2 Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
22 #include "compose.hpp"
26 #include "dcpomatic_log.h"
27 #include "dcp_video.h"
28 #include "dcp_content_type.h"
29 #include "audio_mapping.h"
33 #include "audio_buffers.h"
37 #include "reel_writer.h"
38 #include "text_content.h"
40 #include <dcp/locale_convert.h>
41 #include <boost/foreach.hpp>
49 /* OS X strikes again */
61 using boost::shared_ptr;
62 using boost::weak_ptr;
63 using boost::dynamic_pointer_cast;
64 using boost::optional;
66 using namespace dcpomatic;
68 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
72 , _queued_full_in_memory (0)
73 /* These will be reset to sensible values when J2KEncoder is created */
74 , _maximum_frames_in_memory (8)
75 , _maximum_queue_size (8)
81 shared_ptr<Job> job = _job.lock ();
82 DCPOMATIC_ASSERT (job);
85 list<DCPTimePeriod> const reels = _film->reels ();
86 BOOST_FOREACH (DCPTimePeriod p, reels) {
87 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
90 _last_written.resize (reels.size());
92 /* We can keep track of the current audio, subtitle and closed caption reels easily because audio
93 and captions arrive to the Writer in sequence. This is not so for video.
95 _audio_reel = _reels.begin ();
96 _subtitle_reel = _reels.begin ();
97 BOOST_FOREACH (DCPTextTrack i, _film->closed_caption_tracks()) {
98 _caption_reels[i] = _reels.begin ();
100 _atmos_reel = _reels.begin ();
102 /* Check that the signer is OK */
104 if (!Config::instance()->signer_chain()->valid(&reason)) {
105 throw InvalidSignerError (reason);
112 _thread = boost::thread (boost::bind(&Writer::thread, this));
113 #ifdef DCPOMATIC_LINUX
114 pthread_setname_np (_thread.native_handle(), "writer");
120 terminate_thread (false);
123 /** Pass a video frame to the writer for writing to disk at some point.
124 * This method can be called with frames out of order.
125 * @param encoded JPEG2000-encoded data.
126 * @param frame Frame index within the DCP.
127 * @param eyes Eyes that this frame image is for.
130 Writer::write (Data encoded, Frame frame, Eyes eyes)
132 boost::mutex::scoped_lock lock (_state_mutex);
134 while (_queued_full_in_memory > _maximum_frames_in_memory) {
135 /* There are too many full frames in memory; wake the main writer thread and
136 wait until it sorts everything out */
137 _empty_condition.notify_all ();
138 _full_condition.wait (lock);
142 qi.type = QueueItem::FULL;
143 qi.encoded = encoded;
144 qi.reel = video_reel (frame);
145 qi.frame = frame - _reels[qi.reel].start ();
147 if (_film->three_d() && eyes == EYES_BOTH) {
148 /* 2D material in a 3D DCP; fake the 3D */
150 _queue.push_back (qi);
151 ++_queued_full_in_memory;
152 qi.eyes = EYES_RIGHT;
153 _queue.push_back (qi);
154 ++_queued_full_in_memory;
157 _queue.push_back (qi);
158 ++_queued_full_in_memory;
161 /* Now there's something to do: wake anything wait()ing on _empty_condition */
162 _empty_condition.notify_all ();
166 Writer::can_repeat (Frame frame) const
168 return frame > _reels[video_reel(frame)].start();
171 /** Repeat the last frame that was written to a reel as a new frame.
172 * @param frame Frame index within the DCP of the new (repeated) frame.
173 * @param eyes Eyes that this repeated frame image is for.
176 Writer::repeat (Frame frame, Eyes eyes)
178 boost::mutex::scoped_lock lock (_state_mutex);
180 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
181 /* The queue is too big, and the main writer thread can run and fix it, so
182 wake it and wait until it has done.
184 _empty_condition.notify_all ();
185 _full_condition.wait (lock);
189 qi.type = QueueItem::REPEAT;
190 qi.reel = video_reel (frame);
191 qi.frame = frame - _reels[qi.reel].start ();
192 if (_film->three_d() && eyes == EYES_BOTH) {
194 _queue.push_back (qi);
195 qi.eyes = EYES_RIGHT;
196 _queue.push_back (qi);
199 _queue.push_back (qi);
202 /* Now there's something to do: wake anything wait()ing on _empty_condition */
203 _empty_condition.notify_all ();
207 Writer::fake_write (Frame frame, Eyes eyes)
209 boost::mutex::scoped_lock lock (_state_mutex);
211 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
212 /* The queue is too big, and the main writer thread can run and fix it, so
213 wake it and wait until it has done.
215 _empty_condition.notify_all ();
216 _full_condition.wait (lock);
219 size_t const reel = video_reel (frame);
220 Frame const frame_in_reel = frame - _reels[reel].start ();
223 qi.type = QueueItem::FAKE;
226 shared_ptr<InfoFileHandle> info_file = _film->info_file_handle(_reels[reel].period(), true);
227 qi.size = _reels[reel].read_frame_info(info_file, frame_in_reel, eyes).size;
231 qi.frame = frame_in_reel;
232 if (_film->three_d() && eyes == EYES_BOTH) {
234 _queue.push_back (qi);
235 qi.eyes = EYES_RIGHT;
236 _queue.push_back (qi);
239 _queue.push_back (qi);
242 /* Now there's something to do: wake anything wait()ing on _empty_condition */
243 _empty_condition.notify_all ();
246 /** Write some audio frames to the DCP.
247 * @param audio Audio data.
248 * @param time Time of this data within the DCP.
249 * This method is not thread safe.
252 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
254 DCPOMATIC_ASSERT (audio);
256 int const afr = _film->audio_frame_rate();
258 DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
260 /* The audio we get might span a reel boundary, and if so we have to write it in bits */
265 if (_audio_reel == _reels.end ()) {
266 /* This audio is off the end of the last reel; ignore it */
270 if (end <= _audio_reel->period().to) {
271 /* Easy case: we can write all the audio to this reel */
272 _audio_reel->write (audio);
274 } else if (_audio_reel->period().to <= t) {
275 /* This reel is entirely before the start of our audio; just skip the reel */
278 /* This audio is over a reel boundary; split the audio into two and write the first part */
279 DCPTime part_lengths[2] = {
280 _audio_reel->period().to - t,
281 end - _audio_reel->period().to
284 Frame part_frames[2] = {
285 part_lengths[0].frames_ceil(afr),
286 part_lengths[1].frames_ceil(afr)
289 if (part_frames[0]) {
290 shared_ptr<AudioBuffers> part (new AudioBuffers(audio, part_frames[0], 0));
291 _audio_reel->write (part);
294 if (part_frames[1]) {
295 audio.reset (new AudioBuffers(audio, part_frames[1], part_frames[0]));
301 t += part_lengths[0];
308 Writer::write (shared_ptr<const dcp::AtmosFrame> atmos, DCPTime time, AtmosMetadata metadata)
310 if (_atmos_reel->period().to == time) {
312 DCPOMATIC_ASSERT (_atmos_reel != _reels.end());
315 /* We assume that we get a video frame's worth of data here */
316 _atmos_reel->write (atmos, metadata);
320 /** Caller must hold a lock on _state_mutex */
322 Writer::have_sequenced_image_at_queue_head ()
324 if (_queue.empty ()) {
329 QueueItem const & f = _queue.front();
330 return _last_written[f.reel].next(f);
335 Writer::LastWritten::next (QueueItem qi) const
337 if (qi.eyes == EYES_BOTH) {
339 return qi.frame == (_frame + 1);
344 if (_eyes == EYES_LEFT && qi.frame == _frame && qi.eyes == EYES_RIGHT) {
348 if (_eyes == EYES_RIGHT && qi.frame == (_frame + 1) && qi.eyes == EYES_LEFT) {
357 Writer::LastWritten::update (QueueItem qi)
370 boost::mutex::scoped_lock lock (_state_mutex);
374 if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
375 /* We've got something to do: go and do it */
379 /* Nothing to do: wait until something happens which may indicate that we do */
380 LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
381 _empty_condition.wait (lock);
382 LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
385 if (_finish && _queue.empty()) {
389 /* We stop here if we have been asked to finish, and if either the queue
390 is empty or we do not have a sequenced image at its head (if this is the
391 case we will never terminate as no new frames will be sent once
394 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
395 /* (Hopefully temporarily) log anything that was not written */
396 if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
397 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
398 BOOST_FOREACH (QueueItem const& i, _queue) {
399 if (i.type == QueueItem::FULL) {
400 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i.frame, (int) i.eyes);
402 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i.size, i.frame, (int) i.eyes);
409 /* Write any frames that we can write; i.e. those that are in sequence. */
410 while (have_sequenced_image_at_queue_head ()) {
411 QueueItem qi = _queue.front ();
412 _last_written[qi.reel].update (qi);
414 if (qi.type == QueueItem::FULL && qi.encoded) {
415 --_queued_full_in_memory;
420 ReelWriter& reel = _reels[qi.reel];
423 case QueueItem::FULL:
424 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
426 qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
428 reel.write (qi.encoded, qi.frame, qi.eyes);
431 case QueueItem::FAKE:
432 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
433 reel.fake_write (qi.size);
436 case QueueItem::REPEAT:
437 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
438 reel.repeat_write (qi.frame, qi.eyes);
444 _full_condition.notify_all ();
447 while (_queued_full_in_memory > _maximum_frames_in_memory) {
448 /* Too many frames in memory which can't yet be written to the stream.
449 Write some FULL frames to disk.
452 /* Find one from the back of the queue */
454 list<QueueItem>::reverse_iterator i = _queue.rbegin ();
455 while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
459 DCPOMATIC_ASSERT (i != _queue.rend());
461 /* For the log message below */
462 int const awaiting = _last_written[_queue.front().reel].frame() + 1;
465 /* i is valid here, even though we don't hold a lock on the mutex,
466 since list iterators are unaffected by insertion and only this
467 thread could erase the last item in the list.
470 LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
472 i->encoded->write_via_temp (
473 _film->j2c_path (i->reel, i->frame, i->eyes, true),
474 _film->j2c_path (i->reel, i->frame, i->eyes, false)
479 --_queued_full_in_memory;
480 _full_condition.notify_all ();
490 Writer::terminate_thread (bool can_throw)
492 boost::this_thread::disable_interruption dis;
494 boost::mutex::scoped_lock lock (_state_mutex);
497 _empty_condition.notify_all ();
498 _full_condition.notify_all ();
513 if (!_thread.joinable()) {
517 LOG_GENERAL_NC ("Terminating writer thread");
519 terminate_thread (true);
521 LOG_GENERAL_NC ("Finishing ReelWriters");
523 BOOST_FOREACH (ReelWriter& i, _reels) {
527 LOG_GENERAL_NC ("Writing XML");
529 dcp::DCP dcp (_film->dir (_film->dcp_name()));
531 shared_ptr<dcp::CPL> cpl (
534 _film->dcp_content_type()->libdcp_kind ()
540 /* Calculate digests for each reel in parallel */
542 shared_ptr<Job> job = _job.lock ();
543 job->sub (_("Computing digests"));
545 boost::asio::io_service service;
546 boost::thread_group pool;
548 shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
550 int const threads = max (1, Config::instance()->master_encoding_threads ());
552 for (int i = 0; i < threads; ++i) {
553 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
556 BOOST_FOREACH (ReelWriter& i, _reels) {
557 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
558 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
565 /* Add reels to CPL */
567 BOOST_FOREACH (ReelWriter& i, _reels) {
568 cpl->add (i.create_reel (_reel_assets, _fonts));
571 dcp::XMLMetadata meta;
572 meta.annotation_text = cpl->annotation_text ();
573 meta.creator = Config::instance()->dcp_creator ();
574 if (meta.creator.empty ()) {
575 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
577 meta.issuer = Config::instance()->dcp_issuer ();
578 if (meta.issuer.empty ()) {
579 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
581 meta.set_issue_date_now ();
583 cpl->set_metadata (meta);
584 cpl->set_ratings (vector_to_list(_film->ratings()));
585 cpl->set_content_version_label_text (_film->content_version());
587 shared_ptr<const dcp::CertificateChain> signer;
588 signer = Config::instance()->signer_chain ();
589 /* We did check earlier, but check again here to be on the safe side */
591 if (!signer->valid (&reason)) {
592 throw InvalidSignerError (reason);
595 dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer, Config::instance()->dcp_metadata_filename_format());
598 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
601 write_cover_sheet ();
605 Writer::write_cover_sheet ()
607 boost::filesystem::path const cover = _film->file ("COVER_SHEET.txt");
608 FILE* f = fopen_boost (cover, "w");
610 throw OpenFileError (cover, errno, OpenFileError::WRITE);
613 string text = Config::instance()->cover_sheet ();
614 boost::algorithm::replace_all (text, "$CPL_NAME", _film->name());
615 boost::algorithm::replace_all (text, "$TYPE", _film->dcp_content_type()->pretty_name());
616 boost::algorithm::replace_all (text, "$CONTAINER", _film->container()->container_nickname());
617 boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", _film->isdcf_metadata().audio_language);
619 optional<string> subtitle_language;
620 BOOST_FOREACH (shared_ptr<Content> i, _film->content()) {
621 BOOST_FOREACH (shared_ptr<TextContent> j, i->text) {
622 if (j->type() == TEXT_OPEN_SUBTITLE && j->use()) {
623 subtitle_language = j->language ();
627 boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", subtitle_language.get_value_or("None"));
629 boost::uintmax_t size = 0;
631 boost::filesystem::recursive_directory_iterator i = boost::filesystem::recursive_directory_iterator(_film->dir(_film->dcp_name()));
632 i != boost::filesystem::recursive_directory_iterator();
634 if (boost::filesystem::is_regular_file (i->path ())) {
635 size += boost::filesystem::file_size (i->path ());
639 if (size > (1000000000L)) {
640 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1GB", dcp::locale_convert<string> (size / 1000000000.0, 1, true)));
642 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1MB", dcp::locale_convert<string> (size / 1000000.0, 1, true)));
645 pair<int, int> ch = audio_channel_types (_film->mapped_audio_channels(), _film->audio_channels());
646 string description = String::compose("%1.%2", ch.first, ch.second);
648 if (description == "0.0") {
649 description = _("None");
650 } else if (description == "1.0") {
651 description = _("Mono");
652 } else if (description == "2.0") {
653 description = _("Stereo");
655 boost::algorithm::replace_all (text, "$AUDIO", description);
658 _film->length().split (_film->video_frame_rate(), h, m, s, fr);
660 if (h == 0 && m == 0) {
661 length = String::compose("%1s", s);
662 } else if (h == 0 && m > 0) {
663 length = String::compose("%1m%2s", m, s);
664 } else if (h > 0 && m > 0) {
665 length = String::compose("%1h%2m%3s", h, m, s);
668 boost::algorithm::replace_all (text, "$LENGTH", length);
670 checked_fwrite (text.c_str(), text.length(), f, cover);
674 /** @param frame Frame index within the whole DCP.
675 * @return true if we can fake-write this frame.
678 Writer::can_fake_write (Frame frame) const
680 if (_film->encrypted()) {
681 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
685 /* We have to do a proper write of the first frame so that we can set up the JPEG2000
686 parameters in the asset writer.
689 ReelWriter const & reel = _reels[video_reel(frame)];
691 /* Make frame relative to the start of the reel */
692 frame -= reel.start ();
693 return (frame != 0 && frame < reel.first_nonexistant_frame());
696 /** @param track Closed caption track if type == TEXT_CLOSED_CAPTION */
698 Writer::write (PlayerText text, TextType type, optional<DCPTextTrack> track, DCPTimePeriod period)
700 vector<ReelWriter>::iterator* reel = 0;
703 case TEXT_OPEN_SUBTITLE:
704 reel = &_subtitle_reel;
706 case TEXT_CLOSED_CAPTION:
707 DCPOMATIC_ASSERT (track);
708 DCPOMATIC_ASSERT (_caption_reels.find(*track) != _caption_reels.end());
709 reel = &_caption_reels[*track];
712 DCPOMATIC_ASSERT (false);
715 DCPOMATIC_ASSERT (*reel != _reels.end());
716 while ((*reel)->period().to <= period.from) {
718 DCPOMATIC_ASSERT (*reel != _reels.end());
721 (*reel)->write (text, type, track, period);
725 Writer::write (list<shared_ptr<Font> > fonts)
727 /* Just keep a list of unique fonts and we'll deal with them in ::finish */
729 BOOST_FOREACH (shared_ptr<Font> i, fonts) {
731 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
738 _fonts.push_back (i);
744 operator< (QueueItem const & a, QueueItem const & b)
746 if (a.reel != b.reel) {
747 return a.reel < b.reel;
750 if (a.frame != b.frame) {
751 return a.frame < b.frame;
754 return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
758 operator== (QueueItem const & a, QueueItem const & b)
760 return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
764 Writer::set_encoder_threads (int threads)
766 boost::mutex::scoped_lock lm (_state_mutex);
767 _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
768 _maximum_queue_size = threads * 16;
772 Writer::write (ReferencedReelAsset asset)
774 _reel_assets.push_back (asset);
778 Writer::video_reel (int frame) const
780 DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
782 while (i < _reels.size() && !_reels[i].period().contains (t)) {
786 DCPOMATIC_ASSERT (i < _reels.size ());
791 Writer::set_digest_progress (Job* job, float progress)
793 boost::mutex::scoped_lock lm (_digest_progresses_mutex);
795 _digest_progresses[boost::this_thread::get_id()] = progress;
796 float min_progress = FLT_MAX;
797 for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
798 min_progress = min (min_progress, i->second);
801 job->set_progress (min_progress);