2 Copyright (C) 2012-2017 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
22 #include "compose.hpp"
26 #include "dcp_video.h"
27 #include "dcp_content_type.h"
28 #include "audio_mapping.h"
32 #include "audio_buffers.h"
36 #include "reel_writer.h"
38 #include <dcp/locale_convert.h>
39 #include <boost/foreach.hpp>
47 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
48 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
49 #define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE);
50 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
51 #define LOG_WARNING_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_WARNING);
52 #define LOG_WARNING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_WARNING);
53 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
55 /* OS X strikes again */
66 using boost::shared_ptr;
67 using boost::weak_ptr;
68 using boost::dynamic_pointer_cast;
71 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
76 , _queued_full_in_memory (0)
77 /* These will be reset to sensible values when J2KEncoder is created */
78 , _maximum_frames_in_memory (8)
79 , _maximum_queue_size (8)
85 shared_ptr<Job> job = _job.lock ();
86 DCPOMATIC_ASSERT (job);
89 list<DCPTimePeriod> const reels = _film->reels ();
90 BOOST_FOREACH (DCPTimePeriod p, reels) {
91 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
94 /* We can keep track of the current audio, subtitle and closed caption reels easily because audio
95 and captions arrive to the Writer in sequence. This is not so for video.
97 _audio_reel = _reels.begin ();
98 for (int i = 0; i < TEXT_COUNT; ++i) {
99 _text_reel[i] = _reels.begin ();
102 /* Check that the signer is OK if we need one */
104 if (_film->is_signed() && !Config::instance()->signer_chain()->valid(&reason)) {
105 throw InvalidSignerError (reason);
112 _thread = new boost::thread (boost::bind (&Writer::thread, this));
113 #ifdef DCPOMATIC_LINUX
114 pthread_setname_np (_thread->native_handle(), "writer");
120 terminate_thread (false);
123 /** Pass a video frame to the writer for writing to disk at some point.
124 * This method can be called with frames out of order.
125 * @param encoded JPEG2000-encoded data.
126 * @param frame Frame index within the DCP.
127 * @param eyes Eyes that this frame image is for.
130 Writer::write (Data encoded, Frame frame, Eyes eyes)
132 boost::mutex::scoped_lock lock (_state_mutex);
134 while (_queued_full_in_memory > _maximum_frames_in_memory) {
135 /* There are too many full frames in memory; wake the main writer thread and
136 wait until it sorts everything out */
137 _empty_condition.notify_all ();
138 _full_condition.wait (lock);
142 qi.type = QueueItem::FULL;
143 qi.encoded = encoded;
144 qi.reel = video_reel (frame);
145 qi.frame = frame - _reels[qi.reel].start ();
147 if (_film->three_d() && eyes == EYES_BOTH) {
148 /* 2D material in a 3D DCP; fake the 3D */
150 _queue.push_back (qi);
151 ++_queued_full_in_memory;
152 qi.eyes = EYES_RIGHT;
153 _queue.push_back (qi);
154 ++_queued_full_in_memory;
157 _queue.push_back (qi);
158 ++_queued_full_in_memory;
161 /* Now there's something to do: wake anything wait()ing on _empty_condition */
162 _empty_condition.notify_all ();
166 Writer::can_repeat (Frame frame) const
168 return frame > _reels[video_reel(frame)].start();
171 /** Repeat the last frame that was written to a reel as a new frame.
172 * @param frame Frame index within the DCP of the new (repeated) frame.
173 * @param eyes Eyes that this repeated frame image is for.
176 Writer::repeat (Frame frame, Eyes eyes)
178 boost::mutex::scoped_lock lock (_state_mutex);
180 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
181 /* The queue is too big, and the main writer thread can run and fix it, so
182 wake it and wait until it has done.
184 _empty_condition.notify_all ();
185 _full_condition.wait (lock);
189 qi.type = QueueItem::REPEAT;
190 qi.reel = video_reel (frame);
191 qi.frame = frame - _reels[qi.reel].start ();
192 if (_film->three_d() && eyes == EYES_BOTH) {
194 _queue.push_back (qi);
195 qi.eyes = EYES_RIGHT;
196 _queue.push_back (qi);
199 _queue.push_back (qi);
202 /* Now there's something to do: wake anything wait()ing on _empty_condition */
203 _empty_condition.notify_all ();
207 Writer::fake_write (Frame frame, Eyes eyes)
209 boost::mutex::scoped_lock lock (_state_mutex);
211 while (_queue.size() > _maximum_queue_size && have_sequenced_image_at_queue_head()) {
212 /* The queue is too big, and the main writer thread can run and fix it, so
213 wake it and wait until it has done.
215 _empty_condition.notify_all ();
216 _full_condition.wait (lock);
219 size_t const reel = video_reel (frame);
220 Frame const reel_frame = frame - _reels[reel].start ();
222 FILE* file = fopen_boost (_film->info_file(_reels[reel].period()), "rb");
224 throw ReadFileError (_film->info_file(_reels[reel].period()));
226 dcp::FrameInfo info = _reels[reel].read_frame_info (file, reel_frame, eyes);
230 qi.type = QueueItem::FAKE;
233 qi.frame = reel_frame;
234 if (_film->three_d() && eyes == EYES_BOTH) {
236 _queue.push_back (qi);
237 qi.eyes = EYES_RIGHT;
238 _queue.push_back (qi);
241 _queue.push_back (qi);
244 /* Now there's something to do: wake anything wait()ing on _empty_condition */
245 _empty_condition.notify_all ();
248 /** Write some audio frames to the DCP.
249 * @param audio Audio data.
250 * @param time Time of this data within the DCP.
251 * This method is not thread safe.
254 Writer::write (shared_ptr<const AudioBuffers> audio, DCPTime const time)
256 DCPOMATIC_ASSERT (audio);
258 int const afr = _film->audio_frame_rate();
260 DCPTime const end = time + DCPTime::from_frames(audio->frames(), afr);
262 /* The audio we get might span a reel boundary, and if so we have to write it in bits */
267 if (_audio_reel == _reels.end ()) {
268 /* This audio is off the end of the last reel; ignore it */
272 if (end <= _audio_reel->period().to) {
273 /* Easy case: we can write all the audio to this reel */
274 _audio_reel->write (audio);
277 /* Split the audio into two and write the first part */
278 DCPTime part_lengths[2] = {
279 _audio_reel->period().to - t,
280 end - _audio_reel->period().to
283 Frame part_frames[2] = {
284 part_lengths[0].frames_ceil(afr),
285 part_lengths[1].frames_ceil(afr)
288 if (part_frames[0]) {
289 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[0]));
290 part->copy_from (audio.get(), part_frames[0], 0, 0);
291 _audio_reel->write (part);
294 if (part_frames[1]) {
295 shared_ptr<AudioBuffers> part (new AudioBuffers (audio->channels(), part_frames[1]));
296 part->copy_from (audio.get(), part_frames[1], part_frames[0], 0);
303 t += part_lengths[0];
308 /** This must be called from Writer::thread() with an appropriate lock held */
310 Writer::have_sequenced_image_at_queue_head ()
312 if (_queue.empty ()) {
318 QueueItem const & f = _queue.front();
319 ReelWriter const & reel = _reels[f.reel];
321 /* The queue should contain only EYES_LEFT/EYES_RIGHT pairs or EYES_BOTH */
323 if (f.eyes == EYES_BOTH) {
325 return f.frame == (reel.last_written_video_frame() + 1);
330 if (reel.last_written_eyes() == EYES_LEFT && f.frame == reel.last_written_video_frame() && f.eyes == EYES_RIGHT) {
334 if (reel.last_written_eyes() == EYES_RIGHT && f.frame == (reel.last_written_video_frame() + 1) && f.eyes == EYES_LEFT) {
347 boost::mutex::scoped_lock lock (_state_mutex);
351 if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
352 /* We've got something to do: go and do it */
356 /* Nothing to do: wait until something happens which may indicate that we do */
357 LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
358 _empty_condition.wait (lock);
359 LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
362 if (_finish && _queue.empty()) {
366 /* We stop here if we have been asked to finish, and if either the queue
367 is empty or we do not have a sequenced image at its head (if this is the
368 case we will never terminate as no new frames will be sent once
371 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
372 /* (Hopefully temporarily) log anything that was not written */
373 if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
374 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
375 for (list<QueueItem>::const_iterator i = _queue.begin(); i != _queue.end(); ++i) {
376 if (i->type == QueueItem::FULL) {
377 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i->frame, (int) i->eyes);
379 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i->size, i->frame, (int) i->eyes);
386 /* Write any frames that we can write; i.e. those that are in sequence. */
387 while (have_sequenced_image_at_queue_head ()) {
388 QueueItem qi = _queue.front ();
390 if (qi.type == QueueItem::FULL && qi.encoded) {
391 --_queued_full_in_memory;
396 ReelWriter& reel = _reels[qi.reel];
399 case QueueItem::FULL:
400 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
402 qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
404 reel.write (qi.encoded, qi.frame, qi.eyes);
407 case QueueItem::FAKE:
408 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
409 reel.fake_write (qi.frame, qi.eyes, qi.size);
412 case QueueItem::REPEAT:
413 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
414 reel.repeat_write (qi.frame, qi.eyes);
420 _full_condition.notify_all ();
423 while (_queued_full_in_memory > _maximum_frames_in_memory) {
424 /* Too many frames in memory which can't yet be written to the stream.
425 Write some FULL frames to disk.
428 /* Find one from the back of the queue */
430 list<QueueItem>::reverse_iterator i = _queue.rbegin ();
431 while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
435 DCPOMATIC_ASSERT (i != _queue.rend());
437 /* For the log message below */
438 int const awaiting = _reels[_queue.front().reel].last_written_video_frame();
441 /* i is valid here, even though we don't hold a lock on the mutex,
442 since list iterators are unaffected by insertion and only this
443 thread could erase the last item in the list.
446 LOG_GENERAL ("Writer full; pushes %1 to disk while awaiting %2", i->frame, awaiting);
448 i->encoded->write_via_temp (
449 _film->j2c_path (i->reel, i->frame, i->eyes, true),
450 _film->j2c_path (i->reel, i->frame, i->eyes, false)
455 --_queued_full_in_memory;
456 _full_condition.notify_all ();
466 Writer::terminate_thread (bool can_throw)
468 boost::mutex::scoped_lock lock (_state_mutex);
474 _empty_condition.notify_all ();
475 _full_condition.notify_all ();
478 if (_thread->joinable ()) {
497 LOG_GENERAL_NC ("Terminating writer thread");
499 terminate_thread (true);
501 LOG_GENERAL_NC ("Finishing ReelWriters");
503 BOOST_FOREACH (ReelWriter& i, _reels) {
507 LOG_GENERAL_NC ("Writing XML");
509 dcp::DCP dcp (_film->dir (_film->dcp_name()));
511 shared_ptr<dcp::CPL> cpl (
514 _film->dcp_content_type()->libdcp_kind ()
520 /* Calculate digests for each reel in parallel */
522 shared_ptr<Job> job = _job.lock ();
523 job->sub (_("Computing digests"));
525 boost::asio::io_service service;
526 boost::thread_group pool;
528 shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
530 int const threads = max (1, Config::instance()->master_encoding_threads ());
532 for (int i = 0; i < threads; ++i) {
533 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
536 BOOST_FOREACH (ReelWriter& i, _reels) {
537 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
538 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
545 /* Add reels to CPL */
547 BOOST_FOREACH (ReelWriter& i, _reels) {
548 cpl->add (i.create_reel (_reel_assets, _fonts));
551 dcp::XMLMetadata meta;
552 meta.annotation_text = cpl->annotation_text ();
553 meta.creator = Config::instance()->dcp_creator ();
554 if (meta.creator.empty ()) {
555 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
557 meta.issuer = Config::instance()->dcp_issuer ();
558 if (meta.issuer.empty ()) {
559 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
561 meta.set_issue_date_now ();
563 cpl->set_metadata (meta);
565 shared_ptr<const dcp::CertificateChain> signer;
566 if (_film->is_signed ()) {
567 signer = Config::instance()->signer_chain ();
568 /* We did check earlier, but check again here to be on the safe side */
570 if (!signer->valid (&reason)) {
571 throw InvalidSignerError (reason);
575 dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer, Config::instance()->dcp_metadata_filename_format());
578 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
581 write_cover_sheet ();
585 Writer::write_cover_sheet ()
587 boost::filesystem::path const cover = _film->file ("COVER_SHEET.txt");
588 FILE* f = fopen_boost (cover, "w");
590 throw OpenFileError (cover, errno, false);
593 string text = Config::instance()->cover_sheet ();
594 boost::algorithm::replace_all (text, "$CPL_NAME", _film->name());
595 boost::algorithm::replace_all (text, "$TYPE", _film->dcp_content_type()->pretty_name());
596 boost::algorithm::replace_all (text, "$CONTAINER", _film->container()->container_nickname());
597 boost::algorithm::replace_all (text, "$AUDIO_LANGUAGE", _film->isdcf_metadata().audio_language);
598 boost::algorithm::replace_all (text, "$SUBTITLE_LANGUAGE", _film->isdcf_metadata().subtitle_language);
600 boost::uintmax_t size = 0;
602 boost::filesystem::recursive_directory_iterator i = boost::filesystem::recursive_directory_iterator(_film->dir(_film->dcp_name()));
603 i != boost::filesystem::recursive_directory_iterator();
605 if (boost::filesystem::is_regular_file (i->path ())) {
606 size += boost::filesystem::file_size (i->path ());
610 if (size > (1000000000L)) {
611 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1GB", dcp::locale_convert<string> (size / 1000000000.0, 1, true)));
613 boost::algorithm::replace_all (text, "$SIZE", String::compose ("%1MB", dcp::locale_convert<string> (size / 1000000.0, 1, true)));
616 pair<int, int> ch = audio_channel_types (_film->mapped_audio_channels(), _film->audio_channels());
617 string description = String::compose("%1.%2", ch.first, ch.second);
619 if (description == "0.0") {
620 description = _("None");
621 } else if (description == "1.0") {
622 description = _("Mono");
623 } else if (description == "2.0") {
624 description = _("Stereo");
626 boost::algorithm::replace_all (text, "$AUDIO", description);
629 _film->length().split (_film->video_frame_rate(), h, m, s, fr);
631 if (h == 0 && m == 0) {
632 length = String::compose("%1s", s);
633 } else if (h == 0 && m > 0) {
634 length = String::compose("%1m%2s", m, s);
635 } else if (h > 0 && m > 0) {
636 length = String::compose("%1h%2m%3s", h, m, s);
639 boost::algorithm::replace_all (text, "$LENGTH", length);
641 fwrite (text.c_str(), 1, text.length(), f);
645 /** @param frame Frame index within the whole DCP.
646 * @return true if we can fake-write this frame.
649 Writer::can_fake_write (Frame frame) const
651 if (_film->encrypted()) {
652 /* We need to re-write the frame because the asset ID is embedded in the HMAC... I think... */
656 /* We have to do a proper write of the first frame so that we can set up the JPEG2000
657 parameters in the asset writer.
660 ReelWriter const & reel = _reels[video_reel(frame)];
662 /* Make frame relative to the start of the reel */
663 frame -= reel.start ();
664 return (frame != 0 && frame < reel.first_nonexistant_frame());
668 Writer::write (PlayerCaption text, TextType type, DCPTimePeriod period)
670 while (_text_reel[type]->period().to <= period.from) {
672 DCPOMATIC_ASSERT (_text_reel[type] != _reels.end());
675 DCPOMATIC_ASSERT (_text_reel[type] != _reels.end());
677 _text_reel[type]->write (text, type, period);
681 Writer::write (list<shared_ptr<Font> > fonts)
683 /* Just keep a list of unique fonts and we'll deal with them in ::finish */
685 BOOST_FOREACH (shared_ptr<Font> i, fonts) {
687 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
694 _fonts.push_back (i);
700 operator< (QueueItem const & a, QueueItem const & b)
702 if (a.reel != b.reel) {
703 return a.reel < b.reel;
706 if (a.frame != b.frame) {
707 return a.frame < b.frame;
710 return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
714 operator== (QueueItem const & a, QueueItem const & b)
716 return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
720 Writer::set_encoder_threads (int threads)
722 boost::mutex::scoped_lock lm (_state_mutex);
723 _maximum_frames_in_memory = lrint (threads * Config::instance()->frames_in_memory_multiplier());
724 _maximum_queue_size = threads * 16;
728 Writer::write (ReferencedReelAsset asset)
730 _reel_assets.push_back (asset);
734 Writer::video_reel (int frame) const
736 DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
738 while (i < _reels.size() && !_reels[i].period().contains (t)) {
742 DCPOMATIC_ASSERT (i < _reels.size ());
747 Writer::set_digest_progress (Job* job, float progress)
749 /* I believe this is thread-safe */
750 _digest_progresses[boost::this_thread::get_id()] = progress;
752 boost::mutex::scoped_lock lm (_digest_progresses_mutex);
753 float min_progress = FLT_MAX;
754 for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
755 min_progress = min (min_progress, i->second);
758 job->set_progress (min_progress);