Rename SafeStringStream -> locked_stringstream. Bump deps for removal of stringstream.
[dcpomatic.git] / src / lib / writer.cc
1 /*
2     Copyright (C) 2012-2016 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #include "writer.h"
22 #include "compose.hpp"
23 #include "film.h"
24 #include "ratio.h"
25 #include "log.h"
26 #include "dcp_video.h"
27 #include "dcp_content_type.h"
28 #include "audio_mapping.h"
29 #include "config.h"
30 #include "job.h"
31 #include "cross.h"
32 #include "audio_buffers.h"
33 #include "version.h"
34 #include "font.h"
35 #include "util.h"
36 #include "reel_writer.h"
37 #include <dcp/cpl.h>
38 #include <boost/foreach.hpp>
39 #include <fstream>
40 #include <cerrno>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
46 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
47 #define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE);
48 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
49 #define LOG_WARNING_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_WARNING);
50 #define LOG_WARNING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_WARNING);
51 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
52
53 /* OS X strikes again */
54 #undef set_key
55
56 using std::make_pair;
57 using std::pair;
58 using std::string;
59 using std::list;
60 using std::cout;
61 using boost::shared_ptr;
62 using boost::weak_ptr;
63 using boost::dynamic_pointer_cast;
64 using dcp::Data;
65
66 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
67         : _film (film)
68         , _job (j)
69         , _thread (0)
70         , _finish (false)
71         , _queued_full_in_memory (0)
72         , _maximum_frames_in_memory (0)
73         , _full_written (0)
74         , _fake_written (0)
75         , _repeat_written (0)
76         , _pushed_to_disk (0)
77 {
78         /* Remove any old DCP */
79         boost::filesystem::remove_all (_film->dir (_film->dcp_name ()));
80
81         shared_ptr<Job> job = _job.lock ();
82         DCPOMATIC_ASSERT (job);
83
84         BOOST_FOREACH (DCPTimePeriod p, _film->reels ()) {
85                 _reels.push_back (ReelWriter (film, p, job));
86         }
87
88         /* We can keep track of the current audio and subtitle reels easily because audio
89            and subs arrive to the Writer in sequence.  This is not so for video.
90         */
91         _audio_reel = _reels.begin ();
92         _subtitle_reel = _reels.begin ();
93
94         /* Check that the signer is OK if we need one */
95         if (_film->is_signed() && !Config::instance()->signer_chain()->valid ()) {
96                 throw InvalidSignerError ();
97         }
98 }
99
100 void
101 Writer::start ()
102 {
103         _thread = new boost::thread (boost::bind (&Writer::thread, this));
104 }
105
106 Writer::~Writer ()
107 {
108         terminate_thread (false);
109 }
110
111 /** Pass a video frame to the writer for writing to disk at some point.
112  *  This method can be called with frames out of order.
113  *  @param encoded JPEG2000-encoded data.
114  *  @param frame Frame index within the DCP.
115  *  @param eyes Eyes that this frame image is for.
116  */
117 void
118 Writer::write (Data encoded, Frame frame, Eyes eyes)
119 {
120         boost::mutex::scoped_lock lock (_state_mutex);
121
122         while (_queued_full_in_memory > _maximum_frames_in_memory) {
123                 /* The queue is too big; wait until that is sorted out */
124                 _full_condition.wait (lock);
125         }
126
127         QueueItem qi;
128         qi.type = QueueItem::FULL;
129         qi.encoded = encoded;
130         qi.reel = video_reel (frame);
131         qi.frame = frame - _reels[qi.reel].start ();
132
133         if (_film->three_d() && eyes == EYES_BOTH) {
134                 /* 2D material in a 3D DCP; fake the 3D */
135                 qi.eyes = EYES_LEFT;
136                 _queue.push_back (qi);
137                 ++_queued_full_in_memory;
138                 qi.eyes = EYES_RIGHT;
139                 _queue.push_back (qi);
140                 ++_queued_full_in_memory;
141         } else {
142                 qi.eyes = eyes;
143                 _queue.push_back (qi);
144                 ++_queued_full_in_memory;
145         }
146
147         /* Now there's something to do: wake anything wait()ing on _empty_condition */
148         _empty_condition.notify_all ();
149 }
150
151 bool
152 Writer::can_repeat (Frame frame) const
153 {
154         return frame > _reels[video_reel(frame)].start();
155 }
156
157 /** Repeat the last frame that was written to a reel as a new frame.
158  *  @param frame Frame index within the DCP of the new (repeated) frame.
159  *  @param eyes Eyes that this repeated frame image is for.
160  */
161 void
162 Writer::repeat (Frame frame, Eyes eyes)
163 {
164         boost::mutex::scoped_lock lock (_state_mutex);
165
166         while (_queued_full_in_memory > _maximum_frames_in_memory) {
167                 /* The queue is too big; wait until that is sorted out */
168                 _full_condition.wait (lock);
169         }
170
171         QueueItem qi;
172         qi.type = QueueItem::REPEAT;
173         qi.reel = video_reel (frame);
174         qi.frame = frame - _reels[qi.reel].start ();
175         if (_film->three_d() && eyes == EYES_BOTH) {
176                 qi.eyes = EYES_LEFT;
177                 _queue.push_back (qi);
178                 qi.eyes = EYES_RIGHT;
179                 _queue.push_back (qi);
180         } else {
181                 qi.eyes = eyes;
182                 _queue.push_back (qi);
183         }
184
185         /* Now there's something to do: wake anything wait()ing on _empty_condition */
186         _empty_condition.notify_all ();
187 }
188
189 void
190 Writer::fake_write (Frame frame, Eyes eyes)
191 {
192         boost::mutex::scoped_lock lock (_state_mutex);
193
194         while (_queued_full_in_memory > _maximum_frames_in_memory) {
195                 /* The queue is too big; wait until that is sorted out */
196                 _full_condition.wait (lock);
197         }
198
199         size_t const reel = video_reel (frame);
200         Frame const reel_frame = frame - _reels[reel].start ();
201
202         FILE* file = fopen_boost (_film->info_file(_reels[reel].period()), "rb");
203         if (!file) {
204                 throw ReadFileError (_film->info_file(_reels[reel].period()));
205         }
206         dcp::FrameInfo info = _reels[reel].read_frame_info (file, reel_frame, eyes);
207         fclose (file);
208
209         QueueItem qi;
210         qi.type = QueueItem::FAKE;
211         qi.size = info.size;
212         qi.reel = reel;
213         qi.frame = reel_frame;
214         if (_film->three_d() && eyes == EYES_BOTH) {
215                 qi.eyes = EYES_LEFT;
216                 _queue.push_back (qi);
217                 qi.eyes = EYES_RIGHT;
218                 _queue.push_back (qi);
219         } else {
220                 qi.eyes = eyes;
221                 _queue.push_back (qi);
222         }
223
224         /* Now there's something to do: wake anything wait()ing on _empty_condition */
225         _empty_condition.notify_all ();
226 }
227
228 /** Write one video frame's worth of audio frames to the DCP.
229  *  @param audio Audio data or 0 if there is no audio to be written here (i.e. it is referenced).
230  *  This method is not thread safe.
231  */
232 void
233 Writer::write (shared_ptr<const AudioBuffers> audio)
234 {
235         if (_audio_reel == _reels.end ()) {
236                 /* This audio is off the end of the last reel; ignore it */
237                 return;
238         }
239
240         _audio_reel->write (audio);
241
242         /* written is in video frames, not audio frames */
243         if (_audio_reel->total_written_audio_frames() >= _audio_reel->period().duration().frames_floor (_film->video_frame_rate())) {
244                 ++_audio_reel;
245         }
246 }
247
248 /** This must be called from Writer::thread() with an appropriate lock held */
249 bool
250 Writer::have_sequenced_image_at_queue_head ()
251 {
252         if (_queue.empty ()) {
253                 return false;
254         }
255
256         _queue.sort ();
257
258         QueueItem const & f = _queue.front();
259         ReelWriter const & reel = _reels[f.reel];
260
261         /* The queue should contain only EYES_LEFT/EYES_RIGHT pairs or EYES_BOTH */
262
263         if (f.eyes == EYES_BOTH) {
264                 /* 2D */
265                 return f.frame == (reel.last_written_video_frame() + 1);
266         }
267
268         /* 3D */
269
270         if (reel.last_written_eyes() == EYES_LEFT && f.frame == reel.last_written_video_frame() && f.eyes == EYES_RIGHT) {
271                 return true;
272         }
273
274         if (reel.last_written_eyes() == EYES_RIGHT && f.frame == (reel.last_written_video_frame() + 1) && f.eyes == EYES_LEFT) {
275                 return true;
276         }
277
278         return false;
279 }
280
281 void
282 Writer::thread ()
283 try
284 {
285         while (true)
286         {
287                 boost::mutex::scoped_lock lock (_state_mutex);
288
289                 while (true) {
290
291                         if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
292                                 /* We've got something to do: go and do it */
293                                 break;
294                         }
295
296                         /* Nothing to do: wait until something happens which may indicate that we do */
297                         LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
298                         _empty_condition.wait (lock);
299                         LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
300                 }
301
302                 if (_finish && _queue.empty()) {
303                         return;
304                 }
305
306                 /* We stop here if we have been asked to finish, and if either the queue
307                    is empty or we do not have a sequenced image at its head (if this is the
308                    case we will never terminate as no new frames will be sent once
309                    _finish is true).
310                 */
311                 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
312                         /* (Hopefully temporarily) log anything that was not written */
313                         if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
314                                 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
315                                 for (list<QueueItem>::const_iterator i = _queue.begin(); i != _queue.end(); ++i) {
316                                         if (i->type == QueueItem::FULL) {
317                                                 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i->frame, i->eyes);
318                                         } else {
319                                                 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i->size, i->frame, i->eyes);
320                                         }
321                                 }
322                         }
323                         return;
324                 }
325
326                 /* Write any frames that we can write; i.e. those that are in sequence. */
327                 while (have_sequenced_image_at_queue_head ()) {
328                         QueueItem qi = _queue.front ();
329                         _queue.pop_front ();
330                         if (qi.type == QueueItem::FULL && qi.encoded) {
331                                 --_queued_full_in_memory;
332                         }
333
334                         lock.unlock ();
335
336                         ReelWriter& reel = _reels[qi.reel];
337
338                         switch (qi.type) {
339                         case QueueItem::FULL:
340                                 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, qi.eyes);
341                                 if (!qi.encoded) {
342                                         qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
343                                 }
344                                 reel.write (qi.encoded, qi.frame, qi.eyes);
345                                 ++_full_written;
346                                 break;
347                         case QueueItem::FAKE:
348                                 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
349                                 reel.fake_write (qi.frame, qi.eyes, qi.size);
350                                 ++_fake_written;
351                                 break;
352                         case QueueItem::REPEAT:
353                                 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
354                                 reel.repeat_write (qi.frame, qi.eyes);
355                                 ++_repeat_written;
356                                 break;
357                         }
358
359                         lock.lock ();
360                 }
361
362                 while (_queued_full_in_memory > _maximum_frames_in_memory) {
363                         /* Too many frames in memory which can't yet be written to the stream.
364                            Write some FULL frames to disk.
365                         */
366
367                         /* Find one from the back of the queue */
368                         _queue.sort ();
369                         list<QueueItem>::reverse_iterator i = _queue.rbegin ();
370                         while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
371                                 ++i;
372                         }
373
374                         DCPOMATIC_ASSERT (i != _queue.rend());
375                         ++_pushed_to_disk;
376                         lock.unlock ();
377
378                         /* i is valid here, even though we don't hold a lock on the mutex,
379                            since list iterators are unaffected by insertion and only this
380                            thread could erase the last item in the list.
381                         */
382
383                         LOG_GENERAL ("Writer full; pushes %1 to disk", i->frame);
384
385                         i->encoded->write_via_temp (
386                                 _film->j2c_path (i->reel, i->frame, i->eyes, true),
387                                 _film->j2c_path (i->reel, i->frame, i->eyes, false)
388                                 );
389
390                         lock.lock ();
391                         i->encoded.reset ();
392                         --_queued_full_in_memory;
393                 }
394
395                 /* The queue has probably just gone down a bit; notify anything wait()ing on _full_condition */
396                 _full_condition.notify_all ();
397         }
398 }
399 catch (...)
400 {
401         store_current ();
402 }
403
404 void
405 Writer::terminate_thread (bool can_throw)
406 {
407         boost::mutex::scoped_lock lock (_state_mutex);
408         if (_thread == 0) {
409                 return;
410         }
411
412         _finish = true;
413         _empty_condition.notify_all ();
414         _full_condition.notify_all ();
415         lock.unlock ();
416
417         if (_thread->joinable ()) {
418                 _thread->join ();
419         }
420
421         if (can_throw) {
422                 rethrow ();
423         }
424
425         delete _thread;
426         _thread = 0;
427 }
428
429 void
430 Writer::finish ()
431 {
432         if (!_thread) {
433                 return;
434         }
435
436         LOG_GENERAL_NC ("Terminating writer thread");
437
438         terminate_thread (true);
439
440         LOG_GENERAL_NC ("Finishing ReelWriters");
441
442         BOOST_FOREACH (ReelWriter& i, _reels) {
443                 i.finish ();
444         }
445
446         LOG_GENERAL_NC ("Writing XML");
447
448         dcp::DCP dcp (_film->dir (_film->dcp_name()));
449
450         shared_ptr<dcp::CPL> cpl (
451                 new dcp::CPL (
452                         _film->dcp_name(),
453                         _film->dcp_content_type()->libdcp_kind ()
454                         )
455                 );
456
457         dcp.add (cpl);
458
459         BOOST_FOREACH (ReelWriter& i, _reels) {
460
461                 shared_ptr<Job> job = _job.lock ();
462                 DCPOMATIC_ASSERT (job);
463                 i.calculate_digests (job);
464
465                 cpl->add (i.create_reel (_reel_assets, _fonts));
466         }
467
468         dcp::XMLMetadata meta;
469         meta.creator = Config::instance()->dcp_creator ();
470         if (meta.creator.empty ()) {
471                 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
472         }
473         meta.issuer = Config::instance()->dcp_issuer ();
474         if (meta.issuer.empty ()) {
475                 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
476         }
477         meta.set_issue_date_now ();
478
479         cpl->set_metadata (meta);
480
481         shared_ptr<const dcp::CertificateChain> signer;
482         if (_film->is_signed ()) {
483                 signer = Config::instance()->signer_chain ();
484                 /* We did check earlier, but check again here to be on the safe side */
485                 if (!signer->valid ()) {
486                         throw InvalidSignerError ();
487                 }
488         }
489
490         dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer);
491
492         LOG_GENERAL (
493                 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
494                 );
495 }
496
497 /** @param frame Frame index within the whole DCP.
498  *  @return true if we can fake-write this frame.
499  */
500 bool
501 Writer::can_fake_write (Frame frame) const
502 {
503         /* We have to do a proper write of the first frame so that we can set up the JPEG2000
504            parameters in the asset writer.
505         */
506
507         ReelWriter const & reel = _reels[video_reel(frame)];
508
509         /* Make frame relative to the start of the reel */
510         frame -= reel.start ();
511         return (frame != 0 && frame < reel.first_nonexistant_frame());
512 }
513
514 void
515 Writer::write (PlayerSubtitles subs)
516 {
517         if (subs.text.empty ()) {
518                 return;
519         }
520
521         if (_subtitle_reel->period().to <= subs.from) {
522                 ++_subtitle_reel;
523         }
524
525         _subtitle_reel->write (subs);
526 }
527
528 void
529 Writer::write (list<shared_ptr<Font> > fonts)
530 {
531         /* Just keep a list of unique fonts and we'll deal with them in ::finish */
532
533         BOOST_FOREACH (shared_ptr<Font> i, fonts) {
534                 bool got = false;
535                 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
536                         if (*i == *j) {
537                                 got = true;
538                         }
539                 }
540
541                 if (!got) {
542                         _fonts.push_back (i);
543                 }
544         }
545 }
546
547 bool
548 operator< (QueueItem const & a, QueueItem const & b)
549 {
550         if (a.reel != b.reel) {
551                 return a.reel < b.reel;
552         }
553
554         if (a.frame != b.frame) {
555                 return a.frame < b.frame;
556         }
557
558         return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
559 }
560
561 bool
562 operator== (QueueItem const & a, QueueItem const & b)
563 {
564         return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
565 }
566
567 void
568 Writer::set_encoder_threads (int threads)
569 {
570         /* I think the scaling factor here should be the ratio of the longest frame
571            encode time to the shortest; if the thread count is T, longest time is L
572            and the shortest time S we could encode L/S frames per thread whilst waiting
573            for the L frame to encode so we might have to store LT/S frames.
574
575            However we don't want to use too much memory, so keep it a bit lower than we'd
576            perhaps like.  A J2K frame is typically about 1Mb so 3 here will mean we could
577            use about 240Mb with 72 encoding threads.
578         */
579         _maximum_frames_in_memory = lrint (threads * 3);
580 }
581
582 void
583 Writer::write (ReferencedReelAsset asset)
584 {
585         _reel_assets.push_back (asset);
586 }
587
588 size_t
589 Writer::video_reel (int frame) const
590 {
591         DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
592         size_t i = 0;
593         while (i < _reels.size() && !_reels[i].period().contains (t)) {
594                 ++i;
595         }
596
597         DCPOMATIC_ASSERT (i < _reels.size ());
598         return i;
599 }