2 Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
22 #include "compose.hpp"
26 #include "dcpomatic_log.h"
27 #include "dcp_video.h"
28 #include "dcp_content_type.h"
29 #include "audio_mapping.h"
33 #include "audio_buffers.h"
37 #include "reel_writer.h"
38 #include "text_content.h"
40 #include <dcp/locale_convert.h>
41 #include <dcp/reel_mxf.h>
42 #include <boost/foreach.hpp>
50 /* OS X strikes again */
62 using boost::shared_ptr;
63 using boost::weak_ptr;
64 using boost::dynamic_pointer_cast;
65 using boost::optional;
66 #if BOOST_VERSION >= 106100
67 using namespace boost::placeholders;
71 using namespace dcpomatic;
76 ignore_progress (float)
82 /** @param j Job to report progress to, or 0 */
83 Writer::Writer (weak_ptr<const Film> weak_film, weak_ptr<Job> j)
84 : WeakConstFilm (weak_film)
87 , _queued_full_in_memory (0)
88 /* These will be reset to sensible values when J2KEncoder is created */
89 , _maximum_frames_in_memory (8)
90 , _maximum_queue_size (8)
96 shared_ptr<Job> job = _job.lock ();
99 list<DCPTimePeriod> const reels = film()->reels();
100 BOOST_FOREACH (DCPTimePeriod p, reels) {
101 _reels.push_back (ReelWriter(weak_film, p, job, reel_index++, reels.size()));
104 _last_written.resize (reels.size());
106 /* We can keep track of the current audio, subtitle and closed caption reels easily because audio
107 and captions arrive to the Writer in sequence. This is not so for video.
109 _audio_reel = _reels.begin ();
110 _subtitle_reel = _reels.begin ();
111 BOOST_FOREACH (DCPTextTrack i, film()->closed_caption_tracks()) {
112 _caption_reels[i] = _reels.begin ();
114 _atmos_reel = _reels.begin ();
116 /* Check that the signer is OK */
118 if (!Config::instance()->signer_chain()->valid(&reason)) {
119 throw InvalidSignerError (reason);
126 _thread = boost::thread (boost::bind(&Writer::thread, this));
127 #ifdef DCPOMATIC_LINUX
128 pthread_setname_np (_thread.native_handle(), "writer");
134 terminate_thread (false);
137 /** Pass a video frame to the writer for writing to disk at some point.
138 * This method can be called with frames out of order.
139 * @param encoded JPEG2000-encoded data.
140 * @param frame Frame index within the DCP.
141 * @param eyes Eyes that this frame image is for.
144 Writer::write (shared_ptr<const Data> encoded, Frame frame, Eyes eyes)
146 boost::mutex::scoped_lock lock (_state_mutex);
148 while (_queued_full_in_memory > _maximum_frames_in_memory) {
149 /* There are too many full frames in memory; wake the main writer thread and
150 wait until it sorts everything out */
151 _empty_condition.notify_all ();
152 _full_condition.wait (lock);
156 qi.type = QueueItem::FULL;
157 qi.encoded = encoded;
158 qi.reel = video_reel (frame);
159 qi.frame = frame - _reels[qi.reel].start ();
161 if (film()->three_d() && eyes == EYES_BOTH) {
162 /* 2D material in a 3D DCP; fake the 3D */
164 _queue.push_back (qi);
165 ++_queued_full_in_memory;
166 qi.eyes = EYES_RIGHT;
167 _queue.push_back (qi);
168 ++_queued_full_in_memory;
171 _queue.push_back (qi);
172 ++_queued_full_in_memory;
175 /* Now there's something to do: wake anything wait()ing on _empty_condition */
176 _empty_condition.notify_all ();
180 Writer::can_repeat (Frame frame) const
182 return frame > _reels[video_reel(frame)].start();
185 /** Repeat the last frame that was written to a reel as a new frame.
186 * @param frame Frame index within the DCP of the new (repeated) frame.
187 * @param eyes Eyes that this repeated frame image is for.
190 Writer::repeat (Frame frame, Eyes eyes)
192 boost::mutex::scoped_lock lock (_state_mutex);
194 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
195 /* The queue is too big, and the main writer thread can run and fix it, so
196 wake it and wait until it has done.
198 _empty_condition.notify_all ();
199 _full_condition.wait (lock);
203 qi.type = QueueItem::REPEAT;
204 qi.reel = video_reel (frame);
205 qi.frame = frame - _reels[qi.reel].start ();
206 if (film()->three_d() && eyes == EYES_BOTH) {
208 _queue.push_back (qi);
209 qi.eyes = EYES_RIGHT;
210 _queue.push_back (qi);
213 _queue.push_back (qi);
216 /* Now there's something to do: wake anything wait()ing on _empty_condition */
217 _empty_condition.notify_all ();
221 Writer::fake_write (Frame frame, Eyes eyes)
223 boost::mutex::scoped_lock lock (_state_mutex);
225 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
226 /* The queue is too big, and the main writer thread can run and fix it, so
227 wake it and wait until it has done.
229 _empty_condition.notify_all ();
230 _full_condition.wait (lock);
233 size_t const reel = video_reel (frame);
234 Frame const frame_in_reel = frame - _reels[reel].start ();
237 qi.type = QueueItem::FAKE;
240 shared_ptr<InfoFileHandle> info_file = film()->info_file_handle(_reels[reel].period(), true);
241 qi.size = _reels[reel].read_frame_info(info_file, frame_in_reel, eyes).size;
245 qi.frame = frame_in_reel;
246 if (film()->three_d() && eyes == EYES_BOTH) {
248 _queue.push_back (qi);
249 qi.eyes = EYES_RIGHT;
250 _queue.push_back (qi);
253 _queue.push_back (qi);
256 /* Now there's something to do: wake anything wait()ing on _empty_condition */
257 _empty_condition.notify_all ();
260 /** Write some audio frames to the DCP.
261 * @param audio Audio data.
262 * @param time Time of this data within the DCP.
263 * This method is not thread safe.
266 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
268 DCPOMATIC_ASSERT (audio);
270 int const afr = film()->audio_frame_rate();
272 DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
274 /* The audio we get might span a reel boundary, and if so we have to write it in bits */
279 if (_audio_reel == _reels.end ()) {
280 /* This audio is off the end of the last reel; ignore it */
284 if (end <= _audio_reel->period().to) {
285 /* Easy case: we can write all the audio to this reel */
286 _audio_reel->write (audio);
288 } else if (_audio_reel->period().to <= t) {
289 /* This reel is entirely before the start of our audio; just skip the reel */
292 /* This audio is over a reel boundary; split the audio into two and write the first part */
293 DCPTime part_lengths[2] = {
294 _audio_reel->period().to - t,
295 end - _audio_reel->period().to
298 Frame part_frames[2] = {
299 part_lengths[0].frames_ceil(afr),
300 part_lengths[1].frames_ceil(afr)
303 if (part_frames[0]) {
304 shared_ptr<AudioBuffers> part (new AudioBuffers(audio, part_frames[0], 0));
305 _audio_reel->write (part);
308 if (part_frames[1]) {
309 audio.reset (new AudioBuffers(audio, part_frames[1], part_frames[0]));
315 t += part_lengths[0];
322 Writer::write (shared_ptr<const dcp::AtmosFrame> atmos, DCPTime time, AtmosMetadata metadata)
324 if (_atmos_reel->period().to == time) {
326 DCPOMATIC_ASSERT (_atmos_reel != _reels.end());
329 /* We assume that we get a video frame's worth of data here */
330 _atmos_reel->write (atmos, metadata);
334 /** Caller must hold a lock on _state_mutex */
336 Writer::have_sequenced_image_at_queue_head ()
338 if (_queue.empty ()) {
343 QueueItem const & f = _queue.front();
344 return _last_written[f.reel].next(f);
349 Writer::LastWritten::next (QueueItem qi) const
351 if (qi.eyes == EYES_BOTH) {
353 return qi.frame == (_frame + 1);
358 if (_eyes == EYES_LEFT && qi.frame == _frame && qi.eyes == EYES_RIGHT) {
362 if (_eyes == EYES_RIGHT && qi.frame == (_frame + 1) && qi.eyes == EYES_LEFT) {
371 Writer::LastWritten::update (QueueItem qi)
384 boost::mutex::scoped_lock lock (_state_mutex);
388 if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
389 /* We've got something to do: go and do it */
393 /* Nothing to do: wait until something happens which may indicate that we do */
394 LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
395 _empty_condition.wait (lock);
396 LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
399 /* We stop here if we have been asked to finish, and if either the queue
400 is empty or we do not have a sequenced image at its head (if this is the
401 case we will never terminate as no new frames will be sent once
404 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
405 /* (Hopefully temporarily) log anything that was not written */
406 if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
407 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
408 BOOST_FOREACH (QueueItem const& i, _queue) {
409 if (i.type == QueueItem::FULL) {
410 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i.frame, (int) i.eyes);
412 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i.size, i.frame, (int) i.eyes);
419 /* Write any frames that we can write; i.e. those that are in sequence. */
420 while (have_sequenced_image_at_queue_head ()) {
421 QueueItem qi = _queue.front ();
422 _last_written[qi.reel].update (qi);
424 if (qi.type == QueueItem::FULL && qi.encoded) {
425 --_queued_full_in_memory;
430 ReelWriter& reel = _reels[qi.reel];
433 case QueueItem::FULL:
434 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
436 qi.encoded.reset (new ArrayData(film()->j2c_path(qi.reel, qi.frame, qi.eyes, false)));
438 reel.write (qi.encoded, qi.frame, qi.eyes);
441 case QueueItem::FAKE:
442 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
443 reel.fake_write (qi.size);
446 case QueueItem::REPEAT:
447 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
448 reel.repeat_write (qi.frame, qi.eyes);
454 _full_condition.notify_all ();
457 while (_queued_full_in_memory > _maximum_frames_in_memory) {
458 /* Too many frames in memory which can't yet be written to the stream.
459 Write some FULL frames to disk.
462 /* Find one from the back of the queue */
464 list<QueueItem>::reverse_iterator i = _queue.rbegin ();
465 while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
469 DCPOMATIC_ASSERT (i != _queue.rend());
471 /* For the log message below */
472 int const awaiting = _last_written[_queue.front().reel].frame() + 1;
475 /* i is valid here, even though we don't hold a lock on the mutex,
476 since list iterators are unaffected by insertion and only this
477 thread could erase the last item in the list.
480 LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
482 i->encoded->write_via_temp (
483 film()->j2c_path(i->reel, i->frame, i->eyes, true),
484 film()->j2c_path(i->reel, i->frame, i->eyes, false)
489 --_queued_full_in_memory;
490 _full_condition.notify_all ();
500 Writer::terminate_thread (bool can_throw)
502 boost::this_thread::disable_interruption dis;
504 boost::mutex::scoped_lock lock (_state_mutex);
507 _empty_condition.notify_all ();
508 _full_condition.notify_all ();
523 if (!_thread.joinable()) {
527 LOG_GENERAL_NC ("Terminating writer thread");
529 terminate_thread (true);
531 LOG_GENERAL_NC ("Finishing ReelWriters");
533 BOOST_FOREACH (ReelWriter& i, _reels) {
537 LOG_GENERAL_NC ("Writing XML");
539 dcp::DCP dcp (film()->dir(film()->dcp_name()));
541 shared_ptr<dcp::CPL> cpl (
544 film()->dcp_content_type()->libdcp_kind ()
550 /* Calculate digests for each reel in parallel */
552 shared_ptr<Job> job = _job.lock ();
554 job->sub (_("Computing digests"));
557 boost::asio::io_service service;
558 boost::thread_group pool;
560 shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
562 int const threads = max (1, Config::instance()->master_encoding_threads ());
564 for (int i = 0; i < threads; ++i) {
565 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
568 boost::function<void (float)> set_progress;
570 set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
572 set_progress = &ignore_progress;
575 BOOST_FOREACH (ReelWriter& i, _reels) {
576 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
578 service.post (boost::bind (&Writer::calculate_referenced_digests, this, set_progress));
586 BOOST_FOREACH (ReelWriter& i, _reels) {
587 cpl->add (i.create_reel (_reel_assets, _fonts));
592 string creator = Config::instance()->dcp_creator();
593 if (creator.empty()) {
594 creator = String::compose("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
597 string issuer = Config::instance()->dcp_issuer();
598 if (issuer.empty()) {
599 issuer = String::compose("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
602 cpl->set_ratings (film()->ratings());
604 vector<dcp::ContentVersion> cv;
605 BOOST_FOREACH (string i, film()->content_versions()) {
606 cv.push_back (dcp::ContentVersion(i));
608 cpl->set_content_versions (cv);
610 cpl->set_full_content_title_text (film()->name());
611 cpl->set_full_content_title_text_language (film()->name_language());
612 cpl->set_release_territory (film()->release_territory());
613 cpl->set_version_number (film()->version_number());
614 cpl->set_status (film()->status());
615 cpl->set_chain (film()->chain());
616 cpl->set_distributor (film()->distributor());
617 cpl->set_facility (film()->facility());
618 cpl->set_luminance (film()->luminance());
620 list<int> ac = film()->mapped_audio_channels();
621 dcp::MCASoundField field = (
622 find(ac.begin(), ac.end(), static_cast<int>(dcp::BSL)) != ac.end() ||
623 find(ac.begin(), ac.end(), static_cast<int>(dcp::BSR)) != ac.end()
624 ) ? dcp::SEVEN_POINT_ONE : dcp::FIVE_POINT_ONE;
626 dcp::MainSoundConfiguration msc (field, film()->audio_channels());
627 BOOST_FOREACH (int i, ac) {
628 if (i < film()->audio_channels()) {
629 msc.set_mapping (i, static_cast<dcp::Channel>(i));
633 cpl->set_main_sound_configuration (msc.to_string());
634 cpl->set_main_sound_sample_rate (film()->audio_frame_rate());
635 cpl->set_main_picture_stored_area (film()->frame_size());
636 cpl->set_main_picture_active_area (film()->active_area());
638 vector<dcp::LanguageTag> sl = film()->subtitle_languages();
640 cpl->set_additional_subtitle_languages(std::vector<dcp::LanguageTag>(sl.begin() + 1, sl.end()));
643 shared_ptr<const dcp::CertificateChain> signer;
644 signer = Config::instance()->signer_chain ();
645 /* We did check earlier, but check again here to be on the safe side */
647 if (!signer->valid (&reason)) {
648 throw InvalidSignerError (reason);
652 film()->interop() ? dcp::INTEROP : dcp::SMPTE,
655 dcp::LocalTime().as_string(),
656 String::compose("Created by libdcp %1", dcp::version),
658 Config::instance()->dcp_metadata_filename_format()
662 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
665 write_cover_sheet ();
669 Writer::write_cover_sheet ()
671 boost::filesystem::path const cover = film()->file("COVER_SHEET.txt");
672 FILE* f = fopen_boost (cover, "w");
674 throw OpenFileError (cover, errno, OpenFileError::WRITE);
677 string text = Config::instance()->cover_sheet ();
678 boost::algorithm::replace_all (text, "$CPL_NAME", film()->name());
679 boost::algorithm::replace_all (text, "$TYPE", film()->dcp_content_type()->pretty_name());
680 boost::algorithm::replace_all (text, "$CONTAINER", film()->container()->container_nickname());
681 boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", film()->isdcf_metadata().audio_language);
683 vector<dcp::LanguageTag> subtitle_languages = film()->subtitle_languages();
684 if (subtitle_languages.empty()) {
685 boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", "None");
687 boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", subtitle_languages.front().description());
690 boost::uintmax_t size = 0;
692 boost::filesystem::recursive_directory_iterator i = boost::filesystem::recursive_directory_iterator(film()->dir(film()->dcp_name()));
693 i != boost::filesystem::recursive_directory_iterator();
695 if (boost::filesystem::is_regular_file (i->path ())) {
696 size += boost::filesystem::file_size (i->path ());
700 if (size > (1000000000L)) {
701 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1GB", dcp::locale_convert<string> (size / 1000000000.0, 1, true)));
703 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1MB", dcp::locale_convert<string> (size / 1000000.0, 1, true)));
706 pair<int, int> ch = audio_channel_types (film()->mapped_audio_channels(), film()->audio_channels());
707 string description = String::compose("%1.%2", ch.first, ch.second);
709 if (description == "0.0") {
710 description = _("None");
711 } else if (description == "1.0") {
712 description = _("Mono");
713 } else if (description == "2.0") {
714 description = _("Stereo");
716 boost::algorithm::replace_all (text, "$AUDIO", description);
719 film()->length().split(film()->video_frame_rate(), h, m, s, fr);
721 if (h == 0 && m == 0) {
722 length = String::compose("%1s", s);
723 } else if (h == 0 && m > 0) {
724 length = String::compose("%1m%2s", m, s);
725 } else if (h > 0 && m > 0) {
726 length = String::compose("%1h%2m%3s", h, m, s);
729 boost::algorithm::replace_all (text, "$LENGTH", length);
731 checked_fwrite (text.c_str(), text.length(), f, cover);
735 /** @param frame Frame index within the whole DCP.
736 * @return true if we can fake-write this frame.
739 Writer::can_fake_write (Frame frame) const
741 if (film()->encrypted()) {
742 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
746 /* We have to do a proper write of the first frame so that we can set up the JPEG2000
747 parameters in the asset writer.
750 ReelWriter const & reel = _reels[video_reel(frame)];
752 /* Make frame relative to the start of the reel */
753 frame -= reel.start ();
754 return (frame != 0 && frame < reel.first_nonexistant_frame());
757 /** @param track Closed caption track if type == TEXT_CLOSED_CAPTION */
759 Writer::write (PlayerText text, TextType type, optional<DCPTextTrack> track, DCPTimePeriod period)
761 vector<ReelWriter>::iterator* reel = 0;
764 case TEXT_OPEN_SUBTITLE:
765 reel = &_subtitle_reel;
767 case TEXT_CLOSED_CAPTION:
768 DCPOMATIC_ASSERT (track);
769 DCPOMATIC_ASSERT (_caption_reels.find(*track) != _caption_reels.end());
770 reel = &_caption_reels[*track];
773 DCPOMATIC_ASSERT (false);
776 DCPOMATIC_ASSERT (*reel != _reels.end());
777 while ((*reel)->period().to <= period.from) {
779 DCPOMATIC_ASSERT (*reel != _reels.end());
782 (*reel)->write (text, type, track, period);
786 Writer::write (list<shared_ptr<Font> > fonts)
788 /* Just keep a list of unique fonts and we'll deal with them in ::finish */
790 BOOST_FOREACH (shared_ptr<Font> i, fonts) {
792 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
799 _fonts.push_back (i);
805 operator< (QueueItem const & a, QueueItem const & b)
807 if (a.reel != b.reel) {
808 return a.reel < b.reel;
811 if (a.frame != b.frame) {
812 return a.frame < b.frame;
815 return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
819 operator== (QueueItem const & a, QueueItem const & b)
821 return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
825 Writer::set_encoder_threads (int threads)
827 boost::mutex::scoped_lock lm (_state_mutex);
828 _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
829 _maximum_queue_size = threads * 16;
833 Writer::write (ReferencedReelAsset asset)
835 _reel_assets.push_back (asset);
839 Writer::video_reel (int frame) const
841 DCPTime t = DCPTime::from_frames (frame, film()->video_frame_rate());
843 while (i < _reels.size() && !_reels[i].period().contains (t)) {
847 DCPOMATIC_ASSERT (i < _reels.size ());
852 Writer::set_digest_progress (Job* job, float progress)
854 boost::mutex::scoped_lock lm (_digest_progresses_mutex);
856 _digest_progresses[boost::this_thread::get_id()] = progress;
857 float min_progress = FLT_MAX;
858 for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
859 min_progress = min (min_progress, i->second);
862 job->set_progress (min_progress);
869 /** Calculate hashes for any referenced MXF assets which do not already have one */
871 Writer::calculate_referenced_digests (boost::function<void (float)> set_progress)
873 BOOST_FOREACH (ReferencedReelAsset const& i, _reel_assets) {
874 shared_ptr<dcp::ReelMXF> mxf = dynamic_pointer_cast<dcp::ReelMXF>(i.asset);
875 if (mxf && !mxf->hash()) {
876 mxf->asset_ref().asset()->hash (set_progress);
877 mxf->set_hash (mxf->asset_ref().asset()->hash());