2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/j2k_wav_encoder.cc
21 * @brief An encoder which writes JPEG2000 and WAV files.
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
32 #include "j2k_wav_encoder.h"
34 #include "film_state.h"
36 #include "exceptions.h"
37 #include "dcp_video_frame.h"
44 using namespace boost;
46 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
48 , _deinterleave_buffer_size (8192)
49 , _deinterleave_buffer (0)
50 , _process_end (false)
52 /* Create sound output files with .tmp suffixes; we will rename
53 them if and when we complete.
55 for (int i = 0; i < _fs->audio_channels; ++i) {
57 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
58 /* We write mono files */
60 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
61 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
63 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
65 _sound_files.push_back (f);
68 /* Create buffer for deinterleaving audio */
69 _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
72 J2KWAVEncoder::~J2KWAVEncoder ()
74 terminate_worker_threads ();
75 delete[] _deinterleave_buffer;
80 J2KWAVEncoder::terminate_worker_threads ()
82 boost::mutex::scoped_lock lock (_worker_mutex);
84 _worker_condition.notify_all ();
87 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
94 J2KWAVEncoder::close_sound_files ()
96 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
100 _sound_files.clear ();
104 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
106 boost::mutex::scoped_lock lock (_worker_mutex);
108 /* Wait until the queue has gone down a bit */
109 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
110 _worker_condition.wait (lock);
117 /* Only do the processing if we don't already have a file for this frame */
118 if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
119 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
120 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
122 yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
123 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
128 _worker_condition.notify_all ();
135 J2KWAVEncoder::encoder_thread (ServerDescription* server)
137 /* Number of seconds that we currently wait between attempts
138 to connect to the server; not relevant for localhost
141 int remote_backoff = 0;
144 boost::mutex::scoped_lock lock (_worker_mutex);
145 while (_queue.empty () && !_process_end) {
146 _worker_condition.wait (lock);
153 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
158 shared_ptr<EncodedData> encoded;
162 encoded = vf->encode_remotely (server);
164 if (remote_backoff > 0) {
166 s << server->host_name() << " was lost, but now she is found; removing backoff";
167 _log->log (s.str ());
170 /* This job succeeded, so remove any backoff */
173 } catch (std::exception& e) {
174 if (remote_backoff < 60) {
176 remote_backoff += 10;
179 s << "Remote encode of " << vf->frame() << " on " << server->host_name() << " failed (" << e.what() << "); thread sleeping for " << remote_backoff << "s.";
180 _log->log (s.str ());
185 encoded = vf->encode_locally ();
186 } catch (std::exception& e) {
188 s << "Local encode failed " << e.what() << ".";
189 _log->log (s.str ());
194 encoded->write (_opt, vf->frame ());
195 frame_done (vf->frame ());
198 _queue.push_front (vf);
202 if (remote_backoff > 0) {
203 dvdomatic_sleep (remote_backoff);
207 _worker_condition.notify_all ();
212 J2KWAVEncoder::process_begin ()
214 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
215 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
218 vector<ServerDescription*> servers = Config::instance()->servers ();
220 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
221 for (int j = 0; j < (*i)->threads (); ++j) {
222 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
228 J2KWAVEncoder::process_end ()
230 boost::mutex::scoped_lock lock (_worker_mutex);
232 /* Keep waking workers until the queue is empty */
233 while (!_queue.empty ()) {
234 _worker_condition.notify_all ();
235 _worker_condition.wait (lock);
240 terminate_worker_threads ();
242 /* The following sequence of events can occur in the above code:
243 1. a remote worker takes the last image off the queue
244 2. the loop above terminates
245 3. the remote worker fails to encode the image and puts it back on the queue
246 4. the remote worker is then terminated by terminate_worker_threads
248 So just mop up anything left in the queue here.
251 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
253 s << "Encode left-over frame " << (*i)->frame();
254 _log->log (s.str ());
256 shared_ptr<EncodedData> e = (*i)->encode_locally ();
257 e->write (_opt, (*i)->frame ());
258 frame_done ((*i)->frame ());
259 } catch (std::exception& e) {
261 s << "Local encode failed " << e.what() << ".";
262 _log->log (s.str ());
266 close_sound_files ();
268 /* Rename .wav.tmp files to .wav */
269 for (int i = 0; i < _fs->audio_channels; ++i) {
270 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
271 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
273 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
278 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
280 /* Size of a sample in bytes */
281 int const sample_size = 2;
283 /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
284 of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
287 /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
289 /* Number of bytes left to read this time */
290 int remaining = data_size;
291 /* Our position in the output buffers, in bytes */
293 while (remaining > 0) {
294 /* How many bytes of the deinterleaved data to do this time */
295 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
296 for (int i = 0; i < _fs->audio_channels; ++i) {
297 for (int j = 0; j < this_time; j += sample_size) {
298 for (int k = 0; k < sample_size; ++k) {
299 int const to = j + k;
300 int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
301 _deinterleave_buffer[to] = data[from];
305 switch (_fs->audio_sample_format) {
306 case AV_SAMPLE_FMT_S16:
307 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
310 throw DecodeError ("unknown audio sample format");
314 position += this_time;
315 remaining -= this_time * _fs->audio_channels;