Stop double-calculation of hashes.
[dcpomatic.git] / src / lib / writer.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 #include "writer.h"
21 #include "compose.hpp"
22 #include "film.h"
23 #include "ratio.h"
24 #include "log.h"
25 #include "dcp_video.h"
26 #include "dcp_content_type.h"
27 #include "audio_mapping.h"
28 #include "config.h"
29 #include "job.h"
30 #include "cross.h"
31 #include "audio_buffers.h"
32 #include "md5_digester.h"
33 #include "version.h"
34 #include "font.h"
35 #include "util.h"
36 #include "reel_writer.h"
37 #include <dcp/cpl.h>
38 #include <boost/foreach.hpp>
39 #include <fstream>
40 #include <cerrno>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
46 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
47 #define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE);
48 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
49 #define LOG_WARNING_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_WARNING);
50 #define LOG_WARNING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_WARNING);
51 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
52
53 /* OS X strikes again */
54 #undef set_key
55
56 using std::make_pair;
57 using std::pair;
58 using std::string;
59 using std::list;
60 using std::cout;
61 using boost::shared_ptr;
62 using boost::weak_ptr;
63 using boost::dynamic_pointer_cast;
64 using dcp::Data;
65
66 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
67         : _film (film)
68         , _job (j)
69         , _thread (0)
70         , _finish (false)
71         , _queued_full_in_memory (0)
72         , _maximum_frames_in_memory (0)
73         , _full_written (0)
74         , _fake_written (0)
75         , _repeat_written (0)
76         , _pushed_to_disk (0)
77 {
78         /* Remove any old DCP */
79         boost::filesystem::remove_all (_film->dir (_film->dcp_name ()));
80
81         shared_ptr<Job> job = _job.lock ();
82         DCPOMATIC_ASSERT (job);
83
84         BOOST_FOREACH (DCPTimePeriod p, _film->reels ()) {
85                 _reels.push_back (ReelWriter (film, p, job));
86         }
87
88         /* We can keep track of the current audio and subtitle reels easily because audio
89            and subs arrive to the Writer in sequence.  This is not so for video.
90         */
91         _audio_reel = _reels.begin ();
92         _subtitle_reel = _reels.begin ();
93
94         /* Check that the signer is OK if we need one */
95         if (_film->is_signed() && !Config::instance()->signer_chain()->valid ()) {
96                 throw InvalidSignerError ();
97         }
98
99         job->sub (_("Encoding image data"));
100 }
101
102 void
103 Writer::start ()
104 {
105         _thread = new boost::thread (boost::bind (&Writer::thread, this));
106 }
107
108 Writer::~Writer ()
109 {
110         terminate_thread (false);
111 }
112
113 /** Pass a video frame to the writer for writing to disk at some point.
114  *  This method can be called with frames out of order.
115  *  @param encoded JPEG2000-encoded data.
116  *  @param frame Frame index within the DCP.
117  *  @param eyes Eyes that this frame image is for.
118  */
119 void
120 Writer::write (Data encoded, Frame frame, Eyes eyes)
121 {
122         boost::mutex::scoped_lock lock (_state_mutex);
123
124         while (_queued_full_in_memory > _maximum_frames_in_memory) {
125                 /* The queue is too big; wait until that is sorted out */
126                 _full_condition.wait (lock);
127         }
128
129         QueueItem qi;
130         qi.type = QueueItem::FULL;
131         qi.encoded = encoded;
132         qi.reel = video_reel (frame);
133         qi.frame = frame - _reels[qi.reel].start ();
134
135         if (_film->three_d() && eyes == EYES_BOTH) {
136                 /* 2D material in a 3D DCP; fake the 3D */
137                 qi.eyes = EYES_LEFT;
138                 _queue.push_back (qi);
139                 ++_queued_full_in_memory;
140                 qi.eyes = EYES_RIGHT;
141                 _queue.push_back (qi);
142                 ++_queued_full_in_memory;
143         } else {
144                 qi.eyes = eyes;
145                 _queue.push_back (qi);
146                 ++_queued_full_in_memory;
147         }
148
149         /* Now there's something to do: wake anything wait()ing on _empty_condition */
150         _empty_condition.notify_all ();
151 }
152
153 bool
154 Writer::can_repeat (Frame frame) const
155 {
156         return frame > _reels[video_reel(frame)].start();
157 }
158
159 /** Repeat the last frame that was written to a reel as a new frame.
160  *  @param frame Frame index within the DCP of the new (repeated) frame.
161  *  @param eyes Eyes that this repeated frame image is for.
162  */
163 void
164 Writer::repeat (Frame frame, Eyes eyes)
165 {
166         boost::mutex::scoped_lock lock (_state_mutex);
167
168         while (_queued_full_in_memory > _maximum_frames_in_memory) {
169                 /* The queue is too big; wait until that is sorted out */
170                 _full_condition.wait (lock);
171         }
172
173         QueueItem qi;
174         qi.type = QueueItem::REPEAT;
175         qi.reel = video_reel (frame);
176         qi.frame = frame - _reels[qi.reel].start ();
177         if (_film->three_d() && eyes == EYES_BOTH) {
178                 qi.eyes = EYES_LEFT;
179                 _queue.push_back (qi);
180                 qi.eyes = EYES_RIGHT;
181                 _queue.push_back (qi);
182         } else {
183                 qi.eyes = eyes;
184                 _queue.push_back (qi);
185         }
186
187         /* Now there's something to do: wake anything wait()ing on _empty_condition */
188         _empty_condition.notify_all ();
189 }
190
191 void
192 Writer::fake_write (Frame frame, Eyes eyes)
193 {
194         boost::mutex::scoped_lock lock (_state_mutex);
195
196         while (_queued_full_in_memory > _maximum_frames_in_memory) {
197                 /* The queue is too big; wait until that is sorted out */
198                 _full_condition.wait (lock);
199         }
200
201         size_t const reel = video_reel (frame);
202         Frame const reel_frame = frame - _reels[reel].start ();
203
204         FILE* file = fopen_boost (_film->info_file(_reels[reel].period()), "rb");
205         if (!file) {
206                 throw ReadFileError (_film->info_file(_reels[reel].period()));
207         }
208         dcp::FrameInfo info = _reels[reel].read_frame_info (file, reel_frame, eyes);
209         fclose (file);
210
211         QueueItem qi;
212         qi.type = QueueItem::FAKE;
213         qi.size = info.size;
214         qi.reel = reel;
215         qi.frame = reel_frame;
216         if (_film->three_d() && eyes == EYES_BOTH) {
217                 qi.eyes = EYES_LEFT;
218                 _queue.push_back (qi);
219                 qi.eyes = EYES_RIGHT;
220                 _queue.push_back (qi);
221         } else {
222                 qi.eyes = eyes;
223                 _queue.push_back (qi);
224         }
225
226         /* Now there's something to do: wake anything wait()ing on _empty_condition */
227         _empty_condition.notify_all ();
228 }
229
230 /** Write one video frame's worth of audio frames to the DCP.
231  *  @param audio Audio data or 0 if there is no audio to be written here (i.e. it is referenced).
232  *  This method is not thread safe.
233  */
234 void
235 Writer::write (shared_ptr<const AudioBuffers> audio)
236 {
237         if (_audio_reel == _reels.end ()) {
238                 /* This audio is off the end of the last reel; ignore it */
239                 return;
240         }
241
242         _audio_reel->write (audio);
243
244         /* written is in video frames, not audio frames */
245         if (_audio_reel->total_written_audio_frames() >= _audio_reel->period().duration().frames_floor (_film->video_frame_rate())) {
246                 ++_audio_reel;
247         }
248 }
249
250 /** This must be called from Writer::thread() with an appropriate lock held */
251 bool
252 Writer::have_sequenced_image_at_queue_head ()
253 {
254         if (_queue.empty ()) {
255                 return false;
256         }
257
258         _queue.sort ();
259
260         QueueItem const & f = _queue.front();
261         ReelWriter const & reel = _reels[f.reel];
262
263         /* The queue should contain only EYES_LEFT/EYES_RIGHT pairs or EYES_BOTH */
264
265         if (f.eyes == EYES_BOTH) {
266                 /* 2D */
267                 return f.frame == (reel.last_written_video_frame() + 1);
268         }
269
270         /* 3D */
271
272         if (reel.last_written_eyes() == EYES_LEFT && f.frame == reel.last_written_video_frame() && f.eyes == EYES_RIGHT) {
273                 return true;
274         }
275
276         if (reel.last_written_eyes() == EYES_RIGHT && f.frame == (reel.last_written_video_frame() + 1) && f.eyes == EYES_LEFT) {
277                 return true;
278         }
279
280         return false;
281 }
282
283 void
284 Writer::thread ()
285 try
286 {
287         while (true)
288         {
289                 boost::mutex::scoped_lock lock (_state_mutex);
290
291                 while (true) {
292
293                         if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
294                                 /* We've got something to do: go and do it */
295                                 break;
296                         }
297
298                         /* Nothing to do: wait until something happens which may indicate that we do */
299                         LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
300                         _empty_condition.wait (lock);
301                         LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
302                 }
303
304                 if (_finish && _queue.empty()) {
305                         return;
306                 }
307
308                 /* We stop here if we have been asked to finish, and if either the queue
309                    is empty or we do not have a sequenced image at its head (if this is the
310                    case we will never terminate as no new frames will be sent once
311                    _finish is true).
312                 */
313                 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
314                         /* (Hopefully temporarily) log anything that was not written */
315                         if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
316                                 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
317                                 for (list<QueueItem>::const_iterator i = _queue.begin(); i != _queue.end(); ++i) {
318                                         if (i->type == QueueItem::FULL) {
319                                                 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i->frame, i->eyes);
320                                         } else {
321                                                 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i->size, i->frame, i->eyes);
322                                         }
323                                 }
324                         }
325                         return;
326                 }
327
328                 /* Write any frames that we can write; i.e. those that are in sequence. */
329                 while (have_sequenced_image_at_queue_head ()) {
330                         QueueItem qi = _queue.front ();
331                         _queue.pop_front ();
332                         if (qi.type == QueueItem::FULL && qi.encoded) {
333                                 --_queued_full_in_memory;
334                         }
335
336                         lock.unlock ();
337
338                         ReelWriter& reel = _reels[qi.reel];
339
340                         switch (qi.type) {
341                         case QueueItem::FULL:
342                                 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, qi.eyes);
343                                 if (!qi.encoded) {
344                                         qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
345                                 }
346                                 reel.write (qi.encoded, qi.frame, qi.eyes);
347                                 ++_full_written;
348                                 break;
349                         case QueueItem::FAKE:
350                                 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
351                                 reel.fake_write (qi.frame, qi.eyes, qi.size);
352                                 ++_fake_written;
353                                 break;
354                         case QueueItem::REPEAT:
355                                 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
356                                 reel.repeat_write (qi.frame, qi.eyes);
357                                 ++_repeat_written;
358                                 break;
359                         }
360
361                         lock.lock ();
362
363                         shared_ptr<Job> job = _job.lock ();
364                         DCPOMATIC_ASSERT (job);
365                         int64_t total = _film->length().frames_round (_film->video_frame_rate ());
366                         if (_film->three_d ()) {
367                                 /* _full_written and so on are incremented for each eye, so we need to double the total
368                                    frames to get the correct progress.
369                                 */
370                                 total *= 2;
371                         }
372                         if (total) {
373                                 job->set_progress (float (_full_written + _fake_written + _repeat_written) / total);
374                         }
375                 }
376
377                 while (_queued_full_in_memory > _maximum_frames_in_memory) {
378                         /* Too many frames in memory which can't yet be written to the stream.
379                            Write some FULL frames to disk.
380                         */
381
382                         /* Find one from the back of the queue */
383                         _queue.sort ();
384                         list<QueueItem>::reverse_iterator i = _queue.rbegin ();
385                         while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
386                                 ++i;
387                         }
388
389                         DCPOMATIC_ASSERT (i != _queue.rend());
390                         ++_pushed_to_disk;
391                         lock.unlock ();
392
393                         /* i is valid here, even though we don't hold a lock on the mutex,
394                            since list iterators are unaffected by insertion and only this
395                            thread could erase the last item in the list.
396                         */
397
398                         LOG_GENERAL ("Writer full; pushes %1 to disk", i->frame);
399
400                         i->encoded->write_via_temp (
401                                 _film->j2c_path (i->reel, i->frame, i->eyes, true),
402                                 _film->j2c_path (i->reel, i->frame, i->eyes, false)
403                                 );
404
405                         lock.lock ();
406                         i->encoded.reset ();
407                         --_queued_full_in_memory;
408                 }
409
410                 /* The queue has probably just gone down a bit; notify anything wait()ing on _full_condition */
411                 _full_condition.notify_all ();
412         }
413 }
414 catch (...)
415 {
416         store_current ();
417 }
418
419 void
420 Writer::terminate_thread (bool can_throw)
421 {
422         boost::mutex::scoped_lock lock (_state_mutex);
423         if (_thread == 0) {
424                 return;
425         }
426
427         _finish = true;
428         _empty_condition.notify_all ();
429         _full_condition.notify_all ();
430         lock.unlock ();
431
432         if (_thread->joinable ()) {
433                 _thread->join ();
434         }
435
436         if (can_throw) {
437                 rethrow ();
438         }
439
440         delete _thread;
441         _thread = 0;
442 }
443
444 void
445 Writer::finish ()
446 {
447         if (!_thread) {
448                 return;
449         }
450
451         LOG_GENERAL_NC ("Terminating writer thread");
452
453         terminate_thread (true);
454
455         LOG_GENERAL_NC ("Finishing ReelWriters");
456
457         BOOST_FOREACH (ReelWriter& i, _reels) {
458                 i.finish ();
459         }
460
461         LOG_GENERAL_NC ("Writing XML");
462
463         dcp::DCP dcp (_film->dir (_film->dcp_name()));
464
465         shared_ptr<dcp::CPL> cpl (
466                 new dcp::CPL (
467                         _film->dcp_name(),
468                         _film->dcp_content_type()->libdcp_kind ()
469                         )
470                 );
471
472         dcp.add (cpl);
473
474         BOOST_FOREACH (ReelWriter& i, _reels) {
475
476                 shared_ptr<Job> job = _job.lock ();
477                 DCPOMATIC_ASSERT (job);
478                 i.calculate_digests (job);
479
480                 cpl->add (i.create_reel (_reel_assets, _fonts));
481         }
482
483         dcp::XMLMetadata meta;
484         meta.creator = Config::instance()->dcp_creator ();
485         if (meta.creator.empty ()) {
486                 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
487         }
488         meta.issuer = Config::instance()->dcp_issuer ();
489         if (meta.issuer.empty ()) {
490                 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
491         }
492         meta.set_issue_date_now ();
493
494         cpl->set_metadata (meta);
495
496         shared_ptr<const dcp::CertificateChain> signer;
497         if (_film->is_signed ()) {
498                 signer = Config::instance()->signer_chain ();
499                 /* We did check earlier, but check again here to be on the safe side */
500                 if (!signer->valid ()) {
501                         throw InvalidSignerError ();
502                 }
503         }
504
505         dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer);
506
507         LOG_GENERAL (
508                 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
509                 );
510 }
511
512 /** @param frame Frame index within the whole DCP.
513  *  @return true if we can fake-write this frame.
514  */
515 bool
516 Writer::can_fake_write (Frame frame) const
517 {
518         /* We have to do a proper write of the first frame so that we can set up the JPEG2000
519            parameters in the asset writer.
520         */
521
522         ReelWriter const & reel = _reels[video_reel(frame)];
523
524         /* Make frame relative to the start of the reel */
525         frame -= reel.start ();
526         return (frame != 0 && frame < reel.first_nonexistant_frame());
527 }
528
529 void
530 Writer::write (PlayerSubtitles subs)
531 {
532         if (subs.text.empty ()) {
533                 return;
534         }
535
536         if (_subtitle_reel->period().to <= subs.from) {
537                 ++_subtitle_reel;
538         }
539
540         _subtitle_reel->write (subs);
541 }
542
543 void
544 Writer::write (list<shared_ptr<Font> > fonts)
545 {
546         /* Just keep a list of fonts and we'll deal with them in ::finish */
547         copy (fonts.begin (), fonts.end (), back_inserter (_fonts));
548 }
549
550 bool
551 operator< (QueueItem const & a, QueueItem const & b)
552 {
553         if (a.reel != b.reel) {
554                 return a.reel < b.reel;
555         }
556
557         if (a.frame != b.frame) {
558                 return a.frame < b.frame;
559         }
560
561         return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
562 }
563
564 bool
565 operator== (QueueItem const & a, QueueItem const & b)
566 {
567         return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
568 }
569
570 void
571 Writer::set_encoder_threads (int threads)
572 {
573         _maximum_frames_in_memory = lrint (threads * 1.1);
574 }
575
576 void
577 Writer::write (ReferencedReelAsset asset)
578 {
579         _reel_assets.push_back (asset);
580 }
581
582 size_t
583 Writer::video_reel (int frame) const
584 {
585         DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
586         size_t i = 0;
587         while (i < _reels.size() && !_reels[i].period().contains (t)) {
588                 ++i;
589         }
590
591         DCPOMATIC_ASSERT (i < _reels.size ());
592         return i;
593 }