2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
32 #include "exceptions.h"
35 #include "dcp_video_frame.h"
42 using std::stringstream;
47 using namespace boost;
49 int const Encoder::_history_size = 25;
51 /** @param f Film that we are encoding.
54 Encoder::Encoder (shared_ptr<const Film> f)
56 , _just_skipped (false)
57 , _video_frames_in (0)
58 , _audio_frames_in (0)
59 , _video_frames_out (0)
60 , _audio_frames_out (0)
61 #ifdef HAVE_SWRESAMPLE
64 , _terminate_encoder (false)
66 , _terminate_writer (false)
68 if (_film->audio_stream()) {
69 /* Create sound output files with .tmp suffixes; we will rename
70 them if and when we complete.
72 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
74 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
75 /* We write mono files */
77 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
78 SNDFILE* f = sf_open (_film->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
80 throw CreateFileError (_film->multichannel_audio_out_path (i, true));
82 _sound_files.push_back (f);
90 terminate_worker_threads ();
91 terminate_writer_thread ();
95 Encoder::process_begin ()
97 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
98 #ifdef HAVE_SWRESAMPLE
101 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
102 _film->log()->log (s.str ());
104 /* We will be using planar float data when we call the resampler */
105 _swr_context = swr_alloc_set_opts (
107 _film->audio_stream()->channel_layout(),
109 _film->target_audio_sample_rate(),
110 _film->audio_stream()->channel_layout(),
112 _film->audio_stream()->sample_rate(),
116 swr_init (_swr_context);
118 throw EncodeError ("Cannot resample audio as libswresample is not present");
121 #ifdef HAVE_SWRESAMPLE
126 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
127 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
130 vector<ServerDescription*> servers = Config::instance()->servers ();
132 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
133 for (int j = 0; j < (*i)->threads (); ++j) {
134 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
138 _writer_thread = new boost::thread (boost::bind (&Encoder::writer_thread, this));
143 Encoder::process_end ()
146 if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
148 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
151 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
154 throw EncodeError ("could not run sample-rate converter");
161 out->set_frames (frames);
165 swr_free (&_swr_context);
169 if (_film->audio_stream()) {
170 close_sound_files ();
172 /* Rename .wav.tmp files to .wav */
173 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
174 if (boost::filesystem::exists (_film->multichannel_audio_out_path (i, false))) {
175 boost::filesystem::remove (_film->multichannel_audio_out_path (i, false));
177 boost::filesystem::rename (_film->multichannel_audio_out_path (i, true), _film->multichannel_audio_out_path (i, false));
181 boost::mutex::scoped_lock lock (_worker_mutex);
183 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_encode_queue.size ()));
185 /* Keep waking workers until the queue is empty */
186 while (!_encode_queue.empty ()) {
187 _film->log()->log ("Waking with " + lexical_cast<string> (_encode_queue.size ()), Log::VERBOSE);
188 _worker_condition.notify_all ();
189 _worker_condition.wait (lock);
194 terminate_worker_threads ();
195 terminate_writer_thread ();
197 _film->log()->log ("Mopping up " + lexical_cast<string> (_encode_queue.size()));
199 /* The following sequence of events can occur in the above code:
200 1. a remote worker takes the last image off the queue
201 2. the loop above terminates
202 3. the remote worker fails to encode the image and puts it back on the queue
203 4. the remote worker is then terminated by terminate_worker_threads
205 So just mop up anything left in the queue here.
208 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
209 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
211 shared_ptr<EncodedData> e = (*i)->encode_locally ();
212 e->write (_film, (*i)->frame ());
214 } catch (std::exception& e) {
215 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
219 /* Mop up any unwritten things in the writer's queue */
220 for (list<pair<shared_ptr<EncodedData>, int> >::iterator i = _write_queue.begin(); i != _write_queue.end(); ++i) {
221 i->first->write (_opt, i->second);
224 /* Now do links (or copies on windows) to duplicate frames */
225 for (list<pair<int, int> >::iterator i = _links_required.begin(); i != _links_required.end(); ++i) {
226 link (_film->frame_out_path (i->first, false), _film->frame_out_path (i->second, false));
227 link (_film->hash_out_path (i->first, false), _film->hash_out_path (i->second, false));
231 /** @return an estimate of the current number of frames we are encoding per second,
235 Encoder::current_frames_per_second () const
237 boost::mutex::scoped_lock lock (_history_mutex);
238 if (int (_time_history.size()) < _history_size) {
243 gettimeofday (&now, 0);
245 return _history_size / (seconds (now) - seconds (_time_history.back ()));
248 /** @return true if the last frame to be processed was skipped as it already existed */
250 Encoder::skipping () const
252 boost::mutex::scoped_lock (_history_mutex);
253 return _just_skipped;
256 /** @return Number of video frames that have been sent out */
258 Encoder::video_frames_out () const
260 boost::mutex::scoped_lock (_history_mutex);
261 return _video_frames_out;
264 /** Should be called when a frame has been encoded successfully.
265 * @param n Source frame index.
268 Encoder::frame_done ()
270 boost::mutex::scoped_lock lock (_history_mutex);
271 _just_skipped = false;
274 gettimeofday (&tv, 0);
275 _time_history.push_front (tv);
276 if (int (_time_history.size()) > _history_size) {
277 _time_history.pop_back ();
281 /** Called by a subclass when it has just skipped the processing
282 of a frame because it has already been done.
285 Encoder::frame_skipped ()
287 boost::mutex::scoped_lock lock (_history_mutex);
288 _just_skipped = true;
292 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
294 DCPFrameRate dfr (_film->frames_per_second ());
296 if (dfr.skip && (_video_frames_in % 2)) {
301 boost::mutex::scoped_lock lock (_worker_mutex);
303 /* Wait until the queue has gone down a bit */
304 while (_encode_queue.size() >= _worker_threads.size() * 2 && !_terminate_encoder) {
305 TIMING ("decoder sleeps with queue of %1", _encode_queue.size());
306 _worker_condition.wait (lock);
307 TIMING ("decoder wakes with queue of %1", _encode_queue.size());
310 if (_terminate_encoder) {
314 /* Only do the processing if we don't already have a file for this frame */
315 if (boost::filesystem::exists (_film->frame_out_path (_video_frames_out, false))) {
320 if (same && _last_real_frame) {
321 /* Use the last frame that we encoded. We need to postpone doing the actual link,
322 as on windows the link is really a copy and the reference frame might not have
323 finished encoding yet.
325 _links_required.push_back (make_pair (_last_real_frame.get(), _video_frames_out));
327 /* Queue this new frame for encoding */
328 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
329 TIMING ("adding to queue of %1", _encode_queue.size ());
330 _encode_queue.push_back (boost::shared_ptr<DCPVideoFrame> (
332 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
333 _film->subtitle_offset(), _film->subtitle_scale(),
334 _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
335 _film->colour_lut(), _film->j2k_bandwidth(),
340 _worker_condition.notify_all ();
341 _last_real_frame = _video_frames_out;
348 _links_required.push_back (make_pair (_video_frames_out, _video_frames_out - 1));
354 Encoder::process_audio (shared_ptr<AudioBuffers> data)
357 /* Maybe sample-rate convert */
360 /* Compute the resampled frames count and add 32 for luck */
361 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
363 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
366 int const resampled_frames = swr_convert (
367 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
370 if (resampled_frames < 0) {
371 throw EncodeError ("could not run sample-rate converter");
374 resampled->set_frames (resampled_frames);
376 /* And point our variables at the resampled audio */
381 if (_film->audio_channels() == 1) {
382 /* We need to switch things around so that the mono channel is on
383 the centre channel of a 5.1 set (with other channels silent).
386 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
387 b->make_silent (libdcp::LEFT);
388 b->make_silent (libdcp::RIGHT);
389 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
390 b->make_silent (libdcp::LFE);
391 b->make_silent (libdcp::LS);
392 b->make_silent (libdcp::RS);
399 _audio_frames_in += data->frames ();
403 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
405 for (int i = 0; i < audio->channels(); ++i) {
406 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
409 _audio_frames_out += audio->frames ();
413 Encoder::close_sound_files ()
415 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
419 _sound_files.clear ();
423 Encoder::terminate_worker_threads ()
425 boost::mutex::scoped_lock lock (_worker_mutex);
426 _terminate_encoder = true;
427 _worker_condition.notify_all ();
430 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
437 Encoder::terminate_writer_thread ()
439 if (!_writer_thread) {
443 boost::mutex::scoped_lock lock (_writer_mutex);
444 _terminate_writer = true;
445 _writer_condition.notify_all ();
448 _writer_thread->join ();
449 delete _writer_thread;
454 Encoder::encoder_thread (ServerDescription* server)
456 /* Number of seconds that we currently wait between attempts
457 to connect to the server; not relevant for localhost
460 int remote_backoff = 0;
464 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
465 boost::mutex::scoped_lock lock (_worker_mutex);
466 while (_encode_queue.empty () && !_terminate_encoder) {
467 _worker_condition.wait (lock);
470 if (_terminate_encoder) {
474 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _encode_queue.size());
475 boost::shared_ptr<DCPVideoFrame> vf = _encode_queue.front ();
476 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
477 _encode_queue.pop_front ();
481 shared_ptr<EncodedData> encoded;
485 encoded = vf->encode_remotely (server);
487 if (remote_backoff > 0) {
488 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
491 /* This job succeeded, so remove any backoff */
494 } catch (std::exception& e) {
495 if (remote_backoff < 60) {
497 remote_backoff += 10;
501 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
502 vf->frame(), server->host_name(), e.what(), remote_backoff)
508 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
509 encoded = vf->encode_locally ();
510 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
511 } catch (std::exception& e) {
512 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
517 boost::mutex::scoped_lock lock2 (_writer_mutex);
518 _write_queue.push_back (make_pair (encoded, vf->frame ()));
519 _writer_condition.notify_all ();
523 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
525 _encode_queue.push_front (vf);
529 if (remote_backoff > 0) {
530 dvdomatic_sleep (remote_backoff);
534 _worker_condition.notify_all ();
539 Encoder::link (string a, string b) const
541 #ifdef DVDOMATIC_POSIX
542 int const r = symlink (a.c_str(), b.c_str());
544 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
548 #ifdef DVDOMATIC_WINDOWS
549 boost::filesystem::copy_file (a, b);
554 Encoder::writer_thread ()
558 boost::mutex::scoped_lock lock (_writer_mutex);
559 TIMING ("writer sleeps with a queue of %1", _write_queue.size());
560 while (_write_queue.empty() && !_terminate_writer) {
561 _writer_condition.wait (lock);
563 TIMING ("writer wakes with a queue of %1", _write_queue.size());
565 if (_terminate_writer) {
569 pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.front ();
570 _write_queue.pop_front ();
573 encoded.first->write (_opt, encoded.second);