Distinguish master DoM encode threads count from the server count.
[dcpomatic.git] / src / lib / writer.cc
1 /*
2     Copyright (C) 2012-2016 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #include "writer.h"
22 #include "compose.hpp"
23 #include "film.h"
24 #include "ratio.h"
25 #include "log.h"
26 #include "dcp_video.h"
27 #include "dcp_content_type.h"
28 #include "audio_mapping.h"
29 #include "config.h"
30 #include "job.h"
31 #include "cross.h"
32 #include "audio_buffers.h"
33 #include "version.h"
34 #include "font.h"
35 #include "util.h"
36 #include "reel_writer.h"
37 #include <dcp/cpl.h>
38 #include <boost/foreach.hpp>
39 #include <fstream>
40 #include <cerrno>
41 #include <iostream>
42 #include <cfloat>
43
44 #include "i18n.h"
45
46 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
47 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
48 #define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE);
49 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
50 #define LOG_WARNING_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_WARNING);
51 #define LOG_WARNING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_WARNING);
52 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
53
54 /* OS X strikes again */
55 #undef set_key
56
57 using std::make_pair;
58 using std::pair;
59 using std::string;
60 using std::list;
61 using std::cout;
62 using std::map;
63 using std::min;
64 using std::max;
65 using boost::shared_ptr;
66 using boost::weak_ptr;
67 using boost::dynamic_pointer_cast;
68 using dcp::Data;
69
70 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
71         : _film (film)
72         , _job (j)
73         , _thread (0)
74         , _finish (false)
75         , _queued_full_in_memory (0)
76         , _maximum_frames_in_memory (0)
77         , _full_written (0)
78         , _fake_written (0)
79         , _repeat_written (0)
80         , _pushed_to_disk (0)
81 {
82         shared_ptr<Job> job = _job.lock ();
83         DCPOMATIC_ASSERT (job);
84
85         int reel_index = 0;
86         list<DCPTimePeriod> const reels = _film->reels ();
87         BOOST_FOREACH (DCPTimePeriod p, reels) {
88                 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
89         }
90
91         /* We can keep track of the current audio and subtitle reels easily because audio
92            and subs arrive to the Writer in sequence.  This is not so for video.
93         */
94         _audio_reel = _reels.begin ();
95         _subtitle_reel = _reels.begin ();
96
97         /* Check that the signer is OK if we need one */
98         string reason;
99         if (_film->is_signed() && !Config::instance()->signer_chain()->valid(&reason)) {
100                 throw InvalidSignerError (reason);
101         }
102 }
103
104 void
105 Writer::start ()
106 {
107         _thread = new boost::thread (boost::bind (&Writer::thread, this));
108 }
109
110 Writer::~Writer ()
111 {
112         terminate_thread (false);
113 }
114
115 /** Pass a video frame to the writer for writing to disk at some point.
116  *  This method can be called with frames out of order.
117  *  @param encoded JPEG2000-encoded data.
118  *  @param frame Frame index within the DCP.
119  *  @param eyes Eyes that this frame image is for.
120  */
121 void
122 Writer::write (Data encoded, Frame frame, Eyes eyes)
123 {
124         boost::mutex::scoped_lock lock (_state_mutex);
125
126         while (_queued_full_in_memory > _maximum_frames_in_memory) {
127                 /* The queue is too big; wait until that is sorted out */
128                 _full_condition.wait (lock);
129         }
130
131         QueueItem qi;
132         qi.type = QueueItem::FULL;
133         qi.encoded = encoded;
134         qi.reel = video_reel (frame);
135         qi.frame = frame - _reels[qi.reel].start ();
136
137         if (_film->three_d() && eyes == EYES_BOTH) {
138                 /* 2D material in a 3D DCP; fake the 3D */
139                 qi.eyes = EYES_LEFT;
140                 _queue.push_back (qi);
141                 ++_queued_full_in_memory;
142                 qi.eyes = EYES_RIGHT;
143                 _queue.push_back (qi);
144                 ++_queued_full_in_memory;
145         } else {
146                 qi.eyes = eyes;
147                 _queue.push_back (qi);
148                 ++_queued_full_in_memory;
149         }
150
151         /* Now there's something to do: wake anything wait()ing on _empty_condition */
152         _empty_condition.notify_all ();
153 }
154
155 bool
156 Writer::can_repeat (Frame frame) const
157 {
158         return frame > _reels[video_reel(frame)].start();
159 }
160
161 /** Repeat the last frame that was written to a reel as a new frame.
162  *  @param frame Frame index within the DCP of the new (repeated) frame.
163  *  @param eyes Eyes that this repeated frame image is for.
164  */
165 void
166 Writer::repeat (Frame frame, Eyes eyes)
167 {
168         boost::mutex::scoped_lock lock (_state_mutex);
169
170         while (_queued_full_in_memory > _maximum_frames_in_memory) {
171                 /* The queue is too big; wait until that is sorted out */
172                 _full_condition.wait (lock);
173         }
174
175         QueueItem qi;
176         qi.type = QueueItem::REPEAT;
177         qi.reel = video_reel (frame);
178         qi.frame = frame - _reels[qi.reel].start ();
179         if (_film->three_d() && eyes == EYES_BOTH) {
180                 qi.eyes = EYES_LEFT;
181                 _queue.push_back (qi);
182                 qi.eyes = EYES_RIGHT;
183                 _queue.push_back (qi);
184         } else {
185                 qi.eyes = eyes;
186                 _queue.push_back (qi);
187         }
188
189         /* Now there's something to do: wake anything wait()ing on _empty_condition */
190         _empty_condition.notify_all ();
191 }
192
193 void
194 Writer::fake_write (Frame frame, Eyes eyes)
195 {
196         boost::mutex::scoped_lock lock (_state_mutex);
197
198         while (_queued_full_in_memory > _maximum_frames_in_memory) {
199                 /* The queue is too big; wait until that is sorted out */
200                 _full_condition.wait (lock);
201         }
202
203         size_t const reel = video_reel (frame);
204         Frame const reel_frame = frame - _reels[reel].start ();
205
206         FILE* file = fopen_boost (_film->info_file(_reels[reel].period()), "rb");
207         if (!file) {
208                 throw ReadFileError (_film->info_file(_reels[reel].period()));
209         }
210         dcp::FrameInfo info = _reels[reel].read_frame_info (file, reel_frame, eyes);
211         fclose (file);
212
213         QueueItem qi;
214         qi.type = QueueItem::FAKE;
215         qi.size = info.size;
216         qi.reel = reel;
217         qi.frame = reel_frame;
218         if (_film->three_d() && eyes == EYES_BOTH) {
219                 qi.eyes = EYES_LEFT;
220                 _queue.push_back (qi);
221                 qi.eyes = EYES_RIGHT;
222                 _queue.push_back (qi);
223         } else {
224                 qi.eyes = eyes;
225                 _queue.push_back (qi);
226         }
227
228         /* Now there's something to do: wake anything wait()ing on _empty_condition */
229         _empty_condition.notify_all ();
230 }
231
232 /** Write some audio frames to the DCP.
233  *  @param audio Audio data or 0 if there is no audio to be written here (i.e. it is referenced).
234  *  This method is not thread safe.
235  */
236 void
237 Writer::write (shared_ptr<const AudioBuffers> audio)
238 {
239         /* The audio we get might span a reel boundary, and if so we have to write it in bits */
240
241         int32_t offset = 0;
242         while (offset < audio->frames ()) {
243
244                 if (_audio_reel == _reels.end ()) {
245                         /* This audio is off the end of the last reel; ignore it */
246                         return;
247                 }
248
249                 int32_t const this_time = min (
250                         audio->frames() - offset,
251                         (int32_t) (_audio_reel->period().duration().frames_floor(_film->audio_frame_rate()) - _audio_reel->total_written_audio_frames())
252                         );
253
254                 if (this_time == audio->frames()) {
255                         /* Easy case: we can write all the audio to this reel */
256                         _audio_reel->write (audio);
257                 } else {
258                         /* Write the part we can */
259                         shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), this_time));
260                         part->copy_from (audio.get(), this_time, offset, 0);
261                         _audio_reel->write (part);
262                         ++_audio_reel;
263                 }
264
265                 offset += this_time;
266         }
267 }
268
269 /** This must be called from Writer::thread() with an appropriate lock held */
270 bool
271 Writer::have_sequenced_image_at_queue_head ()
272 {
273         if (_queue.empty ()) {
274                 return false;
275         }
276
277         _queue.sort ();
278
279         QueueItem const & f = _queue.front();
280         ReelWriter const & reel = _reels[f.reel];
281
282         /* The queue should contain only EYES_LEFT/EYES_RIGHT pairs or EYES_BOTH */
283
284         if (f.eyes == EYES_BOTH) {
285                 /* 2D */
286                 return f.frame == (reel.last_written_video_frame() + 1);
287         }
288
289         /* 3D */
290
291         if (reel.last_written_eyes() == EYES_LEFT && f.frame == reel.last_written_video_frame() && f.eyes == EYES_RIGHT) {
292                 return true;
293         }
294
295         if (reel.last_written_eyes() == EYES_RIGHT && f.frame == (reel.last_written_video_frame() + 1) && f.eyes == EYES_LEFT) {
296                 return true;
297         }
298
299         return false;
300 }
301
302 void
303 Writer::thread ()
304 try
305 {
306         while (true)
307         {
308                 boost::mutex::scoped_lock lock (_state_mutex);
309
310                 while (true) {
311
312                         if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
313                                 /* We've got something to do: go and do it */
314                                 break;
315                         }
316
317                         /* Nothing to do: wait until something happens which may indicate that we do */
318                         LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
319                         _empty_condition.wait (lock);
320                         LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
321                 }
322
323                 if (_finish && _queue.empty()) {
324                         return;
325                 }
326
327                 /* We stop here if we have been asked to finish, and if either the queue
328                    is empty or we do not have a sequenced image at its head (if this is the
329                    case we will never terminate as no new frames will be sent once
330                    _finish is true).
331                 */
332                 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
333                         /* (Hopefully temporarily) log anything that was not written */
334                         if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
335                                 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
336                                 for (list<QueueItem>::const_iterator i = _queue.begin(); i != _queue.end(); ++i) {
337                                         if (i->type == QueueItem::FULL) {
338                                                 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i->frame, (int) i->eyes);
339                                         } else {
340                                                 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i->size, i->frame, (int) i->eyes);
341                                         }
342                                 }
343                         }
344                         return;
345                 }
346
347                 /* Write any frames that we can write; i.e. those that are in sequence. */
348                 while (have_sequenced_image_at_queue_head ()) {
349                         QueueItem qi = _queue.front ();
350                         _queue.pop_front ();
351                         if (qi.type == QueueItem::FULL && qi.encoded) {
352                                 --_queued_full_in_memory;
353                         }
354
355                         lock.unlock ();
356
357                         ReelWriter& reel = _reels[qi.reel];
358
359                         switch (qi.type) {
360                         case QueueItem::FULL:
361                                 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
362                                 if (!qi.encoded) {
363                                         qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
364                                 }
365                                 reel.write (qi.encoded, qi.frame, qi.eyes);
366                                 ++_full_written;
367                                 break;
368                         case QueueItem::FAKE:
369                                 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
370                                 reel.fake_write (qi.frame, qi.eyes, qi.size);
371                                 ++_fake_written;
372                                 break;
373                         case QueueItem::REPEAT:
374                                 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
375                                 reel.repeat_write (qi.frame, qi.eyes);
376                                 ++_repeat_written;
377                                 break;
378                         }
379
380                         lock.lock ();
381                 }
382
383                 while (_queued_full_in_memory > _maximum_frames_in_memory) {
384                         /* Too many frames in memory which can't yet be written to the stream.
385                            Write some FULL frames to disk.
386                         */
387
388                         /* Find one from the back of the queue */
389                         _queue.sort ();
390                         list<QueueItem>::reverse_iterator i = _queue.rbegin ();
391                         while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
392                                 ++i;
393                         }
394
395                         DCPOMATIC_ASSERT (i != _queue.rend());
396                         ++_pushed_to_disk;
397                         /* For the log message below */
398                         int const awaiting = _reels[_queue.front().reel].last_written_video_frame();
399                         lock.unlock ();
400
401                         /* i is valid here, even though we don't hold a lock on the mutex,
402                            since list iterators are unaffected by insertion and only this
403                            thread could erase the last item in the list.
404                         */
405
406                         LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
407
408                         i->encoded->write_via_temp (
409                                 _film->j2c_path (i->reel, i->frame, i->eyes, true),
410                                 _film->j2c_path (i->reel, i->frame, i->eyes, false)
411                                 );
412
413                         lock.lock ();
414                         i->encoded.reset ();
415                         --_queued_full_in_memory;
416                 }
417
418                 /* The queue has probably just gone down a bit; notify anything wait()ing on _full_condition */
419                 _full_condition.notify_all ();
420         }
421 }
422 catch (...)
423 {
424         store_current ();
425 }
426
427 void
428 Writer::terminate_thread (bool can_throw)
429 {
430         boost::mutex::scoped_lock lock (_state_mutex);
431         if (_thread == 0) {
432                 return;
433         }
434
435         _finish = true;
436         _empty_condition.notify_all ();
437         _full_condition.notify_all ();
438         lock.unlock ();
439
440         if (_thread->joinable ()) {
441                 _thread->join ();
442         }
443
444         if (can_throw) {
445                 rethrow ();
446         }
447
448         delete _thread;
449         _thread = 0;
450 }
451
452 void
453 Writer::finish ()
454 {
455         if (!_thread) {
456                 return;
457         }
458
459         LOG_GENERAL_NC ("Terminating writer thread");
460
461         terminate_thread (true);
462
463         LOG_GENERAL_NC ("Finishing ReelWriters");
464
465         BOOST_FOREACH (ReelWriter& i, _reels) {
466                 i.finish ();
467         }
468
469         LOG_GENERAL_NC ("Writing XML");
470
471         dcp::DCP dcp (_film->dir (_film->dcp_name()));
472
473         shared_ptr<dcp::CPL> cpl (
474                 new dcp::CPL (
475                         _film->dcp_name(),
476                         _film->dcp_content_type()->libdcp_kind ()
477                         )
478                 );
479
480         dcp.add (cpl);
481
482         /* Calculate digests for each reel in parallel */
483
484         shared_ptr<Job> job = _job.lock ();
485         job->sub (_("Computing digests"));
486
487         boost::asio::io_service service;
488         boost::thread_group pool;
489
490         shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
491
492         int const threads = max (1, Config::instance()->master_encoding_threads ());
493
494         for (int i = 0; i < threads; ++i) {
495                 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
496         }
497
498         BOOST_FOREACH (ReelWriter& i, _reels) {
499                 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
500                 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
501         }
502
503         work.reset ();
504         pool.join_all ();
505         service.stop ();
506
507         /* Add reels to CPL */
508
509         BOOST_FOREACH (ReelWriter& i, _reels) {
510                 cpl->add (i.create_reel (_reel_assets, _fonts));
511         }
512
513         dcp::XMLMetadata meta;
514         meta.annotation_text = cpl->annotation_text ();
515         meta.creator = Config::instance()->dcp_creator ();
516         if (meta.creator.empty ()) {
517                 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
518         }
519         meta.issuer = Config::instance()->dcp_issuer ();
520         if (meta.issuer.empty ()) {
521                 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
522         }
523         meta.set_issue_date_now ();
524
525         cpl->set_metadata (meta);
526
527         shared_ptr<const dcp::CertificateChain> signer;
528         if (_film->is_signed ()) {
529                 signer = Config::instance()->signer_chain ();
530                 /* We did check earlier, but check again here to be on the safe side */
531                 string reason;
532                 if (!signer->valid (&reason)) {
533                         throw InvalidSignerError (reason);
534                 }
535         }
536
537         dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer, Config::instance()->dcp_metadata_filename_format());
538
539         LOG_GENERAL (
540                 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
541                 );
542 }
543
544 /** @param frame Frame index within the whole DCP.
545  *  @return true if we can fake-write this frame.
546  */
547 bool
548 Writer::can_fake_write (Frame frame) const
549 {
550         /* We have to do a proper write of the first frame so that we can set up the JPEG2000
551            parameters in the asset writer.
552         */
553
554         ReelWriter const & reel = _reels[video_reel(frame)];
555
556         /* Make frame relative to the start of the reel */
557         frame -= reel.start ();
558         return (frame != 0 && frame < reel.first_nonexistant_frame());
559 }
560
561 void
562 Writer::write (PlayerSubtitles subs, DCPTimePeriod period)
563 {
564         if (subs.text.empty ()) {
565                 return;
566         }
567
568         if (_subtitle_reel->period().to <= period.from) {
569                 ++_subtitle_reel;
570         }
571
572         _subtitle_reel->write (subs);
573 }
574
575 void
576 Writer::write (list<shared_ptr<Font> > fonts)
577 {
578         /* Just keep a list of unique fonts and we'll deal with them in ::finish */
579
580         BOOST_FOREACH (shared_ptr<Font> i, fonts) {
581                 bool got = false;
582                 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
583                         if (*i == *j) {
584                                 got = true;
585                         }
586                 }
587
588                 if (!got) {
589                         _fonts.push_back (i);
590                 }
591         }
592 }
593
594 bool
595 operator< (QueueItem const & a, QueueItem const & b)
596 {
597         if (a.reel != b.reel) {
598                 return a.reel < b.reel;
599         }
600
601         if (a.frame != b.frame) {
602                 return a.frame < b.frame;
603         }
604
605         return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
606 }
607
608 bool
609 operator== (QueueItem const & a, QueueItem const & b)
610 {
611         return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
612 }
613
614 void
615 Writer::set_encoder_threads (int threads)
616 {
617         /* I think the scaling factor here should be the ratio of the longest frame
618            encode time to the shortest; if the thread count is T, longest time is L
619            and the shortest time S we could encode L/S frames per thread whilst waiting
620            for the L frame to encode so we might have to store LT/S frames.
621
622            However we don't want to use too much memory, so keep it a bit lower than we'd
623            perhaps like.  A J2K frame is typically about 1Mb so 3 here will mean we could
624            use about 240Mb with 72 encoding threads.
625         */
626         _maximum_frames_in_memory = lrint (threads * 3);
627 }
628
629 void
630 Writer::write (ReferencedReelAsset asset)
631 {
632         _reel_assets.push_back (asset);
633 }
634
635 size_t
636 Writer::video_reel (int frame) const
637 {
638         DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
639         size_t i = 0;
640         while (i < _reels.size() && !_reels[i].period().contains (t)) {
641                 ++i;
642         }
643
644         DCPOMATIC_ASSERT (i < _reels.size ());
645         return i;
646 }
647
648 void
649 Writer::set_digest_progress (Job* job, float progress)
650 {
651         /* I believe this is thread-safe */
652         _digest_progresses[boost::this_thread::get_id()] = progress;
653
654         boost::mutex::scoped_lock lm (_digest_progresses_mutex);
655         float min_progress = FLT_MAX;
656         for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
657                 min_progress = min (min_progress, i->second);
658         }
659
660         job->set_progress (min_progress);
661 }