Yet more waking (in hash computation).
[dcpomatic.git] / src / lib / writer.cc
1 /*
2     Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #include "writer.h"
22 #include "compose.hpp"
23 #include "film.h"
24 #include "ratio.h"
25 #include "log.h"
26 #include "dcpomatic_log.h"
27 #include "dcp_video.h"
28 #include "dcp_content_type.h"
29 #include "audio_mapping.h"
30 #include "config.h"
31 #include "job.h"
32 #include "cross.h"
33 #include "audio_buffers.h"
34 #include "version.h"
35 #include "font.h"
36 #include "util.h"
37 #include "reel_writer.h"
38 #include "text_content.h"
39 #include <dcp/cpl.h>
40 #include <dcp/locale_convert.h>
41 #include <boost/foreach.hpp>
42 #include <fstream>
43 #include <cerrno>
44 #include <iostream>
45 #include <cfloat>
46
47 #include "i18n.h"
48
49 /* OS X strikes again */
50 #undef set_key
51
52 using std::make_pair;
53 using std::pair;
54 using std::string;
55 using std::list;
56 using std::cout;
57 using std::map;
58 using std::min;
59 using std::max;
60 using std::vector;
61 using boost::shared_ptr;
62 using boost::weak_ptr;
63 using boost::dynamic_pointer_cast;
64 using boost::optional;
65 using dcp::Data;
66 using namespace dcpomatic;
67
68 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
69         : _film (film)
70         , _job (j)
71         , _thread (0)
72         , _finish (false)
73         , _queued_full_in_memory (0)
74         /* These will be reset to sensible values when J2KEncoder is created */
75         , _maximum_frames_in_memory (8)
76         , _maximum_queue_size (8)
77         , _full_written (0)
78         , _fake_written (0)
79         , _repeat_written (0)
80         , _pushed_to_disk (0)
81 {
82         shared_ptr<Job> job = _job.lock ();
83         DCPOMATIC_ASSERT (job);
84
85         int reel_index = 0;
86         list<DCPTimePeriod> const reels = _film->reels ();
87         BOOST_FOREACH (DCPTimePeriod p, reels) {
88                 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
89         }
90
91         /* We can keep track of the current audio, subtitle and closed caption reels easily because audio
92            and captions arrive to the Writer in sequence.  This is not so for video.
93         */
94         _audio_reel = _reels.begin ();
95         _subtitle_reel = _reels.begin ();
96         BOOST_FOREACH (DCPTextTrack i, _film->closed_caption_tracks()) {
97                 _caption_reels[i] = _reels.begin ();
98         }
99
100         /* Check that the signer is OK if we need one */
101         string reason;
102         if (_film->is_signed() && !Config::instance()->signer_chain()->valid(&reason)) {
103                 throw InvalidSignerError (reason);
104         }
105 }
106
107 void
108 Writer::start ()
109 {
110         _thread = new boost::thread (boost::bind (&Writer::thread, this));
111 #ifdef DCPOMATIC_LINUX
112         pthread_setname_np (_thread->native_handle(), "writer");
113 #endif
114 }
115
116 Writer::~Writer ()
117 {
118         terminate_thread (false);
119 }
120
121 /** Pass a video frame to the writer for writing to disk at some point.
122  *  This method can be called with frames out of order.
123  *  @param encoded JPEG2000-encoded data.
124  *  @param frame Frame index within the DCP.
125  *  @param eyes Eyes that this frame image is for.
126  */
127 void
128 Writer::write (Data encoded, Frame frame, Eyes eyes)
129 {
130         boost::mutex::scoped_lock lock (_state_mutex);
131
132         while (_queued_full_in_memory > _maximum_frames_in_memory) {
133                 /* There are too many full frames in memory; wake the main writer thread and
134                    wait until it sorts everything out */
135                 _empty_condition.notify_all ();
136                 _full_condition.wait (lock);
137         }
138
139         QueueItem qi;
140         qi.type = QueueItem::FULL;
141         qi.encoded = encoded;
142         qi.reel = video_reel (frame);
143         qi.frame = frame - _reels[qi.reel].start ();
144
145         if (_film->three_d() && eyes == EYES_BOTH) {
146                 /* 2D material in a 3D DCP; fake the 3D */
147                 qi.eyes = EYES_LEFT;
148                 _queue.push_back (qi);
149                 ++_queued_full_in_memory;
150                 qi.eyes = EYES_RIGHT;
151                 _queue.push_back (qi);
152                 ++_queued_full_in_memory;
153         } else {
154                 qi.eyes = eyes;
155                 _queue.push_back (qi);
156                 ++_queued_full_in_memory;
157         }
158
159         /* Now there's something to do: wake anything wait()ing on _empty_condition */
160         _empty_condition.notify_all ();
161 }
162
163 bool
164 Writer::can_repeat (Frame frame) const
165 {
166         return frame > _reels[video_reel(frame)].start();
167 }
168
169 /** Repeat the last frame that was written to a reel as a new frame.
170  *  @param frame Frame index within the DCP of the new (repeated) frame.
171  *  @param eyes Eyes that this repeated frame image is for.
172  */
173 void
174 Writer::repeat (Frame frame, Eyes eyes)
175 {
176         boost::mutex::scoped_lock lock (_state_mutex);
177
178         while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
179                 /* The queue is too big, and the main writer thread can run and fix it, so
180                    wake it and wait until it has done.
181                 */
182                 _empty_condition.notify_all ();
183                 _full_condition.wait (lock);
184         }
185
186         QueueItem qi;
187         qi.type = QueueItem::REPEAT;
188         qi.reel = video_reel (frame);
189         qi.frame = frame - _reels[qi.reel].start ();
190         if (_film->three_d() && eyes == EYES_BOTH) {
191                 qi.eyes = EYES_LEFT;
192                 _queue.push_back (qi);
193                 qi.eyes = EYES_RIGHT;
194                 _queue.push_back (qi);
195         } else {
196                 qi.eyes = eyes;
197                 _queue.push_back (qi);
198         }
199
200         /* Now there's something to do: wake anything wait()ing on _empty_condition */
201         _empty_condition.notify_all ();
202 }
203
204 void
205 Writer::fake_write (Frame frame, Eyes eyes)
206 {
207         boost::mutex::scoped_lock lock (_state_mutex);
208
209         while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
210                 /* The queue is too big, and the main writer thread can run and fix it, so
211                    wake it and wait until it has done.
212                 */
213                 _empty_condition.notify_all ();
214                 _full_condition.wait (lock);
215         }
216
217         size_t const reel = video_reel (frame);
218         Frame const reel_frame = frame - _reels[reel].start ();
219
220         QueueItem qi;
221         qi.type = QueueItem::FAKE;
222
223         {
224                 shared_ptr<InfoFileHandle> info_file = _film->info_file_handle(_reels[reel].period(), true);
225                 qi.size = _reels[reel].read_frame_info(info_file, reel_frame, eyes).size;
226         }
227
228         qi.reel = reel;
229         qi.frame = reel_frame;
230         if (_film->three_d() && eyes == EYES_BOTH) {
231                 qi.eyes = EYES_LEFT;
232                 _queue.push_back (qi);
233                 qi.eyes = EYES_RIGHT;
234                 _queue.push_back (qi);
235         } else {
236                 qi.eyes = eyes;
237                 _queue.push_back (qi);
238         }
239
240         /* Now there's something to do: wake anything wait()ing on _empty_condition */
241         _empty_condition.notify_all ();
242 }
243
244 /** Write some audio frames to the DCP.
245  *  @param audio Audio data.
246  *  @param time Time of this data within the DCP.
247  *  This method is not thread safe.
248  */
249 void
250 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
251 {
252         DCPOMATIC_ASSERT (audio);
253
254         int const afr = _film->audio_frame_rate();
255
256         DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
257
258         /* The audio we get might span a reel boundary, and if so we have to write it in bits */
259
260         DCPTime t = time;
261         while (t < end) {
262
263                 if (_audio_reel == _reels.end ()) {
264                         /* This audio is off the end of the last reel; ignore it */
265                         return;
266                 }
267
268                 if (end <= _audio_reel->period().to) {
269                         /* Easy case: we can write all the audio to this reel */
270                         _audio_reel->write (audio);
271                         t = end;
272                 } else if (_audio_reel->period().to <= t) {
273                         /* This reel is entirely before the start of our audio; just skip the reel */
274                         ++_audio_reel;
275                 } else {
276                         /* This audio is over a reel boundary; split the audio into two and write the first part */
277                         DCPTime part_lengths[2] = {
278                                 _audio_reel->period().to - t,
279                                 end - _audio_reel->period().to
280                         };
281
282                         Frame part_frames[2] = {
283                                 part_lengths[0].frames_ceil(afr),
284                                 part_lengths[1].frames_ceil(afr)
285                         };
286
287                         if (part_frames[0]) {
288                                 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[0]));
289                                 part->copy_from (audio.get(), part_frames[0], 0, 0);
290                                 _audio_reel->write (part);
291                         }
292
293                         if (part_frames[1]) {
294                                 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[1]));
295                                 part->copy_from (audio.get(), part_frames[1], part_frames[0], 0);
296                                 audio = part;
297                         } else {
298                                 audio.reset ();
299                         }
300
301                         ++_audio_reel;
302                         t += part_lengths[0];
303                 }
304         }
305 }
306
307 /** This must be called from Writer::thread() with an appropriate lock held */
308 bool
309 Writer::have_sequenced_image_at_queue_head ()
310 {
311         if (_queue.empty ()) {
312                 return false;
313         }
314
315         _queue.sort ();
316
317         QueueItem const & f = _queue.front();
318         ReelWriter const & reel = _reels[f.reel];
319
320         /* The queue should contain only EYES_LEFT/EYES_RIGHT pairs or EYES_BOTH */
321
322         if (f.eyes == EYES_BOTH) {
323                 /* 2D */
324                 return f.frame == (reel.last_written_video_frame() + 1);
325         }
326
327         /* 3D */
328
329         if (reel.last_written_eyes() == EYES_LEFT && f.frame == reel.last_written_video_frame() && f.eyes == EYES_RIGHT) {
330                 return true;
331         }
332
333         if (reel.last_written_eyes() == EYES_RIGHT && f.frame == (reel.last_written_video_frame() + 1) && f.eyes == EYES_LEFT) {
334                 return true;
335         }
336
337         return false;
338 }
339
340 void
341 Writer::thread ()
342 try
343 {
344         while (true)
345         {
346                 boost::mutex::scoped_lock lock (_state_mutex);
347
348                 while (true) {
349
350                         if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
351                                 /* We've got something to do: go and do it */
352                                 break;
353                         }
354
355                         /* Nothing to do: wait until something happens which may indicate that we do */
356                         LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
357                         _empty_condition.wait (lock);
358                         LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
359                 }
360
361                 if (_finish && _queue.empty()) {
362                         return;
363                 }
364
365                 /* We stop here if we have been asked to finish, and if either the queue
366                    is empty or we do not have a sequenced image at its head (if this is the
367                    case we will never terminate as no new frames will be sent once
368                    _finish is true).
369                 */
370                 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
371                         /* (Hopefully temporarily) log anything that was not written */
372                         if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
373                                 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
374                                 for (list<QueueItem>::const_iterator i = _queue.begin(); i != _queue.end(); ++i) {
375                                         if (i->type == QueueItem::FULL) {
376                                                 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i->frame, (int) i->eyes);
377                                         } else {
378                                                 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i->size, i->frame, (int) i->eyes);
379                                         }
380                                 }
381                         }
382                         return;
383                 }
384
385                 /* Write any frames that we can write; i.e. those that are in sequence. */
386                 while (have_sequenced_image_at_queue_head ()) {
387                         QueueItem qi = _queue.front ();
388                         _queue.pop_front ();
389                         if (qi.type == QueueItem::FULL && qi.encoded) {
390                                 --_queued_full_in_memory;
391                         }
392
393                         lock.unlock ();
394
395                         ReelWriter& reel = _reels[qi.reel];
396
397                         switch (qi.type) {
398                         case QueueItem::FULL:
399                                 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
400                                 if (!qi.encoded) {
401                                         qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
402                                 }
403                                 reel.write (qi.encoded, qi.frame, qi.eyes);
404                                 ++_full_written;
405                                 break;
406                         case QueueItem::FAKE:
407                                 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
408                                 reel.fake_write (qi.frame, qi.eyes, qi.size);
409                                 ++_fake_written;
410                                 break;
411                         case QueueItem::REPEAT:
412                                 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
413                                 reel.repeat_write (qi.frame, qi.eyes);
414                                 ++_repeat_written;
415                                 break;
416                         }
417
418                         lock.lock ();
419                         _full_condition.notify_all ();
420                 }
421
422                 while (_queued_full_in_memory > _maximum_frames_in_memory) {
423                         /* Too many frames in memory which can't yet be written to the stream.
424                            Write some FULL frames to disk.
425                         */
426
427                         /* Find one from the back of the queue */
428                         _queue.sort ();
429                         list<QueueItem>::reverse_iterator i = _queue.rbegin ();
430                         while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
431                                 ++i;
432                         }
433
434                         DCPOMATIC_ASSERT (i != _queue.rend());
435                         ++_pushed_to_disk;
436                         /* For the log message below */
437                         int const awaiting = _reels[_queue.front().reel].last_written_video_frame() + 1;
438                         lock.unlock ();
439
440                         /* i is valid here, even though we don't hold a lock on the mutex,
441                            since list iterators are unaffected by insertion and only this
442                            thread could erase the last item in the list.
443                         */
444
445                         LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
446
447                         i->encoded->write_via_temp (
448                                 _film->j2c_path (i->reel, i->frame, i->eyes, true),
449                                 _film->j2c_path (i->reel, i->frame, i->eyes, false)
450                                 );
451
452                         lock.lock ();
453                         i->encoded.reset ();
454                         --_queued_full_in_memory;
455                         _full_condition.notify_all ();
456                 }
457         }
458 }
459 catch (...)
460 {
461         store_current ();
462 }
463
464 void
465 Writer::terminate_thread (bool can_throw)
466 {
467         boost::mutex::scoped_lock lock (_state_mutex);
468         if (_thread == 0) {
469                 return;
470         }
471
472         _finish = true;
473         _empty_condition.notify_all ();
474         _full_condition.notify_all ();
475         lock.unlock ();
476
477         if (_thread->joinable ()) {
478                 _thread->join ();
479         }
480
481         if (can_throw) {
482                 rethrow ();
483         }
484
485         delete _thread;
486         _thread = 0;
487 }
488
489 void
490 Writer::finish ()
491 {
492         if (!_thread) {
493                 return;
494         }
495
496         LOG_GENERAL_NC ("Terminating writer thread");
497
498         terminate_thread (true);
499
500         LOG_GENERAL_NC ("Finishing ReelWriters");
501
502         BOOST_FOREACH (ReelWriter& i, _reels) {
503                 i.finish ();
504         }
505
506         LOG_GENERAL_NC ("Writing XML");
507
508         dcp::DCP dcp (_film->dir (_film->dcp_name()));
509
510         shared_ptr<dcp::CPL> cpl (
511                 new dcp::CPL (
512                         _film->dcp_name(),
513                         _film->dcp_content_type()->libdcp_kind ()
514                         )
515                 );
516
517         dcp.add (cpl);
518
519         /* Calculate digests for each reel in parallel */
520
521         shared_ptr<Job> job = _job.lock ();
522         job->sub (_("Computing digests"));
523
524         boost::asio::io_service service;
525         boost::thread_group pool;
526
527         shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
528
529         int const threads = max (1, Config::instance()->master_encoding_threads ());
530
531         for (int i = 0; i < threads; ++i) {
532                 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
533         }
534
535         BOOST_FOREACH (ReelWriter& i, _reels) {
536                 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
537                 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
538         }
539
540         work.reset ();
541         pool.join_all ();
542         service.stop ();
543
544         /* Add reels to CPL */
545
546         BOOST_FOREACH (ReelWriter& i, _reels) {
547                 cpl->add (i.create_reel (_reel_assets, _fonts));
548         }
549
550         dcp::XMLMetadata meta;
551         meta.annotation_text = cpl->annotation_text ();
552         meta.creator = Config::instance()->dcp_creator ();
553         if (meta.creator.empty ()) {
554                 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
555         }
556         meta.issuer = Config::instance()->dcp_issuer ();
557         if (meta.issuer.empty ()) {
558                 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
559         }
560         meta.set_issue_date_now ();
561
562         cpl->set_metadata (meta);
563         cpl->set_ratings (vector_to_list(_film->ratings()));
564
565         shared_ptr<const dcp::CertificateChain> signer;
566         if (_film->is_signed ()) {
567                 signer = Config::instance()->signer_chain ();
568                 /* We did check earlier, but check again here to be on the safe side */
569                 string reason;
570                 if (!signer->valid (&reason)) {
571                         throw InvalidSignerError (reason);
572                 }
573         }
574
575         dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer, Config::instance()->dcp_metadata_filename_format());
576
577         LOG_GENERAL (
578                 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
579                 );
580
581         write_cover_sheet ();
582 }
583
584 void
585 Writer::write_cover_sheet ()
586 {
587         boost::filesystem::path const cover = _film->file ("COVER_SHEET.txt");
588         FILE* f = fopen_boost (cover, "w");
589         if (!f) {
590                 throw OpenFileError (cover, errno, OpenFileError::WRITE);
591         }
592
593         string text = Config::instance()->cover_sheet ();
594         boost::algorithm::replace_all (text, "$CPL_NAME", _film->name());
595         boost::algorithm::replace_all (text, "$TYPE", _film->dcp_content_type()->pretty_name());
596         boost::algorithm::replace_all (text, "$CONTAINER", _film->container()->container_nickname());
597         boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", _film->isdcf_metadata().audio_language);
598
599         optional<string> subtitle_language;
600         BOOST_FOREACH (shared_ptr<Content> i, _film->content()) {
601                 BOOST_FOREACH (shared_ptr<TextContent> j, i->text) {
602                         if (j->type() == TEXT_OPEN_SUBTITLE && j->use()) {
603                                 subtitle_language = j->language ();
604                         }
605                 }
606         }
607         boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", subtitle_language.get_value_or("None"));
608
609         boost::uintmax_t size = 0;
610         for (
611                 boost::filesystem::recursive_directory_iterator i = boost::filesystem::recursive_directory_iterator(_film->dir(_film->dcp_name()));
612                 i != boost::filesystem::recursive_directory_iterator();
613                 ++i) {
614                 if (boost::filesystem::is_regular_file (i->path ())) {
615                         size += boost::filesystem::file_size (i->path ());
616                 }
617         }
618
619         if (size > (1000000000L)) {
620                 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1GB", dcp::locale_convert<string> (size / 1000000000.0, 1, true)));
621         } else {
622                 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1MB", dcp::locale_convert<string> (size / 1000000.0, 1, true)));
623         }
624
625         pair<int, int> ch = audio_channel_types (_film->mapped_audio_channels(), _film->audio_channels());
626         string description = String::compose("%1.%2", ch.first, ch.second);
627
628         if (description == "0.0") {
629                 description = _("None");
630         } else if (description == "1.0") {
631                 description = _("Mono");
632         } else if (description == "2.0") {
633                 description = _("Stereo");
634         }
635         boost::algorithm::replace_all (text, "$AUDIO", description);
636
637         int h, m, s, fr;
638         _film->length().split (_film->video_frame_rate(), h, m, s, fr);
639         string length;
640         if (h == 0 && m == 0) {
641                 length = String::compose("%1s", s);
642         } else if (h == 0 && m > 0) {
643                 length = String::compose("%1m%2s", m, s);
644         } else if (h > 0 && m > 0) {
645                 length = String::compose("%1h%2m%3s", h, m, s);
646         }
647
648         boost::algorithm::replace_all (text, "$LENGTH", length);
649
650         checked_fwrite (text.c_str(), text.length(), f, cover);
651         fclose (f);
652 }
653
654 /** @param frame Frame index within the whole DCP.
655  *  @return true if we can fake-write this frame.
656  */
657 bool
658 Writer::can_fake_write (Frame frame) const
659 {
660         if (_film->encrypted()) {
661                 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
662                 return false;
663         }
664
665         /* We have to do a proper write of the first frame so that we can set up the JPEG2000
666            parameters in the asset writer.
667         */
668
669         ReelWriter const & reel = _reels[video_reel(frame)];
670
671         /* Make frame relative to the start of the reel */
672         frame -= reel.start ();
673         return (frame != 0 && frame < reel.first_nonexistant_frame());
674 }
675
676 /** @param track Closed caption track if type == TEXT_CLOSED_CAPTION */
677 void
678 Writer::write (PlayerText text, TextType type, optional<DCPTextTrack> track, DCPTimePeriod period)
679 {
680         vector<ReelWriter>::iterator* reel = 0;
681
682         switch (type) {
683         case TEXT_OPEN_SUBTITLE:
684                 reel = &_subtitle_reel;
685                 break;
686         case TEXT_CLOSED_CAPTION:
687                 DCPOMATIC_ASSERT (track);
688                 DCPOMATIC_ASSERT (_caption_reels.find(*track) != _caption_reels.end());
689                 reel = &_caption_reels[*track];
690                 break;
691         default:
692                 DCPOMATIC_ASSERT (false);
693         }
694
695         DCPOMATIC_ASSERT (*reel != _reels.end());
696         while ((*reel)->period().to <= period.from) {
697                 ++(*reel);
698                 DCPOMATIC_ASSERT (*reel != _reels.end());
699         }
700
701         (*reel)->write (text, type, track, period);
702 }
703
704 void
705 Writer::write (list<shared_ptr<Font> > fonts)
706 {
707         /* Just keep a list of unique fonts and we'll deal with them in ::finish */
708
709         BOOST_FOREACH (shared_ptr<Font> i, fonts) {
710                 bool got = false;
711                 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
712                         if (*i == *j) {
713                                 got = true;
714                         }
715                 }
716
717                 if (!got) {
718                         _fonts.push_back (i);
719                 }
720         }
721 }
722
723 bool
724 operator< (QueueItem const & a, QueueItem const & b)
725 {
726         if (a.reel != b.reel) {
727                 return a.reel < b.reel;
728         }
729
730         if (a.frame != b.frame) {
731                 return a.frame < b.frame;
732         }
733
734         return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
735 }
736
737 bool
738 operator== (QueueItem const & a, QueueItem const & b)
739 {
740         return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
741 }
742
743 void
744 Writer::set_encoder_threads (int threads)
745 {
746         boost::mutex::scoped_lock lm (_state_mutex);
747         _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
748         _maximum_queue_size = threads * 16;
749 }
750
751 void
752 Writer::write (ReferencedReelAsset asset)
753 {
754         _reel_assets.push_back (asset);
755 }
756
757 size_t
758 Writer::video_reel (int frame) const
759 {
760         DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
761         size_t i = 0;
762         while (i < _reels.size() && !_reels[i].period().contains (t)) {
763                 ++i;
764         }
765
766         DCPOMATIC_ASSERT (i < _reels.size ());
767         return i;
768 }
769
770 void
771 Writer::set_digest_progress (Job* job, float progress)
772 {
773         /* I believe this is thread-safe */
774         _digest_progresses[boost::this_thread::get_id()] = progress;
775
776         boost::mutex::scoped_lock lm (_digest_progresses_mutex);
777         float min_progress = FLT_MAX;
778         for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
779                 min_progress = min (min_progress, i->second);
780         }
781
782         job->set_progress (min_progress);
783
784         Waker waker;
785         waker.nudge ();
786 }