2 Copyright (C) 2012-2016 Carl Hetherington <cth@carlh.net>
4 This file is part of DCP-o-matic.
6 DCP-o-matic is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 2 of the License, or
9 (at your option) any later version.
11 DCP-o-matic is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with DCP-o-matic. If not, see <http://www.gnu.org/licenses/>.
22 #include "compose.hpp"
26 #include "dcp_video.h"
27 #include "dcp_content_type.h"
28 #include "audio_mapping.h"
32 #include "audio_buffers.h"
36 #include "reel_writer.h"
38 #include <boost/foreach.hpp>
46 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
47 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
48 #define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE);
49 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
50 #define LOG_WARNING_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_WARNING);
51 #define LOG_WARNING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_WARNING);
52 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
54 /* OS X strikes again */
65 using boost::shared_ptr;
66 using boost::weak_ptr;
67 using boost::dynamic_pointer_cast;
70 Writer::Writer (shared_ptr<const Film> film, weak_ptr<Job> j)
75 , _queued_full_in_memory (0)
76 , _maximum_frames_in_memory (0)
82 /* Remove any old DCP */
83 boost::filesystem::remove_all (_film->dir (_film->dcp_name ()));
85 shared_ptr<Job> job = _job.lock ();
86 DCPOMATIC_ASSERT (job);
89 list<DCPTimePeriod> const reels = _film->reels ();
90 BOOST_FOREACH (DCPTimePeriod p, reels) {
91 _reels.push_back (ReelWriter (film, p, job, reel_index++, reels.size(), _film->content_summary(p)));
94 /* We can keep track of the current audio and subtitle reels easily because audio
95 and subs arrive to the Writer in sequence. This is not so for video.
97 _audio_reel = _reels.begin ();
98 _subtitle_reel = _reels.begin ();
100 /* Check that the signer is OK if we need one */
101 if (_film->is_signed() && !Config::instance()->signer_chain()->valid ()) {
102 throw InvalidSignerError ();
109 _thread = new boost::thread (boost::bind (&Writer::thread, this));
114 terminate_thread (false);
117 /** Pass a video frame to the writer for writing to disk at some point.
118 * This method can be called with frames out of order.
119 * @param encoded JPEG2000-encoded data.
120 * @param frame Frame index within the DCP.
121 * @param eyes Eyes that this frame image is for.
124 Writer::write (Data encoded, Frame frame, Eyes eyes)
126 boost::mutex::scoped_lock lock (_state_mutex);
128 while (_queued_full_in_memory > _maximum_frames_in_memory) {
129 /* The queue is too big; wait until that is sorted out */
130 _full_condition.wait (lock);
134 qi.type = QueueItem::FULL;
135 qi.encoded = encoded;
136 qi.reel = video_reel (frame);
137 qi.frame = frame - _reels[qi.reel].start ();
139 if (_film->three_d() && eyes == EYES_BOTH) {
140 /* 2D material in a 3D DCP; fake the 3D */
142 _queue.push_back (qi);
143 ++_queued_full_in_memory;
144 qi.eyes = EYES_RIGHT;
145 _queue.push_back (qi);
146 ++_queued_full_in_memory;
149 _queue.push_back (qi);
150 ++_queued_full_in_memory;
153 /* Now there's something to do: wake anything wait()ing on _empty_condition */
154 _empty_condition.notify_all ();
158 Writer::can_repeat (Frame frame) const
160 return frame > _reels[video_reel(frame)].start();
163 /** Repeat the last frame that was written to a reel as a new frame.
164 * @param frame Frame index within the DCP of the new (repeated) frame.
165 * @param eyes Eyes that this repeated frame image is for.
168 Writer::repeat (Frame frame, Eyes eyes)
170 boost::mutex::scoped_lock lock (_state_mutex);
172 while (_queued_full_in_memory > _maximum_frames_in_memory) {
173 /* The queue is too big; wait until that is sorted out */
174 _full_condition.wait (lock);
178 qi.type = QueueItem::REPEAT;
179 qi.reel = video_reel (frame);
180 qi.frame = frame - _reels[qi.reel].start ();
181 if (_film->three_d() && eyes == EYES_BOTH) {
183 _queue.push_back (qi);
184 qi.eyes = EYES_RIGHT;
185 _queue.push_back (qi);
188 _queue.push_back (qi);
191 /* Now there's something to do: wake anything wait()ing on _empty_condition */
192 _empty_condition.notify_all ();
196 Writer::fake_write (Frame frame, Eyes eyes)
198 boost::mutex::scoped_lock lock (_state_mutex);
200 while (_queued_full_in_memory > _maximum_frames_in_memory) {
201 /* The queue is too big; wait until that is sorted out */
202 _full_condition.wait (lock);
205 size_t const reel = video_reel (frame);
206 Frame const reel_frame = frame - _reels[reel].start ();
208 FILE* file = fopen_boost (_film->info_file(_reels[reel].period()), "rb");
210 throw ReadFileError (_film->info_file(_reels[reel].period()));
212 dcp::FrameInfo info = _reels[reel].read_frame_info (file, reel_frame, eyes);
216 qi.type = QueueItem::FAKE;
219 qi.frame = reel_frame;
220 if (_film->three_d() && eyes == EYES_BOTH) {
222 _queue.push_back (qi);
223 qi.eyes = EYES_RIGHT;
224 _queue.push_back (qi);
227 _queue.push_back (qi);
230 /* Now there's something to do: wake anything wait()ing on _empty_condition */
231 _empty_condition.notify_all ();
234 /** Write one video frame's worth of audio frames to the DCP.
235 * @param audio Audio data or 0 if there is no audio to be written here (i.e. it is referenced).
236 * This method is not thread safe.
239 Writer::write (shared_ptr<const AudioBuffers> audio)
241 if (_audio_reel == _reels.end ()) {
242 /* This audio is off the end of the last reel; ignore it */
246 _audio_reel->write (audio);
248 /* written is in video frames, not audio frames */
249 if (_audio_reel->total_written_audio_frames() >= _audio_reel->period().duration().frames_floor (_film->video_frame_rate())) {
254 /** This must be called from Writer::thread() with an appropriate lock held */
256 Writer::have_sequenced_image_at_queue_head ()
258 if (_queue.empty ()) {
264 QueueItem const & f = _queue.front();
265 ReelWriter const & reel = _reels[f.reel];
267 /* The queue should contain only EYES_LEFT/EYES_RIGHT pairs or EYES_BOTH */
269 if (f.eyes == EYES_BOTH) {
271 return f.frame == (reel.last_written_video_frame() + 1);
276 if (reel.last_written_eyes() == EYES_LEFT && f.frame == reel.last_written_video_frame() && f.eyes == EYES_RIGHT) {
280 if (reel.last_written_eyes() == EYES_RIGHT && f.frame == (reel.last_written_video_frame() + 1) && f.eyes == EYES_LEFT) {
293 boost::mutex::scoped_lock lock (_state_mutex);
297 if (_finish || _queued_full_in_memory > _maximum_frames_in_memory || have_sequenced_image_at_queue_head ()) {
298 /* We've got something to do: go and do it */
302 /* Nothing to do: wait until something happens which may indicate that we do */
303 LOG_TIMING (N_("writer-sleep queue=%1"), _queue.size());
304 _empty_condition.wait (lock);
305 LOG_TIMING (N_("writer-wake queue=%1"), _queue.size());
308 if (_finish && _queue.empty()) {
312 /* We stop here if we have been asked to finish, and if either the queue
313 is empty or we do not have a sequenced image at its head (if this is the
314 case we will never terminate as no new frames will be sent once
317 if (_finish && (!have_sequenced_image_at_queue_head() || _queue.empty())) {
318 /* (Hopefully temporarily) log anything that was not written */
319 if (!_queue.empty() && !have_sequenced_image_at_queue_head()) {
320 LOG_WARNING (N_("Finishing writer with a left-over queue of %1:"), _queue.size());
321 for (list<QueueItem>::const_iterator i = _queue.begin(); i != _queue.end(); ++i) {
322 if (i->type == QueueItem::FULL) {
323 LOG_WARNING (N_("- type FULL, frame %1, eyes %2"), i->frame, (int) i->eyes);
325 LOG_WARNING (N_("- type FAKE, size %1, frame %2, eyes %3"), i->size, i->frame, (int) i->eyes);
332 /* Write any frames that we can write; i.e. those that are in sequence. */
333 while (have_sequenced_image_at_queue_head ()) {
334 QueueItem qi = _queue.front ();
336 if (qi.type == QueueItem::FULL && qi.encoded) {
337 --_queued_full_in_memory;
342 ReelWriter& reel = _reels[qi.reel];
345 case QueueItem::FULL:
346 LOG_DEBUG_ENCODE (N_("Writer FULL-writes %1 (%2)"), qi.frame, (int) qi.eyes);
348 qi.encoded = Data (_film->j2c_path (qi.reel, qi.frame, qi.eyes, false));
350 reel.write (qi.encoded, qi.frame, qi.eyes);
353 case QueueItem::FAKE:
354 LOG_DEBUG_ENCODE (N_("Writer FAKE-writes %1"), qi.frame);
355 reel.fake_write (qi.frame, qi.eyes, qi.size);
358 case QueueItem::REPEAT:
359 LOG_DEBUG_ENCODE (N_("Writer REPEAT-writes %1"), qi.frame);
360 reel.repeat_write (qi.frame, qi.eyes);
368 while (_queued_full_in_memory > _maximum_frames_in_memory) {
369 /* Too many frames in memory which can't yet be written to the stream.
370 Write some FULL frames to disk.
373 /* Find one from the back of the queue */
375 list<QueueItem>::reverse_iterator i = _queue.rbegin ();
376 while (i != _queue.rend() && (i->type != QueueItem::FULL || !i->encoded)) {
380 DCPOMATIC_ASSERT (i != _queue.rend());
384 /* i is valid here, even though we don't hold a lock on the mutex,
385 since list iterators are unaffected by insertion and only this
386 thread could erase the last item in the list.
389 LOG_GENERAL ("Writer full; pushes %1 to disk", i->frame);
391 i->encoded->write_via_temp (
392 _film->j2c_path (i->reel, i->frame, i->eyes, true),
393 _film->j2c_path (i->reel, i->frame, i->eyes, false)
398 --_queued_full_in_memory;
401 /* The queue has probably just gone down a bit; notify anything wait()ing on _full_condition */
402 _full_condition.notify_all ();
411 Writer::terminate_thread (bool can_throw)
413 boost::mutex::scoped_lock lock (_state_mutex);
419 _empty_condition.notify_all ();
420 _full_condition.notify_all ();
423 if (_thread->joinable ()) {
442 LOG_GENERAL_NC ("Terminating writer thread");
444 terminate_thread (true);
446 LOG_GENERAL_NC ("Finishing ReelWriters");
448 BOOST_FOREACH (ReelWriter& i, _reels) {
452 LOG_GENERAL_NC ("Writing XML");
454 dcp::DCP dcp (_film->dir (_film->dcp_name()));
456 shared_ptr<dcp::CPL> cpl (
459 _film->dcp_content_type()->libdcp_kind ()
465 /* Calculate digests for each reel in parallel */
467 shared_ptr<Job> job = _job.lock ();
468 job->sub (_("Computing digests"));
470 boost::asio::io_service service;
471 boost::thread_group pool;
473 shared_ptr<boost::asio::io_service::work> work (new boost::asio::io_service::work (service));
475 int const threads = max (1, Config::instance()->num_local_encoding_threads ());
477 for (int i = 0; i < threads; ++i) {
478 pool.create_thread (boost::bind (&boost::asio::io_service::run, &service));
481 BOOST_FOREACH (ReelWriter& i, _reels) {
482 boost::function<void (float)> set_progress = boost::bind (&Writer::set_digest_progress, this, job.get(), _1);
483 service.post (boost::bind (&ReelWriter::calculate_digests, &i, set_progress));
490 /* Add reels to CPL */
492 BOOST_FOREACH (ReelWriter& i, _reels) {
493 cpl->add (i.create_reel (_reel_assets, _fonts));
496 dcp::XMLMetadata meta;
497 meta.creator = Config::instance()->dcp_creator ();
498 if (meta.creator.empty ()) {
499 meta.creator = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
501 meta.issuer = Config::instance()->dcp_issuer ();
502 if (meta.issuer.empty ()) {
503 meta.issuer = String::compose ("DCP-o-matic %1 %2", dcpomatic_version, dcpomatic_git_commit);
505 meta.set_issue_date_now ();
507 cpl->set_metadata (meta);
509 shared_ptr<const dcp::CertificateChain> signer;
510 if (_film->is_signed ()) {
511 signer = Config::instance()->signer_chain ();
512 /* We did check earlier, but check again here to be on the safe side */
513 if (!signer->valid ()) {
514 throw InvalidSignerError ();
518 dcp.write_xml (_film->interop () ? dcp::INTEROP : dcp::SMPTE, meta, signer, Config::instance()->dcp_metadata_filename_format());
521 N_("Wrote %1 FULL, %2 FAKE, %3 REPEAT, %4 pushed to disk"), _full_written, _fake_written, _repeat_written, _pushed_to_disk
525 /** @param frame Frame index within the whole DCP.
526 * @return true if we can fake-write this frame.
529 Writer::can_fake_write (Frame frame) const
531 /* We have to do a proper write of the first frame so that we can set up the JPEG2000
532 parameters in the asset writer.
535 ReelWriter const & reel = _reels[video_reel(frame)];
537 /* Make frame relative to the start of the reel */
538 frame -= reel.start ();
539 return (frame != 0 && frame < reel.first_nonexistant_frame());
543 Writer::write (PlayerSubtitles subs)
545 if (subs.text.empty ()) {
549 if (_subtitle_reel->period().to <= subs.from) {
553 _subtitle_reel->write (subs);
557 Writer::write (list<shared_ptr<Font> > fonts)
559 /* Just keep a list of unique fonts and we'll deal with them in ::finish */
561 BOOST_FOREACH (shared_ptr<Font> i, fonts) {
563 BOOST_FOREACH (shared_ptr<Font> j, _fonts) {
570 _fonts.push_back (i);
576 operator< (QueueItem const & a, QueueItem const & b)
578 if (a.reel != b.reel) {
579 return a.reel < b.reel;
582 if (a.frame != b.frame) {
583 return a.frame < b.frame;
586 return static_cast<int> (a.eyes) < static_cast<int> (b.eyes);
590 operator== (QueueItem const & a, QueueItem const & b)
592 return a.reel == b.reel && a.frame == b.frame && a.eyes == b.eyes;
596 Writer::set_encoder_threads (int threads)
598 /* I think the scaling factor here should be the ratio of the longest frame
599 encode time to the shortest; if the thread count is T, longest time is L
600 and the shortest time S we could encode L/S frames per thread whilst waiting
601 for the L frame to encode so we might have to store LT/S frames.
603 However we don't want to use too much memory, so keep it a bit lower than we'd
604 perhaps like. A J2K frame is typically about 1Mb so 3 here will mean we could
605 use about 240Mb with 72 encoding threads.
607 _maximum_frames_in_memory = lrint (threads * 3);
611 Writer::write (ReferencedReelAsset asset)
613 _reel_assets.push_back (asset);
617 Writer::video_reel (int frame) const
619 DCPTime t = DCPTime::from_frames (frame, _film->video_frame_rate ());
621 while (i < _reels.size() && !_reels[i].period().contains (t)) {
625 DCPOMATIC_ASSERT (i < _reels.size ());
630 Writer::set_digest_progress (Job* job, float progress)
632 /* I believe this is thread-safe */
633 _digest_progresses[boost::this_thread::get_id()] = progress;
635 boost::mutex::scoped_lock lm (_digest_progresses_mutex);
636 float min_progress = FLT_MAX;
637 for (map<boost::thread::id, float>::const_iterator i = _digest_progresses.begin(); i != _digest_progresses.end(); ++i) {
638 min_progress = min (min_progress, i->second);
641 job->set_progress (min_progress);