2 Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
22 #include "compose.hpp"
26 #include "dcpomatic_log.h"
27 #include "dcp_video.h"
28 #include "dcp_content_type.h"
29 #include "audio_mapping.h"
33 #include "audio_buffers.h"
37 #include "reel_writer.h"
38 #include "text_content.h"
40 #include <dcp/locale_convert.h>
41 #include <boost/foreach.hpp>
49 /* OS X strikes again */
61 using boost::shared_ptr;
62 using boost::weak_ptr;
63 using boost::dynamic_pointer_cast;
64 using boost::optional;
67 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
72 , _queued_full_in_memory (0)
73 /* These will be reset to sensible values when J2KEncoder is created */
74 , _maximum_frames_in_memory (8)
75 , _maximum_queue_size (8)
81 shared_ptr<Job> job = _job.lock ();
82 DCPOMATIC_ASSERT (job);
85 list<DCPTimePeriod> const reels = _film->reels ();
86 BOOST_FOREACH (DCPTimePeriod p, reels) {
87 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
90 /* We can keep track of the current audio, subtitle and closed caption reels easily because audio
91 and captions arrive to the Writer in sequence. This is not so for video.
93 _audio_reel = _reels.begin ();
94 _subtitle_reel = _reels.begin ();
95 BOOST_FOREACH (DCPTextTrack i, _film->closed_caption_tracks()) {
96 _caption_reels[i] = _reels.begin ();
99 /* Check that the signer is OK if we need one */
101 if (_film->is_signed() && !Config::instance()->signer_chain()->valid(&reason)) {
102 throw InvalidSignerError (reason);
109 _thread = new boost::thread (boost::bind (&Writer::thread, this));
110 #ifdef DCPOMATIC_LINUX
111 pthread_setname_np (_thread->native_handle(), "writer");
117 terminate_thread (false);
120 /** Pass a video frame to the writer for writing to disk at some point.
121 * This method can be called with frames out of order.
122 * @param encoded JPEG2000-encoded data.
123 * @param frame Frame index within the DCP.
124 * @param eyes Eyes that this frame image is for.
127 Writer::write (Data encoded, Frame frame, Eyes eyes)
129 boost::mutex::scoped_lock lock (_state_mutex);
131 while (_queued_full_in_memory > _maximum_frames_in_memory) {
132 /* There are too many full frames in memory; wake the main writer thread and
133 wait until it sorts everything out */
134 _empty_condition.notify_all ();
135 _full_condition.wait (lock);
139 qi.type = QueueItem::FULL;
140 qi.encoded = encoded;
141 qi.reel = video_reel (frame);
142 qi.frame = frame - _reels[qi.reel].start ();
144 if (_film->three_d() && eyes == EYES_BOTH) {
145 /* 2D material in a 3D DCP; fake the 3D */
147 _queue.push_back (qi);
148 ++_queued_full_in_memory;
149 qi.eyes = EYES_RIGHT;
150 _queue.push_back (qi);
151 ++_queued_full_in_memory;
154 _queue.push_back (qi);
155 ++_queued_full_in_memory;
158 /* Now there's something to do: wake anything wait()ing on _empty_condition */
159 _empty_condition.notify_all ();
163 Writer::can_repeat (Frame frame) const
165 return frame > _reels[video_reel(frame)].start();
168 /** Repeat the last frame that was written to a reel as a new frame.
169 * @param frame Frame index within the DCP of the new (repeated) frame.
170 * @param eyes Eyes that this repeated frame image is for.
173 Writer::repeat (Frame frame, Eyes eyes)
175 boost::mutex::scoped_lock lock (_state_mutex);
177 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
178 /* The queue is too big, and the main writer thread can run and fix it, so
179 wake it and wait until it has done.
181 _empty_condition.notify_all ();
182 _full_condition.wait (lock);
186 qi.type = QueueItem::REPEAT;
187 qi.reel = video_reel (frame);
188 qi.frame = frame - _reels[qi.reel].start ();
189 if (_film->three_d() && eyes == EYES_BOTH) {
191 _queue.push_back (qi);
192 qi.eyes = EYES_RIGHT;
193 _queue.push_back (qi);
196 _queue.push_back (qi);
199 /* Now there's something to do: wake anything wait()ing on _empty_condition */
200 _empty_condition.notify_all ();
204 Writer::fake_write (Frame frame, Eyes eyes)
206 boost::mutex::scoped_lock lock (_state_mutex);
208 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
209 /* The queue is too big, and the main writer thread can run and fix it, so
210 wake it and wait until it has done.
212 _empty_condition.notify_all ();
213 _full_condition.wait (lock);
216 size_t const reel = video_reel (frame);
217 Frame const reel_frame = frame - _reels[reel].start ();
220 qi.type = QueueItem::FAKE;
223 shared_ptr<InfoFileHandle> info_file = _film->info_file_handle(_reels[reel].period(), true);
224 qi.size = _reels[reel].read_frame_info(info_file, reel_frame, eyes).size;
228 qi.frame = reel_frame;
229 if (_film->three_d() && eyes == EYES_BOTH) {
231 _queue.push_back (qi);
232 qi.eyes = EYES_RIGHT;
233 _queue.push_back (qi);
236 _queue.push_back (qi);
239 /* Now there's something to do: wake anything wait()ing on _empty_condition */
240 _empty_condition.notify_all ();
243 /** Write some audio frames to the DCP.
244 * @param audio Audio data.
245 * @param time Time of this data within the DCP.
246 * This method is not thread safe.
249 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
251 DCPOMATIC_ASSERT (audio);
253 int const afr = _film->audio_frame_rate();
255 DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
257 /* The audio we get might span a reel boundary, and if so we have to write it in bits */
262 if (_audio_reel == _reels.end ()) {
263 /* This audio is off the end of the last reel; ignore it */
267 if (end <= _audio_reel->period().to) {
268 /* Easy case: we can write all the audio to this reel */
269 _audio_reel->write (audio);
272 /* Split the audio into two and write the first part */
273 DCPTime part_lengths[2] = {
274 _audio_reel->period().to - t,
275 end - _audio_reel->period().to
278 Frame part_frames[2] = {
279 part_lengths[0].frames_ceil(afr),
280 part_lengths[1].frames_ceil(afr)
283 if (part_frames[0]) {
284 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[0]));
285 part->copy_from (audio.get(), part_frames[0], 0, 0);
286 _audio_reel->write (part);
289 if (part_frames[1]) {
290 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[1]));
291 part->copy_from (audio.get(), part_frames[1], part_frames[0], 0);
298 t += part_lengths[0];
303 /** This must be called from Writer::thread() with an appropriate lock held */
305 Writer::have_sequenced_image_at_queue_head ()
307 if (_queue.empty ()) {
313 QueueItem const & f = _queue.front();
314 ReelWriter const & reel = _reels[f.reel];
316 /* The queue should contain only EYES_LEFT/EYES_RIGHT pairs or EYES_BOTH */
318 if (f.eyes == EYES_BOTH) {
320 return f.frame == (reel.last_written_video_frame() + 1);
325 if (reel.last_written_eyes() == EYES_LEFT && f.frame == reel.last_written_video_frame() && f.eyes == EYES_RIGHT) {
329 if (reel.last_written_eyes() == EYES_RIGHT && f.frame == (reel.last_written_video_frame() + 1) && f.eyes == EYES_LEFT) {
342 boost::mutex::scoped_lock lock (_state_mutex);
346 if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
347 /* We've got something to do: go and do it */
351 /* Nothing to do: wait until something happens which may indicate that we do */
352 LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
353 _empty_condition.wait (lock);
354 LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
357 if (_finish && _queue.empty()) {
361 /* We stop here if we have been asked to finish, and if either the queue
362 is empty or we do not have a sequenced image at its head (if this is the
363 case we will never terminate as no new frames will be sent once
366 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
367 /* (Hopefully temporarily) log anything that was not written */
368 if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
369 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
370 for (list<QueueItem>::const_iterator i = _queue.begin(); i != _queue.end(); ++i) {
371 if (i->type == QueueItem::FULL) {
372 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i->frame, (int) i->eyes);
374 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i->size, i->frame, (int) i->eyes);
381 /* Write any frames that we can write; i.e. those that are in sequence. */
382 while (have_sequenced_image_at_queue_head ()) {
383 QueueItem qi = _queue.front ();
385 if (qi.type == QueueItem::FULL && qi.encoded) {
386 --_queued_full_in_memory;
391 ReelWriter& reel = _reels[qi.reel];
394 case QueueItem::FULL:
395 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
397 qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
399 reel.write (qi.encoded, qi.frame, qi.eyes);
402 case QueueItem::FAKE:
403 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
404 reel.fake_write (qi.frame, qi.eyes, qi.size);
407 case QueueItem::REPEAT:
408 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
409 reel.repeat_write (qi.frame, qi.eyes);
415 _full_condition.notify_all ();
418 while (_queued_full_in_memory > _maximum_frames_in_memory) {
419 /* Too many frames in memory which can't yet be written to the stream.
420 Write some FULL frames to disk.
423 /* Find one from the back of the queue */
425 list<QueueItem>::reverse_iterator i = _queue.rbegin ();
426 while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
430 DCPOMATIC_ASSERT (i != _queue.rend());
432 /* For the log message below */
433 int const awaiting = _reels[_queue.front().reel].last_written_video_frame() + 1;
436 /* i is valid here, even though we don't hold a lock on the mutex,
437 since list iterators are unaffected by insertion and only this
438 thread could erase the last item in the list.
441 LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
443 i->encoded->write_via_temp (
444 _film->j2c_path (i->reel, i->frame, i->eyes, true),
445 _film->j2c_path (i->reel, i->frame, i->eyes, false)
450 --_queued_full_in_memory;
451 _full_condition.notify_all ();
461 Writer::terminate_thread (bool can_throw)
463 boost::mutex::scoped_lock lock (_state_mutex);
469 _empty_condition.notify_all ();
470 _full_condition.notify_all ();
473 if (_thread->joinable ()) {
492 LOG_GENERAL_NC ("Terminating writer thread");
494 terminate_thread (true);
496 LOG_GENERAL_NC ("Finishing ReelWriters");
498 BOOST_FOREACH (ReelWriter& i, _reels) {
502 LOG_GENERAL_NC ("Writing XML");
504 dcp::DCP dcp (_film->dir (_film->dcp_name()));
506 shared_ptr<dcp::CPL> cpl (
509 _film->dcp_content_type()->libdcp_kind ()
515 /* Calculate digests for each reel in parallel */
517 shared_ptr<Job> job = _job.lock ();
518 job->sub (_("Computing digests"));
520 boost::asio::io_service service;
521 boost::thread_group pool;
523 shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
525 int const threads = max (1, Config::instance()->master_encoding_threads ());
527 for (int i = 0; i < threads; ++i) {
528 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
531 BOOST_FOREACH (ReelWriter& i, _reels) {
532 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
533 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
540 /* Add reels to CPL */
542 BOOST_FOREACH (ReelWriter& i, _reels) {
543 cpl->add (i.create_reel (_reel_assets, _fonts));
546 dcp::XMLMetadata meta;
547 meta.annotation_text = cpl->annotation_text ();
548 meta.creator = Config::instance()->dcp_creator ();
549 if (meta.creator.empty ()) {
550 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
552 meta.issuer = Config::instance()->dcp_issuer ();
553 if (meta.issuer.empty ()) {
554 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
556 meta.set_issue_date_now ();
558 cpl->set_metadata (meta);
560 shared_ptr<const dcp::CertificateChain> signer;
561 if (_film->is_signed ()) {
562 signer = Config::instance()->signer_chain ();
563 /* We did check earlier, but check again here to be on the safe side */
565 if (!signer->valid (&reason)) {
566 throw InvalidSignerError (reason);
570 dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer, Config::instance()->dcp_metadata_filename_format());
573 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
576 write_cover_sheet ();
580 Writer::write_cover_sheet ()
582 boost::filesystem::path const cover = _film->file ("COVER_SHEET.txt");
583 FILE* f = fopen_boost (cover, "w");
585 throw OpenFileError (cover, errno, OpenFileError::WRITE);
588 string text = Config::instance()->cover_sheet ();
589 boost::algorithm::replace_all (text, "$CPL_NAME", _film->name());
590 boost::algorithm::replace_all (text, "$TYPE", _film->dcp_content_type()->pretty_name());
591 boost::algorithm::replace_all (text, "$CONTAINER", _film->container()->container_nickname());
592 boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", _film->isdcf_metadata().audio_language);
594 optional<string> subtitle_language;
595 BOOST_FOREACH (shared_ptr<Content> i, _film->content()) {
596 BOOST_FOREACH (shared_ptr<TextContent> j, i->text) {
597 if (j->type() == TEXT_OPEN_SUBTITLE && j->use()) {
598 subtitle_language = j->language ();
602 boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", subtitle_language.get_value_or("None"));
604 boost::uintmax_t size = 0;
606 boost::filesystem::recursive_directory_iterator i = boost::filesystem::recursive_directory_iterator(_film->dir(_film->dcp_name()));
607 i != boost::filesystem::recursive_directory_iterator();
609 if (boost::filesystem::is_regular_file (i->path ())) {
610 size += boost::filesystem::file_size (i->path ());
614 if (size > (1000000000L)) {
615 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1GB", dcp::locale_convert<string> (size / 1000000000.0, 1, true)));
617 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1MB", dcp::locale_convert<string> (size / 1000000.0, 1, true)));
620 pair<int, int> ch = audio_channel_types (_film->mapped_audio_channels(), _film->audio_channels());
621 string description = String::compose("%1.%2", ch.first, ch.second);
623 if (description == "0.0") {
624 description = _("None");
625 } else if (description == "1.0") {
626 description = _("Mono");
627 } else if (description == "2.0") {
628 description = _("Stereo");
630 boost::algorithm::replace_all (text, "$AUDIO", description);
633 _film->length().split (_film->video_frame_rate(), h, m, s, fr);
635 if (h == 0 && m == 0) {
636 length = String::compose("%1s", s);
637 } else if (h == 0 && m > 0) {
638 length = String::compose("%1m%2s", m, s);
639 } else if (h > 0 && m > 0) {
640 length = String::compose("%1h%2m%3s", h, m, s);
643 boost::algorithm::replace_all (text, "$LENGTH", length);
645 checked_fwrite (text.c_str(), text.length(), f, cover);
649 /** @param frame Frame index within the whole DCP.
650 * @return true if we can fake-write this frame.
653 Writer::can_fake_write (Frame frame) const
655 if (_film->encrypted()) {
656 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
660 /* We have to do a proper write of the first frame so that we can set up the JPEG2000
661 parameters in the asset writer.
664 ReelWriter const & reel = _reels[video_reel(frame)];
666 /* Make frame relative to the start of the reel */
667 frame -= reel.start ();
668 return (frame != 0 && frame < reel.first_nonexistant_frame());
671 /** @param track Closed caption track if type == TEXT_CLOSED_CAPTION */
673 Writer::write (PlayerText text, TextType type, optional<DCPTextTrack> track, DCPTimePeriod period)
675 vector<ReelWriter>::iterator* reel = 0;
678 case TEXT_OPEN_SUBTITLE:
679 reel = &_subtitle_reel;
681 case TEXT_CLOSED_CAPTION:
682 DCPOMATIC_ASSERT (track);
683 DCPOMATIC_ASSERT (_caption_reels.find(*track) != _caption_reels.end());
684 reel = &_caption_reels[*track];
687 DCPOMATIC_ASSERT (false);
690 DCPOMATIC_ASSERT (*reel != _reels.end());
691 while ((*reel)->period().to <= period.from) {
693 DCPOMATIC_ASSERT (*reel != _reels.end());
696 (*reel)->write (text, type, track, period);
700 Writer::write (list<shared_ptr<Font> > fonts)
702 /* Just keep a list of unique fonts and we'll deal with them in ::finish */
704 BOOST_FOREACH (shared_ptr<Font> i, fonts) {
706 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
713 _fonts.push_back (i);
719 operator< (QueueItem const & a, QueueItem const & b)
721 if (a.reel != b.reel) {
722 return a.reel < b.reel;
725 if (a.frame != b.frame) {
726 return a.frame < b.frame;
729 return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
733 operator== (QueueItem const & a, QueueItem const & b)
735 return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
739 Writer::set_encoder_threads (int threads)
741 boost::mutex::scoped_lock lm (_state_mutex);
742 _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
743 _maximum_queue_size = threads * 16;
747 Writer::write (ReferencedReelAsset asset)
749 _reel_assets.push_back (asset);
753 Writer::video_reel (int frame) const
755 DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
757 while (i < _reels.size() && !_reels[i].period().contains (t)) {
761 DCPOMATIC_ASSERT (i < _reels.size ());
766 Writer::set_digest_progress (Job* job, float progress)
768 /* I believe this is thread-safe */
769 _digest_progresses[boost::this_thread::get_id()] = progress;
771 boost::mutex::scoped_lock lm (_digest_progresses_mutex);
772 float min_progress = FLT_MAX;
773 for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
774 min_progress = min (min_progress, i->second);
777 job->set_progress (min_progress);