2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
32 #include "exceptions.h"
35 #include "dcp_video_frame.h"
41 #include "audio_mapping.h"
47 using std::stringstream;
52 using boost::shared_ptr;
53 using boost::optional;
55 int const Encoder::_history_size = 25;
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<Film> f)
60 , _video_frames_in (0)
61 , _video_frames_out (0)
62 #ifdef HAVE_SWRESAMPLE
65 , _have_a_real_frame (false)
80 Encoder::process_begin ()
82 if (_film->has_audio() && _film->audio_frame_rate() != _film->target_audio_sample_rate()) {
83 #ifdef HAVE_SWRESAMPLE
86 s << String::compose (N_("Will resample audio from %1 to %2"), _film->audio_frame_rate(), _film->target_audio_sample_rate());
87 _film->log()->log (s.str ());
89 /* We will be using planar float data when we call the resampler */
90 _swr_context = swr_alloc_set_opts (
92 _film->audio_channel_layout(),
94 _film->target_audio_sample_rate(),
95 _film->audio_channel_layout(),
97 _film->audio_frame_rate(),
101 swr_init (_swr_context);
103 throw EncodeError (_("Cannot resample audio as libswresample is not present"));
106 #ifdef HAVE_SWRESAMPLE
111 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
112 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
115 vector<ServerDescription*> servers = Config::instance()->servers ();
117 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
118 for (int j = 0; j < (*i)->threads (); ++j) {
119 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
123 _writer.reset (new Writer (_film));
128 Encoder::process_end ()
131 if (_film->has_audio() && _film->audio_channels() && _swr_context) {
133 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_channels(), 256));
136 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
139 throw EncodeError (_("could not run sample-rate converter"));
146 out->set_frames (frames);
147 _writer->write (out);
150 swr_free (&_swr_context);
154 boost::mutex::scoped_lock lock (_mutex);
156 _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
158 /* Keep waking workers until the queue is empty */
159 while (!_queue.empty ()) {
160 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
161 _condition.notify_all ();
162 _condition.wait (lock);
167 terminate_threads ();
169 _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
171 /* The following sequence of events can occur in the above code:
172 1. a remote worker takes the last image off the queue
173 2. the loop above terminates
174 3. the remote worker fails to encode the image and puts it back on the queue
175 4. the remote worker is then terminated by terminate_threads
177 So just mop up anything left in the queue here.
180 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
181 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
183 _writer->write ((*i)->encode_locally(), (*i)->frame ());
185 } catch (std::exception& e) {
186 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
194 /** @return an estimate of the current number of frames we are encoding per second,
198 Encoder::current_encoding_rate () const
200 boost::mutex::scoped_lock lock (_history_mutex);
201 if (int (_time_history.size()) < _history_size) {
206 gettimeofday (&now, 0);
208 return _history_size / (seconds (now) - seconds (_time_history.back ()));
211 /** @return Number of video frames that have been sent out */
213 Encoder::video_frames_out () const
215 boost::mutex::scoped_lock (_history_mutex);
216 return _video_frames_out;
219 /** Should be called when a frame has been encoded successfully.
220 * @param n Source frame index.
223 Encoder::frame_done ()
225 boost::mutex::scoped_lock lock (_history_mutex);
228 gettimeofday (&tv, 0);
229 _time_history.push_front (tv);
230 if (int (_time_history.size()) > _history_size) {
231 _time_history.pop_back ();
236 Encoder::process_video (shared_ptr<Image> image, bool same, shared_ptr<Subtitle> sub)
238 FrameRateConversion frc (_film->video_frame_rate(), _film->dcp_frame_rate());
240 if (frc.skip && (_video_frames_in % 2)) {
245 boost::mutex::scoped_lock lock (_mutex);
247 /* Wait until the queue has gone down a bit */
248 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
249 TIMING ("decoder sleeps with queue of %1", _queue.size());
250 _condition.wait (lock);
251 TIMING ("decoder wakes with queue of %1", _queue.size());
258 if (_writer->thrown ()) {
262 if (_writer->can_fake_write (_video_frames_out)) {
263 _writer->fake_write (_video_frames_out);
264 _have_a_real_frame = false;
266 } else if (same && _have_a_real_frame) {
267 /* Use the last frame that we encoded. */
268 _writer->repeat (_video_frames_out);
271 /* Queue this new frame for encoding */
272 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
273 TIMING ("adding to queue of %1", _queue.size ());
274 _queue.push_back (shared_ptr<DCPVideoFrame> (
276 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
277 _film->subtitle_offset(), _film->subtitle_scale(),
278 _film->scaler(), _video_frames_out, _film->dcp_frame_rate(), s.second,
279 _film->colour_lut(), _film->j2k_bandwidth(),
284 _condition.notify_all ();
285 _have_a_real_frame = true;
292 _writer->repeat (_video_frames_out);
299 Encoder::process_audio (shared_ptr<AudioBuffers> data)
302 /* Maybe sample-rate convert */
305 /* Compute the resampled frames count and add 32 for luck */
306 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_frame_rate()) + 32;
308 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_channels(), max_resampled_frames));
311 int const resampled_frames = swr_convert (
312 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
315 if (resampled_frames < 0) {
316 throw EncodeError (_("could not run sample-rate converter"));
319 resampled->set_frames (resampled_frames);
321 /* And point our variables at the resampled audio */
326 _writer->write (data);
330 Encoder::terminate_threads ()
332 boost::mutex::scoped_lock lock (_mutex);
334 _condition.notify_all ();
337 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
344 Encoder::encoder_thread (ServerDescription* server)
346 /* Number of seconds that we currently wait between attempts
347 to connect to the server; not relevant for localhost
350 int remote_backoff = 0;
354 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
355 boost::mutex::scoped_lock lock (_mutex);
356 while (_queue.empty () && !_terminate) {
357 _condition.wait (lock);
364 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
365 shared_ptr<DCPVideoFrame> vf = _queue.front ();
366 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
371 shared_ptr<EncodedData> encoded;
375 encoded = vf->encode_remotely (server);
377 if (remote_backoff > 0) {
378 _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
381 /* This job succeeded, so remove any backoff */
384 } catch (std::exception& e) {
385 if (remote_backoff < 60) {
387 remote_backoff += 10;
391 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
392 vf->frame(), server->host_name(), e.what(), remote_backoff)
398 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
399 encoded = vf->encode_locally ();
400 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
401 } catch (std::exception& e) {
402 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
407 _writer->write (encoded, vf->frame ());
412 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
414 _queue.push_front (vf);
418 if (remote_backoff > 0) {
419 dvdomatic_sleep (remote_backoff);
423 _condition.notify_all ();