2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
32 #include "exceptions.h"
35 #include "dcp_video_frame.h"
42 using std::stringstream;
47 using namespace boost;
49 int const Encoder::_history_size = 25;
51 /** @param f Film that we are encoding.
54 Encoder::Encoder (shared_ptr<const Film> f)
56 , _just_skipped (false)
59 #ifdef HAVE_SWRESAMPLE
62 , _audio_frames_written (0)
63 , _process_end (false)
65 if (_film->audio_stream()) {
66 /* Create sound output files with .tmp suffixes; we will rename
67 them if and when we complete.
69 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
71 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
72 /* We write mono files */
74 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
75 SNDFILE* f = sf_open (_film->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
77 throw CreateFileError (_film->multichannel_audio_out_path (i, true));
79 _sound_files.push_back (f);
87 terminate_worker_threads ();
91 Encoder::process_begin ()
93 if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
94 #ifdef HAVE_SWRESAMPLE
97 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
98 _film->log()->log (s.str ());
100 /* We will be using planar float data when we call the resampler */
101 _swr_context = swr_alloc_set_opts (
103 _film->audio_stream()->channel_layout(),
105 _film->target_audio_sample_rate(),
106 _film->audio_stream()->channel_layout(),
108 _film->audio_stream()->sample_rate(),
112 swr_init (_swr_context);
114 throw EncodeError ("Cannot resample audio as libswresample is not present");
117 #ifdef HAVE_SWRESAMPLE
122 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
123 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
126 vector<ServerDescription*> servers = Config::instance()->servers ();
128 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
129 for (int j = 0; j < (*i)->threads (); ++j) {
130 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
137 Encoder::process_end ()
140 if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
142 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
145 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
148 throw EncodeError ("could not run sample-rate converter");
155 out->set_frames (frames);
159 swr_free (&_swr_context);
163 if (_film->audio_stream()) {
164 close_sound_files ();
166 /* Rename .wav.tmp files to .wav */
167 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
168 if (boost::filesystem::exists (_film->multichannel_audio_out_path (i, false))) {
169 boost::filesystem::remove (_film->multichannel_audio_out_path (i, false));
171 boost::filesystem::rename (_film->multichannel_audio_out_path (i, true), _film->multichannel_audio_out_path (i, false));
175 boost::mutex::scoped_lock lock (_worker_mutex);
177 _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
179 /* Keep waking workers until the queue is empty */
180 while (!_queue.empty ()) {
181 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
182 _worker_condition.notify_all ();
183 _worker_condition.wait (lock);
188 terminate_worker_threads ();
190 _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
192 /* The following sequence of events can occur in the above code:
193 1. a remote worker takes the last image off the queue
194 2. the loop above terminates
195 3. the remote worker fails to encode the image and puts it back on the queue
196 4. the remote worker is then terminated by terminate_worker_threads
198 So just mop up anything left in the queue here.
201 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
202 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
204 shared_ptr<EncodedData> e = (*i)->encode_locally ();
205 e->write (_film, (*i)->frame ());
207 } catch (std::exception& e) {
208 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
212 /* Now do links (or copies on windows) to duplicate frames */
213 for (list<pair<int, int> >::iterator i = _links_required.begin(); i != _links_required.end(); ++i) {
214 link (_film->frame_out_path (i->first, false), _film->frame_out_path (i->second, false));
215 link (_film->hash_out_path (i->first, false), _film->hash_out_path (i->second, false));
219 /** @return an estimate of the current number of frames we are encoding per second,
223 Encoder::current_frames_per_second () const
225 boost::mutex::scoped_lock lock (_history_mutex);
226 if (int (_time_history.size()) < _history_size) {
231 gettimeofday (&now, 0);
233 return _history_size / (seconds (now) - seconds (_time_history.back ()));
236 /** @return true if the last frame to be processed was skipped as it already existed */
238 Encoder::skipping () const
240 boost::mutex::scoped_lock (_history_mutex);
241 return _just_skipped;
244 /** @return Number of video frames that have been received */
246 Encoder::video_frame () const
248 boost::mutex::scoped_lock (_history_mutex);
252 /** Should be called when a frame has been encoded successfully.
253 * @param n Source frame index.
256 Encoder::frame_done ()
258 boost::mutex::scoped_lock lock (_history_mutex);
259 _just_skipped = false;
262 gettimeofday (&tv, 0);
263 _time_history.push_front (tv);
264 if (int (_time_history.size()) > _history_size) {
265 _time_history.pop_back ();
269 /** Called by a subclass when it has just skipped the processing
270 of a frame because it has already been done.
273 Encoder::frame_skipped ()
275 boost::mutex::scoped_lock lock (_history_mutex);
276 _just_skipped = true;
280 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
282 DCPFrameRate dfr (_film->frames_per_second ());
284 if (dfr.skip && (_video_frame % 2)) {
289 boost::mutex::scoped_lock lock (_worker_mutex);
291 /* Wait until the queue has gone down a bit */
292 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
293 TIMING ("decoder sleeps with queue of %1", _queue.size());
294 _worker_condition.wait (lock);
295 TIMING ("decoder wakes with queue of %1", _queue.size());
302 /* Only do the processing if we don't already have a file for this frame */
303 if (boost::filesystem::exists (_film->frame_out_path (_video_frame, false))) {
308 if (same && _last_real_frame) {
309 /* Use the last frame that we encoded. We need to postpone doing the actual link,
310 as on windows the link is really a copy and the reference frame might not have
311 finished encoding yet.
313 _links_required.push_back (make_pair (_last_real_frame.get(), _video_frame));
315 /* Queue this new frame for encoding */
316 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
317 TIMING ("adding to queue of %1", _queue.size ());
318 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
320 image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
321 _film->subtitle_offset(), _film->subtitle_scale(),
322 _film->scaler(), _video_frame, _film->frames_per_second(), s.second,
323 _film->colour_lut(), _film->j2k_bandwidth(),
328 _worker_condition.notify_all ();
329 _last_real_frame = _video_frame;
336 Encoder::process_audio (shared_ptr<AudioBuffers> data)
339 /* Maybe sample-rate convert */
342 /* Compute the resampled frames count and add 32 for luck */
343 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
345 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
348 int const resampled_frames = swr_convert (
349 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
352 if (resampled_frames < 0) {
353 throw EncodeError ("could not run sample-rate converter");
356 resampled->set_frames (resampled_frames);
358 /* And point our variables at the resampled audio */
363 if (_film->audio_channels() == 1) {
364 /* We need to switch things around so that the mono channel is on
365 the centre channel of a 5.1 set (with other channels silent).
368 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
369 b->make_silent (libdcp::LEFT);
370 b->make_silent (libdcp::RIGHT);
371 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
372 b->make_silent (libdcp::LFE);
373 b->make_silent (libdcp::LS);
374 b->make_silent (libdcp::RS);
381 _audio_frame += data->frames ();
385 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
387 for (int i = 0; i < audio->channels(); ++i) {
388 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
391 _audio_frames_written += audio->frames ();
395 Encoder::close_sound_files ()
397 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
401 _sound_files.clear ();
405 Encoder::terminate_worker_threads ()
407 boost::mutex::scoped_lock lock (_worker_mutex);
409 _worker_condition.notify_all ();
412 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
419 Encoder::encoder_thread (ServerDescription* server)
421 /* Number of seconds that we currently wait between attempts
422 to connect to the server; not relevant for localhost
425 int remote_backoff = 0;
429 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
430 boost::mutex::scoped_lock lock (_worker_mutex);
431 while (_queue.empty () && !_process_end) {
432 _worker_condition.wait (lock);
439 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
440 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
441 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
446 shared_ptr<EncodedData> encoded;
450 encoded = vf->encode_remotely (server);
452 if (remote_backoff > 0) {
453 _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
456 /* This job succeeded, so remove any backoff */
459 } catch (std::exception& e) {
460 if (remote_backoff < 60) {
462 remote_backoff += 10;
466 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
467 vf->frame(), server->host_name(), e.what(), remote_backoff)
473 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
474 encoded = vf->encode_locally ();
475 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
476 } catch (std::exception& e) {
477 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
482 encoded->write (_film, vf->frame ());
487 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
489 _queue.push_front (vf);
493 if (remote_backoff > 0) {
494 dvdomatic_sleep (remote_backoff);
498 _worker_condition.notify_all ();
503 Encoder::link (string a, string b) const
505 #ifdef DVDOMATIC_POSIX
506 int const r = symlink (a.c_str(), b.c_str());
508 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
512 #ifdef DVDOMATIC_WINDOWS
513 boost::filesystem::copy_file (a, b);