2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
33 #include "exceptions.h"
36 #include "dcp_video_frame.h"
44 using std::stringstream;
49 using namespace boost;
51 int const Encoder::_history_size = 25;
53 /** @param f Film that we are encoding */
54 Encoder::Encoder (shared_ptr<Film> f)
56 , _video_frames_in (0)
57 , _video_frames_out (0)
58 #ifdef HAVE_SWRESAMPLE
61 , _have_a_real_frame (false)
76 Encoder::process_begin ()
78 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
79 #ifdef HAVE_SWRESAMPLE
82 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
83 _film->log()->log (s.str ());
85 /* We will be using planar float data when we call the resampler */
86 _swr_context = swr_alloc_set_opts (
88 _film->audio_stream()->channel_layout(),
90 _film->target_audio_sample_rate(),
91 _film->audio_stream()->channel_layout(),
93 _film->audio_stream()->sample_rate(),
97 swr_init (_swr_context);
99 throw EncodeError ("Cannot resample audio as libswresample is not present");
102 #ifdef HAVE_SWRESAMPLE
107 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
108 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
111 vector<ServerDescription*> servers = Config::instance()->servers ();
113 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
114 for (int j = 0; j < (*i)->threads (); ++j) {
115 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
119 _writer.reset (new Writer (_film));
124 Encoder::process_end ()
127 if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
129 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
132 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
135 throw EncodeError ("could not run sample-rate converter");
142 out->set_frames (frames);
143 _writer->write (out);
146 swr_free (&_swr_context);
150 boost::mutex::scoped_lock lock (_mutex);
152 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
154 /* Keep waking workers until the queue is empty */
155 while (!_queue.empty ()) {
156 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
157 _condition.notify_all ();
158 _condition.wait (lock);
163 terminate_threads ();
165 _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
167 /* The following sequence of events can occur in the above code:
168 1. a remote worker takes the last image off the queue
169 2. the loop above terminates
170 3. the remote worker fails to encode the image and puts it back on the queue
171 4. the remote worker is then terminated by terminate_threads
173 So just mop up anything left in the queue here.
176 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
177 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
179 _writer->write ((*i)->encode_locally(), (*i)->frame ());
181 } catch (std::exception& e) {
182 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
190 /** @return an estimate of the current number of frames we are encoding per second,
194 Encoder::current_frames_per_second () const
196 boost::mutex::scoped_lock lock (_history_mutex);
197 if (int (_time_history.size()) < _history_size) {
202 gettimeofday (&now, 0);
204 return _history_size / (seconds (now) - seconds (_time_history.back ()));
207 /** @return Number of video frames that have been sent out */
209 Encoder::video_frames_out () const
211 boost::mutex::scoped_lock (_history_mutex);
212 return _video_frames_out;
215 /** Should be called when a frame has been encoded successfully.
216 * @param n Source frame index.
219 Encoder::frame_done ()
221 boost::mutex::scoped_lock lock (_history_mutex);
224 gettimeofday (&tv, 0);
225 _time_history.push_front (tv);
226 if (int (_time_history.size()) > _history_size) {
227 _time_history.pop_back ();
232 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
234 DCPFrameRate dfr (_film->frames_per_second ());
236 if (dfr.skip && (_video_frames_in % 2)) {
241 boost::mutex::scoped_lock lock (_mutex);
243 /* Wait until the queue has gone down a bit */
244 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
245 TIMING ("decoder sleeps with queue of %1", _queue.size());
246 _condition.wait (lock);
247 TIMING ("decoder wakes with queue of %1", _queue.size());
254 if (_writer->can_fake_write (_video_frames_out)) {
255 _writer->fake_write (_video_frames_out);
256 _have_a_real_frame = false;
258 } else if (same && _have_a_real_frame) {
259 /* Use the last frame that we encoded. */
260 _writer->repeat (_video_frames_out);
263 /* Queue this new frame for encoding */
264 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
265 TIMING ("adding to queue of %1", _queue.size ());
266 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
268 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
269 _film->subtitle_offset(), _film->subtitle_scale(),
270 _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
271 _film->colour_lut(), _film->j2k_bandwidth(),
276 _condition.notify_all ();
277 _have_a_real_frame = true;
284 _writer->repeat (_video_frames_out);
291 Encoder::process_audio (shared_ptr<AudioBuffers> data)
294 /* Maybe sample-rate convert */
297 /* Compute the resampled frames count and add 32 for luck */
298 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
300 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
303 int const resampled_frames = swr_convert (
304 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
307 if (resampled_frames < 0) {
308 throw EncodeError ("could not run sample-rate converter");
311 resampled->set_frames (resampled_frames);
313 /* And point our variables at the resampled audio */
318 if (_film->audio_channels() == 1) {
319 /* We need to switch things around so that the mono channel is on
320 the centre channel of a 5.1 set (with other channels silent).
323 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
324 b->make_silent (libdcp::LEFT);
325 b->make_silent (libdcp::RIGHT);
326 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
327 b->make_silent (libdcp::LFE);
328 b->make_silent (libdcp::LS);
329 b->make_silent (libdcp::RS);
334 _writer->write (data);
338 Encoder::terminate_threads ()
340 boost::mutex::scoped_lock lock (_mutex);
342 _condition.notify_all ();
345 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
352 Encoder::encoder_thread (ServerDescription* server)
354 /* Number of seconds that we currently wait between attempts
355 to connect to the server; not relevant for localhost
358 int remote_backoff = 0;
362 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
363 boost::mutex::scoped_lock lock (_mutex);
364 while (_queue.empty () && !_terminate) {
365 _condition.wait (lock);
372 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
373 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
374 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
379 shared_ptr<EncodedData> encoded;
383 encoded = vf->encode_remotely (server);
385 if (remote_backoff > 0) {
386 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
389 /* This job succeeded, so remove any backoff */
392 } catch (std::exception& e) {
393 if (remote_backoff < 60) {
395 remote_backoff += 10;
399 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
400 vf->frame(), server->host_name(), e.what(), remote_backoff)
406 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
407 encoded = vf->encode_locally ();
408 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
409 } catch (std::exception& e) {
410 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
415 _writer->write (encoded, vf->frame ());
420 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
422 _queue.push_front (vf);
426 if (remote_backoff > 0) {
427 dvdomatic_sleep (remote_backoff);
431 _condition.notify_all ();