2 Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
22 #include "compose.hpp"
26 #include "dcpomatic_log.h"
27 #include "dcp_video.h"
28 #include "dcp_content_type.h"
29 #include "audio_mapping.h"
33 #include "audio_buffers.h"
37 #include "reel_writer.h"
38 #include "text_content.h"
40 #include <dcp/locale_convert.h>
41 #include <boost/foreach.hpp>
49 /* OS X strikes again */
61 using boost::shared_ptr;
62 using boost::weak_ptr;
63 using boost::dynamic_pointer_cast;
64 using boost::optional;
66 using namespace dcpomatic;
68 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
72 , _queued_full_in_memory (0)
73 /* These will be reset to sensible values when J2KEncoder is created */
74 , _maximum_frames_in_memory (8)
75 , _maximum_queue_size (8)
81 shared_ptr<Job> job = _job.lock ();
82 DCPOMATIC_ASSERT (job);
85 list<DCPTimePeriod> const reels = _film->reels ();
86 BOOST_FOREACH (DCPTimePeriod p, reels) {
87 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
90 _last_written.resize (reels.size());
92 /* We can keep track of the current audio, subtitle and closed caption reels easily because audio
93 and captions arrive to the Writer in sequence. This is not so for video.
95 _audio_reel = _reels.begin ();
96 _subtitle_reel = _reels.begin ();
97 BOOST_FOREACH (DCPTextTrack i, _film->closed_caption_tracks()) {
98 _caption_reels[i] = _reels.begin ();
100 _atmos_reel = _reels.begin ();
102 /* Check that the signer is OK */
104 if (!Config::instance()->signer_chain()->valid(&reason)) {
105 throw InvalidSignerError (reason);
112 _thread = boost::thread (boost::bind(&Writer::thread, this));
113 #ifdef DCPOMATIC_LINUX
114 pthread_setname_np (_thread.native_handle(), "writer");
120 terminate_thread (false);
123 /** Pass a video frame to the writer for writing to disk at some point.
124 * This method can be called with frames out of order.
125 * @param encoded JPEG2000-encoded data.
126 * @param frame Frame index within the DCP.
127 * @param eyes Eyes that this frame image is for.
130 Writer::write (Data encoded, Frame frame, Eyes eyes)
132 boost::mutex::scoped_lock lock (_state_mutex);
134 while (_queued_full_in_memory > _maximum_frames_in_memory) {
135 /* There are too many full frames in memory; wake the main writer thread and
136 wait until it sorts everything out */
137 _empty_condition.notify_all ();
138 _full_condition.wait (lock);
142 qi.type = QueueItem::FULL;
143 qi.encoded = encoded;
144 qi.reel = video_reel (frame);
145 qi.frame = frame - _reels[qi.reel].start ();
147 if (_film->three_d() && eyes == EYES_BOTH) {
148 /* 2D material in a 3D DCP; fake the 3D */
150 _queue.push_back (qi);
151 ++_queued_full_in_memory;
152 qi.eyes = EYES_RIGHT;
153 _queue.push_back (qi);
154 ++_queued_full_in_memory;
157 _queue.push_back (qi);
158 ++_queued_full_in_memory;
161 /* Now there's something to do: wake anything wait()ing on _empty_condition */
162 _empty_condition.notify_all ();
166 Writer::can_repeat (Frame frame) const
168 return frame > _reels[video_reel(frame)].start();
171 /** Repeat the last frame that was written to a reel as a new frame.
172 * @param frame Frame index within the DCP of the new (repeated) frame.
173 * @param eyes Eyes that this repeated frame image is for.
176 Writer::repeat (Frame frame, Eyes eyes)
178 boost::mutex::scoped_lock lock (_state_mutex);
180 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
181 /* The queue is too big, and the main writer thread can run and fix it, so
182 wake it and wait until it has done.
184 _empty_condition.notify_all ();
185 _full_condition.wait (lock);
189 qi.type = QueueItem::REPEAT;
190 qi.reel = video_reel (frame);
191 qi.frame = frame - _reels[qi.reel].start ();
192 if (_film->three_d() && eyes == EYES_BOTH) {
194 _queue.push_back (qi);
195 qi.eyes = EYES_RIGHT;
196 _queue.push_back (qi);
199 _queue.push_back (qi);
202 /* Now there's something to do: wake anything wait()ing on _empty_condition */
203 _empty_condition.notify_all ();
207 Writer::fake_write (Frame frame, Eyes eyes)
209 boost::mutex::scoped_lock lock (_state_mutex);
211 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
212 /* The queue is too big, and the main writer thread can run and fix it, so
213 wake it and wait until it has done.
215 _empty_condition.notify_all ();
216 _full_condition.wait (lock);
219 size_t const reel = video_reel (frame);
220 Frame const frame_in_reel = frame - _reels[reel].start ();
223 qi.type = QueueItem::FAKE;
226 shared_ptr<InfoFileHandle> info_file = _film->info_file_handle(_reels[reel].period(), true);
227 qi.size = _reels[reel].read_frame_info(info_file, frame_in_reel, eyes).size;
231 qi.frame = frame_in_reel;
232 if (_film->three_d() && eyes == EYES_BOTH) {
234 _queue.push_back (qi);
235 qi.eyes = EYES_RIGHT;
236 _queue.push_back (qi);
239 _queue.push_back (qi);
242 /* Now there's something to do: wake anything wait()ing on _empty_condition */
243 _empty_condition.notify_all ();
246 /** Write some audio frames to the DCP.
247 * @param audio Audio data.
248 * @param time Time of this data within the DCP.
249 * This method is not thread safe.
252 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
254 DCPOMATIC_ASSERT (audio);
256 int const afr = _film->audio_frame_rate();
258 DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
260 /* The audio we get might span a reel boundary, and if so we have to write it in bits */
265 if (_audio_reel == _reels.end ()) {
266 /* This audio is off the end of the last reel; ignore it */
270 if (end <= _audio_reel->period().to) {
271 /* Easy case: we can write all the audio to this reel */
272 _audio_reel->write (audio);
274 } else if (_audio_reel->period().to <= t) {
275 /* This reel is entirely before the start of our audio; just skip the reel */
278 /* This audio is over a reel boundary; split the audio into two and write the first part */
279 DCPTime part_lengths[2] = {
280 _audio_reel->period().to - t,
281 end - _audio_reel->period().to
284 Frame part_frames[2] = {
285 part_lengths[0].frames_ceil(afr),
286 part_lengths[1].frames_ceil(afr)
289 if (part_frames[0]) {
290 shared_ptr<AudioBuffers> part (new AudioBuffers(audio, part_frames[0], 0));
291 _audio_reel->write (part);
294 if (part_frames[1]) {
295 audio.reset (new AudioBuffers(audio, part_frames[1], part_frames[0]));
301 t += part_lengths[0];
308 Writer::write (shared_ptr<const dcp::AtmosFrame> atmos, DCPTime time, AtmosMetadata metadata)
310 if (_atmos_reel->period().to == time) {
312 DCPOMATIC_ASSERT (_atmos_reel != _reels.end());
315 /* We assume that we get a video frame's worth of data here */
316 _atmos_reel->write (atmos, metadata);
320 /** Caller must hold a lock on _state_mutex */
322 Writer::have_sequenced_image_at_queue_head ()
324 if (_queue.empty ()) {
329 QueueItem const & f = _queue.front();
330 return _last_written[f.reel].next(f);
335 Writer::LastWritten::next (QueueItem qi) const
337 if (qi.eyes == EYES_BOTH) {
339 return qi.frame == (_frame + 1);
344 if (_eyes == EYES_LEFT && qi.frame == _frame && qi.eyes == EYES_RIGHT) {
348 if (_eyes == EYES_RIGHT && qi.frame == (_frame + 1) && qi.eyes == EYES_LEFT) {
357 Writer::LastWritten::update (QueueItem qi)
370 boost::mutex::scoped_lock lock (_state_mutex);
374 if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
375 /* We've got something to do: go and do it */
379 /* Nothing to do: wait until something happens which may indicate that we do */
380 LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
381 _empty_condition.wait (lock);
382 LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
385 if (_finish && _queue.empty()) {
389 /* We stop here if we have been asked to finish, and if either the queue
390 is empty or we do not have a sequenced image at its head (if this is the
391 case we will never terminate as no new frames will be sent once
394 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
395 /* (Hopefully temporarily) log anything that was not written */
396 if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
397 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
398 BOOST_FOREACH (QueueItem const& i, _queue) {
399 if (i.type == QueueItem::FULL) {
400 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i.frame, (int) i.eyes);
402 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i.size, i.frame, (int) i.eyes);
409 /* Write any frames that we can write; i.e. those that are in sequence. */
410 while (have_sequenced_image_at_queue_head ()) {
411 QueueItem qi = _queue.front ();
412 _last_written[qi.reel].update (qi);
414 if (qi.type == QueueItem::FULL && qi.encoded) {
415 --_queued_full_in_memory;
420 ReelWriter& reel = _reels[qi.reel];
423 case QueueItem::FULL:
424 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
426 qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
428 reel.write (qi.encoded, qi.frame, qi.eyes);
431 case QueueItem::FAKE:
432 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
433 reel.fake_write (qi.size);
436 case QueueItem::REPEAT:
437 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
438 reel.repeat_write (qi.frame, qi.eyes);
444 _full_condition.notify_all ();
447 while (_queued_full_in_memory > _maximum_frames_in_memory) {
448 /* Too many frames in memory which can't yet be written to the stream.
449 Write some FULL frames to disk.
452 /* Find one from the back of the queue */
454 list<QueueItem>::reverse_iterator i = _queue.rbegin ();
455 while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
459 DCPOMATIC_ASSERT (i != _queue.rend());
461 /* For the log message below */
462 int const awaiting = _last_written[_queue.front().reel].frame() + 1;
465 /* i is valid here, even though we don't hold a lock on the mutex,
466 since list iterators are unaffected by insertion and only this
467 thread could erase the last item in the list.
470 LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
472 i->encoded->write_via_temp (
473 _film->j2c_path (i->reel, i->frame, i->eyes, true),
474 _film->j2c_path (i->reel, i->frame, i->eyes, false)
479 --_queued_full_in_memory;
480 _full_condition.notify_all ();
490 Writer::terminate_thread (bool can_throw)
492 boost::this_thread::disable_interruption dis;
494 boost::mutex::scoped_lock lock (_state_mutex);
497 _empty_condition.notify_all ();
498 _full_condition.notify_all ();
513 if (!_thread.joinable()) {
517 LOG_GENERAL_NC ("Terminating writer thread");
519 terminate_thread (true);
521 LOG_GENERAL_NC ("Finishing ReelWriters");
523 BOOST_FOREACH (ReelWriter& i, _reels) {
527 LOG_GENERAL_NC ("Writing XML");
529 dcp::DCP dcp (_film->dir (_film->dcp_name()));
531 shared_ptr<dcp::CPL> cpl (
534 _film->dcp_content_type()->libdcp_kind ()
540 /* Calculate digests for each reel in parallel */
542 shared_ptr<Job> job = _job.lock ();
543 job->sub (_("Computing digests"));
545 boost::asio::io_service service;
546 boost::thread_group pool;
548 shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
550 int const threads = max (1, Config::instance()->master_encoding_threads ());
552 for (int i = 0; i < threads; ++i) {
553 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
556 BOOST_FOREACH (ReelWriter& i, _reels) {
557 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
558 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
565 /* Add reels to CPL */
567 BOOST_FOREACH (ReelWriter& i, _reels) {
568 cpl->add (i.create_reel (_reel_assets, _fonts));
571 string creator = Config::instance()->dcp_creator();
572 if (creator.empty()) {
573 creator = String::compose("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
576 string issuer = Config::instance()->dcp_issuer();
577 if (issuer.empty()) {
578 issuer = String::compose("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
581 cpl->set_ratings (_film->ratings());
583 dcp::ContentVersion cv = cpl->content_version ();
584 cv.label_text = _film->content_version();
585 cpl->set_content_version (cv);
587 cpl->set_full_content_title_text (_film->name());
589 shared_ptr<const dcp::CertificateChain> signer;
590 signer = Config::instance()->signer_chain ();
591 /* We did check earlier, but check again here to be on the safe side */
593 if (!signer->valid (&reason)) {
594 throw InvalidSignerError (reason);
598 _film->interop() ? dcp::INTEROP : dcp::SMPTE,
601 dcp::LocalTime().as_string(),
602 String::compose("Created by libdcp %1", dcp::version),
604 Config::instance()->dcp_metadata_filename_format()
608 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
611 write_cover_sheet ();
615 Writer::write_cover_sheet ()
617 boost::filesystem::path const cover = _film->file ("COVER_SHEET.txt");
618 FILE* f = fopen_boost (cover, "w");
620 throw OpenFileError (cover, errno, OpenFileError::WRITE);
623 string text = Config::instance()->cover_sheet ();
624 boost::algorithm::replace_all (text, "$CPL_NAME", _film->name());
625 boost::algorithm::replace_all (text, "$TYPE", _film->dcp_content_type()->pretty_name());
626 boost::algorithm::replace_all (text, "$CONTAINER", _film->container()->container_nickname());
627 boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", _film->isdcf_metadata().audio_language);
629 optional<string> subtitle_language;
630 BOOST_FOREACH (shared_ptr<Content> i, _film->content()) {
631 BOOST_FOREACH (shared_ptr<TextContent> j, i->text) {
632 if (j->type() == TEXT_OPEN_SUBTITLE && j->use()) {
633 subtitle_language = j->language ();
637 boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", subtitle_language.get_value_or("None"));
639 boost::uintmax_t size = 0;
641 boost::filesystem::recursive_directory_iterator i = boost::filesystem::recursive_directory_iterator(_film->dir(_film->dcp_name()));
642 i != boost::filesystem::recursive_directory_iterator();
644 if (boost::filesystem::is_regular_file (i->path ())) {
645 size += boost::filesystem::file_size (i->path ());
649 if (size > (1000000000L)) {
650 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1GB", dcp::locale_convert<string> (size / 1000000000.0, 1, true)));
652 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1MB", dcp::locale_convert<string> (size / 1000000.0, 1, true)));
655 pair<int, int> ch = audio_channel_types (_film->mapped_audio_channels(), _film->audio_channels());
656 string description = String::compose("%1.%2", ch.first, ch.second);
658 if (description == "0.0") {
659 description = _("None");
660 } else if (description == "1.0") {
661 description = _("Mono");
662 } else if (description == "2.0") {
663 description = _("Stereo");
665 boost::algorithm::replace_all (text, "$AUDIO", description);
668 _film->length().split (_film->video_frame_rate(), h, m, s, fr);
670 if (h == 0 && m == 0) {
671 length = String::compose("%1s", s);
672 } else if (h == 0 && m > 0) {
673 length = String::compose("%1m%2s", m, s);
674 } else if (h > 0 && m > 0) {
675 length = String::compose("%1h%2m%3s", h, m, s);
678 boost::algorithm::replace_all (text, "$LENGTH", length);
680 checked_fwrite (text.c_str(), text.length(), f, cover);
684 /** @param frame Frame index within the whole DCP.
685 * @return true if we can fake-write this frame.
688 Writer::can_fake_write (Frame frame) const
690 if (_film->encrypted()) {
691 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
695 /* We have to do a proper write of the first frame so that we can set up the JPEG2000
696 parameters in the asset writer.
699 ReelWriter const & reel = _reels[video_reel(frame)];
701 /* Make frame relative to the start of the reel */
702 frame -= reel.start ();
703 return (frame != 0 && frame < reel.first_nonexistant_frame());
706 /** @param track Closed caption track if type == TEXT_CLOSED_CAPTION */
708 Writer::write (PlayerText text, TextType type, optional<DCPTextTrack> track, DCPTimePeriod period)
710 vector<ReelWriter>::iterator* reel = 0;
713 case TEXT_OPEN_SUBTITLE:
714 reel = &_subtitle_reel;
716 case TEXT_CLOSED_CAPTION:
717 DCPOMATIC_ASSERT (track);
718 DCPOMATIC_ASSERT (_caption_reels.find(*track) != _caption_reels.end());
719 reel = &_caption_reels[*track];
722 DCPOMATIC_ASSERT (false);
725 DCPOMATIC_ASSERT (*reel != _reels.end());
726 while ((*reel)->period().to <= period.from) {
728 DCPOMATIC_ASSERT (*reel != _reels.end());
731 (*reel)->write (text, type, track, period);
735 Writer::write (list<shared_ptr<Font> > fonts)
737 /* Just keep a list of unique fonts and we'll deal with them in ::finish */
739 BOOST_FOREACH (shared_ptr<Font> i, fonts) {
741 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
748 _fonts.push_back (i);
754 operator< (QueueItem const & a, QueueItem const & b)
756 if (a.reel != b.reel) {
757 return a.reel < b.reel;
760 if (a.frame != b.frame) {
761 return a.frame < b.frame;
764 return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
768 operator== (QueueItem const & a, QueueItem const & b)
770 return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
774 Writer::set_encoder_threads (int threads)
776 boost::mutex::scoped_lock lm (_state_mutex);
777 _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
778 _maximum_queue_size = threads * 16;
782 Writer::write (ReferencedReelAsset asset)
784 _reel_assets.push_back (asset);
788 Writer::video_reel (int frame) const
790 DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
792 while (i < _reels.size() && !_reels[i].period().contains (t)) {
796 DCPOMATIC_ASSERT (i < _reels.size ());
801 Writer::set_digest_progress (Job* job, float progress)
803 boost::mutex::scoped_lock lm (_digest_progresses_mutex);
805 _digest_progresses[boost::this_thread::get_id()] = progress;
806 float min_progress = FLT_MAX;
807 for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
808 min_progress = min (min_progress, i->second);
811 job->set_progress (min_progress);