2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/j2k_wav_encoder.cc
21 * @brief An encoder which writes JPEG2000 and WAV files.
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
32 #include "j2k_wav_encoder.h"
34 #include "film_state.h"
36 #include "exceptions.h"
37 #include "dcp_video_frame.h"
44 using namespace boost;
46 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
48 , _deinterleave_buffer_size (8192)
49 , _deinterleave_buffer (0)
50 , _process_end (false)
52 /* Create sound output files with .tmp suffixes; we will rename
53 them if and when we complete.
55 for (int i = 0; i < _fs->audio_channels; ++i) {
57 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
58 /* We write mono files */
60 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
61 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
63 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
65 _sound_files.push_back (f);
68 /* Create buffer for deinterleaving audio */
69 _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
72 J2KWAVEncoder::~J2KWAVEncoder ()
74 terminate_worker_threads ();
75 delete[] _deinterleave_buffer;
80 J2KWAVEncoder::terminate_worker_threads ()
82 boost::mutex::scoped_lock lock (_worker_mutex);
84 _worker_condition.notify_all ();
87 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
94 J2KWAVEncoder::close_sound_files ()
96 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
100 _sound_files.clear ();
104 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
106 boost::mutex::scoped_lock lock (_worker_mutex);
108 /* Wait until the queue has gone down a bit */
109 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
110 _worker_condition.wait (lock);
117 /* Only do the processing if we don't already have a file for this frame */
118 if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
119 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
120 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
122 yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
123 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
128 _worker_condition.notify_all ();
133 J2KWAVEncoder::encoder_thread (Server* server)
135 /* Number of seconds that we currently wait between attempts
136 to connect to the server; not relevant for localhost
139 int remote_backoff = 0;
142 boost::mutex::scoped_lock lock (_worker_mutex);
143 while (_queue.empty () && !_process_end) {
144 _worker_condition.wait (lock);
151 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
156 shared_ptr<EncodedData> encoded;
160 encoded = vf->encode_remotely (server);
162 if (remote_backoff > 0) {
164 s << server->host_name() << " was lost, but now she is found; removing backoff";
165 _log->log (s.str ());
168 /* This job succeeded, so remove any backoff */
171 } catch (std::exception& e) {
172 if (remote_backoff < 60) {
174 remote_backoff += 10;
177 s << "Remote encode of " << vf->frame() << " on " << server->host_name() << " failed (" << e.what() << "); thread sleeping for " << remote_backoff << "s.";
178 _log->log (s.str ());
183 encoded = vf->encode_locally ();
184 } catch (std::exception& e) {
186 s << "Local encode failed " << e.what() << ".";
187 _log->log (s.str ());
192 encoded->write (_opt, vf->frame ());
196 _queue.push_front (vf);
200 if (remote_backoff > 0) {
201 dvdomatic_sleep (remote_backoff);
205 _worker_condition.notify_all ();
210 J2KWAVEncoder::process_begin ()
212 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
213 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (Server *) 0)));
216 vector<Server*> servers = Config::instance()->servers ();
218 for (vector<Server*>::iterator i = servers.begin(); i != servers.end(); ++i) {
219 for (int j = 0; j < (*i)->threads (); ++j) {
220 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
226 J2KWAVEncoder::process_end ()
228 boost::mutex::scoped_lock lock (_worker_mutex);
230 /* Keep waking workers until the queue is empty */
231 while (!_queue.empty ()) {
232 _worker_condition.notify_all ();
233 _worker_condition.wait (lock);
238 terminate_worker_threads ();
240 /* The following sequence of events can occur in the above code:
241 1. a remote worker takes the last image off the queue
242 2. the loop above terminates
243 3. the remote worker fails to encode the image and puts it back on the queue
244 4. the remote worker is then terminated by terminate_worker_threads
246 So just mop up anything left in the queue here.
249 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
251 s << "Encode left-over frame " << (*i)->frame();
252 _log->log (s.str ());
254 shared_ptr<EncodedData> e = (*i)->encode_locally ();
255 e->write (_opt, (*i)->frame ());
257 } catch (std::exception& e) {
259 s << "Local encode failed " << e.what() << ".";
260 _log->log (s.str ());
264 close_sound_files ();
266 /* Rename .wav.tmp files to .wav */
267 for (int i = 0; i < _fs->audio_channels; ++i) {
268 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
269 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
271 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
276 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
278 /* Size of a sample in bytes */
279 int const sample_size = 2;
281 /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
282 of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
285 /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
287 /* Number of bytes left to read this time */
288 int remaining = data_size;
289 /* Our position in the output buffers, in bytes */
291 while (remaining > 0) {
292 /* How many bytes of the deinterleaved data to do this time */
293 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
294 for (int i = 0; i < _fs->audio_channels; ++i) {
295 for (int j = 0; j < this_time; j += sample_size) {
296 for (int k = 0; k < sample_size; ++k) {
297 int const to = j + k;
298 int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
299 _deinterleave_buffer[to] = data[from];
303 switch (_fs->audio_sample_format) {
304 case AV_SAMPLE_FMT_S16:
305 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
308 throw DecodeError ("unknown audio sample format");
312 position += this_time;
313 remaining -= this_time * _fs->audio_channels;