Fix bugs in thread termination causing occasional pthread
[dcpomatic.git] / src / lib / writer.cc
1 /*
2     Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 #include "writer.h"
22 #include "compose.hpp"
23 #include "film.h"
24 #include "ratio.h"
25 #include "log.h"
26 #include "dcpomatic_log.h"
27 #include "dcp_video.h"
28 #include "dcp_content_type.h"
29 #include "audio_mapping.h"
30 #include "config.h"
31 #include "job.h"
32 #include "cross.h"
33 #include "audio_buffers.h"
34 #include "version.h"
35 #include "font.h"
36 #include "util.h"
37 #include "reel_writer.h"
38 #include "text_content.h"
39 #include <dcp/cpl.h>
40 #include <dcp/locale_convert.h>
41 #include <boost/foreach.hpp>
42 #include <fstream>
43 #include <cerrno>
44 #include <iostream>
45 #include <cfloat>
46
47 #include "i18n.h"
48
49 /* OS X strikes again */
50 #undef set_key
51
52 using std::make_pair;
53 using std::pair;
54 using std::string;
55 using std::list;
56 using std::cout;
57 using std::map;
58 using std::min;
59 using std::max;
60 using std::vector;
61 using boost::shared_ptr;
62 using boost::weak_ptr;
63 using boost::dynamic_pointer_cast;
64 using boost::optional;
65 using dcp::Data;
66 using namespace dcpomatic;
67
68 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
69         : _film (film)
70         , _job (j)
71         , _finish (false)
72         , _queued_full_in_memory (0)
73         /* These will be reset to sensible values when J2KEncoder is created */
74         , _maximum_frames_in_memory (8)
75         , _maximum_queue_size (8)
76         , _full_written (0)
77         , _fake_written (0)
78         , _repeat_written (0)
79         , _pushed_to_disk (0)
80 {
81         shared_ptr<Job> job = _job.lock ();
82         DCPOMATIC_ASSERT (job);
83
84         int reel_index = 0;
85         list<DCPTimePeriod> const reels = _film->reels ();
86         BOOST_FOREACH (DCPTimePeriod p, reels) {
87                 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
88         }
89
90         _last_written.resize (reels.size());
91
92         /* We can keep track of the current audio, subtitle and closed caption reels easily because audio
93            and captions arrive to the Writer in sequence.  This is not so for video.
94         */
95         _audio_reel = _reels.begin ();
96         _subtitle_reel = _reels.begin ();
97         BOOST_FOREACH (DCPTextTrack i, _film->closed_caption_tracks()) {
98                 _caption_reels[i] = _reels.begin ();
99         }
100         _atmos_reel = _reels.begin ();
101
102         /* Check that the signer is OK */
103         string reason;
104         if (!Config::instance()->signer_chain()->valid(&reason)) {
105                 throw InvalidSignerError (reason);
106         }
107 }
108
109 void
110 Writer::start ()
111 {
112         _thread = boost::thread (boost::bind(&Writer::thread, this));
113 #ifdef DCPOMATIC_LINUX
114         pthread_setname_np (_thread.native_handle(), "writer");
115 #endif
116 }
117
118 Writer::~Writer ()
119 {
120         terminate_thread (false);
121 }
122
123 /** Pass a video frame to the writer for writing to disk at some point.
124  *  This method can be called with frames out of order.
125  *  @param encoded JPEG2000-encoded data.
126  *  @param frame Frame index within the DCP.
127  *  @param eyes Eyes that this frame image is for.
128  */
129 void
130 Writer::write (Data encoded, Frame frame, Eyes eyes)
131 {
132         boost::mutex::scoped_lock lock (_state_mutex);
133
134         while (_queued_full_in_memory > _maximum_frames_in_memory) {
135                 /* There are too many full frames in memory; wake the main writer thread and
136                    wait until it sorts everything out */
137                 _empty_condition.notify_all ();
138                 _full_condition.wait (lock);
139         }
140
141         QueueItem qi;
142         qi.type = QueueItem::FULL;
143         qi.encoded = encoded;
144         qi.reel = video_reel (frame);
145         qi.frame = frame - _reels[qi.reel].start ();
146
147         if (_film->three_d() && eyes == EYES_BOTH) {
148                 /* 2D material in a 3D DCP; fake the 3D */
149                 qi.eyes = EYES_LEFT;
150                 _queue.push_back (qi);
151                 ++_queued_full_in_memory;
152                 qi.eyes = EYES_RIGHT;
153                 _queue.push_back (qi);
154                 ++_queued_full_in_memory;
155         } else {
156                 qi.eyes = eyes;
157                 _queue.push_back (qi);
158                 ++_queued_full_in_memory;
159         }
160
161         /* Now there's something to do: wake anything wait()ing on _empty_condition */
162         _empty_condition.notify_all ();
163 }
164
165 bool
166 Writer::can_repeat (Frame frame) const
167 {
168         return frame > _reels[video_reel(frame)].start();
169 }
170
171 /** Repeat the last frame that was written to a reel as a new frame.
172  *  @param frame Frame index within the DCP of the new (repeated) frame.
173  *  @param eyes Eyes that this repeated frame image is for.
174  */
175 void
176 Writer::repeat (Frame frame, Eyes eyes)
177 {
178         boost::mutex::scoped_lock lock (_state_mutex);
179
180         while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
181                 /* The queue is too big, and the main writer thread can run and fix it, so
182                    wake it and wait until it has done.
183                 */
184                 _empty_condition.notify_all ();
185                 _full_condition.wait (lock);
186         }
187
188         QueueItem qi;
189         qi.type = QueueItem::REPEAT;
190         qi.reel = video_reel (frame);
191         qi.frame = frame - _reels[qi.reel].start ();
192         if (_film->three_d() && eyes == EYES_BOTH) {
193                 qi.eyes = EYES_LEFT;
194                 _queue.push_back (qi);
195                 qi.eyes = EYES_RIGHT;
196                 _queue.push_back (qi);
197         } else {
198                 qi.eyes = eyes;
199                 _queue.push_back (qi);
200         }
201
202         /* Now there's something to do: wake anything wait()ing on _empty_condition */
203         _empty_condition.notify_all ();
204 }
205
206 void
207 Writer::fake_write (Frame frame, Eyes eyes)
208 {
209         boost::mutex::scoped_lock lock (_state_mutex);
210
211         while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
212                 /* The queue is too big, and the main writer thread can run and fix it, so
213                    wake it and wait until it has done.
214                 */
215                 _empty_condition.notify_all ();
216                 _full_condition.wait (lock);
217         }
218
219         size_t const reel = video_reel (frame);
220         Frame const frame_in_reel = frame - _reels[reel].start ();
221
222         QueueItem qi;
223         qi.type = QueueItem::FAKE;
224
225         {
226                 shared_ptr<InfoFileHandle> info_file = _film->info_file_handle(_reels[reel].period(), true);
227                 qi.size = _reels[reel].read_frame_info(info_file, frame_in_reel, eyes).size;
228         }
229
230         qi.reel = reel;
231         qi.frame = frame_in_reel;
232         if (_film->three_d() && eyes == EYES_BOTH) {
233                 qi.eyes = EYES_LEFT;
234                 _queue.push_back (qi);
235                 qi.eyes = EYES_RIGHT;
236                 _queue.push_back (qi);
237         } else {
238                 qi.eyes = eyes;
239                 _queue.push_back (qi);
240         }
241
242         /* Now there's something to do: wake anything wait()ing on _empty_condition */
243         _empty_condition.notify_all ();
244 }
245
246 /** Write some audio frames to the DCP.
247  *  @param audio Audio data.
248  *  @param time Time of this data within the DCP.
249  *  This method is not thread safe.
250  */
251 void
252 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
253 {
254         DCPOMATIC_ASSERT (audio);
255
256         int const afr = _film->audio_frame_rate();
257
258         DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
259
260         /* The audio we get might span a reel boundary, and if so we have to write it in bits */
261
262         DCPTime t = time;
263         while (t < end) {
264
265                 if (_audio_reel == _reels.end ()) {
266                         /* This audio is off the end of the last reel; ignore it */
267                         return;
268                 }
269
270                 if (end <= _audio_reel->period().to) {
271                         /* Easy case: we can write all the audio to this reel */
272                         _audio_reel->write (audio);
273                         t = end;
274                 } else if (_audio_reel->period().to <= t) {
275                         /* This reel is entirely before the start of our audio; just skip the reel */
276                         ++_audio_reel;
277                 } else {
278                         /* This audio is over a reel boundary; split the audio into two and write the first part */
279                         DCPTime part_lengths[2] = {
280                                 _audio_reel->period().to - t,
281                                 end - _audio_reel->period().to
282                         };
283
284                         Frame part_frames[2] = {
285                                 part_lengths[0].frames_ceil(afr),
286                                 part_lengths[1].frames_ceil(afr)
287                         };
288
289                         if (part_frames[0]) {
290                                 shared_ptr<AudioBuffers> part (new AudioBuffers(audio, part_frames[0], 0));
291                                 _audio_reel->write (part);
292                         }
293
294                         if (part_frames[1]) {
295                                 audio.reset (new AudioBuffers(audio, part_frames[1], part_frames[0]));
296                         } else {
297                                 audio.reset ();
298                         }
299
300                         ++_audio_reel;
301                         t += part_lengths[0];
302                 }
303         }
304 }
305
306
307 void
308 Writer::write (shared_ptr<const dcp::AtmosFrame> atmos, DCPTime time, AtmosMetadata metadata)
309 {
310         if (_atmos_reel->period().to == time) {
311                 ++_atmos_reel;
312                 DCPOMATIC_ASSERT (_atmos_reel != _reels.end());
313         }
314
315         /* We assume that we get a video frame's worth of data here */
316         _atmos_reel->write (atmos, metadata);
317 }
318
319
320 /** Caller must hold a lock on _state_mutex */
321 bool
322 Writer::have_sequenced_image_at_queue_head ()
323 {
324         if (_queue.empty ()) {
325                 return false;
326         }
327
328         _queue.sort ();
329         QueueItem const & f = _queue.front();
330         return _last_written[f.reel].next(f);
331 }
332
333
334 bool
335 Writer::LastWritten::next (QueueItem qi) const
336 {
337         if (qi.eyes == EYES_BOTH) {
338                 /* 2D */
339                 return qi.frame == (_frame + 1);
340         }
341
342         /* 3D */
343
344         if (_eyes == EYES_LEFT && qi.frame == _frame && qi.eyes == EYES_RIGHT) {
345                 return true;
346         }
347
348         if (_eyes == EYES_RIGHT && qi.frame == (_frame + 1) && qi.eyes == EYES_LEFT) {
349                 return true;
350         }
351
352         return false;
353 }
354
355
356 void
357 Writer::LastWritten::update (QueueItem qi)
358 {
359         _frame = qi.frame;
360         _eyes = qi.eyes;
361 }
362
363
364 void
365 Writer::thread ()
366 try
367 {
368         while (true)
369         {
370                 boost::mutex::scoped_lock lock (_state_mutex);
371
372                 while (true) {
373
374                         if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
375                                 /* We've got something to do: go and do it */
376                                 break;
377                         }
378
379                         /* Nothing to do: wait until something happens which may indicate that we do */
380                         LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
381                         _empty_condition.wait (lock);
382                         LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
383                 }
384
385                 if (_finish && _queue.empty()) {
386                         return;
387                 }
388
389                 /* We stop here if we have been asked to finish, and if either the queue
390                    is empty or we do not have a sequenced image at its head (if this is the
391                    case we will never terminate as no new frames will be sent once
392                    _finish is true).
393                 */
394                 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
395                         /* (Hopefully temporarily) log anything that was not written */
396                         if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
397                                 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
398                                 BOOST_FOREACH (QueueItem const& i, _queue) {
399                                         if (i.type == QueueItem::FULL) {
400                                                 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i.frame, (int) i.eyes);
401                                         } else {
402                                                 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i.size, i.frame, (int) i.eyes);
403                                         }
404                                 }
405                         }
406                         return;
407                 }
408
409                 /* Write any frames that we can write; i.e. those that are in sequence. */
410                 while (have_sequenced_image_at_queue_head ()) {
411                         QueueItem qi = _queue.front ();
412                         _last_written[qi.reel].update (qi);
413                         _queue.pop_front ();
414                         if (qi.type == QueueItem::FULL && qi.encoded) {
415                                 --_queued_full_in_memory;
416                         }
417
418                         lock.unlock ();
419
420                         ReelWriter& reel = _reels[qi.reel];
421
422                         switch (qi.type) {
423                         case QueueItem::FULL:
424                                 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
425                                 if (!qi.encoded) {
426                                         qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
427                                 }
428                                 reel.write (qi.encoded, qi.frame, qi.eyes);
429                                 ++_full_written;
430                                 break;
431                         case QueueItem::FAKE:
432                                 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
433                                 reel.fake_write (qi.size);
434                                 ++_fake_written;
435                                 break;
436                         case QueueItem::REPEAT:
437                                 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
438                                 reel.repeat_write (qi.frame, qi.eyes);
439                                 ++_repeat_written;
440                                 break;
441                         }
442
443                         lock.lock ();
444                         _full_condition.notify_all ();
445                 }
446
447                 while (_queued_full_in_memory > _maximum_frames_in_memory) {
448                         /* Too many frames in memory which can't yet be written to the stream.
449                            Write some FULL frames to disk.
450                         */
451
452                         /* Find one from the back of the queue */
453                         _queue.sort ();
454                         list<QueueItem>::reverse_iterator i = _queue.rbegin ();
455                         while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
456                                 ++i;
457                         }
458
459                         DCPOMATIC_ASSERT (i != _queue.rend());
460                         ++_pushed_to_disk;
461                         /* For the log message below */
462                         int const awaiting = _last_written[_queue.front().reel].frame() + 1;
463                         lock.unlock ();
464
465                         /* i is valid here, even though we don't hold a lock on the mutex,
466                            since list iterators are unaffected by insertion and only this
467                            thread could erase the last item in the list.
468                         */
469
470                         LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
471
472                         i->encoded->write_via_temp (
473                                 _film->j2c_path (i->reel, i->frame, i->eyes, true),
474                                 _film->j2c_path (i->reel, i->frame, i->eyes, false)
475                                 );
476
477                         lock.lock ();
478                         i->encoded.reset ();
479                         --_queued_full_in_memory;
480                         _full_condition.notify_all ();
481                 }
482         }
483 }
484 catch (...)
485 {
486         store_current ();
487 }
488
489 void
490 Writer::terminate_thread (bool can_throw)
491 {
492         boost::this_thread::disable_interruption dis;
493
494         boost::mutex::scoped_lock lock (_state_mutex);
495
496         _finish = true;
497         _empty_condition.notify_all ();
498         _full_condition.notify_all ();
499         lock.unlock ();
500
501         try {
502                 _thread.join ();
503         } catch (...) {}
504
505         if (can_throw) {
506                 rethrow ();
507         }
508 }
509
510 void
511 Writer::finish ()
512 {
513         if (!_thread.joinable()) {
514                 return;
515         }
516
517         LOG_GENERAL_NC ("Terminating writer thread");
518
519         terminate_thread (true);
520
521         LOG_GENERAL_NC ("Finishing ReelWriters");
522
523         BOOST_FOREACH (ReelWriter& i, _reels) {
524                 i.finish ();
525         }
526
527         LOG_GENERAL_NC ("Writing XML");
528
529         dcp::DCP dcp (_film->dir (_film->dcp_name()));
530
531         shared_ptr<dcp::CPL> cpl (
532                 new dcp::CPL (
533                         _film->dcp_name(),
534                         _film->dcp_content_type()->libdcp_kind ()
535                         )
536                 );
537
538         dcp.add (cpl);
539
540         /* Calculate digests for each reel in parallel */
541
542         shared_ptr<Job> job = _job.lock ();
543         job->sub (_("Computing digests"));
544
545         boost::asio::io_service service;
546         boost::thread_group pool;
547
548         shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
549
550         int const threads = max (1, Config::instance()->master_encoding_threads ());
551
552         for (int i = 0; i < threads; ++i) {
553                 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
554         }
555
556         BOOST_FOREACH (ReelWriter& i, _reels) {
557                 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
558                 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
559         }
560
561         work.reset ();
562         pool.join_all ();
563         service.stop ();
564
565         /* Add reels to CPL */
566
567         BOOST_FOREACH (ReelWriter& i, _reels) {
568                 cpl->add (i.create_reel (_reel_assets, _fonts));
569         }
570
571         dcp::XMLMetadata meta;
572         meta.annotation_text = cpl->annotation_text ();
573         meta.creator = Config::instance()->dcp_creator ();
574         if (meta.creator.empty ()) {
575                 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
576         }
577         meta.issuer = Config::instance()->dcp_issuer ();
578         if (meta.issuer.empty ()) {
579                 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
580         }
581         meta.set_issue_date_now ();
582
583         cpl->set_metadata (meta);
584         cpl->set_ratings (vector_to_list(_film->ratings()));
585         cpl->set_content_version_label_text (_film->content_version());
586
587         shared_ptr<const dcp::CertificateChain> signer;
588         signer = Config::instance()->signer_chain ();
589         /* We did check earlier, but check again here to be on the safe side */
590         string reason;
591         if (!signer->valid (&reason)) {
592                 throw InvalidSignerError (reason);
593         }
594
595         dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer, Config::instance()->dcp_metadata_filename_format());
596
597         LOG_GENERAL (
598                 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
599                 );
600
601         write_cover_sheet ();
602 }
603
604 void
605 Writer::write_cover_sheet ()
606 {
607         boost::filesystem::path const cover = _film->file ("COVER_SHEET.txt");
608         FILE* f = fopen_boost (cover, "w");
609         if (!f) {
610                 throw OpenFileError (cover, errno, OpenFileError::WRITE);
611         }
612
613         string text = Config::instance()->cover_sheet ();
614         boost::algorithm::replace_all (text, "$CPL_NAME", _film->name());
615         boost::algorithm::replace_all (text, "$TYPE", _film->dcp_content_type()->pretty_name());
616         boost::algorithm::replace_all (text, "$CONTAINER", _film->container()->container_nickname());
617         boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", _film->isdcf_metadata().audio_language);
618
619         optional<string> subtitle_language;
620         BOOST_FOREACH (shared_ptr<Content> i, _film->content()) {
621                 BOOST_FOREACH (shared_ptr<TextContent> j, i->text) {
622                         if (j->type() == TEXT_OPEN_SUBTITLE && j->use()) {
623                                 subtitle_language = j->language ();
624                         }
625                 }
626         }
627         boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", subtitle_language.get_value_or("None"));
628
629         boost::uintmax_t size = 0;
630         for (
631                 boost::filesystem::recursive_directory_iterator i = boost::filesystem::recursive_directory_iterator(_film->dir(_film->dcp_name()));
632                 i != boost::filesystem::recursive_directory_iterator();
633                 ++i) {
634                 if (boost::filesystem::is_regular_file (i->path ())) {
635                         size += boost::filesystem::file_size (i->path ());
636                 }
637         }
638
639         if (size > (1000000000L)) {
640                 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1GB", dcp::locale_convert<string> (size / 1000000000.0, 1, true)));
641         } else {
642                 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1MB", dcp::locale_convert<string> (size / 1000000.0, 1, true)));
643         }
644
645         pair<int, int> ch = audio_channel_types (_film->mapped_audio_channels(), _film->audio_channels());
646         string description = String::compose("%1.%2", ch.first, ch.second);
647
648         if (description == "0.0") {
649                 description = _("None");
650         } else if (description == "1.0") {
651                 description = _("Mono");
652         } else if (description == "2.0") {
653                 description = _("Stereo");
654         }
655         boost::algorithm::replace_all (text, "$AUDIO", description);
656
657         int h, m, s, fr;
658         _film->length().split (_film->video_frame_rate(), h, m, s, fr);
659         string length;
660         if (h == 0 && m == 0) {
661                 length = String::compose("%1s", s);
662         } else if (h == 0 && m > 0) {
663                 length = String::compose("%1m%2s", m, s);
664         } else if (h > 0 && m > 0) {
665                 length = String::compose("%1h%2m%3s", h, m, s);
666         }
667
668         boost::algorithm::replace_all (text, "$LENGTH", length);
669
670         checked_fwrite (text.c_str(), text.length(), f, cover);
671         fclose (f);
672 }
673
674 /** @param frame Frame index within the whole DCP.
675  *  @return true if we can fake-write this frame.
676  */
677 bool
678 Writer::can_fake_write (Frame frame) const
679 {
680         if (_film->encrypted()) {
681                 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
682                 return false;
683         }
684
685         /* We have to do a proper write of the first frame so that we can set up the JPEG2000
686            parameters in the asset writer.
687         */
688
689         ReelWriter const & reel = _reels[video_reel(frame)];
690
691         /* Make frame relative to the start of the reel */
692         frame -= reel.start ();
693         return (frame != 0 && frame < reel.first_nonexistant_frame());
694 }
695
696 /** @param track Closed caption track if type == TEXT_CLOSED_CAPTION */
697 void
698 Writer::write (PlayerText text, TextType type, optional<DCPTextTrack> track, DCPTimePeriod period)
699 {
700         vector<ReelWriter>::iterator* reel = 0;
701
702         switch (type) {
703         case TEXT_OPEN_SUBTITLE:
704                 reel = &_subtitle_reel;
705                 break;
706         case TEXT_CLOSED_CAPTION:
707                 DCPOMATIC_ASSERT (track);
708                 DCPOMATIC_ASSERT (_caption_reels.find(*track) != _caption_reels.end());
709                 reel = &_caption_reels[*track];
710                 break;
711         default:
712                 DCPOMATIC_ASSERT (false);
713         }
714
715         DCPOMATIC_ASSERT (*reel != _reels.end());
716         while ((*reel)->period().to <= period.from) {
717                 ++(*reel);
718                 DCPOMATIC_ASSERT (*reel != _reels.end());
719         }
720
721         (*reel)->write (text, type, track, period);
722 }
723
724 void
725 Writer::write (list<shared_ptr<Font> > fonts)
726 {
727         /* Just keep a list of unique fonts and we'll deal with them in ::finish */
728
729         BOOST_FOREACH (shared_ptr<Font> i, fonts) {
730                 bool got = false;
731                 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
732                         if (*i == *j) {
733                                 got = true;
734                         }
735                 }
736
737                 if (!got) {
738                         _fonts.push_back (i);
739                 }
740         }
741 }
742
743 bool
744 operator< (QueueItem const & a, QueueItem const & b)
745 {
746         if (a.reel != b.reel) {
747                 return a.reel < b.reel;
748         }
749
750         if (a.frame != b.frame) {
751                 return a.frame < b.frame;
752         }
753
754         return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
755 }
756
757 bool
758 operator== (QueueItem const & a, QueueItem const & b)
759 {
760         return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
761 }
762
763 void
764 Writer::set_encoder_threads (int threads)
765 {
766         boost::mutex::scoped_lock lm (_state_mutex);
767         _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
768         _maximum_queue_size = threads * 16;
769 }
770
771 void
772 Writer::write (ReferencedReelAsset asset)
773 {
774         _reel_assets.push_back (asset);
775 }
776
777 size_t
778 Writer::video_reel (int frame) const
779 {
780         DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
781         size_t i = 0;
782         while (i < _reels.size() && !_reels[i].period().contains (t)) {
783                 ++i;
784         }
785
786         DCPOMATIC_ASSERT (i < _reels.size ());
787         return i;
788 }
789
790 void
791 Writer::set_digest_progress (Job* job, float progress)
792 {
793         boost::mutex::scoped_lock lm (_digest_progresses_mutex);
794
795         _digest_progresses[boost::this_thread::get_id()] = progress;
796         float min_progress = FLT_MAX;
797         for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
798                 min_progress = min (min_progress, i->second);
799         }
800
801         job->set_progress (min_progress);
802
803         Waker waker;
804         waker.nudge ();
805 }