Another writer deadlock fix.
[dcpomatic.git] / src / lib / writer.cc
1 /*
2     Copyright (C) 2012-2017 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #include "writer.h"
22 #include "compose.hpp"
23 #include "film.h"
24 #include "ratio.h"
25 #include "log.h"
26 #include "dcp_video.h"
27 #include "dcp_content_type.h"
28 #include "audio_mapping.h"
29 #include "config.h"
30 #include "job.h"
31 #include "cross.h"
32 #include "audio_buffers.h"
33 #include "version.h"
34 #include "font.h"
35 #include "util.h"
36 #include "reel_writer.h"
37 #include <dcp/cpl.h>
38 #include <dcp/locale_convert.h>
39 #include <boost/foreach.hpp>
40 #include <fstream>
41 #include <cerrno>
42 #include <iostream>
43 #include <cfloat>
44
45 #include "i18n.h"
46
47 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
48 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
49 #define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE);
50 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
51 #define LOG_WARNING_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_WARNING);
52 #define LOG_WARNING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_WARNING);
53 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
54
55 /* OS X strikes again */
56 #undef set_key
57
58 using std::make_pair;
59 using std::pair;
60 using std::string;
61 using std::list;
62 using std::cout;
63 using std::map;
64 using std::min;
65 using std::max;
66 using boost::shared_ptr;
67 using boost::weak_ptr;
68 using boost::dynamic_pointer_cast;
69 using dcp::Data;
70
71 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
72         : _film (film)
73         , _job (j)
74         , _thread (0)
75         , _finish (false)
76         , _queued_full_in_memory (0)
77         /* These will be reset to sensible values when J2KEncoder is created */
78         , _maximum_frames_in_memory (8)
79         , _maximum_queue_size (8)
80         , _full_written (0)
81         , _fake_written (0)
82         , _repeat_written (0)
83         , _pushed_to_disk (0)
84 {
85         shared_ptr<Job> job = _job.lock ();
86         DCPOMATIC_ASSERT (job);
87
88         int reel_index = 0;
89         list<DCPTimePeriod> const reels = _film->reels ();
90         BOOST_FOREACH (DCPTimePeriod p, reels) {
91                 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
92         }
93
94         /* We can keep track of the current audio and subtitle reels easily because audio
95            and subs arrive to the Writer in sequence.  This is not so for video.
96         */
97         _audio_reel = _reels.begin ();
98         _subtitle_reel = _reels.begin ();
99
100         /* Check that the signer is OK if we need one */
101         string reason;
102         if (_film->is_signed() && !Config::instance()->signer_chain()->valid(&reason)) {
103                 throw InvalidSignerError (reason);
104         }
105 }
106
107 void
108 Writer::start ()
109 {
110         _thread = new boost::thread (boost::bind (&Writer::thread, this));
111 #ifdef DCPOMATIC_LINUX
112         pthread_setname_np (_thread->native_handle(), "writer");
113 #endif
114 }
115
116 Writer::~Writer ()
117 {
118         terminate_thread (false);
119 }
120
121 /** Pass a video frame to the writer for writing to disk at some point.
122  *  This method can be called with frames out of order.
123  *  @param encoded JPEG2000-encoded data.
124  *  @param frame Frame index within the DCP.
125  *  @param eyes Eyes that this frame image is for.
126  */
127 void
128 Writer::write (Data encoded, Frame frame, Eyes eyes)
129 {
130         boost::mutex::scoped_lock lock (_state_mutex);
131
132         while (_queued_full_in_memory > _maximum_frames_in_memory) {
133                 /* There are too many full frames in memory; wait until that is sorted out */
134                 _full_condition.wait (lock);
135         }
136
137         QueueItem qi;
138         qi.type = QueueItem::FULL;
139         qi.encoded = encoded;
140         qi.reel = video_reel (frame);
141         qi.frame = frame - _reels[qi.reel].start ();
142
143         if (_film->three_d() && eyes == EYES_BOTH) {
144                 /* 2D material in a 3D DCP; fake the 3D */
145                 qi.eyes = EYES_LEFT;
146                 _queue.push_back (qi);
147                 ++_queued_full_in_memory;
148                 qi.eyes = EYES_RIGHT;
149                 _queue.push_back (qi);
150                 ++_queued_full_in_memory;
151         } else {
152                 qi.eyes = eyes;
153                 _queue.push_back (qi);
154                 ++_queued_full_in_memory;
155         }
156
157         /* Now there's something to do: wake anything wait()ing on _empty_condition */
158         _empty_condition.notify_all ();
159 }
160
161 bool
162 Writer::can_repeat (Frame frame) const
163 {
164         return frame > _reels[video_reel(frame)].start();
165 }
166
167 /** Repeat the last frame that was written to a reel as a new frame.
168  *  @param frame Frame index within the DCP of the new (repeated) frame.
169  *  @param eyes Eyes that this repeated frame image is for.
170  */
171 void
172 Writer::repeat (Frame frame, Eyes eyes)
173 {
174         boost::mutex::scoped_lock lock (_state_mutex);
175
176         while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
177                 /* The queue is too big, and the main writer thread can run and fix it, so
178                    wait until it has done.
179                 */
180                 _full_condition.wait (lock);
181         }
182
183         QueueItem qi;
184         qi.type = QueueItem::REPEAT;
185         qi.reel = video_reel (frame);
186         qi.frame = frame - _reels[qi.reel].start ();
187         if (_film->three_d() && eyes == EYES_BOTH) {
188                 qi.eyes = EYES_LEFT;
189                 _queue.push_back (qi);
190                 qi.eyes = EYES_RIGHT;
191                 _queue.push_back (qi);
192         } else {
193                 qi.eyes = eyes;
194                 _queue.push_back (qi);
195         }
196
197         /* Now there's something to do: wake anything wait()ing on _empty_condition */
198         _empty_condition.notify_all ();
199 }
200
201 void
202 Writer::fake_write (Frame frame, Eyes eyes)
203 {
204         boost::mutex::scoped_lock lock (_state_mutex);
205
206         while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
207                 /* The queue is too big, and the main writer thread can run and fix it, so
208                    wait until it has done.
209                 */
210                 _full_condition.wait (lock);
211         }
212
213         size_t const reel = video_reel (frame);
214         Frame const reel_frame = frame - _reels[reel].start ();
215
216         FILE* file = fopen_boost (_film->info_file(_reels[reel].period()), "rb");
217         if (!file) {
218                 throw ReadFileError (_film->info_file(_reels[reel].period()));
219         }
220         dcp::FrameInfo info = _reels[reel].read_frame_info (file, reel_frame, eyes);
221         fclose (file);
222
223         QueueItem qi;
224         qi.type = QueueItem::FAKE;
225         qi.size = info.size;
226         qi.reel = reel;
227         qi.frame = reel_frame;
228         if (_film->three_d() && eyes == EYES_BOTH) {
229                 qi.eyes = EYES_LEFT;
230                 _queue.push_back (qi);
231                 qi.eyes = EYES_RIGHT;
232                 _queue.push_back (qi);
233         } else {
234                 qi.eyes = eyes;
235                 _queue.push_back (qi);
236         }
237
238         /* Now there's something to do: wake anything wait()ing on _empty_condition */
239         _empty_condition.notify_all ();
240 }
241
242 /** Write some audio frames to the DCP.
243  *  @param audio Audio data.
244  *  @param time Time of this data within the DCP.
245  *  This method is not thread safe.
246  */
247 void
248 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
249 {
250         DCPOMATIC_ASSERT (audio);
251
252         int const afr = _film->audio_frame_rate();
253
254         DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
255
256         /* The audio we get might span a reel boundary, and if so we have to write it in bits */
257
258         DCPTime t = time;
259         while (t < end) {
260
261                 if (_audio_reel == _reels.end ()) {
262                         /* This audio is off the end of the last reel; ignore it */
263                         return;
264                 }
265
266                 if (end <= _audio_reel->period().to) {
267                         /* Easy case: we can write all the audio to this reel */
268                         _audio_reel->write (audio);
269                         t = end;
270                 } else {
271                         /* Split the audio into two and write the first part */
272                         DCPTime part_lengths[2] = {
273                                 _audio_reel->period().to - t,
274                                 end - _audio_reel->period().to
275                         };
276
277                         Frame part_frames[2] = {
278                                 part_lengths[0].frames_ceil(afr),
279                                 part_lengths[1].frames_ceil(afr)
280                         };
281
282                         if (part_frames[0]) {
283                                 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[0]));
284                                 part->copy_from (audio.get(), part_frames[0], 0, 0);
285                                 _audio_reel->write (part);
286                         }
287
288                         if (part_frames[1]) {
289                                 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[1]));
290                                 part->copy_from (audio.get(), part_frames[1], part_frames[0], 0);
291                                 audio = part;
292                         } else {
293                                 audio.reset ();
294                         }
295
296                         ++_audio_reel;
297                         t += part_lengths[0];
298                 }
299         }
300 }
301
302 /** This must be called from Writer::thread() with an appropriate lock held */
303 bool
304 Writer::have_sequenced_image_at_queue_head ()
305 {
306         if (_queue.empty ()) {
307                 return false;
308         }
309
310         _queue.sort ();
311
312         QueueItem const & f = _queue.front();
313         ReelWriter const & reel = _reels[f.reel];
314
315         /* The queue should contain only EYES_LEFT/EYES_RIGHT pairs or EYES_BOTH */
316
317         if (f.eyes == EYES_BOTH) {
318                 /* 2D */
319                 return f.frame == (reel.last_written_video_frame() + 1);
320         }
321
322         /* 3D */
323
324         if (reel.last_written_eyes() == EYES_LEFT && f.frame == reel.last_written_video_frame() && f.eyes == EYES_RIGHT) {
325                 return true;
326         }
327
328         if (reel.last_written_eyes() == EYES_RIGHT && f.frame == (reel.last_written_video_frame() + 1) && f.eyes == EYES_LEFT) {
329                 return true;
330         }
331
332         return false;
333 }
334
335 void
336 Writer::thread ()
337 try
338 {
339         while (true)
340         {
341                 boost::mutex::scoped_lock lock (_state_mutex);
342
343                 while (true) {
344
345                         if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
346                                 /* We've got something to do: go and do it */
347                                 break;
348                         }
349
350                         /* Nothing to do: wait until something happens which may indicate that we do */
351                         LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
352                         _empty_condition.wait (lock);
353                         LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
354                 }
355
356                 if (_finish && _queue.empty()) {
357                         return;
358                 }
359
360                 /* We stop here if we have been asked to finish, and if either the queue
361                    is empty or we do not have a sequenced image at its head (if this is the
362                    case we will never terminate as no new frames will be sent once
363                    _finish is true).
364                 */
365                 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
366                         /* (Hopefully temporarily) log anything that was not written */
367                         if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
368                                 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
369                                 for (list<QueueItem>::const_iterator i = _queue.begin(); i != _queue.end(); ++i) {
370                                         if (i->type == QueueItem::FULL) {
371                                                 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i->frame, (int) i->eyes);
372                                         } else {
373                                                 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i->size, i->frame, (int) i->eyes);
374                                         }
375                                 }
376                         }
377                         return;
378                 }
379
380                 /* Write any frames that we can write; i.e. those that are in sequence. */
381                 while (have_sequenced_image_at_queue_head ()) {
382                         QueueItem qi = _queue.front ();
383                         _queue.pop_front ();
384                         if (qi.type == QueueItem::FULL && qi.encoded) {
385                                 --_queued_full_in_memory;
386                         }
387
388                         lock.unlock ();
389
390                         ReelWriter& reel = _reels[qi.reel];
391
392                         switch (qi.type) {
393                         case QueueItem::FULL:
394                                 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
395                                 if (!qi.encoded) {
396                                         qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
397                                 }
398                                 reel.write (qi.encoded, qi.frame, qi.eyes);
399                                 ++_full_written;
400                                 break;
401                         case QueueItem::FAKE:
402                                 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
403                                 reel.fake_write (qi.frame, qi.eyes, qi.size);
404                                 ++_fake_written;
405                                 break;
406                         case QueueItem::REPEAT:
407                                 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
408                                 reel.repeat_write (qi.frame, qi.eyes);
409                                 ++_repeat_written;
410                                 break;
411                         }
412
413                         lock.lock ();
414                         _full_condition.notify_all ();
415                 }
416
417                 while (_queued_full_in_memory > _maximum_frames_in_memory) {
418                         /* Too many frames in memory which can't yet be written to the stream.
419                            Write some FULL frames to disk.
420                         */
421
422                         /* Find one from the back of the queue */
423                         _queue.sort ();
424                         list<QueueItem>::reverse_iterator i = _queue.rbegin ();
425                         while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
426                                 ++i;
427                         }
428
429                         DCPOMATIC_ASSERT (i != _queue.rend());
430                         ++_pushed_to_disk;
431                         /* For the log message below */
432                         int const awaiting = _reels[_queue.front().reel].last_written_video_frame();
433                         lock.unlock ();
434
435                         /* i is valid here, even though we don't hold a lock on the mutex,
436                            since list iterators are unaffected by insertion and only this
437                            thread could erase the last item in the list.
438                         */
439
440                         LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
441
442                         i->encoded->write_via_temp (
443                                 _film->j2c_path (i->reel, i->frame, i->eyes, true),
444                                 _film->j2c_path (i->reel, i->frame, i->eyes, false)
445                                 );
446
447                         lock.lock ();
448                         i->encoded.reset ();
449                         --_queued_full_in_memory;
450                         _full_condition.notify_all ();
451                 }
452         }
453 }
454 catch (...)
455 {
456         store_current ();
457 }
458
459 void
460 Writer::terminate_thread (bool can_throw)
461 {
462         boost::mutex::scoped_lock lock (_state_mutex);
463         if (_thread == 0) {
464                 return;
465         }
466
467         _finish = true;
468         _empty_condition.notify_all ();
469         _full_condition.notify_all ();
470         lock.unlock ();
471
472         if (_thread->joinable ()) {
473                 _thread->join ();
474         }
475
476         if (can_throw) {
477                 rethrow ();
478         }
479
480         delete _thread;
481         _thread = 0;
482 }
483
484 void
485 Writer::finish ()
486 {
487         if (!_thread) {
488                 return;
489         }
490
491         LOG_GENERAL_NC ("Terminating writer thread");
492
493         terminate_thread (true);
494
495         LOG_GENERAL_NC ("Finishing ReelWriters");
496
497         BOOST_FOREACH (ReelWriter& i, _reels) {
498                 i.finish ();
499         }
500
501         LOG_GENERAL_NC ("Writing XML");
502
503         dcp::DCP dcp (_film->dir (_film->dcp_name()));
504
505         shared_ptr<dcp::CPL> cpl (
506                 new dcp::CPL (
507                         _film->dcp_name(),
508                         _film->dcp_content_type()->libdcp_kind ()
509                         )
510                 );
511
512         dcp.add (cpl);
513
514         /* Calculate digests for each reel in parallel */
515
516         shared_ptr<Job> job = _job.lock ();
517         job->sub (_("Computing digests"));
518
519         boost::asio::io_service service;
520         boost::thread_group pool;
521
522         shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
523
524         int const threads = max (1, Config::instance()->master_encoding_threads ());
525
526         for (int i = 0; i < threads; ++i) {
527                 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
528         }
529
530         BOOST_FOREACH (ReelWriter& i, _reels) {
531                 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
532                 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
533         }
534
535         work.reset ();
536         pool.join_all ();
537         service.stop ();
538
539         /* Add reels to CPL */
540
541         BOOST_FOREACH (ReelWriter& i, _reels) {
542                 cpl->add (i.create_reel (_reel_assets, _fonts));
543         }
544
545         dcp::XMLMetadata meta;
546         meta.annotation_text = cpl->annotation_text ();
547         meta.creator = Config::instance()->dcp_creator ();
548         if (meta.creator.empty ()) {
549                 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
550         }
551         meta.issuer = Config::instance()->dcp_issuer ();
552         if (meta.issuer.empty ()) {
553                 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
554         }
555         meta.set_issue_date_now ();
556
557         cpl->set_metadata (meta);
558
559         shared_ptr<const dcp::CertificateChain> signer;
560         if (_film->is_signed ()) {
561                 signer = Config::instance()->signer_chain ();
562                 /* We did check earlier, but check again here to be on the safe side */
563                 string reason;
564                 if (!signer->valid (&reason)) {
565                         throw InvalidSignerError (reason);
566                 }
567         }
568
569         dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer, Config::instance()->dcp_metadata_filename_format());
570
571         LOG_GENERAL (
572                 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
573                 );
574
575         write_cover_sheet ();
576 }
577
578 void
579 Writer::write_cover_sheet ()
580 {
581         boost::filesystem::path const cover = _film->file ("COVER_SHEET.txt");
582         FILE* f = fopen_boost (cover, "w");
583         if (!f) {
584                 throw OpenFileError (cover, errno, false);
585         }
586
587         string text = Config::instance()->cover_sheet ();
588         boost::algorithm::replace_all (text, "$CPL_NAME", _film->name());
589         boost::algorithm::replace_all (text, "$TYPE", _film->dcp_content_type()->pretty_name());
590         boost::algorithm::replace_all (text, "$CONTAINER", _film->container()->container_nickname());
591         boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", _film->isdcf_metadata().audio_language);
592         boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", _film->isdcf_metadata().subtitle_language);
593
594         boost::uintmax_t size = 0;
595         for (
596                 boost::filesystem::recursive_directory_iterator i = boost::filesystem::recursive_directory_iterator(_film->dir(_film->dcp_name()));
597                 i != boost::filesystem::recursive_directory_iterator();
598                 ++i) {
599                 if (boost::filesystem::is_regular_file (i->path ())) {
600                         size += boost::filesystem::file_size (i->path ());
601                 }
602         }
603
604         if (size > (1000000000L)) {
605                 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1GB", dcp::locale_convert<string> (size / 1000000000.0, 1, true)));
606         } else {
607                 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1MB", dcp::locale_convert<string> (size / 1000000.0, 1, true)));
608         }
609
610         pair<int, int> ch = audio_channel_types (_film->mapped_audio_channels(), _film->audio_channels());
611         string description = String::compose("%1.%2", ch.first, ch.second);
612
613         if (description == "0.0") {
614                 description = _("None");
615         } else if (description == "1.0") {
616                 description = _("Mono");
617         } else if (description == "2.0") {
618                 description = _("Stereo");
619         }
620         boost::algorithm::replace_all (text, "$AUDIO", description);
621
622         int h, m, s, fr;
623         _film->length().split (_film->video_frame_rate(), h, m, s, fr);
624         string length;
625         if (h == 0 && m == 0) {
626                 length = String::compose("%1s", s);
627         } else if (h == 0 && m > 0) {
628                 length = String::compose("%1m%2s", m, s);
629         } else if (h > 0 && m > 0) {
630                 length = String::compose("%1h%2m%3s", h, m, s);
631         }
632
633         boost::algorithm::replace_all (text, "$LENGTH", length);
634
635         fwrite (text.c_str(), 1, text.length(), f);
636         fclose (f);
637 }
638
639 /** @param frame Frame index within the whole DCP.
640  *  @return true if we can fake-write this frame.
641  */
642 bool
643 Writer::can_fake_write (Frame frame) const
644 {
645         if (_film->encrypted()) {
646                 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
647                 return false;
648         }
649
650         /* We have to do a proper write of the first frame so that we can set up the JPEG2000
651            parameters in the asset writer.
652         */
653
654         ReelWriter const & reel = _reels[video_reel(frame)];
655
656         /* Make frame relative to the start of the reel */
657         frame -= reel.start ();
658         return (frame != 0 && frame < reel.first_nonexistant_frame());
659 }
660
661 void
662 Writer::write (PlayerSubtitles subs, DCPTimePeriod period)
663 {
664         if (subs.text.empty ()) {
665                 return;
666         }
667
668         while (_subtitle_reel->period().to <= period.from) {
669                 ++_subtitle_reel;
670                 DCPOMATIC_ASSERT (_subtitle_reel != _reels.end());
671         }
672
673         DCPOMATIC_ASSERT (_subtitle_reel != _reels.end());
674
675         _subtitle_reel->write (subs);
676 }
677
678 void
679 Writer::write (list<shared_ptr<Font> > fonts)
680 {
681         /* Just keep a list of unique fonts and we'll deal with them in ::finish */
682
683         BOOST_FOREACH (shared_ptr<Font> i, fonts) {
684                 bool got = false;
685                 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
686                         if (*i == *j) {
687                                 got = true;
688                         }
689                 }
690
691                 if (!got) {
692                         _fonts.push_back (i);
693                 }
694         }
695 }
696
697 bool
698 operator< (QueueItem const & a, QueueItem const & b)
699 {
700         if (a.reel != b.reel) {
701                 return a.reel < b.reel;
702         }
703
704         if (a.frame != b.frame) {
705                 return a.frame < b.frame;
706         }
707
708         return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
709 }
710
711 bool
712 operator== (QueueItem const & a, QueueItem const & b)
713 {
714         return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
715 }
716
717 void
718 Writer::set_encoder_threads (int threads)
719 {
720         boost::mutex::scoped_lock lm (_state_mutex);
721         _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
722         _maximum_queue_size = threads * 16;
723 }
724
725 void
726 Writer::write (ReferencedReelAsset asset)
727 {
728         _reel_assets.push_back (asset);
729 }
730
731 size_t
732 Writer::video_reel (int frame) const
733 {
734         DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
735         size_t i = 0;
736         while (i < _reels.size() && !_reels[i].period().contains (t)) {
737                 ++i;
738         }
739
740         DCPOMATIC_ASSERT (i < _reels.size ());
741         return i;
742 }
743
744 void
745 Writer::set_digest_progress (Job* job, float progress)
746 {
747         /* I believe this is thread-safe */
748         _digest_progresses[boost::this_thread::get_id()] = progress;
749
750         boost::mutex::scoped_lock lm (_digest_progresses_mutex);
751         float min_progress = FLT_MAX;
752         for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
753                 min_progress = min (min_progress, i->second);
754         }
755
756         job->set_progress (min_progress);
757 }