2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
32 #include "exceptions.h"
35 #include "dcp_video_frame.h"
41 #include "audio_mapping.h"
47 using std::stringstream;
52 using boost::shared_ptr;
53 using boost::optional;
55 int const Encoder::_history_size = 25;
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<Film> f, shared_ptr<Job> j)
61 , _video_frames_in (0)
62 , _video_frames_out (0)
63 #ifdef HAVE_SWRESAMPLE
66 , _have_a_real_frame (false)
81 Encoder::process_begin ()
83 if (_film->has_audio() && _film->audio_frame_rate() != _film->target_audio_sample_rate()) {
84 #ifdef HAVE_SWRESAMPLE
87 s << String::compose (N_("Will resample audio from %1 to %2"), _film->audio_frame_rate(), _film->target_audio_sample_rate());
88 _film->log()->log (s.str ());
90 /* We will be using planar float data when we call the
91 resampler. As far as I can see, the audio channel
92 layout is not necessary for our purposes; it seems
93 only to be used get the number of channels and
94 decide if rematrixing is needed. It won't be, since
95 input and output layouts are the same.
98 _swr_context = swr_alloc_set_opts (
100 av_get_default_channel_layout (_film->audio_mapping().dcp_channels ()),
102 _film->target_audio_sample_rate(),
103 av_get_default_channel_layout (_film->audio_mapping().dcp_channels ()),
105 _film->audio_frame_rate(),
109 swr_init (_swr_context);
111 throw EncodeError (_("Cannot resample audio as libswresample is not present"));
114 #ifdef HAVE_SWRESAMPLE
119 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
120 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
123 vector<ServerDescription*> servers = Config::instance()->servers ();
125 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
126 for (int j = 0; j < (*i)->threads (); ++j) {
127 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
131 _writer.reset (new Writer (_film, _job));
136 Encoder::process_end ()
139 if (_film->has_audio() && _swr_context) {
141 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_mapping().dcp_channels(), 256));
144 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
147 throw EncodeError (_("could not run sample-rate converter"));
154 out->set_frames (frames);
155 _writer->write (out);
158 swr_free (&_swr_context);
162 boost::mutex::scoped_lock lock (_mutex);
164 _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
166 /* Keep waking workers until the queue is empty */
167 while (!_queue.empty ()) {
168 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
169 _condition.notify_all ();
170 _condition.wait (lock);
175 terminate_threads ();
177 _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
179 /* The following sequence of events can occur in the above code:
180 1. a remote worker takes the last image off the queue
181 2. the loop above terminates
182 3. the remote worker fails to encode the image and puts it back on the queue
183 4. the remote worker is then terminated by terminate_threads
185 So just mop up anything left in the queue here.
188 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
189 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
191 _writer->write ((*i)->encode_locally(), (*i)->frame ());
193 } catch (std::exception& e) {
194 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
202 /** @return an estimate of the current number of frames we are encoding per second,
206 Encoder::current_encoding_rate () const
208 boost::mutex::scoped_lock lock (_history_mutex);
209 if (int (_time_history.size()) < _history_size) {
214 gettimeofday (&now, 0);
216 return _history_size / (seconds (now) - seconds (_time_history.back ()));
219 /** @return Number of video frames that have been sent out */
221 Encoder::video_frames_out () const
223 boost::mutex::scoped_lock (_history_mutex);
224 return _video_frames_out;
227 /** Should be called when a frame has been encoded successfully.
228 * @param n Source frame index.
231 Encoder::frame_done ()
233 boost::mutex::scoped_lock lock (_history_mutex);
236 gettimeofday (&tv, 0);
237 _time_history.push_front (tv);
238 if (int (_time_history.size()) > _history_size) {
239 _time_history.pop_back ();
244 Encoder::process_video (shared_ptr<const Image> image, bool same, shared_ptr<Subtitle> sub)
246 FrameRateConversion frc (_film->video_frame_rate(), _film->dcp_frame_rate());
248 if (frc.skip && (_video_frames_in % 2)) {
253 boost::mutex::scoped_lock lock (_mutex);
255 /* Wait until the queue has gone down a bit */
256 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
257 TIMING ("decoder sleeps with queue of %1", _queue.size());
258 _condition.wait (lock);
259 TIMING ("decoder wakes with queue of %1", _queue.size());
266 if (_writer->thrown ()) {
270 if (_writer->can_fake_write (_video_frames_out)) {
271 _writer->fake_write (_video_frames_out);
272 _have_a_real_frame = false;
274 } else if (same && _have_a_real_frame) {
275 /* Use the last frame that we encoded. */
276 _writer->repeat (_video_frames_out);
279 /* Queue this new frame for encoding */
280 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
281 TIMING ("adding to queue of %1", _queue.size ());
282 _queue.push_back (shared_ptr<DCPVideoFrame> (
284 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
285 _film->subtitle_offset(), _film->subtitle_scale(),
286 _film->scaler(), _video_frames_out, _film->dcp_frame_rate(), s.second,
287 _film->colour_lut(), _film->j2k_bandwidth(),
292 _condition.notify_all ();
293 _have_a_real_frame = true;
300 _writer->repeat (_video_frames_out);
307 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
310 /* Maybe sample-rate convert */
313 /* Compute the resampled frames count and add 32 for luck */
314 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_frame_rate()) + 32;
316 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_mapping().dcp_channels(), max_resampled_frames));
319 int const resampled_frames = swr_convert (
320 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
323 if (resampled_frames < 0) {
324 throw EncodeError (_("could not run sample-rate converter"));
327 resampled->set_frames (resampled_frames);
329 /* And point our variables at the resampled audio */
334 _writer->write (data);
338 Encoder::terminate_threads ()
340 boost::mutex::scoped_lock lock (_mutex);
342 _condition.notify_all ();
345 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
346 if ((*i)->joinable ()) {
354 Encoder::encoder_thread (ServerDescription* server)
356 /* Number of seconds that we currently wait between attempts
357 to connect to the server; not relevant for localhost
360 int remote_backoff = 0;
364 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
365 boost::mutex::scoped_lock lock (_mutex);
366 while (_queue.empty () && !_terminate) {
367 _condition.wait (lock);
374 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
375 shared_ptr<DCPVideoFrame> vf = _queue.front ();
376 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
381 shared_ptr<EncodedData> encoded;
385 encoded = vf->encode_remotely (server);
387 if (remote_backoff > 0) {
388 _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
391 /* This job succeeded, so remove any backoff */
394 } catch (std::exception& e) {
395 if (remote_backoff < 60) {
397 remote_backoff += 10;
401 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
402 vf->frame(), server->host_name(), e.what(), remote_backoff)
408 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
409 encoded = vf->encode_locally ();
410 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
411 } catch (std::exception& e) {
412 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
417 _writer->write (encoded, vf->frame ());
422 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
424 _queue.push_front (vf);
428 if (remote_backoff > 0) {
429 dcpomatic_sleep (remote_backoff);
433 _condition.notify_all ();