2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
33 #include "exceptions.h"
36 #include "dcp_video_frame.h"
43 using std::stringstream;
48 using namespace boost;
50 int const Encoder::_history_size = 25;
51 unsigned int const Encoder::_maximum_frames_in_memory = 8;
53 /** @param f Film that we are encoding.
56 Encoder::Encoder (shared_ptr<const Film> f)
58 , _just_skipped (false)
59 , _video_frames_in (0)
60 , _audio_frames_in (0)
61 , _video_frames_out (0)
62 , _audio_frames_out (0)
63 #ifdef HAVE_SWRESAMPLE
66 , _have_a_real_frame (false)
67 , _terminate_encoder (false)
69 , _finish_writer (false)
70 , _last_written_frame (-1)
72 if (_film->audio_stream()) {
73 /* Create sound output files with .tmp suffixes; we will rename
74 them if and when we complete.
76 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
78 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
79 /* We write mono files */
81 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
82 SNDFILE* f = sf_open (_film->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
84 throw CreateFileError (_film->multichannel_audio_out_path (i, true));
86 _sound_files.push_back (f);
94 terminate_worker_threads ();
95 finish_writer_thread ();
99 Encoder::process_begin ()
101 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
102 #ifdef HAVE_SWRESAMPLE
105 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
106 _film->log()->log (s.str ());
108 /* We will be using planar float data when we call the resampler */
109 _swr_context = swr_alloc_set_opts (
111 _film->audio_stream()->channel_layout(),
113 _film->target_audio_sample_rate(),
114 _film->audio_stream()->channel_layout(),
116 _film->audio_stream()->sample_rate(),
120 swr_init (_swr_context);
122 throw EncodeError ("Cannot resample audio as libswresample is not present");
125 #ifdef HAVE_SWRESAMPLE
130 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
131 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
134 vector<ServerDescription*> servers = Config::instance()->servers ();
136 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
137 for (int j = 0; j < (*i)->threads (); ++j) {
138 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
143 _picture_asset.reset (
144 new libdcp::MonoPictureAsset (
145 _film->dir (_film->dcp_name()),
146 String::compose ("video_%1.mxf", 0),
147 DCPFrameRate (_film->frames_per_second()).frames_per_second,
148 _film->format()->dcp_size()
152 _picture_asset_writer = _picture_asset->start_write ();
153 _writer_thread = new boost::thread (boost::bind (&Encoder::writer_thread, this));
158 Encoder::process_end ()
161 if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
163 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
166 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
169 throw EncodeError ("could not run sample-rate converter");
176 out->set_frames (frames);
180 swr_free (&_swr_context);
184 if (_film->audio_stream()) {
185 close_sound_files ();
187 /* Rename .wav.tmp files to .wav */
188 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
189 if (boost::filesystem::exists (_film->multichannel_audio_out_path (i, false))) {
190 boost::filesystem::remove (_film->multichannel_audio_out_path (i, false));
192 boost::filesystem::rename (_film->multichannel_audio_out_path (i, true), _film->multichannel_audio_out_path (i, false));
196 boost::mutex::scoped_lock lock (_worker_mutex);
198 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_encode_queue.size ()));
200 /* Keep waking workers until the queue is empty */
201 while (!_encode_queue.empty ()) {
202 _film->log()->log ("Waking with " + lexical_cast<string> (_encode_queue.size ()), Log::VERBOSE);
203 _worker_condition.notify_all ();
204 _worker_condition.wait (lock);
209 terminate_worker_threads ();
211 _film->log()->log ("Mopping up " + lexical_cast<string> (_encode_queue.size()));
213 /* The following sequence of events can occur in the above code:
214 1. a remote worker takes the last image off the queue
215 2. the loop above terminates
216 3. the remote worker fails to encode the image and puts it back on the queue
217 4. the remote worker is then terminated by terminate_worker_threads
219 So just mop up anything left in the queue here.
222 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
223 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
225 shared_ptr<EncodedData> e = (*i)->encode_locally ();
227 boost::mutex::scoped_lock lock2 (_writer_mutex);
228 _write_queue.push_back (make_pair (e, (*i)->frame ()));
229 _writer_condition.notify_all ();
232 } catch (std::exception& e) {
233 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
237 finish_writer_thread ();
238 _picture_asset_writer->finalize ();
241 /** @return an estimate of the current number of frames we are encoding per second,
245 Encoder::current_frames_per_second () const
247 boost::mutex::scoped_lock lock (_history_mutex);
248 if (int (_time_history.size()) < _history_size) {
253 gettimeofday (&now, 0);
255 return _history_size / (seconds (now) - seconds (_time_history.back ()));
258 /** @return true if the last frame to be processed was skipped as it already existed */
260 Encoder::skipping () const
262 boost::mutex::scoped_lock (_history_mutex);
263 return _just_skipped;
266 /** @return Number of video frames that have been sent out */
268 Encoder::video_frames_out () const
270 boost::mutex::scoped_lock (_history_mutex);
271 return _video_frames_out;
274 /** Should be called when a frame has been encoded successfully.
275 * @param n Source frame index.
278 Encoder::frame_done ()
280 boost::mutex::scoped_lock lock (_history_mutex);
281 _just_skipped = false;
284 gettimeofday (&tv, 0);
285 _time_history.push_front (tv);
286 if (int (_time_history.size()) > _history_size) {
287 _time_history.pop_back ();
291 /** Called by a subclass when it has just skipped the processing
292 of a frame because it has already been done.
295 Encoder::frame_skipped ()
297 boost::mutex::scoped_lock lock (_history_mutex);
298 _just_skipped = true;
302 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
304 DCPFrameRate dfr (_film->frames_per_second ());
306 if (dfr.skip && (_video_frames_in % 2)) {
311 boost::mutex::scoped_lock lock (_worker_mutex);
313 /* Wait until the queue has gone down a bit */
314 while (_encode_queue.size() >= _worker_threads.size() * 2 && !_terminate_encoder) {
315 TIMING ("decoder sleeps with queue of %1", _encode_queue.size());
316 _worker_condition.wait (lock);
317 TIMING ("decoder wakes with queue of %1", _encode_queue.size());
320 if (_terminate_encoder) {
324 /* Only do the processing if we don't already have a file for this frame */
325 if (boost::filesystem::exists (_film->frame_out_path (_video_frames_out, false))) {
330 if (same && _have_a_real_frame) {
331 /* Use the last frame that we encoded. We do this by putting a null encoded
332 frame straight onto the writer's queue. It will know to duplicate the previous frame
335 boost::mutex::scoped_lock lock2 (_writer_mutex);
336 _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
338 /* Queue this new frame for encoding */
339 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
340 TIMING ("adding to queue of %1", _encode_queue.size ());
341 _encode_queue.push_back (boost::shared_ptr<DCPVideoFrame> (
343 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
344 _film->subtitle_offset(), _film->subtitle_scale(),
345 _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
346 _film->colour_lut(), _film->j2k_bandwidth(),
351 _worker_condition.notify_all ();
352 _have_a_real_frame = true;
359 boost::mutex::scoped_lock lock2 (_writer_mutex);
360 _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
366 Encoder::process_audio (shared_ptr<AudioBuffers> data)
369 /* Maybe sample-rate convert */
372 /* Compute the resampled frames count and add 32 for luck */
373 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
375 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
378 int const resampled_frames = swr_convert (
379 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
382 if (resampled_frames < 0) {
383 throw EncodeError ("could not run sample-rate converter");
386 resampled->set_frames (resampled_frames);
388 /* And point our variables at the resampled audio */
393 if (_film->audio_channels() == 1) {
394 /* We need to switch things around so that the mono channel is on
395 the centre channel of a 5.1 set (with other channels silent).
398 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
399 b->make_silent (libdcp::LEFT);
400 b->make_silent (libdcp::RIGHT);
401 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
402 b->make_silent (libdcp::LFE);
403 b->make_silent (libdcp::LS);
404 b->make_silent (libdcp::RS);
411 _audio_frames_in += data->frames ();
415 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
417 for (int i = 0; i < audio->channels(); ++i) {
418 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
421 _audio_frames_out += audio->frames ();
425 Encoder::close_sound_files ()
427 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
431 _sound_files.clear ();
435 Encoder::terminate_worker_threads ()
437 boost::mutex::scoped_lock lock (_worker_mutex);
438 _terminate_encoder = true;
439 _worker_condition.notify_all ();
442 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
449 Encoder::finish_writer_thread ()
451 if (!_writer_thread) {
455 boost::mutex::scoped_lock lock (_writer_mutex);
456 _finish_writer = true;
457 _writer_condition.notify_all ();
460 _writer_thread->join ();
461 delete _writer_thread;
466 Encoder::encoder_thread (ServerDescription* server)
468 /* Number of seconds that we currently wait between attempts
469 to connect to the server; not relevant for localhost
472 int remote_backoff = 0;
476 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
477 boost::mutex::scoped_lock lock (_worker_mutex);
478 while (_encode_queue.empty () && !_terminate_encoder) {
479 _worker_condition.wait (lock);
482 if (_terminate_encoder) {
486 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _encode_queue.size());
487 boost::shared_ptr<DCPVideoFrame> vf = _encode_queue.front ();
488 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
489 _encode_queue.pop_front ();
493 shared_ptr<EncodedData> encoded;
497 encoded = vf->encode_remotely (server);
499 if (remote_backoff > 0) {
500 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
503 /* This job succeeded, so remove any backoff */
506 } catch (std::exception& e) {
507 if (remote_backoff < 60) {
509 remote_backoff += 10;
513 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
514 vf->frame(), server->host_name(), e.what(), remote_backoff)
520 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
521 encoded = vf->encode_locally ();
522 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
523 } catch (std::exception& e) {
524 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
529 boost::mutex::scoped_lock lock2 (_writer_mutex);
530 _write_queue.push_back (make_pair (encoded, vf->frame ()));
531 _writer_condition.notify_all ();
535 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
537 _encode_queue.push_front (vf);
541 if (remote_backoff > 0) {
542 dvdomatic_sleep (remote_backoff);
546 _worker_condition.notify_all ();
551 Encoder::link (string a, string b) const
553 #ifdef DVDOMATIC_POSIX
554 int const r = symlink (a.c_str(), b.c_str());
556 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
560 #ifdef DVDOMATIC_WINDOWS
561 boost::filesystem::copy_file (a, b);
565 struct WriteQueueSorter
567 bool operator() (pair<shared_ptr<EncodedData>, int> const & a, pair<shared_ptr<EncodedData>, int> const & b) {
568 return a.second < b.second;
573 Encoder::writer_thread ()
577 boost::mutex::scoped_lock lock (_writer_mutex);
580 if (_finish_writer ||
581 _write_queue.size() > _maximum_frames_in_memory ||
582 (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1))) {
587 TIMING ("writer sleeps with a queue of %1; %2 pending", _write_queue.size(), _pending.size());
588 _writer_condition.wait (lock);
589 TIMING ("writer wakes with a queue of %1", _write_queue.size());
591 _write_queue.sort (WriteQueueSorter ());
594 if (_finish_writer && _write_queue.empty() && _pending.empty()) {
598 /* Write any frames that we can write; i.e. those that are in sequence */
599 while (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1)) {
600 pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.front ();
601 _write_queue.pop_front ();
604 _film->log()->log (String::compose ("Writer writes %1 to MXF", encoded.second));
606 _picture_asset_writer->write (encoded.first->data(), encoded.first->size());
607 _last_written = encoded.first;
609 _picture_asset_writer->write (_last_written->data(), _last_written->size());
613 ++_last_written_frame;
616 while (_write_queue.size() > _maximum_frames_in_memory) {
617 /* Too many frames in memory which can't yet be written to the stream.
621 pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.back ();
622 _write_queue.pop_back ();
623 if (!encoded.first) {
624 /* This is a `repeat-last' frame, so no need to write it to disk */
629 _film->log()->log (String::compose ("Writer full (awaiting %1); pushes %2 to disk", _last_written_frame + 1, encoded.second));
630 encoded.first->write (_film, encoded.second);
633 _pending.push_back (encoded.second);
636 while (_write_queue.size() < _maximum_frames_in_memory && !_pending.empty()) {
637 /* We have some space in memory. Fetch some frames back off disk. */
640 int const fetch = _pending.front ();
643 _film->log()->log (String::compose ("Writer pulls %1 back from disk", fetch));
644 shared_ptr<EncodedData> encoded;
645 if (boost::filesystem::exists (_film->frame_out_path (fetch, false))) {
646 /* It's an actual frame (not a repeat-last); load it in */
647 encoded.reset (new EncodedData (_film->frame_out_path (fetch, false)));
651 _write_queue.push_back (make_pair (encoded, fetch));
652 _pending.remove (fetch);