2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/j2k_wav_encoder.cc
21 * @brief An encoder which writes JPEG2000 and WAV files.
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
33 #include "j2k_wav_encoder.h"
35 #include "film_state.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
45 using namespace boost;
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
49 , _deinterleave_buffer_size (8192)
50 , _deinterleave_buffer (0)
51 , _process_end (false)
53 /* Create sound output files with .tmp suffixes; we will rename
54 them if and when we complete.
56 for (int i = 0; i < _fs->audio_channels; ++i) {
58 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
59 /* We write mono files */
61 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
62 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
64 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
66 _sound_files.push_back (f);
69 /* Create buffer for deinterleaving audio */
70 _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
73 J2KWAVEncoder::~J2KWAVEncoder ()
75 terminate_worker_threads ();
76 delete[] _deinterleave_buffer;
81 J2KWAVEncoder::terminate_worker_threads ()
83 boost::mutex::scoped_lock lock (_worker_mutex);
85 _worker_condition.notify_all ();
88 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
95 J2KWAVEncoder::close_sound_files ()
97 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
101 _sound_files.clear ();
105 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
107 boost::mutex::scoped_lock lock (_worker_mutex);
109 /* Wait until the queue has gone down a bit */
110 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
111 _log->microsecond_log ("Decoder sleeps", Log::TIMING);
112 _worker_condition.wait (lock);
113 _log->microsecond_log ("Decoder wakes", Log::TIMING);
120 /* Only do the processing if we don't already have a file for this frame */
121 if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
122 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
123 _log->microsecond_log ("Adding to queue of " + boost::lexical_cast<string> (_queue.size ()), Log::TIMING);
124 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
126 yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
127 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
132 _worker_condition.notify_all ();
139 J2KWAVEncoder::encoder_thread (ServerDescription* server)
141 /* Number of seconds that we currently wait between attempts
142 to connect to the server; not relevant for localhost
145 int remote_backoff = 0;
151 s << "Encoder thread " << pthread_self() << " sleeps.";
152 _log->microsecond_log (s.str(), Log::TIMING);
155 boost::mutex::scoped_lock lock (_worker_mutex);
156 while (_queue.empty () && !_process_end) {
157 _worker_condition.wait (lock);
164 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
168 s << "Encoder thread " << pthread_self() << " wakes with queue of " << _queue.size();
169 _log->microsecond_log (s.str(), Log::TIMING);
176 shared_ptr<EncodedData> encoded;
180 encoded = vf->encode_remotely (server);
182 if (remote_backoff > 0) {
184 s << server->host_name() << " was lost, but now she is found; removing backoff";
185 _log->log (s.str ());
188 /* This job succeeded, so remove any backoff */
191 } catch (std::exception& e) {
192 if (remote_backoff < 60) {
194 remote_backoff += 10;
197 s << "Remote encode of " << vf->frame() << " on " << server->host_name() << " failed (" << e.what() << "); thread sleeping for " << remote_backoff << "s.";
198 _log->log (s.str ());
205 s << "Encoder thread " << pthread_self() << " begins local encode of " << vf->frame();
206 _log->microsecond_log (s.str(), Log::TIMING);
209 encoded = vf->encode_locally ();
213 s << "Encoder thread " << pthread_self() << " finishes local encode of " << vf->frame();
214 _log->microsecond_log (s.str(), Log::TIMING);
216 } catch (std::exception& e) {
218 s << "Local encode failed " << e.what() << ".";
219 _log->log (s.str ());
224 encoded->write (_opt, vf->frame ());
225 frame_done (vf->frame ());
228 _queue.push_front (vf);
232 if (remote_backoff > 0) {
233 dvdomatic_sleep (remote_backoff);
237 _worker_condition.notify_all ();
242 J2KWAVEncoder::process_begin ()
244 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
245 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
248 vector<ServerDescription*> servers = Config::instance()->servers ();
250 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
251 for (int j = 0; j < (*i)->threads (); ++j) {
252 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
258 J2KWAVEncoder::process_end ()
260 boost::mutex::scoped_lock lock (_worker_mutex);
262 _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
264 /* Keep waking workers until the queue is empty */
265 while (!_queue.empty ()) {
266 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
267 _worker_condition.notify_all ();
268 _worker_condition.wait (lock);
273 terminate_worker_threads ();
275 _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
277 /* The following sequence of events can occur in the above code:
278 1. a remote worker takes the last image off the queue
279 2. the loop above terminates
280 3. the remote worker fails to encode the image and puts it back on the queue
281 4. the remote worker is then terminated by terminate_worker_threads
283 So just mop up anything left in the queue here.
286 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
288 s << "Encode left-over frame " << (*i)->frame();
289 _log->log (s.str ());
291 shared_ptr<EncodedData> e = (*i)->encode_locally ();
292 e->write (_opt, (*i)->frame ());
293 frame_done ((*i)->frame ());
294 } catch (std::exception& e) {
296 s << "Local encode failed " << e.what() << ".";
297 _log->log (s.str ());
301 close_sound_files ();
303 /* Rename .wav.tmp files to .wav */
304 for (int i = 0; i < _fs->audio_channels; ++i) {
305 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
306 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
308 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
313 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
315 /* Size of a sample in bytes */
316 int const sample_size = 2;
318 /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
319 of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
322 /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
324 /* Number of bytes left to read this time */
325 int remaining = data_size;
326 /* Our position in the output buffers, in bytes */
328 while (remaining > 0) {
329 /* How many bytes of the deinterleaved data to do this time */
330 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
331 for (int i = 0; i < _fs->audio_channels; ++i) {
332 for (int j = 0; j < this_time; j += sample_size) {
333 for (int k = 0; k < sample_size; ++k) {
334 int const to = j + k;
335 int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
336 _deinterleave_buffer[to] = data[from];
340 switch (_fs->audio_sample_format) {
341 case AV_SAMPLE_FMT_S16:
342 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
345 throw DecodeError ("unknown audio sample format");
349 position += this_time;
350 remaining -= this_time * _fs->audio_channels;