Work around deadlock when destroying J2KEncoder with a full writer queue (#2784).
[dcpomatic.git] / src / lib / writer.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 #include "audio_buffers.h"
23 #include "audio_mapping.h"
24 #include "compose.hpp"
25 #include "config.h"
26 #include "constants.h"
27 #include "cross.h"
28 #include "dcp_content_type.h"
29 #include "dcp_video.h"
30 #include "dcpomatic_log.h"
31 #include "film.h"
32 #include "film_util.h"
33 #include "job.h"
34 #include "log.h"
35 #include "ratio.h"
36 #include "reel_writer.h"
37 #include "text_content.h"
38 #include "util.h"
39 #include "version.h"
40 #include "writer.h"
41 #include <dcp/cpl.h>
42 #include <dcp/locale_convert.h>
43 #include <dcp/raw_convert.h>
44 #include <dcp/reel_closed_caption_asset.h>
45 #include <dcp/reel_file_asset.h>
46 #include <dcp/reel_subtitle_asset.h>
47 #include <cerrno>
48 #include <cfloat>
49 #include <set>
50
51 #include "i18n.h"
52
53
54 /* OS X strikes again */
55 #undef set_key
56
57
58 using std::cout;
59 using std::dynamic_pointer_cast;
60 using std::make_shared;
61 using std::max;
62 using std::min;
63 using std::shared_ptr;
64 using std::set;
65 using std::string;
66 using std::vector;
67 using std::weak_ptr;
68 using boost::optional;
69 #if BOOST_VERSION >= 106100
70 using namespace boost::placeholders;
71 #endif
72 using dcp::Data;
73 using dcp::ArrayData;
74 using namespace dcpomatic;
75
76
77 /** @param weak_job Job to report progress to, or 0.
78  *  @param text_only true to enable only the text (subtitle/ccap) parts of the writer.
79  */
80 Writer::Writer(weak_ptr<const Film> weak_film, weak_ptr<Job> weak_job, bool text_only)
81         : WeakConstFilm (weak_film)
82         , _job(weak_job)
83         /* These will be reset to sensible values when J2KEncoder is created */
84         , _maximum_frames_in_memory (8)
85         , _maximum_queue_size (8)
86         , _text_only (text_only)
87 {
88         auto job = _job.lock ();
89
90         int reel_index = 0;
91         auto const reels = film()->reels();
92         for (auto p: reels) {
93                 _reels.push_back (ReelWriter(weak_film, p, job, reel_index++, reels.size(), text_only));
94         }
95
96         _last_written.resize (reels.size());
97
98         /* We can keep track of the current audio, subtitle and closed caption reels easily because audio
99            and captions arrive to the Writer in sequence.  This is not so for video.
100         */
101         _audio_reel = _reels.begin ();
102         _subtitle_reel = _reels.begin ();
103         for (auto i: film()->closed_caption_tracks()) {
104                 _caption_reels[i] = _reels.begin ();
105         }
106         _atmos_reel = _reels.begin ();
107
108         /* Check that the signer is OK */
109         string reason;
110         if (!Config::instance()->signer_chain()->valid(&reason)) {
111                 throw InvalidSignerError (reason);
112         }
113 }
114
115
116 void
117 Writer::start ()
118 {
119         if (!_text_only) {
120                 _thread = boost::thread (boost::bind(&Writer::thread, this));
121 #ifdef DCPOMATIC_LINUX
122                 pthread_setname_np (_thread.native_handle(), "writer");
123 #endif
124         }
125 }
126
127
128 Writer::~Writer ()
129 {
130         if (!_text_only) {
131                 terminate_thread (false);
132         }
133 }
134
135
136 /** Pass a video frame to the writer for writing to disk at some point.
137  *  This method can be called with frames out of order.
138  *  @param encoded JPEG2000-encoded data.
139  *  @param frame Frame index within the DCP.
140  *  @param eyes Eyes that this frame image is for.
141  */
142 void
143 Writer::write (shared_ptr<const Data> encoded, Frame frame, Eyes eyes)
144 {
145         boost::mutex::scoped_lock lock (_state_mutex);
146
147         if (_zombie) {
148                 return;
149         }
150
151         while (_queued_full_in_memory > _maximum_frames_in_memory) {
152                 /* There are too many full frames in memory; wake the main writer thread and
153                    wait until it sorts everything out */
154                 _empty_condition.notify_all ();
155                 _full_condition.wait (lock);
156         }
157
158         QueueItem qi;
159         qi.type = QueueItem::Type::FULL;
160         qi.encoded = encoded;
161         qi.reel = video_reel (frame);
162         qi.frame = frame - _reels[qi.reel].start ();
163
164         DCPOMATIC_ASSERT((film()->three_d() && eyes != Eyes::BOTH) || (!film()->three_d() && eyes == Eyes::BOTH));
165
166         qi.eyes = eyes;
167         _queue.push_back(qi);
168         ++_queued_full_in_memory;
169
170         /* Now there's something to do: wake anything wait()ing on _empty_condition */
171         _empty_condition.notify_all ();
172 }
173
174
175 bool
176 Writer::can_repeat (Frame frame) const
177 {
178         return frame > _reels[video_reel(frame)].start();
179 }
180
181
182 /** Repeat the last frame that was written to a reel as a new frame.
183  *  @param frame Frame index within the DCP of the new (repeated) frame.
184  *  @param eyes Eyes that this repeated frame image is for.
185  */
186 void
187 Writer::repeat (Frame frame, Eyes eyes)
188 {
189         boost::mutex::scoped_lock lock (_state_mutex);
190
191         while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
192                 /* The queue is too big, and the main writer thread can run and fix it, so
193                    wake it and wait until it has done.
194                 */
195                 _empty_condition.notify_all ();
196                 _full_condition.wait (lock);
197         }
198
199         QueueItem qi;
200         qi.type = QueueItem::Type::REPEAT;
201         qi.reel = video_reel (frame);
202         qi.frame = frame - _reels[qi.reel].start ();
203         if (film()->three_d() && eyes == Eyes::BOTH) {
204                 qi.eyes = Eyes::LEFT;
205                 _queue.push_back (qi);
206                 qi.eyes = Eyes::RIGHT;
207                 _queue.push_back (qi);
208         } else {
209                 qi.eyes = eyes;
210                 _queue.push_back (qi);
211         }
212
213         /* Now there's something to do: wake anything wait()ing on _empty_condition */
214         _empty_condition.notify_all ();
215 }
216
217
218 void
219 Writer::fake_write (Frame frame, Eyes eyes)
220 {
221         boost::mutex::scoped_lock lock (_state_mutex);
222
223         while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
224                 /* The queue is too big, and the main writer thread can run and fix it, so
225                    wake it and wait until it has done.
226                 */
227                 _empty_condition.notify_all ();
228                 _full_condition.wait (lock);
229         }
230
231         size_t const reel = video_reel (frame);
232         Frame const frame_in_reel = frame - _reels[reel].start ();
233
234         QueueItem qi;
235         qi.type = QueueItem::Type::FAKE;
236
237         {
238                 shared_ptr<InfoFileHandle> info_file = film()->info_file_handle(_reels[reel].period(), true);
239                 qi.size = _reels[reel].read_frame_info(info_file, frame_in_reel, eyes).size;
240         }
241
242         DCPOMATIC_ASSERT((film()->three_d() && eyes != Eyes::BOTH) || (!film()->three_d() && eyes == Eyes::BOTH));
243
244         qi.reel = reel;
245         qi.frame = frame_in_reel;
246         qi.eyes = eyes;
247         _queue.push_back(qi);
248
249         /* Now there's something to do: wake anything wait()ing on _empty_condition */
250         _empty_condition.notify_all ();
251 }
252
253
254 /** Write some audio frames to the DCP.
255  *  @param audio Audio data.
256  *  @param time Time of this data within the DCP.
257  *  This method is not thread safe.
258  */
259 void
260 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
261 {
262         DCPOMATIC_ASSERT (audio);
263
264         int const afr = film()->audio_frame_rate();
265
266         DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
267
268         /* The audio we get might span a reel boundary, and if so we have to write it in bits */
269
270         DCPTime t = time;
271         while (t < end) {
272
273                 if (_audio_reel == _reels.end ()) {
274                         /* This audio is off the end of the last reel; ignore it */
275                         return;
276                 }
277
278                 if (end <= _audio_reel->period().to) {
279                         /* Easy case: we can write all the audio to this reel */
280                         _audio_reel->write (audio);
281                         t = end;
282                 } else if (_audio_reel->period().to <= t) {
283                         /* This reel is entirely before the start of our audio; just skip the reel */
284                         ++_audio_reel;
285                 } else {
286                         /* This audio is over a reel boundary; split the audio into two and write the first part */
287                         DCPTime part_lengths[2] = {
288                                 _audio_reel->period().to - t,
289                                 end - _audio_reel->period().to
290                         };
291
292                         /* Be careful that part_lengths[0] + part_lengths[1] can't be bigger than audio->frames() */
293                         Frame part_frames[2] = {
294                                 part_lengths[0].frames_ceil(afr),
295                                 part_lengths[1].frames_floor(afr)
296                         };
297
298                         DCPOMATIC_ASSERT ((part_frames[0] + part_frames[1]) <= audio->frames());
299
300                         if (part_frames[0]) {
301                                 auto part = make_shared<AudioBuffers>(audio, part_frames[0], 0);
302                                 _audio_reel->write (part);
303                         }
304
305                         if (part_frames[1]) {
306                                 audio = make_shared<AudioBuffers>(audio, part_frames[1], part_frames[0]);
307                         } else {
308                                 audio.reset ();
309                         }
310
311                         ++_audio_reel;
312                         t += part_lengths[0];
313                 }
314         }
315 }
316
317
318 void
319 Writer::write (shared_ptr<const dcp::AtmosFrame> atmos, DCPTime time, AtmosMetadata metadata)
320 {
321         if (_atmos_reel->period().to == time) {
322                 ++_atmos_reel;
323                 DCPOMATIC_ASSERT (_atmos_reel != _reels.end());
324         }
325
326         /* We assume that we get a video frame's worth of data here */
327         _atmos_reel->write (atmos, metadata);
328 }
329
330
331 /** Caller must hold a lock on _state_mutex */
332 bool
333 Writer::have_sequenced_image_at_queue_head ()
334 {
335         if (_queue.empty ()) {
336                 return false;
337         }
338
339         _queue.sort ();
340         auto const & f = _queue.front();
341         return _last_written[f.reel].next(f);
342 }
343
344
345 bool
346 Writer::LastWritten::next (QueueItem qi) const
347 {
348         if (qi.eyes == Eyes::BOTH) {
349                 /* 2D */
350                 return qi.frame == (_frame + 1);
351         }
352
353         /* 3D */
354
355         if (_eyes == Eyes::LEFT && qi.frame == _frame && qi.eyes == Eyes::RIGHT) {
356                 return true;
357         }
358
359         if (_eyes == Eyes::RIGHT && qi.frame == (_frame + 1) && qi.eyes == Eyes::LEFT) {
360                 return true;
361         }
362
363         return false;
364 }
365
366
367 void
368 Writer::LastWritten::update (QueueItem qi)
369 {
370         _frame = qi.frame;
371         _eyes = qi.eyes;
372 }
373
374
375 void
376 Writer::thread ()
377 try
378 {
379         start_of_thread ("Writer");
380
381         while (true)
382         {
383                 boost::mutex::scoped_lock lock (_state_mutex);
384                 if (_zombie) {
385                         return;
386                 }
387
388                 while (true) {
389
390                         if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
391                                 /* We've got something to do: go and do it */
392                                 break;
393                         }
394
395                         /* Nothing to do: wait until something happens which may indicate that we do */
396                         LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
397                         _empty_condition.wait (lock);
398                         LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
399                 }
400
401                 /* We stop here if we have been asked to finish, and if either the queue
402                    is empty or we do not have a sequenced image at its head (if this is the
403                    case we will never terminate as no new frames will be sent once
404                    _finish is true).
405                 */
406                 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
407                         /* (Hopefully temporarily) log anything that was not written */
408                         if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
409                                 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
410                                 for (auto const& i: _queue) {
411                                         if (i.type == QueueItem::Type::FULL) {
412                                                 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i.frame, (int) i.eyes);
413                                         } else {
414                                                 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i.size, i.frame, (int) i.eyes);
415                                         }
416                                 }
417                         }
418                         return;
419                 }
420
421                 /* Write any frames that we can write; i.e. those that are in sequence. */
422                 while (have_sequenced_image_at_queue_head ()) {
423                         auto qi = _queue.front ();
424                         _last_written[qi.reel].update (qi);
425                         _queue.pop_front ();
426                         if (qi.type == QueueItem::Type::FULL && qi.encoded) {
427                                 --_queued_full_in_memory;
428                         }
429
430                         lock.unlock ();
431
432                         auto& reel = _reels[qi.reel];
433
434                         switch (qi.type) {
435                         case QueueItem::Type::FULL:
436                                 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
437                                 if (!qi.encoded) {
438                                         qi.encoded.reset (new ArrayData(film()->j2c_path(qi.reel, qi.frame, qi.eyes, false)));
439                                 }
440                                 reel.write (qi.encoded, qi.frame, qi.eyes);
441                                 ++_full_written;
442                                 break;
443                         case QueueItem::Type::FAKE:
444                                 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
445                                 reel.fake_write (qi.size);
446                                 ++_fake_written;
447                                 break;
448                         case QueueItem::Type::REPEAT:
449                                 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
450                                 reel.repeat_write (qi.frame, qi.eyes);
451                                 ++_repeat_written;
452                                 break;
453                         }
454
455                         lock.lock ();
456                         _full_condition.notify_all ();
457                 }
458
459                 while (_queued_full_in_memory > _maximum_frames_in_memory) {
460                         /* Too many frames in memory which can't yet be written to the stream.
461                            Write some FULL frames to disk.
462                         */
463
464                         /* Find one from the back of the queue */
465                         _queue.sort ();
466                         auto item = _queue.rbegin();
467                         while (item != _queue.rend() && (item->type != QueueItem::Type::FULL || !item->encoded)) {
468                                 ++item;
469                         }
470
471                         DCPOMATIC_ASSERT(item != _queue.rend());
472                         ++_pushed_to_disk;
473                         /* For the log message below */
474                         int const awaiting = _last_written[_queue.front().reel].frame() + 1;
475                         lock.unlock ();
476
477                         /* i is valid here, even though we don't hold a lock on the mutex,
478                            since list iterators are unaffected by insertion and only this
479                            thread could erase the last item in the list.
480                         */
481
482                         LOG_GENERAL("Writer full; pushes %1 to disk while awaiting %2", item->frame, awaiting);
483
484                         item->encoded->write_via_temp(
485                                 film()->j2c_path(item->reel, item->frame, item->eyes, true),
486                                 film()->j2c_path(item->reel, item->frame, item->eyes, false)
487                                 );
488
489                         lock.lock ();
490                         item->encoded.reset();
491                         --_queued_full_in_memory;
492                         _full_condition.notify_all ();
493                 }
494         }
495 }
496 catch (...)
497 {
498         store_current ();
499 }
500
501
502 void
503 Writer::terminate_thread (bool can_throw)
504 {
505         boost::this_thread::disable_interruption dis;
506
507         boost::mutex::scoped_lock lock (_state_mutex);
508
509         _finish = true;
510         _empty_condition.notify_all ();
511         _full_condition.notify_all ();
512         lock.unlock ();
513
514         try {
515                 _thread.join ();
516         } catch (...) {}
517
518         if (can_throw) {
519                 rethrow ();
520         }
521 }
522
523
524 void
525 Writer::calculate_digests ()
526 {
527         auto job = _job.lock ();
528         if (job) {
529                 job->sub (_("Computing digests"));
530         }
531
532         boost::asio::io_service service;
533         boost::thread_group pool;
534
535         auto work = make_shared<boost::asio::io_service::work>(service);
536
537         int const threads = max (1, Config::instance()->master_encoding_threads());
538
539         for (int i = 0; i < threads; ++i) {
540                 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
541         }
542
543         std::function<void (int, int64_t, int64_t)> set_progress;
544         if (job) {
545                 set_progress = boost::bind(&Writer::set_digest_progress, this, job.get(), _1, _2, _3);
546         } else {
547                 set_progress = [](int, int64_t, int64_t) {
548                         boost::this_thread::interruption_point();
549                 };
550         }
551
552         int index = 0;
553
554         for (auto& i: _reels) {
555                 service.post(
556                         boost::bind(
557                                 &ReelWriter::calculate_digests,
558                                 &i,
559                                 std::function<void (int64_t, int64_t)>(boost::bind(set_progress, index, _1, _2))
560                                 ));
561                 ++index;
562         }
563         service.post(
564                 boost::bind(
565                         &Writer::calculate_referenced_digests,
566                         this,
567                         std::function<void (int64_t, int64_t)>(boost::bind(set_progress, index, _1, _2))
568                         ));
569
570         work.reset ();
571
572         try {
573                 pool.join_all ();
574         } catch (boost::thread_interrupted) {
575                 /* join_all was interrupted, so we need to interrupt the threads
576                  * in our pool then try again to join them.
577                  */
578                 pool.interrupt_all ();
579                 pool.join_all ();
580         }
581
582         service.stop ();
583 }
584
585
586 /** @param output_dcp Path to DCP folder to write */
587 void
588 Writer::finish (boost::filesystem::path output_dcp)
589 {
590         if (_thread.joinable()) {
591                 LOG_GENERAL_NC ("Terminating writer thread");
592                 terminate_thread (true);
593         }
594
595         LOG_GENERAL_NC ("Finishing ReelWriters");
596
597         for (auto& reel: _reels) {
598                 write_hanging_text(reel);
599                 reel.finish(output_dcp);
600         }
601
602         LOG_GENERAL_NC ("Writing XML");
603
604         dcp::DCP dcp (output_dcp);
605
606         auto cpl = make_shared<dcp::CPL>(
607                 film()->dcp_name(),
608                 film()->dcp_content_type()->libdcp_kind(),
609                 film()->interop() ? dcp::Standard::INTEROP : dcp::Standard::SMPTE
610                 );
611
612         dcp.add (cpl);
613
614         calculate_digests ();
615
616         /* Add reels */
617
618         for (auto& i: _reels) {
619                 cpl->add(i.create_reel(_reel_assets, output_dcp, _have_subtitles, _have_closed_captions));
620         }
621
622         /* Add metadata */
623
624         auto creator = Config::instance()->dcp_creator();
625         if (creator.empty()) {
626                 creator = String::compose("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
627         }
628
629         auto issuer = Config::instance()->dcp_issuer();
630         if (issuer.empty()) {
631                 issuer = String::compose("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
632         }
633
634         cpl->set_creator (creator);
635         cpl->set_issuer (issuer);
636
637         cpl->set_ratings (film()->ratings());
638
639         vector<dcp::ContentVersion> cv;
640         for (auto i: film()->content_versions()) {
641                 /* Make sure we don't end up writing an empty <LabelText> node as some validators
642                  * complain about that.
643                  */
644                 cv.push_back(!i.empty() ? dcp::ContentVersion(i) : dcp::ContentVersion("1"));
645         }
646         if (cv.empty()) {
647                 cv = { dcp::ContentVersion("1") };
648         }
649         cpl->set_content_versions (cv);
650
651         cpl->set_full_content_title_text (film()->name());
652         cpl->set_full_content_title_text_language (film()->name_language());
653         if (film()->release_territory()) {
654                 cpl->set_release_territory (*film()->release_territory());
655         }
656         cpl->set_version_number (film()->version_number());
657         cpl->set_status (film()->status());
658         if (film()->chain()) {
659                 cpl->set_chain (*film()->chain());
660         }
661         if (film()->distributor()) {
662                 cpl->set_distributor (*film()->distributor());
663         }
664         if (film()->facility()) {
665                 cpl->set_facility (*film()->facility());
666         }
667         if (film()->luminance()) {
668                 cpl->set_luminance (*film()->luminance());
669         }
670         if (film()->sign_language_video_language()) {
671                 cpl->set_sign_language_video_language (*film()->sign_language_video_language());
672         }
673
674         dcp::MCASoundField field;
675         if (channel_is_mapped(film(), dcp::Channel::BSL) || channel_is_mapped(film(), dcp::Channel::BSR)) {
676                 field = dcp::MCASoundField::SEVEN_POINT_ONE;
677         } else {
678                 field = dcp::MCASoundField::FIVE_POINT_ONE;
679         }
680
681         auto const audio_channels = film()->audio_channels();
682         dcp::MainSoundConfiguration msc(field, audio_channels);
683         for (auto i: film()->mapped_audio_channels()) {
684                 if (i < audio_channels) {
685                         msc.set_mapping(i, static_cast<dcp::Channel>(i));
686                 }
687         }
688
689         cpl->set_main_sound_configuration(msc);
690         cpl->set_main_sound_sample_rate (film()->audio_frame_rate());
691         cpl->set_main_picture_stored_area (film()->frame_size());
692
693         auto active_area = film()->active_area();
694         if (active_area.width > 0 && active_area.height > 0) {
695                 /* It's not allowed to have a zero active area width or height, and the sizes must be multiples of 2 */
696                 cpl->set_main_picture_active_area({ active_area.width & ~1, active_area.height & ~1});
697         }
698
699         auto sl = film()->subtitle_languages().second;
700         if (!sl.empty()) {
701                 cpl->set_additional_subtitle_languages(sl);
702         }
703
704         auto signer = Config::instance()->signer_chain();
705         /* We did check earlier, but check again here to be on the safe side */
706         string reason;
707         if (!signer->valid (&reason)) {
708                 throw InvalidSignerError (reason);
709         }
710
711         dcp.set_issuer(issuer);
712         dcp.set_creator(creator);
713         dcp.set_annotation_text(film()->dcp_name());
714
715         dcp.write_xml(signer, !film()->limit_to_smpte_bv20(), Config::instance()->dcp_metadata_filename_format());
716
717         LOG_GENERAL (
718                 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
719                 );
720
721         write_cover_sheet (output_dcp);
722 }
723
724
725 void
726 Writer::write_cover_sheet (boost::filesystem::path output_dcp)
727 {
728         auto const cover = film()->file("COVER_SHEET.txt");
729         dcp::File file(cover, "w");
730         if (!file) {
731                 throw OpenFileError (cover, errno, OpenFileError::WRITE);
732         }
733
734         auto text = Config::instance()->cover_sheet ();
735         boost::algorithm::replace_all (text, "$CPL_NAME", film()->name());
736         auto cpls = film()->cpls();
737         if (!cpls.empty()) {
738                 boost::algorithm::replace_all (text, "$CPL_FILENAME", cpls[0].cpl_file.filename().string());
739         }
740         boost::algorithm::replace_all (text, "$TYPE", film()->dcp_content_type()->pretty_name());
741         boost::algorithm::replace_all (text, "$CONTAINER", film()->container()->container_nickname());
742
743         auto audio_language = film()->audio_language();
744         if (audio_language) {
745                 boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", audio_language->description());
746         } else {
747                 boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", _("None"));
748         }
749
750         auto subtitle_languages = film()->subtitle_languages();
751         if (subtitle_languages.first) {
752                 boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", subtitle_languages.first->description());
753         } else {
754                 boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", _("None"));
755         }
756
757         boost::uintmax_t size = 0;
758         for (
759                 auto i = dcp::filesystem::recursive_directory_iterator(output_dcp);
760                 i != dcp::filesystem::recursive_directory_iterator();
761                 ++i) {
762                 if (dcp::filesystem::is_regular_file(i->path())) {
763                         size += dcp::filesystem::file_size(i->path());
764                 }
765         }
766
767         if (size > (1000000000L)) {
768                 boost::algorithm::replace_all (text, "$SIZE", String::compose("%1GB", dcp::locale_convert<string>(size / 1000000000.0, 1, true)));
769         } else {
770                 boost::algorithm::replace_all (text, "$SIZE", String::compose("%1MB", dcp::locale_convert<string>(size / 1000000.0, 1, true)));
771         }
772
773         auto ch = audio_channel_types (film()->mapped_audio_channels(), film()->audio_channels());
774         auto description = String::compose("%1.%2", ch.first, ch.second);
775
776         if (description == "0.0") {
777                 description = _("None");
778         } else if (description == "1.0") {
779                 description = _("Mono");
780         } else if (description == "2.0") {
781                 description = _("Stereo");
782         }
783         boost::algorithm::replace_all (text, "$AUDIO", description);
784
785         auto const hmsf = film()->length().split(film()->video_frame_rate());
786         string length;
787         if (hmsf.h == 0 && hmsf.m == 0) {
788                 length = String::compose("%1s", hmsf.s);
789         } else if (hmsf.h == 0 && hmsf.m > 0) {
790                 length = String::compose("%1m%2s", hmsf.m, hmsf.s);
791         } else if (hmsf.h > 0 && hmsf.m > 0) {
792                 length = String::compose("%1h%2m%3s", hmsf.h, hmsf.m, hmsf.s);
793         }
794
795         boost::algorithm::replace_all (text, "$LENGTH", length);
796
797         file.checked_write(text.c_str(), text.length());
798 }
799
800
801 /** @param frame Frame index within the whole DCP.
802  *  @return true if we can fake-write this frame.
803  */
804 bool
805 Writer::can_fake_write (Frame frame) const
806 {
807         if (film()->encrypted()) {
808                 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
809                 return false;
810         }
811
812         /* We have to do a proper write of the first frame so that we can set up the JPEG2000
813            parameters in the asset writer.
814         */
815
816         auto const & reel = _reels[video_reel(frame)];
817
818         /* Make frame relative to the start of the reel */
819         frame -= reel.start ();
820         return (frame != 0 && frame < reel.first_nonexistent_frame());
821 }
822
823
824 /** @param track Closed caption track if type == TextType::CLOSED_CAPTION */
825 void
826 Writer::write (PlayerText text, TextType type, optional<DCPTextTrack> track, DCPTimePeriod period)
827 {
828         vector<ReelWriter>::iterator* reel = nullptr;
829
830         switch (type) {
831         case TextType::OPEN_SUBTITLE:
832                 reel = &_subtitle_reel;
833                 _have_subtitles = true;
834                 break;
835         case TextType::CLOSED_CAPTION:
836                 DCPOMATIC_ASSERT (track);
837                 DCPOMATIC_ASSERT (_caption_reels.find(*track) != _caption_reels.end());
838                 reel = &_caption_reels[*track];
839                 _have_closed_captions.insert (*track);
840                 break;
841         default:
842                 DCPOMATIC_ASSERT (false);
843         }
844
845         DCPOMATIC_ASSERT (*reel != _reels.end());
846         while ((*reel)->period().to <= period.from) {
847                 ++(*reel);
848                 DCPOMATIC_ASSERT (*reel != _reels.end());
849                 write_hanging_text (**reel);
850         }
851
852         auto back_off = [this](DCPTimePeriod period) {
853                 period.to -= DCPTime::from_frames(2, film()->video_frame_rate());
854                 return period;
855         };
856
857         if (period.to > (*reel)->period().to) {
858                 /* This text goes off the end of the reel.  Store parts of it that should go into
859                  * other reels.
860                  */
861                 for (auto i = std::next(*reel); i != _reels.end(); ++i) {
862                         auto overlap = i->period().overlap(period);
863                         if (overlap) {
864                                 _hanging_texts.push_back (HangingText{text, type, track, back_off(*overlap)});
865                         }
866                 }
867                 /* Back off from the reel boundary by a couple of frames to avoid tripping checks
868                  * for subtitles being too close together.
869                  */
870                 period.to = (*reel)->period().to;
871                 period = back_off(period);
872         }
873
874         (*reel)->write(text, type, track, period, _fonts, _chosen_interop_font);
875 }
876
877
878 void
879 Writer::write (vector<shared_ptr<Font>> fonts)
880 {
881         if (fonts.empty()) {
882                 return;
883         }
884
885         /* Fonts may come in with empty IDs but we don't want to put those in the DCP */
886         auto fix_id = [](string id) {
887                 return id.empty() ? "font" : id;
888         };
889
890         if (film()->interop()) {
891                 /* Interop will ignore second and subsequent <LoadFont>s so we don't want to
892                  * even write them as they upset some validators.  Set up _fonts so that every
893                  * font used by any subtitle will be written with the same ID.
894                  */
895                 for (size_t i = 0; i < fonts.size(); ++i) {
896                         _fonts.put(fonts[i], fix_id(fonts[0]->id()));
897                 }
898                 _chosen_interop_font = fonts[0];
899         } else {
900                 for (auto font: fonts) {
901                         _fonts.put(font, fix_id(font->id()));
902                 }
903         }
904 }
905
906
907 bool
908 operator< (QueueItem const & a, QueueItem const & b)
909 {
910         if (a.reel != b.reel) {
911                 return a.reel < b.reel;
912         }
913
914         if (a.frame != b.frame) {
915                 return a.frame < b.frame;
916         }
917
918         return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
919 }
920
921
922 bool
923 operator== (QueueItem const & a, QueueItem const & b)
924 {
925         return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
926 }
927
928
929 void
930 Writer::set_encoder_threads (int threads)
931 {
932         boost::mutex::scoped_lock lm (_state_mutex);
933         _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
934         _maximum_queue_size = threads * 16;
935 }
936
937
938 void
939 Writer::write (ReferencedReelAsset asset)
940 {
941         _reel_assets.push_back (asset);
942
943         if (dynamic_pointer_cast<dcp::ReelSubtitleAsset>(asset.asset)) {
944                 _have_subtitles = true;
945         } else if (auto ccap = dynamic_pointer_cast<dcp::ReelClosedCaptionAsset>(asset.asset)) {
946                 /* This feels quite fragile. We have a referenced reel and want to know if it's
947                  * part of a given closed-caption track so that we can fill if it has any
948                  * missing reels.  I guess for that purpose almost any DCPTextTrack values are
949                  * fine so long as they are consistent.
950                  */
951                 DCPTextTrack track;
952                 track.name = ccap->annotation_text().get_value_or("");
953                 track.language = dcp::LanguageTag(ccap->language().get_value_or("en-US"));
954                 if (_have_closed_captions.find(track) == _have_closed_captions.end()) {
955                         _have_closed_captions.insert(track);
956                 }
957         }
958 }
959
960
961 size_t
962 Writer::video_reel (int frame) const
963 {
964         auto t = DCPTime::from_frames (frame, film()->video_frame_rate());
965         size_t reel_index = 0;
966         while (reel_index < _reels.size() && !_reels[reel_index].period().contains(t)) {
967                 ++reel_index;
968         }
969
970         DCPOMATIC_ASSERT(reel_index < _reels.size ());
971         return reel_index;
972 }
973
974
975 /** Update job progress with information about the progress of a single digest calculation
976  *  thread.
977  *  @param id Unique identifier for the thread whose progress has changed.
978  *  @param done Number of bytes that this thread has processed.
979  *  @param size Total number of bytes that this thread must process.
980  */
981 void
982 Writer::set_digest_progress(Job* job, int id, int64_t done, int64_t size)
983 {
984         boost::mutex::scoped_lock lm (_digest_progresses_mutex);
985
986         /* Update the progress for this thread */
987         _digest_progresses[id] = std::make_pair(done, size);
988
989         /* Get the total progress across all threads and use it to set job progress */
990         int64_t total_done = 0;
991         int64_t total_size = 0;
992         for (auto const& i: _digest_progresses) {
993                 total_done += i.second.first;
994                 total_size += i.second.second;
995         }
996
997         job->set_progress(float(total_done) / total_size);
998
999         Waker waker;
1000         waker.nudge ();
1001
1002         boost::this_thread::interruption_point();
1003 }
1004
1005
1006 /** Calculate hashes for any referenced MXF assets which do not already have one */
1007 void
1008 Writer::calculate_referenced_digests(std::function<void (int64_t, int64_t)> set_progress)
1009 try
1010 {
1011         int64_t total_size = 0;
1012         for (auto const& i: _reel_assets) {
1013                 auto file = dynamic_pointer_cast<dcp::ReelFileAsset>(i.asset);
1014                 if (file && !file->hash()) {
1015                         auto filename = file->asset_ref().asset()->file();
1016                         DCPOMATIC_ASSERT(filename);
1017                         total_size += boost::filesystem::file_size(*filename);
1018                 }
1019         }
1020
1021         int64_t total_done = 0;
1022         for (auto const& i: _reel_assets) {
1023                 auto file = dynamic_pointer_cast<dcp::ReelFileAsset>(i.asset);
1024                 if (file && !file->hash()) {
1025                         file->asset_ref().asset()->hash([&total_done, total_size, set_progress](int64_t done, int64_t) {
1026                                 set_progress(total_done + done, total_size);
1027                         });
1028                         total_done += boost::filesystem::file_size(*file->asset_ref().asset()->file());
1029                         file->set_hash (file->asset_ref().asset()->hash());
1030                 }
1031         }
1032 } catch (boost::thread_interrupted) {
1033         /* set_progress contains an interruption_point, so any of these methods
1034          * may throw thread_interrupted, at which point we just give up.
1035          */
1036 }
1037
1038
1039 void
1040 Writer::write_hanging_text (ReelWriter& reel)
1041 {
1042         vector<HangingText> new_hanging_texts;
1043         for (auto i: _hanging_texts) {
1044                 if (i.period.from == reel.period().from) {
1045                         reel.write(i.text, i.type, i.track, i.period, _fonts, _chosen_interop_font);
1046                 } else {
1047                         new_hanging_texts.push_back (i);
1048                 }
1049         }
1050         _hanging_texts = new_hanging_texts;
1051 }
1052
1053
1054 /** Set the writer so that it has no queue and drops any pending or future requests to write images */
1055 void
1056 Writer::zombify()
1057 {
1058         boost::mutex::scoped_lock lock(_state_mutex);
1059
1060         _queue.clear();
1061         _queued_full_in_memory = 0;
1062         _zombie = true;
1063         _full_condition.notify_all();
1064 }
1065