2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
32 #include "exceptions.h"
35 #include "dcp_video_frame.h"
46 using std::stringstream;
51 using namespace boost;
53 int const Encoder::_history_size = 25;
55 /** @param f Film that we are encoding */
56 Encoder::Encoder (shared_ptr<Film> f, shared_ptr<Playlist> p)
59 , _video_frames_in (0)
60 , _video_frames_out (0)
61 #ifdef HAVE_SWRESAMPLE
64 , _have_a_real_frame (false)
79 Encoder::process_begin ()
81 if (_playlist->has_audio() && _playlist->audio_frame_rate() != _film->target_audio_sample_rate()) {
82 #ifdef HAVE_SWRESAMPLE
85 s << String::compose (N_("Will resample audio from %1 to %2"), _playlist->audio_frame_rate(), _film->target_audio_sample_rate());
86 _film->log()->log (s.str ());
88 /* We will be using planar float data when we call the resampler */
89 _swr_context = swr_alloc_set_opts (
91 _playlist->audio_channel_layout(),
93 _film->target_audio_sample_rate(),
94 _playlist->audio_channel_layout(),
96 _playlist->audio_frame_rate(),
100 swr_init (_swr_context);
102 throw EncodeError (_("Cannot resample audio as libswresample is not present"));
105 #ifdef HAVE_SWRESAMPLE
110 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
111 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
114 vector<ServerDescription*> servers = Config::instance()->servers ();
116 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
117 for (int j = 0; j < (*i)->threads (); ++j) {
118 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
122 _writer.reset (new Writer (_film, _playlist));
127 Encoder::process_end ()
130 if (_playlist->has_audio() && _playlist->audio_channels() && _swr_context) {
132 shared_ptr<AudioBuffers> out (new AudioBuffers (_playlist->audio_channels(), 256));
135 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
138 throw EncodeError (_("could not run sample-rate converter"));
145 out->set_frames (frames);
149 swr_free (&_swr_context);
153 boost::mutex::scoped_lock lock (_mutex);
155 _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
157 /* Keep waking workers until the queue is empty */
158 while (!_queue.empty ()) {
159 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
160 _condition.notify_all ();
161 _condition.wait (lock);
166 terminate_threads ();
168 _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
170 /* The following sequence of events can occur in the above code:
171 1. a remote worker takes the last image off the queue
172 2. the loop above terminates
173 3. the remote worker fails to encode the image and puts it back on the queue
174 4. the remote worker is then terminated by terminate_threads
176 So just mop up anything left in the queue here.
179 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
180 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
182 _writer->write ((*i)->encode_locally(), (*i)->frame ());
184 } catch (std::exception& e) {
185 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
193 /** @return an estimate of the current number of frames we are encoding per second,
197 Encoder::current_frames_per_second () const
199 boost::mutex::scoped_lock lock (_history_mutex);
200 if (int (_time_history.size()) < _history_size) {
205 gettimeofday (&now, 0);
207 return _history_size / (seconds (now) - seconds (_time_history.back ()));
210 /** @return Number of video frames that have been sent out */
212 Encoder::video_frames_out () const
214 boost::mutex::scoped_lock (_history_mutex);
215 return _video_frames_out;
218 /** Should be called when a frame has been encoded successfully.
219 * @param n Source frame index.
222 Encoder::frame_done ()
224 boost::mutex::scoped_lock lock (_history_mutex);
227 gettimeofday (&tv, 0);
228 _time_history.push_front (tv);
229 if (int (_time_history.size()) > _history_size) {
230 _time_history.pop_back ();
235 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
237 FrameRateConversion frc (_playlist->video_frame_rate(), _film->dcp_frame_rate());
239 if (frc.skip && (_video_frames_in % 2)) {
244 boost::mutex::scoped_lock lock (_mutex);
246 /* Wait until the queue has gone down a bit */
247 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
248 TIMING ("decoder sleeps with queue of %1", _queue.size());
249 _condition.wait (lock);
250 TIMING ("decoder wakes with queue of %1", _queue.size());
257 if (_writer->thrown ()) {
261 if (_writer->can_fake_write (_video_frames_out)) {
262 _writer->fake_write (_video_frames_out);
263 _have_a_real_frame = false;
265 } else if (same && _have_a_real_frame) {
266 /* Use the last frame that we encoded. */
267 _writer->repeat (_video_frames_out);
270 /* Queue this new frame for encoding */
271 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
272 TIMING ("adding to queue of %1", _queue.size ());
273 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
275 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_playlist),
276 _film->subtitle_offset(), _film->subtitle_scale(),
277 _film->scaler(), _video_frames_out, _film->dcp_frame_rate(), s.second,
278 _film->colour_lut(), _film->j2k_bandwidth(),
283 _condition.notify_all ();
284 _have_a_real_frame = true;
291 _writer->repeat (_video_frames_out);
298 Encoder::process_audio (shared_ptr<AudioBuffers> data)
301 /* Maybe sample-rate convert */
304 /* Compute the resampled frames count and add 32 for luck */
305 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _playlist->audio_frame_rate()) + 32;
307 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_playlist->audio_channels(), max_resampled_frames));
310 int const resampled_frames = swr_convert (
311 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
314 if (resampled_frames < 0) {
315 throw EncodeError (_("could not run sample-rate converter"));
318 resampled->set_frames (resampled_frames);
320 /* And point our variables at the resampled audio */
329 Encoder::terminate_threads ()
331 boost::mutex::scoped_lock lock (_mutex);
333 _condition.notify_all ();
336 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
343 Encoder::encoder_thread (ServerDescription* server)
345 /* Number of seconds that we currently wait between attempts
346 to connect to the server; not relevant for localhost
349 int remote_backoff = 0;
353 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
354 boost::mutex::scoped_lock lock (_mutex);
355 while (_queue.empty () && !_terminate) {
356 _condition.wait (lock);
363 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
364 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
365 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
370 shared_ptr<EncodedData> encoded;
374 encoded = vf->encode_remotely (server);
376 if (remote_backoff > 0) {
377 _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
380 /* This job succeeded, so remove any backoff */
383 } catch (std::exception& e) {
384 if (remote_backoff < 60) {
386 remote_backoff += 10;
390 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
391 vf->frame(), server->host_name(), e.what(), remote_backoff)
397 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
398 encoded = vf->encode_locally ();
399 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
400 } catch (std::exception& e) {
401 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
406 _writer->write (encoded, vf->frame ());
411 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
413 _queue.push_front (vf);
417 if (remote_backoff > 0) {
418 dvdomatic_sleep (remote_backoff);
422 _condition.notify_all ();
427 Encoder::write_audio (shared_ptr<const AudioBuffers> data)
429 AudioMapping m (_playlist->audio_channels ());
430 if (m.dcp_channels() != _playlist->audio_channels()) {
432 /* Remap (currently just for mono -> 5.1) */
434 shared_ptr<AudioBuffers> b (new AudioBuffers (m.dcp_channels(), data->frames ()));
435 for (int i = 0; i < m.dcp_channels(); ++i) {
436 optional<int> s = m.dcp_to_source (static_cast<libdcp::Channel> (i));
440 memcpy (b->data()[i], data->data()[s.get()], data->frames() * sizeof(float));
447 _writer->write (data);