Fix cross-thread access to info files. May help with #1618.
[dcpomatic.git] / src / lib / writer.cc
1 /*
2     Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #include "writer.h"
22 #include "compose.hpp"
23 #include "film.h"
24 #include "ratio.h"
25 #include "log.h"
26 #include "dcpomatic_log.h"
27 #include "dcp_video.h"
28 #include "dcp_content_type.h"
29 #include "audio_mapping.h"
30 #include "config.h"
31 #include "job.h"
32 #include "cross.h"
33 #include "audio_buffers.h"
34 #include "version.h"
35 #include "font.h"
36 #include "util.h"
37 #include "reel_writer.h"
38 #include "text_content.h"
39 #include <dcp/cpl.h>
40 #include <dcp/locale_convert.h>
41 #include <boost/foreach.hpp>
42 #include <fstream>
43 #include <cerrno>
44 #include <iostream>
45 #include <cfloat>
46
47 #include "i18n.h"
48
49 /* OS X strikes again */
50 #undef set_key
51
52 using std::make_pair;
53 using std::pair;
54 using std::string;
55 using std::list;
56 using std::cout;
57 using std::map;
58 using std::min;
59 using std::max;
60 using std::vector;
61 using boost::shared_ptr;
62 using boost::weak_ptr;
63 using boost::dynamic_pointer_cast;
64 using boost::optional;
65 using dcp::Data;
66 using namespace dcpomatic;
67
68 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
69         : _film (film)
70         , _job (j)
71         , _thread (0)
72         , _finish (false)
73         , _queued_full_in_memory (0)
74         /* These will be reset to sensible values when J2KEncoder is created */
75         , _maximum_frames_in_memory (8)
76         , _maximum_queue_size (8)
77         , _full_written (0)
78         , _fake_written (0)
79         , _repeat_written (0)
80         , _pushed_to_disk (0)
81 {
82         shared_ptr<Job> job = _job.lock ();
83         DCPOMATIC_ASSERT (job);
84
85         int reel_index = 0;
86         list<DCPTimePeriod> const reels = _film->reels ();
87         BOOST_FOREACH (DCPTimePeriod p, reels) {
88                 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
89         }
90
91         /* We can keep track of the current audio, subtitle and closed caption reels easily because audio
92            and captions arrive to the Writer in sequence.  This is not so for video.
93         */
94         _audio_reel = _reels.begin ();
95         _subtitle_reel = _reels.begin ();
96         BOOST_FOREACH (DCPTextTrack i, _film->closed_caption_tracks()) {
97                 _caption_reels[i] = _reels.begin ();
98         }
99
100         /* Check that the signer is OK if we need one */
101         string reason;
102         if (_film->is_signed() && !Config::instance()->signer_chain()->valid(&reason)) {
103                 throw InvalidSignerError (reason);
104         }
105 }
106
107 void
108 Writer::start ()
109 {
110         _thread = new boost::thread (boost::bind (&Writer::thread, this));
111 #ifdef DCPOMATIC_LINUX
112         pthread_setname_np (_thread->native_handle(), "writer");
113 #endif
114 }
115
116 Writer::~Writer ()
117 {
118         terminate_thread (false);
119 }
120
121 /** Pass a video frame to the writer for writing to disk at some point.
122  *  This method can be called with frames out of order.
123  *  @param encoded JPEG2000-encoded data.
124  *  @param frame Frame index within the DCP.
125  *  @param eyes Eyes that this frame image is for.
126  */
127 void
128 Writer::write (Data encoded, Frame frame, Eyes eyes)
129 {
130         boost::mutex::scoped_lock lock (_state_mutex);
131
132         while (_queued_full_in_memory > _maximum_frames_in_memory) {
133                 /* There are too many full frames in memory; wake the main writer thread and
134                    wait until it sorts everything out */
135                 _empty_condition.notify_all ();
136                 _full_condition.wait (lock);
137         }
138
139         QueueItem qi;
140         qi.type = QueueItem::FULL;
141         qi.encoded = encoded;
142         qi.reel = video_reel (frame);
143         qi.frame = frame - _reels[qi.reel].start ();
144
145         if (_film->three_d() && eyes == EYES_BOTH) {
146                 /* 2D material in a 3D DCP; fake the 3D */
147                 qi.eyes = EYES_LEFT;
148                 _queue.push_back (qi);
149                 ++_queued_full_in_memory;
150                 qi.eyes = EYES_RIGHT;
151                 _queue.push_back (qi);
152                 ++_queued_full_in_memory;
153         } else {
154                 qi.eyes = eyes;
155                 _queue.push_back (qi);
156                 ++_queued_full_in_memory;
157         }
158
159         /* Now there's something to do: wake anything wait()ing on _empty_condition */
160         _empty_condition.notify_all ();
161 }
162
163 bool
164 Writer::can_repeat (Frame frame) const
165 {
166         return frame > _reels[video_reel(frame)].start();
167 }
168
169 /** Repeat the last frame that was written to a reel as a new frame.
170  *  @param frame Frame index within the DCP of the new (repeated) frame.
171  *  @param eyes Eyes that this repeated frame image is for.
172  */
173 void
174 Writer::repeat (Frame frame, Eyes eyes)
175 {
176         boost::mutex::scoped_lock lock (_state_mutex);
177
178         while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
179                 /* The queue is too big, and the main writer thread can run and fix it, so
180                    wake it and wait until it has done.
181                 */
182                 _empty_condition.notify_all ();
183                 _full_condition.wait (lock);
184         }
185
186         QueueItem qi;
187         qi.type = QueueItem::REPEAT;
188         qi.reel = video_reel (frame);
189         qi.frame = frame - _reels[qi.reel].start ();
190         if (_film->three_d() && eyes == EYES_BOTH) {
191                 qi.eyes = EYES_LEFT;
192                 _queue.push_back (qi);
193                 qi.eyes = EYES_RIGHT;
194                 _queue.push_back (qi);
195         } else {
196                 qi.eyes = eyes;
197                 _queue.push_back (qi);
198         }
199
200         /* Now there's something to do: wake anything wait()ing on _empty_condition */
201         _empty_condition.notify_all ();
202 }
203
204 void
205 Writer::fake_write (Frame frame, Eyes eyes)
206 {
207         boost::mutex::scoped_lock lock (_state_mutex);
208
209         while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
210                 /* The queue is too big, and the main writer thread can run and fix it, so
211                    wake it and wait until it has done.
212                 */
213                 _empty_condition.notify_all ();
214                 _full_condition.wait (lock);
215         }
216
217         size_t const reel = video_reel (frame);
218         Frame const reel_frame = frame - _reels[reel].start ();
219
220         QueueItem qi;
221         qi.type = QueueItem::FAKE;
222
223         {
224                 shared_ptr<InfoFileHandle> info_file = _film->info_file_handle(_reels[reel].period(), true);
225                 qi.size = _reels[reel].read_frame_info(info_file, reel_frame, eyes).size;
226         }
227
228         qi.reel = reel;
229         qi.frame = reel_frame;
230         if (_film->three_d() && eyes == EYES_BOTH) {
231                 qi.eyes = EYES_LEFT;
232                 _queue.push_back (qi);
233                 qi.eyes = EYES_RIGHT;
234                 _queue.push_back (qi);
235         } else {
236                 qi.eyes = eyes;
237                 _queue.push_back (qi);
238         }
239
240         /* Now there's something to do: wake anything wait()ing on _empty_condition */
241         _empty_condition.notify_all ();
242 }
243
244 /** Write some audio frames to the DCP.
245  *  @param audio Audio data.
246  *  @param time Time of this data within the DCP.
247  *  This method is not thread safe.
248  */
249 void
250 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
251 {
252         DCPOMATIC_ASSERT (audio);
253
254         int const afr = _film->audio_frame_rate();
255
256         DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
257
258         /* The audio we get might span a reel boundary, and if so we have to write it in bits */
259
260         DCPTime t = time;
261         while (t < end) {
262
263                 if (_audio_reel == _reels.end ()) {
264                         /* This audio is off the end of the last reel; ignore it */
265                         return;
266                 }
267
268                 if (end <= _audio_reel->period().to) {
269                         /* Easy case: we can write all the audio to this reel */
270                         _audio_reel->write (audio);
271                         t = end;
272                 } else {
273                         /* Split the audio into two and write the first part */
274                         DCPTime part_lengths[2] = {
275                                 _audio_reel->period().to - t,
276                                 end - _audio_reel->period().to
277                         };
278
279                         Frame part_frames[2] = {
280                                 part_lengths[0].frames_ceil(afr),
281                                 part_lengths[1].frames_ceil(afr)
282                         };
283
284                         if (part_frames[0]) {
285                                 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[0]));
286                                 part->copy_from (audio.get(), part_frames[0], 0, 0);
287                                 _audio_reel->write (part);
288                         }
289
290                         if (part_frames[1]) {
291                                 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[1]));
292                                 part->copy_from (audio.get(), part_frames[1], part_frames[0], 0);
293                                 audio = part;
294                         } else {
295                                 audio.reset ();
296                         }
297
298                         ++_audio_reel;
299                         t += part_lengths[0];
300                 }
301         }
302 }
303
304 /** This must be called from Writer::thread() with an appropriate lock held */
305 bool
306 Writer::have_sequenced_image_at_queue_head ()
307 {
308         if (_queue.empty ()) {
309                 return false;
310         }
311
312         _queue.sort ();
313
314         QueueItem const & f = _queue.front();
315         ReelWriter const & reel = _reels[f.reel];
316
317         /* The queue should contain only EYES_LEFT/EYES_RIGHT pairs or EYES_BOTH */
318
319         if (f.eyes == EYES_BOTH) {
320                 /* 2D */
321                 return f.frame == (reel.last_written_video_frame() + 1);
322         }
323
324         /* 3D */
325
326         if (reel.last_written_eyes() == EYES_LEFT && f.frame == reel.last_written_video_frame() && f.eyes == EYES_RIGHT) {
327                 return true;
328         }
329
330         if (reel.last_written_eyes() == EYES_RIGHT && f.frame == (reel.last_written_video_frame() + 1) && f.eyes == EYES_LEFT) {
331                 return true;
332         }
333
334         return false;
335 }
336
337 void
338 Writer::thread ()
339 try
340 {
341         while (true)
342         {
343                 boost::mutex::scoped_lock lock (_state_mutex);
344
345                 while (true) {
346
347                         if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
348                                 /* We've got something to do: go and do it */
349                                 break;
350                         }
351
352                         /* Nothing to do: wait until something happens which may indicate that we do */
353                         LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
354                         _empty_condition.wait (lock);
355                         LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
356                 }
357
358                 if (_finish && _queue.empty()) {
359                         return;
360                 }
361
362                 /* We stop here if we have been asked to finish, and if either the queue
363                    is empty or we do not have a sequenced image at its head (if this is the
364                    case we will never terminate as no new frames will be sent once
365                    _finish is true).
366                 */
367                 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
368                         /* (Hopefully temporarily) log anything that was not written */
369                         if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
370                                 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
371                                 for (list<QueueItem>::const_iterator i = _queue.begin(); i != _queue.end(); ++i) {
372                                         if (i->type == QueueItem::FULL) {
373                                                 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i->frame, (int) i->eyes);
374                                         } else {
375                                                 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i->size, i->frame, (int) i->eyes);
376                                         }
377                                 }
378                         }
379                         return;
380                 }
381
382                 /* Write any frames that we can write; i.e. those that are in sequence. */
383                 while (have_sequenced_image_at_queue_head ()) {
384                         QueueItem qi = _queue.front ();
385                         _queue.pop_front ();
386                         if (qi.type == QueueItem::FULL && qi.encoded) {
387                                 --_queued_full_in_memory;
388                         }
389
390                         lock.unlock ();
391
392                         ReelWriter& reel = _reels[qi.reel];
393
394                         switch (qi.type) {
395                         case QueueItem::FULL:
396                                 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
397                                 if (!qi.encoded) {
398                                         qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
399                                 }
400                                 reel.write (qi.encoded, qi.frame, qi.eyes);
401                                 ++_full_written;
402                                 break;
403                         case QueueItem::FAKE:
404                                 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
405                                 reel.fake_write (qi.frame, qi.eyes, qi.size);
406                                 ++_fake_written;
407                                 break;
408                         case QueueItem::REPEAT:
409                                 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
410                                 reel.repeat_write (qi.frame, qi.eyes);
411                                 ++_repeat_written;
412                                 break;
413                         }
414
415                         lock.lock ();
416                         _full_condition.notify_all ();
417                 }
418
419                 while (_queued_full_in_memory > _maximum_frames_in_memory) {
420                         /* Too many frames in memory which can't yet be written to the stream.
421                            Write some FULL frames to disk.
422                         */
423
424                         /* Find one from the back of the queue */
425                         _queue.sort ();
426                         list<QueueItem>::reverse_iterator i = _queue.rbegin ();
427                         while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
428                                 ++i;
429                         }
430
431                         DCPOMATIC_ASSERT (i != _queue.rend());
432                         ++_pushed_to_disk;
433                         /* For the log message below */
434                         int const awaiting = _reels[_queue.front().reel].last_written_video_frame() + 1;
435                         lock.unlock ();
436
437                         /* i is valid here, even though we don't hold a lock on the mutex,
438                            since list iterators are unaffected by insertion and only this
439                            thread could erase the last item in the list.
440                         */
441
442                         LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
443
444                         i->encoded->write_via_temp (
445                                 _film->j2c_path (i->reel, i->frame, i->eyes, true),
446                                 _film->j2c_path (i->reel, i->frame, i->eyes, false)
447                                 );
448
449                         lock.lock ();
450                         i->encoded.reset ();
451                         --_queued_full_in_memory;
452                         _full_condition.notify_all ();
453                 }
454         }
455 }
456 catch (...)
457 {
458         store_current ();
459 }
460
461 void
462 Writer::terminate_thread (bool can_throw)
463 {
464         boost::mutex::scoped_lock lock (_state_mutex);
465         if (_thread == 0) {
466                 return;
467         }
468
469         _finish = true;
470         _empty_condition.notify_all ();
471         _full_condition.notify_all ();
472         lock.unlock ();
473
474         if (_thread->joinable ()) {
475                 _thread->join ();
476         }
477
478         if (can_throw) {
479                 rethrow ();
480         }
481
482         delete _thread;
483         _thread = 0;
484 }
485
486 void
487 Writer::finish ()
488 {
489         if (!_thread) {
490                 return;
491         }
492
493         LOG_GENERAL_NC ("Terminating writer thread");
494
495         terminate_thread (true);
496
497         LOG_GENERAL_NC ("Finishing ReelWriters");
498
499         BOOST_FOREACH (ReelWriter& i, _reels) {
500                 i.finish ();
501         }
502
503         LOG_GENERAL_NC ("Writing XML");
504
505         dcp::DCP dcp (_film->dir (_film->dcp_name()));
506
507         shared_ptr<dcp::CPL> cpl (
508                 new dcp::CPL (
509                         _film->dcp_name(),
510                         _film->dcp_content_type()->libdcp_kind ()
511                         )
512                 );
513
514         dcp.add (cpl);
515
516         /* Calculate digests for each reel in parallel */
517
518         shared_ptr<Job> job = _job.lock ();
519         job->sub (_("Computing digests"));
520
521         boost::asio::io_service service;
522         boost::thread_group pool;
523
524         shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
525
526         int const threads = max (1, Config::instance()->master_encoding_threads ());
527
528         for (int i = 0; i < threads; ++i) {
529                 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
530         }
531
532         BOOST_FOREACH (ReelWriter& i, _reels) {
533                 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
534                 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
535         }
536
537         work.reset ();
538         pool.join_all ();
539         service.stop ();
540
541         /* Add reels to CPL */
542
543         BOOST_FOREACH (ReelWriter& i, _reels) {
544                 cpl->add (i.create_reel (_reel_assets, _fonts));
545         }
546
547         dcp::XMLMetadata meta;
548         meta.annotation_text = cpl->annotation_text ();
549         meta.creator = Config::instance()->dcp_creator ();
550         if (meta.creator.empty ()) {
551                 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
552         }
553         meta.issuer = Config::instance()->dcp_issuer ();
554         if (meta.issuer.empty ()) {
555                 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
556         }
557         meta.set_issue_date_now ();
558
559         cpl->set_metadata (meta);
560         cpl->set_ratings (vector_to_list(_film->ratings()));
561
562         shared_ptr<const dcp::CertificateChain> signer;
563         if (_film->is_signed ()) {
564                 signer = Config::instance()->signer_chain ();
565                 /* We did check earlier, but check again here to be on the safe side */
566                 string reason;
567                 if (!signer->valid (&reason)) {
568                         throw InvalidSignerError (reason);
569                 }
570         }
571
572         dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer, Config::instance()->dcp_metadata_filename_format());
573
574         LOG_GENERAL (
575                 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
576                 );
577
578         write_cover_sheet ();
579 }
580
581 void
582 Writer::write_cover_sheet ()
583 {
584         boost::filesystem::path const cover = _film->file ("COVER_SHEET.txt");
585         FILE* f = fopen_boost (cover, "w");
586         if (!f) {
587                 throw OpenFileError (cover, errno, OpenFileError::WRITE);
588         }
589
590         string text = Config::instance()->cover_sheet ();
591         boost::algorithm::replace_all (text, "$CPL_NAME", _film->name());
592         boost::algorithm::replace_all (text, "$TYPE", _film->dcp_content_type()->pretty_name());
593         boost::algorithm::replace_all (text, "$CONTAINER", _film->container()->container_nickname());
594         boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", _film->isdcf_metadata().audio_language);
595
596         optional<string> subtitle_language;
597         BOOST_FOREACH (shared_ptr<Content> i, _film->content()) {
598                 BOOST_FOREACH (shared_ptr<TextContent> j, i->text) {
599                         if (j->type() == TEXT_OPEN_SUBTITLE && j->use()) {
600                                 subtitle_language = j->language ();
601                         }
602                 }
603         }
604         boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", subtitle_language.get_value_or("None"));
605
606         boost::uintmax_t size = 0;
607         for (
608                 boost::filesystem::recursive_directory_iterator i = boost::filesystem::recursive_directory_iterator(_film->dir(_film->dcp_name()));
609                 i != boost::filesystem::recursive_directory_iterator();
610                 ++i) {
611                 if (boost::filesystem::is_regular_file (i->path ())) {
612                         size += boost::filesystem::file_size (i->path ());
613                 }
614         }
615
616         if (size > (1000000000L)) {
617                 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1GB", dcp::locale_convert<string> (size / 1000000000.0, 1, true)));
618         } else {
619                 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1MB", dcp::locale_convert<string> (size / 1000000.0, 1, true)));
620         }
621
622         pair<int, int> ch = audio_channel_types (_film->mapped_audio_channels(), _film->audio_channels());
623         string description = String::compose("%1.%2", ch.first, ch.second);
624
625         if (description == "0.0") {
626                 description = _("None");
627         } else if (description == "1.0") {
628                 description = _("Mono");
629         } else if (description == "2.0") {
630                 description = _("Stereo");
631         }
632         boost::algorithm::replace_all (text, "$AUDIO", description);
633
634         int h, m, s, fr;
635         _film->length().split (_film->video_frame_rate(), h, m, s, fr);
636         string length;
637         if (h == 0 && m == 0) {
638                 length = String::compose("%1s", s);
639         } else if (h == 0 && m > 0) {
640                 length = String::compose("%1m%2s", m, s);
641         } else if (h > 0 && m > 0) {
642                 length = String::compose("%1h%2m%3s", h, m, s);
643         }
644
645         boost::algorithm::replace_all (text, "$LENGTH", length);
646
647         checked_fwrite (text.c_str(), text.length(), f, cover);
648         fclose (f);
649 }
650
651 /** @param frame Frame index within the whole DCP.
652  *  @return true if we can fake-write this frame.
653  */
654 bool
655 Writer::can_fake_write (Frame frame) const
656 {
657         if (_film->encrypted()) {
658                 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
659                 return false;
660         }
661
662         /* We have to do a proper write of the first frame so that we can set up the JPEG2000
663            parameters in the asset writer.
664         */
665
666         ReelWriter const & reel = _reels[video_reel(frame)];
667
668         /* Make frame relative to the start of the reel */
669         frame -= reel.start ();
670         return (frame != 0 && frame < reel.first_nonexistant_frame());
671 }
672
673 /** @param track Closed caption track if type == TEXT_CLOSED_CAPTION */
674 void
675 Writer::write (PlayerText text, TextType type, optional<DCPTextTrack> track, DCPTimePeriod period)
676 {
677         vector<ReelWriter>::iterator* reel = 0;
678
679         switch (type) {
680         case TEXT_OPEN_SUBTITLE:
681                 reel = &_subtitle_reel;
682                 break;
683         case TEXT_CLOSED_CAPTION:
684                 DCPOMATIC_ASSERT (track);
685                 DCPOMATIC_ASSERT (_caption_reels.find(*track) != _caption_reels.end());
686                 reel = &_caption_reels[*track];
687                 break;
688         default:
689                 DCPOMATIC_ASSERT (false);
690         }
691
692         DCPOMATIC_ASSERT (*reel != _reels.end());
693         while ((*reel)->period().to <= period.from) {
694                 ++(*reel);
695                 DCPOMATIC_ASSERT (*reel != _reels.end());
696         }
697
698         (*reel)->write (text, type, track, period);
699 }
700
701 void
702 Writer::write (list<shared_ptr<Font> > fonts)
703 {
704         /* Just keep a list of unique fonts and we'll deal with them in ::finish */
705
706         BOOST_FOREACH (shared_ptr<Font> i, fonts) {
707                 bool got = false;
708                 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
709                         if (*i == *j) {
710                                 got = true;
711                         }
712                 }
713
714                 if (!got) {
715                         _fonts.push_back (i);
716                 }
717         }
718 }
719
720 bool
721 operator< (QueueItem const & a, QueueItem const & b)
722 {
723         if (a.reel != b.reel) {
724                 return a.reel < b.reel;
725         }
726
727         if (a.frame != b.frame) {
728                 return a.frame < b.frame;
729         }
730
731         return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
732 }
733
734 bool
735 operator== (QueueItem const & a, QueueItem const & b)
736 {
737         return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
738 }
739
740 void
741 Writer::set_encoder_threads (int threads)
742 {
743         boost::mutex::scoped_lock lm (_state_mutex);
744         _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
745         _maximum_queue_size = threads * 16;
746 }
747
748 void
749 Writer::write (ReferencedReelAsset asset)
750 {
751         _reel_assets.push_back (asset);
752 }
753
754 size_t
755 Writer::video_reel (int frame) const
756 {
757         DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
758         size_t i = 0;
759         while (i < _reels.size() && !_reels[i].period().contains (t)) {
760                 ++i;
761         }
762
763         DCPOMATIC_ASSERT (i < _reels.size ());
764         return i;
765 }
766
767 void
768 Writer::set_digest_progress (Job* job, float progress)
769 {
770         /* I believe this is thread-safe */
771         _digest_progresses[boost::this_thread::get_id()] = progress;
772
773         boost::mutex::scoped_lock lm (_digest_progresses_mutex);
774         float min_progress = FLT_MAX;
775         for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
776                 min_progress = min (min_progress, i->second);
777         }
778
779         job->set_progress (min_progress);
780 }