2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/j2k_wav_encoder.cc
21 * @brief An encoder which writes JPEG2000 and WAV files.
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
33 #include "j2k_wav_encoder.h"
35 #include "film_state.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
45 using namespace boost;
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
49 #ifdef HAVE_SWRESAMPLE
52 , _audio_frames_written (0)
53 , _process_end (false)
55 /* Create sound output files with .tmp suffixes; we will rename
56 them if and when we complete.
58 for (int i = 0; i < _fs->audio_channels(); ++i) {
60 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate());
61 /* We write mono files */
63 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
64 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
66 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
68 _sound_files.push_back (f);
72 J2KWAVEncoder::~J2KWAVEncoder ()
74 terminate_worker_threads ();
79 J2KWAVEncoder::terminate_worker_threads ()
81 boost::mutex::scoped_lock lock (_worker_mutex);
83 _worker_condition.notify_all ();
86 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
93 J2KWAVEncoder::close_sound_files ()
95 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
99 _sound_files.clear ();
103 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame, shared_ptr<Subtitle> sub)
105 boost::mutex::scoped_lock lock (_worker_mutex);
107 /* Wait until the queue has gone down a bit */
108 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
109 TIMING ("decoder sleeps with queue of %1", _queue.size());
110 _worker_condition.wait (lock);
111 TIMING ("decoder wakes with queue of %1", _queue.size());
118 /* Only do the processing if we don't already have a file for this frame */
119 if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
120 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters());
121 TIMING ("adding to queue of %1", _queue.size ());
122 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
124 yuv, sub, _opt->out_size, _opt->padding, _fs->subtitle_offset(), _fs->subtitle_scale(),
125 _fs->scaler(), frame, _fs->frames_per_second(), s.second,
126 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
131 _worker_condition.notify_all ();
138 J2KWAVEncoder::encoder_thread (ServerDescription* server)
140 /* Number of seconds that we currently wait between attempts
141 to connect to the server; not relevant for localhost
144 int remote_backoff = 0;
148 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
149 boost::mutex::scoped_lock lock (_worker_mutex);
150 while (_queue.empty () && !_process_end) {
151 _worker_condition.wait (lock);
158 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
159 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
160 _log->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()));
165 shared_ptr<EncodedData> encoded;
169 encoded = vf->encode_remotely (server);
171 if (remote_backoff > 0) {
172 _log->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
175 /* This job succeeded, so remove any backoff */
178 } catch (std::exception& e) {
179 if (remote_backoff < 60) {
181 remote_backoff += 10;
185 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
186 vf->frame(), server->host_name(), e.what(), remote_backoff)
192 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
193 encoded = vf->encode_locally ();
194 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
195 } catch (std::exception& e) {
196 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
201 encoded->write (_opt, vf->frame ());
202 frame_done (vf->frame ());
205 _log->log (String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame()));
206 _queue.push_front (vf);
210 if (remote_backoff > 0) {
211 dvdomatic_sleep (remote_backoff);
215 _worker_condition.notify_all ();
220 J2KWAVEncoder::process_begin (int64_t audio_channel_layout)
222 if (_fs->audio_sample_rate() != _fs->target_audio_sample_rate()) {
223 #ifdef HAVE_SWRESAMPLE
226 s << "Will resample audio from " << _fs->audio_sample_rate() << " to " << _fs->target_audio_sample_rate();
227 _log->log (s.str ());
229 /* We will be using planar float data when we call the resampler */
230 _swr_context = swr_alloc_set_opts (
232 audio_channel_layout,
234 _fs->target_audio_sample_rate(),
235 audio_channel_layout,
237 _fs->audio_sample_rate(),
241 swr_init (_swr_context);
243 throw EncodeError ("Cannot resample audio as libswresample is not present");
246 #ifdef HAVE_SWRESAMPLE
251 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
252 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
255 vector<ServerDescription*> servers = Config::instance()->servers ();
257 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
258 for (int j = 0; j < (*i)->threads (); ++j) {
259 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
265 J2KWAVEncoder::process_end ()
267 boost::mutex::scoped_lock lock (_worker_mutex);
269 _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
271 /* Keep waking workers until the queue is empty */
272 while (!_queue.empty ()) {
273 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
274 _worker_condition.notify_all ();
275 _worker_condition.wait (lock);
280 terminate_worker_threads ();
282 _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
284 /* The following sequence of events can occur in the above code:
285 1. a remote worker takes the last image off the queue
286 2. the loop above terminates
287 3. the remote worker fails to encode the image and puts it back on the queue
288 4. the remote worker is then terminated by terminate_worker_threads
290 So just mop up anything left in the queue here.
293 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
294 _log->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
296 shared_ptr<EncodedData> e = (*i)->encode_locally ();
297 e->write (_opt, (*i)->frame ());
298 frame_done ((*i)->frame ());
299 } catch (std::exception& e) {
300 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
307 shared_ptr<AudioBuffers> out (new AudioBuffers (_fs->audio_channels(), 256));
310 int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
313 throw EncodeError ("could not run sample-rate converter");
323 swr_free (&_swr_context);
327 int const dcp_sr = dcp_audio_sample_rate (_fs->audio_sample_rate ());
328 int64_t const extra_audio_frames = dcp_sr - (_audio_frames_written % dcp_sr);
329 shared_ptr<AudioBuffers> silence (new AudioBuffers (_fs->audio_channels(), extra_audio_frames));
330 silence->make_silent ();
331 write_audio (silence);
333 close_sound_files ();
335 /* Rename .wav.tmp files to .wav */
336 for (int i = 0; i < _fs->audio_channels(); ++i) {
337 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
338 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
340 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
345 J2KWAVEncoder::process_audio (shared_ptr<const AudioBuffers> audio)
347 shared_ptr<AudioBuffers> resampled;
350 /* Maybe sample-rate convert */
353 /* Compute the resampled frames count and add 32 for luck */
354 int const max_resampled_frames = ceil (audio->frames() * _fs->target_audio_sample_rate() / _fs->audio_sample_rate()) + 32;
356 resampled.reset (new AudioBuffers (_fs->audio_channels(), max_resampled_frames));
359 int const resampled_frames = swr_convert (
360 _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) audio->data(), audio->frames()
363 if (resampled_frames < 0) {
364 throw EncodeError ("could not run sample-rate converter");
367 resampled->set_frames (resampled_frames);
369 /* And point our variables at the resampled audio */
378 J2KWAVEncoder::write_audio (shared_ptr<const AudioBuffers> audio)
380 for (int i = 0; i < _fs->audio_channels(); ++i) {
381 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
384 _audio_frames_written += audio->frames ();