2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/j2k_wav_encoder.cc
21 * @brief An encoder which writes JPEG2000 and WAV files.
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
33 #include "j2k_wav_encoder.h"
35 #include "film_state.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
45 using namespace boost;
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
49 , _deinterleave_buffer_size (8192)
50 , _deinterleave_buffer (0)
51 , _process_end (false)
53 /* Create sound output files with .tmp suffixes; we will rename
54 them if and when we complete.
56 for (int i = 0; i < _fs->audio_channels; ++i) {
58 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
59 /* We write mono files */
61 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
62 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
64 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
66 _sound_files.push_back (f);
69 /* Create buffer for deinterleaving audio */
70 _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
73 J2KWAVEncoder::~J2KWAVEncoder ()
75 terminate_worker_threads ();
76 delete[] _deinterleave_buffer;
81 J2KWAVEncoder::terminate_worker_threads ()
83 boost::mutex::scoped_lock lock (_worker_mutex);
85 _worker_condition.notify_all ();
88 for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
95 J2KWAVEncoder::close_sound_files ()
97 for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
101 _sound_files.clear ();
105 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
107 boost::mutex::scoped_lock lock (_worker_mutex);
109 /* Wait until the queue has gone down a bit */
110 while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
111 _log->microsecond_log ("Decoder sleeps", Log::TIMING);
112 _worker_condition.wait (lock);
113 _log->microsecond_log ("Decoder wakes", Log::TIMING);
120 /* Only do the processing if we don't already have a file for this frame */
121 if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
122 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
123 _log->microsecond_log ("Adding to queue of " + boost::lexical_cast<string> (_queue.size ()), Log::TIMING);
124 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
126 yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
127 Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
132 _worker_condition.notify_all ();
139 J2KWAVEncoder::encoder_thread (ServerDescription* server)
141 /* Number of seconds that we currently wait between attempts
142 to connect to the server; not relevant for localhost
145 int remote_backoff = 0;
148 boost::mutex::scoped_lock lock (_worker_mutex);
149 while (_queue.empty () && !_process_end) {
150 _log->microsecond_log ("Encoder thread sleeps", Log::TIMING);
151 _worker_condition.wait (lock);
152 _log->microsecond_log ("Encoder thread wakes", Log::TIMING);
159 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
160 _log->microsecond_log ("Encoder thread wakes with queue of " + boost::lexical_cast<string> (_queue.size ()), Log::TIMING);
165 shared_ptr<EncodedData> encoded;
169 encoded = vf->encode_remotely (server);
171 if (remote_backoff > 0) {
173 s << server->host_name() << " was lost, but now she is found; removing backoff";
174 _log->log (s.str ());
177 /* This job succeeded, so remove any backoff */
180 } catch (std::exception& e) {
181 if (remote_backoff < 60) {
183 remote_backoff += 10;
186 s << "Remote encode of " << vf->frame() << " on " << server->host_name() << " failed (" << e.what() << "); thread sleeping for " << remote_backoff << "s.";
187 _log->log (s.str ());
192 _log->microsecond_log ("Encoder thread begins local encode of " + lexical_cast<string> (vf->frame ()), Log::TIMING);
193 encoded = vf->encode_locally ();
194 _log->microsecond_log ("Encoder thread finishes local encode of " + lexical_cast<string> (vf->frame ()), Log::TIMING);
195 } catch (std::exception& e) {
197 s << "Local encode failed " << e.what() << ".";
198 _log->log (s.str ());
203 encoded->write (_opt, vf->frame ());
204 frame_done (vf->frame ());
207 _queue.push_front (vf);
211 if (remote_backoff > 0) {
212 dvdomatic_sleep (remote_backoff);
216 _worker_condition.notify_all ();
221 J2KWAVEncoder::process_begin ()
223 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
224 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
227 vector<ServerDescription*> servers = Config::instance()->servers ();
229 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
230 for (int j = 0; j < (*i)->threads (); ++j) {
231 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
237 J2KWAVEncoder::process_end ()
239 boost::mutex::scoped_lock lock (_worker_mutex);
241 _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
243 /* Keep waking workers until the queue is empty */
244 while (!_queue.empty ()) {
245 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
246 _worker_condition.notify_all ();
247 _worker_condition.wait (lock);
252 terminate_worker_threads ();
254 _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
256 /* The following sequence of events can occur in the above code:
257 1. a remote worker takes the last image off the queue
258 2. the loop above terminates
259 3. the remote worker fails to encode the image and puts it back on the queue
260 4. the remote worker is then terminated by terminate_worker_threads
262 So just mop up anything left in the queue here.
265 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
267 s << "Encode left-over frame " << (*i)->frame();
268 _log->log (s.str ());
270 shared_ptr<EncodedData> e = (*i)->encode_locally ();
271 e->write (_opt, (*i)->frame ());
272 frame_done ((*i)->frame ());
273 } catch (std::exception& e) {
275 s << "Local encode failed " << e.what() << ".";
276 _log->log (s.str ());
280 close_sound_files ();
282 /* Rename .wav.tmp files to .wav */
283 for (int i = 0; i < _fs->audio_channels; ++i) {
284 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
285 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
287 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
292 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
294 /* Size of a sample in bytes */
295 int const sample_size = 2;
297 /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
298 of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
301 /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
303 /* Number of bytes left to read this time */
304 int remaining = data_size;
305 /* Our position in the output buffers, in bytes */
307 while (remaining > 0) {
308 /* How many bytes of the deinterleaved data to do this time */
309 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
310 for (int i = 0; i < _fs->audio_channels; ++i) {
311 for (int j = 0; j < this_time; j += sample_size) {
312 for (int k = 0; k < sample_size; ++k) {
313 int const to = j + k;
314 int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
315 _deinterleave_buffer[to] = data[from];
319 switch (_fs->audio_sample_format) {
320 case AV_SAMPLE_FMT_S16:
321 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
324 throw DecodeError ("unknown audio sample format");
328 position += this_time;
329 remaining -= this_time * _fs->audio_channels;