a1f70a08a338f23d480f637ac5c806fc8aa8f196
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "film_state.h"
36 #include "options.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
39 #include "server.h"
40 #include "filter.h"
41 #include "log.h"
42 #include "cross.h"
43
44 using namespace std;
45 using namespace boost;
46
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
48         : Encoder (s, o, l)
49 #ifdef HAVE_SWRESAMPLE    
50         , _swr_context (0)
51 #endif    
52         , _deinterleave_buffer_size (8192)
53         , _deinterleave_buffer (0)
54         , _process_end (false)
55 {
56         /* Create sound output files with .tmp suffixes; we will rename
57            them if and when we complete.
58         */
59         for (int i = 0; i < _fs->audio_channels; ++i) {
60                 SF_INFO sf_info;
61                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
62                 /* We write mono files */
63                 sf_info.channels = 1;
64                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
65                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
66                 if (f == 0) {
67                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
68                 }
69                 _sound_files.push_back (f);
70         }
71
72         /* Create buffer for deinterleaving audio */
73         _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
74 }
75
76 J2KWAVEncoder::~J2KWAVEncoder ()
77 {
78         terminate_worker_threads ();
79         delete[] _deinterleave_buffer;
80         close_sound_files ();
81 }
82
83 void
84 J2KWAVEncoder::terminate_worker_threads ()
85 {
86         boost::mutex::scoped_lock lock (_worker_mutex);
87         _process_end = true;
88         _worker_condition.notify_all ();
89         lock.unlock ();
90
91         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
92                 (*i)->join ();
93                 delete *i;
94         }
95 }
96
97 void
98 J2KWAVEncoder::close_sound_files ()
99 {
100         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
101                 sf_close (*i);
102         }
103
104         _sound_files.clear ();
105 }       
106
107 void
108 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
109 {
110         boost::mutex::scoped_lock lock (_worker_mutex);
111
112         /* Wait until the queue has gone down a bit */
113         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
114                 TIMING ("decoder sleeps with queue of %1", _queue.size());
115                 _worker_condition.wait (lock);
116                 TIMING ("decoder wakes with queue of %1", _queue.size());
117         }
118
119         if (_process_end) {
120                 return;
121         }
122
123         /* Only do the processing if we don't already have a file for this frame */
124         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
125                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
126                 TIMING ("adding to queue of %1", _queue.size ());
127                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
128                                           new DCPVideoFrame (
129                                                   yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
130                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
131                                                   _log
132                                                   )
133                                           ));
134                 
135                 _worker_condition.notify_all ();
136         } else {
137                 frame_skipped ();
138         }
139 }
140
141 void
142 J2KWAVEncoder::encoder_thread (ServerDescription* server)
143 {
144         /* Number of seconds that we currently wait between attempts
145            to connect to the server; not relevant for localhost
146            encodings.
147         */
148         int remote_backoff = 0;
149         
150         while (1) {
151
152                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
153                 boost::mutex::scoped_lock lock (_worker_mutex);
154                 while (_queue.empty () && !_process_end) {
155                         _worker_condition.wait (lock);
156                 }
157
158                 if (_process_end) {
159                         return;
160                 }
161
162                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
163                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
164                 _log->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()));
165                 _queue.pop_front ();
166                 
167                 lock.unlock ();
168
169                 shared_ptr<EncodedData> encoded;
170
171                 if (server) {
172                         try {
173                                 encoded = vf->encode_remotely (server);
174
175                                 if (remote_backoff > 0) {
176                                         _log->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
177                                 }
178                                 
179                                 /* This job succeeded, so remove any backoff */
180                                 remote_backoff = 0;
181                                 
182                         } catch (std::exception& e) {
183                                 if (remote_backoff < 60) {
184                                         /* back off more */
185                                         remote_backoff += 10;
186                                 }
187                                 _log->log (
188                                         String::compose (
189                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
190                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
191                                         );
192                         }
193                                 
194                 } else {
195                         try {
196                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
197                                 encoded = vf->encode_locally ();
198                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
199                         } catch (std::exception& e) {
200                                 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
201                         }
202                 }
203
204                 if (encoded) {
205                         encoded->write (_opt, vf->frame ());
206                         frame_done (vf->frame ());
207                 } else {
208                         lock.lock ();
209                         _log->log (String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame()));
210                         _queue.push_front (vf);
211                         lock.unlock ();
212                 }
213
214                 if (remote_backoff > 0) {
215                         dvdomatic_sleep (remote_backoff);
216                 }
217
218                 lock.lock ();
219                 _worker_condition.notify_all ();
220         }
221 }
222
223 void
224 J2KWAVEncoder::process_begin (int64_t audio_channel_layout, AVSampleFormat audio_sample_format)
225 {
226         if (_fs->audio_sample_rate != _fs->target_sample_rate ()) {
227 #ifdef HAVE_SWRESAMPLE
228
229                 stringstream s;
230                 s << "Will resample audio from " << _fs->audio_sample_rate << " to " << _fs->target_sample_rate();
231                 _log->log (s.str ());
232                 
233                 _swr_context = swr_alloc_set_opts (
234                         0,
235                         audio_channel_layout,
236                         audio_sample_format,
237                         _fs->target_sample_rate(),
238                         audio_channel_layout,
239                         audio_sample_format,
240                         _fs->audio_sample_rate,
241                         0, 0
242                         );
243                 
244                 swr_init (_swr_context);
245 #else
246                 throw EncodeError ("Cannot resample audio as libswresample is not present");
247 #endif
248         } else {
249 #ifdef HAVE_SWRESAMPLE
250                 _swr_context = 0;
251 #endif          
252         }
253         
254         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
255                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
256         }
257
258         vector<ServerDescription*> servers = Config::instance()->servers ();
259
260         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
261                 for (int j = 0; j < (*i)->threads (); ++j) {
262                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
263                 }
264         }
265 }
266
267 void
268 J2KWAVEncoder::process_end ()
269 {
270         boost::mutex::scoped_lock lock (_worker_mutex);
271
272         _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
273
274         /* Keep waking workers until the queue is empty */
275         while (!_queue.empty ()) {
276                 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
277                 _worker_condition.notify_all ();
278                 _worker_condition.wait (lock);
279         }
280
281         lock.unlock ();
282         
283         terminate_worker_threads ();
284
285         _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
286
287         /* The following sequence of events can occur in the above code:
288              1. a remote worker takes the last image off the queue
289              2. the loop above terminates
290              3. the remote worker fails to encode the image and puts it back on the queue
291              4. the remote worker is then terminated by terminate_worker_threads
292
293              So just mop up anything left in the queue here.
294         */
295
296         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
297                 _log->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
298                 try {
299                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
300                         e->write (_opt, (*i)->frame ());
301                         frame_done ((*i)->frame ());
302                 } catch (std::exception& e) {
303                         _log->log (String::compose ("Local encode failed (%1)", e.what ()));
304                 }
305         }
306
307 #if HAVE_SWRESAMPLE     
308         if (_swr_context) {
309
310                 while (1) {
311                         uint8_t buffer[256 * _fs->bytes_per_sample() * _fs->audio_channels];
312                         uint8_t* out[2] = {
313                                 buffer,
314                                 0
315                         };
316
317                         int const frames = swr_convert (_swr_context, out, 256, 0, 0);
318
319                         if (frames < 0) {
320                                 throw EncodeError ("could not run sample-rate converter");
321                         }
322
323                         if (frames == 0) {
324                                 break;
325                         }
326
327                         write_audio (buffer, frames * _fs->bytes_per_sample() * _fs->audio_channels);
328                 }
329
330                 swr_free (&_swr_context);
331         }
332 #endif  
333         
334         close_sound_files ();
335
336         /* Rename .wav.tmp files to .wav */
337         for (int i = 0; i < _fs->audio_channels; ++i) {
338                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
339                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
340                 }
341                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
342         }
343 }
344
345 void
346 J2KWAVEncoder::process_audio (uint8_t* data, int size)
347 {
348         /* This is a buffer we might use if we are sample-rate converting;
349            it will need freeing if so.
350         */
351         uint8_t* out_buffer = 0;
352         
353         /* Maybe sample-rate convert */
354 #if HAVE_SWRESAMPLE     
355         if (_swr_context) {
356
357                 uint8_t const * in[2] = {
358                         data,
359                         0
360                 };
361
362                 /* Here's samples per channel */
363                 int const samples = size / _fs->bytes_per_sample();
364                 
365                 /* And here's frames (where 1 frame is a collection of samples, 1 for each channel,
366                    so for 5.1 a frame would be 6 samples)
367                 */
368                 int const frames = samples / _fs->audio_channels;
369
370                 /* Compute the resampled frame count and add 32 for luck */
371                 int const out_buffer_size_frames = ceil (frames * _fs->target_sample_rate() / _fs->audio_sample_rate) + 32;
372                 int const out_buffer_size_bytes = out_buffer_size_frames * _fs->audio_channels * _fs->bytes_per_sample();
373                 out_buffer = new uint8_t[out_buffer_size_bytes];
374
375                 uint8_t* out[2] = {
376                         out_buffer, 
377                         0
378                 };
379
380                 /* Resample audio */
381                 int out_frames = swr_convert (_swr_context, out, out_buffer_size_frames, in, frames);
382                 if (out_frames < 0) {
383                         throw EncodeError ("could not run sample-rate converter");
384                 }
385
386                 /* And point our variables at the resampled audio */
387                 data = out_buffer;
388                 size = out_frames * _fs->audio_channels * _fs->bytes_per_sample();
389         }
390 #endif
391
392         write_audio (data, size);
393
394         /* Delete the sample-rate conversion buffer, if it exists */
395         delete[] out_buffer;
396 }
397
398 void
399 J2KWAVEncoder::write_audio (uint8_t* data, int size)
400 {
401         /* XXX: we are assuming that the _deinterleave_buffer_size is a multiple
402            of the sample size and that size is a multiple of _fs->audio_channels * sample_size.
403         */
404
405         assert ((size % (_fs->audio_channels * _fs->bytes_per_sample())) == 0);
406         assert ((_deinterleave_buffer_size % _fs->bytes_per_sample()) == 0);
407         
408         /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
409         
410         /* Number of bytes left to read this time */
411         int remaining = size;
412         /* Our position in the output buffers, in bytes */
413         int position = 0;
414         while (remaining > 0) {
415                 /* How many bytes of the deinterleaved data to do this time */
416                 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
417                 for (int i = 0; i < _fs->audio_channels; ++i) {
418                         for (int j = 0; j < this_time; j += _fs->bytes_per_sample()) {
419                                 for (int k = 0; k < _fs->bytes_per_sample(); ++k) {
420                                         int const to = j + k;
421                                         int const from = position + (i * _fs->bytes_per_sample()) + (j * _fs->audio_channels) + k;
422                                         _deinterleave_buffer[to] = data[from];
423                                 }
424                         }
425                         
426                         switch (_fs->audio_sample_format) {
427                         case AV_SAMPLE_FMT_S16:
428                                 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / _fs->bytes_per_sample());
429                                 break;
430                         default:
431                                 throw EncodeError ("unknown audio sample format");
432                         }
433                 }
434                 
435                 position += this_time;
436                 remaining -= this_time * _fs->audio_channels;
437         }
438 }
439