2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
32 #include "exceptions.h"
35 #include "dcp_video_frame.h"
41 #include "audio_mapping.h"
47 using std::stringstream;
52 using boost::shared_ptr;
53 using boost::optional;
55 int const Encoder::_history_size = 25;
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<Film> f)
60 , _video_frames_in (0)
61 , _video_frames_out (0)
62 #ifdef HAVE_SWRESAMPLE
65 , _have_a_real_frame (false)
80 Encoder::process_begin ()
82 if (_film->has_audio() && _film->audio_frame_rate() != _film->target_audio_sample_rate()) {
83 #ifdef HAVE_SWRESAMPLE
86 s << String::compose (N_("Will resample audio from %1 to %2"), _film->audio_frame_rate(), _film->target_audio_sample_rate());
87 _film->log()->log (s.str ());
89 /* We will be using planar float data when we call the
90 resampler. As far as I can see, the audio channel
91 layout is not necessary for our purposes; it seems
92 only to be used get the number of channels and
93 decide if rematrixing is needed. It won't be, since
94 input and output layouts are the same.
97 _swr_context = swr_alloc_set_opts (
99 av_get_default_channel_layout (_film->audio_mapping().dcp_channels ()),
101 _film->target_audio_sample_rate(),
102 av_get_default_channel_layout (_film->audio_mapping().dcp_channels ()),
104 _film->audio_frame_rate(),
108 swr_init (_swr_context);
110 throw EncodeError (_("Cannot resample audio as libswresample is not present"));
113 #ifdef HAVE_SWRESAMPLE
118 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
119 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
122 vector<ServerDescription*> servers = Config::instance()->servers ();
124 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
125 for (int j = 0; j < (*i)->threads (); ++j) {
126 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
130 _writer.reset (new Writer (_film));
135 Encoder::process_end ()
138 if (_film->has_audio() && _swr_context) {
140 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_mapping().dcp_channels(), 256));
143 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
146 throw EncodeError (_("could not run sample-rate converter"));
153 out->set_frames (frames);
154 _writer->write (out);
157 swr_free (&_swr_context);
161 boost::mutex::scoped_lock lock (_mutex);
163 _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
165 /* Keep waking workers until the queue is empty */
166 while (!_queue.empty ()) {
167 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
168 _condition.notify_all ();
169 _condition.wait (lock);
174 terminate_threads ();
176 _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
178 /* The following sequence of events can occur in the above code:
179 1. a remote worker takes the last image off the queue
180 2. the loop above terminates
181 3. the remote worker fails to encode the image and puts it back on the queue
182 4. the remote worker is then terminated by terminate_threads
184 So just mop up anything left in the queue here.
187 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
188 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
190 _writer->write ((*i)->encode_locally(), (*i)->frame ());
192 } catch (std::exception& e) {
193 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
201 /** @return an estimate of the current number of frames we are encoding per second,
205 Encoder::current_encoding_rate () const
207 boost::mutex::scoped_lock lock (_history_mutex);
208 if (int (_time_history.size()) < _history_size) {
213 gettimeofday (&now, 0);
215 return _history_size / (seconds (now) - seconds (_time_history.back ()));
218 /** @return Number of video frames that have been sent out */
220 Encoder::video_frames_out () const
222 boost::mutex::scoped_lock (_history_mutex);
223 return _video_frames_out;
226 /** Should be called when a frame has been encoded successfully.
227 * @param n Source frame index.
230 Encoder::frame_done ()
232 boost::mutex::scoped_lock lock (_history_mutex);
235 gettimeofday (&tv, 0);
236 _time_history.push_front (tv);
237 if (int (_time_history.size()) > _history_size) {
238 _time_history.pop_back ();
243 Encoder::process_video (shared_ptr<const Image> image, bool same, shared_ptr<Subtitle> sub)
245 FrameRateConversion frc (_film->video_frame_rate(), _film->dcp_frame_rate());
247 if (frc.skip && (_video_frames_in % 2)) {
252 boost::mutex::scoped_lock lock (_mutex);
254 /* Wait until the queue has gone down a bit */
255 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
256 TIMING ("decoder sleeps with queue of %1", _queue.size());
257 _condition.wait (lock);
258 TIMING ("decoder wakes with queue of %1", _queue.size());
265 if (_writer->thrown ()) {
269 if (_writer->can_fake_write (_video_frames_out)) {
270 _writer->fake_write (_video_frames_out);
271 _have_a_real_frame = false;
273 } else if (same && _have_a_real_frame) {
274 /* Use the last frame that we encoded. */
275 _writer->repeat (_video_frames_out);
278 /* Queue this new frame for encoding */
279 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
280 TIMING ("adding to queue of %1", _queue.size ());
281 _queue.push_back (shared_ptr<DCPVideoFrame> (
283 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
284 _film->subtitle_offset(), _film->subtitle_scale(),
285 _film->scaler(), _video_frames_out, _film->dcp_frame_rate(), s.second,
286 _film->colour_lut(), _film->j2k_bandwidth(),
291 _condition.notify_all ();
292 _have_a_real_frame = true;
299 _writer->repeat (_video_frames_out);
306 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
309 /* Maybe sample-rate convert */
312 /* Compute the resampled frames count and add 32 for luck */
313 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_frame_rate()) + 32;
315 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_mapping().dcp_channels(), max_resampled_frames));
318 int const resampled_frames = swr_convert (
319 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
322 if (resampled_frames < 0) {
323 throw EncodeError (_("could not run sample-rate converter"));
326 resampled->set_frames (resampled_frames);
328 /* And point our variables at the resampled audio */
333 _writer->write (data);
337 Encoder::terminate_threads ()
339 boost::mutex::scoped_lock lock (_mutex);
341 _condition.notify_all ();
344 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
345 if ((*i)->joinable ()) {
353 Encoder::encoder_thread (ServerDescription* server)
355 /* Number of seconds that we currently wait between attempts
356 to connect to the server; not relevant for localhost
359 int remote_backoff = 0;
363 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
364 boost::mutex::scoped_lock lock (_mutex);
365 while (_queue.empty () && !_terminate) {
366 _condition.wait (lock);
373 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
374 shared_ptr<DCPVideoFrame> vf = _queue.front ();
375 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
380 shared_ptr<EncodedData> encoded;
384 encoded = vf->encode_remotely (server);
386 if (remote_backoff > 0) {
387 _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
390 /* This job succeeded, so remove any backoff */
393 } catch (std::exception& e) {
394 if (remote_backoff < 60) {
396 remote_backoff += 10;
400 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
401 vf->frame(), server->host_name(), e.what(), remote_backoff)
407 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
408 encoded = vf->encode_locally ();
409 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
410 } catch (std::exception& e) {
411 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
416 _writer->write (encoded, vf->frame ());
421 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
423 _queue.push_front (vf);
427 if (remote_backoff > 0) {
428 dcpomatic_sleep (remote_backoff);
432 _condition.notify_all ();