Try again to sort out the audio padding / alignment.
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "film_state.h"
36 #include "options.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
39 #include "server.h"
40 #include "filter.h"
41 #include "log.h"
42 #include "cross.h"
43
44 using namespace std;
45 using namespace boost;
46
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
48         : Encoder (s, o, l)
49 #ifdef HAVE_SWRESAMPLE    
50         , _swr_context (0)
51 #endif    
52         , _audio_frames_written (0)
53         , _process_end (false)
54 {
55         /* Create sound output files with .tmp suffixes; we will rename
56            them if and when we complete.
57         */
58         for (int i = 0; i < _fs->audio_channels(); ++i) {
59                 SF_INFO sf_info;
60                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate());
61                 /* We write mono files */
62                 sf_info.channels = 1;
63                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
64                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
65                 if (f == 0) {
66                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
67                 }
68                 _sound_files.push_back (f);
69         }
70 }
71
72 J2KWAVEncoder::~J2KWAVEncoder ()
73 {
74         terminate_worker_threads ();
75         close_sound_files ();
76 }
77
78 void
79 J2KWAVEncoder::terminate_worker_threads ()
80 {
81         boost::mutex::scoped_lock lock (_worker_mutex);
82         _process_end = true;
83         _worker_condition.notify_all ();
84         lock.unlock ();
85
86         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
87                 (*i)->join ();
88                 delete *i;
89         }
90 }
91
92 void
93 J2KWAVEncoder::close_sound_files ()
94 {
95         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
96                 sf_close (*i);
97         }
98
99         _sound_files.clear ();
100 }       
101
102 void
103 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame, shared_ptr<Subtitle> sub)
104 {
105         boost::mutex::scoped_lock lock (_worker_mutex);
106
107         /* Wait until the queue has gone down a bit */
108         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
109                 TIMING ("decoder sleeps with queue of %1", _queue.size());
110                 _worker_condition.wait (lock);
111                 TIMING ("decoder wakes with queue of %1", _queue.size());
112         }
113
114         if (_process_end) {
115                 return;
116         }
117
118         /* Only do the processing if we don't already have a file for this frame */
119         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
120                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters());
121                 TIMING ("adding to queue of %1", _queue.size ());
122                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
123                                           new DCPVideoFrame (
124                                                   yuv, sub, _opt->out_size, _opt->padding, _fs->subtitle_offset(), _fs->subtitle_scale(),
125                                                   _fs->scaler(), frame, _fs->frames_per_second(), s.second,
126                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
127                                                   _log
128                                                   )
129                                           ));
130                 
131                 _worker_condition.notify_all ();
132         } else {
133                 frame_skipped ();
134         }
135 }
136
137 void
138 J2KWAVEncoder::encoder_thread (ServerDescription* server)
139 {
140         /* Number of seconds that we currently wait between attempts
141            to connect to the server; not relevant for localhost
142            encodings.
143         */
144         int remote_backoff = 0;
145         
146         while (1) {
147
148                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
149                 boost::mutex::scoped_lock lock (_worker_mutex);
150                 while (_queue.empty () && !_process_end) {
151                         _worker_condition.wait (lock);
152                 }
153
154                 if (_process_end) {
155                         return;
156                 }
157
158                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
159                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
160                 _log->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()));
161                 _queue.pop_front ();
162                 
163                 lock.unlock ();
164
165                 shared_ptr<EncodedData> encoded;
166
167                 if (server) {
168                         try {
169                                 encoded = vf->encode_remotely (server);
170
171                                 if (remote_backoff > 0) {
172                                         _log->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
173                                 }
174                                 
175                                 /* This job succeeded, so remove any backoff */
176                                 remote_backoff = 0;
177                                 
178                         } catch (std::exception& e) {
179                                 if (remote_backoff < 60) {
180                                         /* back off more */
181                                         remote_backoff += 10;
182                                 }
183                                 _log->log (
184                                         String::compose (
185                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
186                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
187                                         );
188                         }
189                                 
190                 } else {
191                         try {
192                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
193                                 encoded = vf->encode_locally ();
194                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
195                         } catch (std::exception& e) {
196                                 _log->log (String::compose ("Local encode failed (%1)", e.what ()));
197                         }
198                 }
199
200                 if (encoded) {
201                         encoded->write (_opt, vf->frame ());
202                         frame_done (vf->frame ());
203                 } else {
204                         lock.lock ();
205                         _log->log (String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame()));
206                         _queue.push_front (vf);
207                         lock.unlock ();
208                 }
209
210                 if (remote_backoff > 0) {
211                         dvdomatic_sleep (remote_backoff);
212                 }
213
214                 lock.lock ();
215                 _worker_condition.notify_all ();
216         }
217 }
218
219 void
220 J2KWAVEncoder::process_begin (int64_t audio_channel_layout)
221 {
222         if (_fs->audio_sample_rate() != _fs->target_audio_sample_rate()) {
223 #ifdef HAVE_SWRESAMPLE
224
225                 stringstream s;
226                 s << "Will resample audio from " << _fs->audio_sample_rate() << " to " << _fs->target_audio_sample_rate();
227                 _log->log (s.str ());
228
229                 /* We will be using planar float data when we call the resampler */
230                 _swr_context = swr_alloc_set_opts (
231                         0,
232                         audio_channel_layout,
233                         AV_SAMPLE_FMT_FLTP,
234                         _fs->target_audio_sample_rate(),
235                         audio_channel_layout,
236                         AV_SAMPLE_FMT_FLTP,
237                         _fs->audio_sample_rate(),
238                         0, 0
239                         );
240                 
241                 swr_init (_swr_context);
242 #else
243                 throw EncodeError ("Cannot resample audio as libswresample is not present");
244 #endif
245         } else {
246 #ifdef HAVE_SWRESAMPLE
247                 _swr_context = 0;
248 #endif          
249         }
250         
251         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
252                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
253         }
254
255         vector<ServerDescription*> servers = Config::instance()->servers ();
256
257         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
258                 for (int j = 0; j < (*i)->threads (); ++j) {
259                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
260                 }
261         }
262 }
263
264 void
265 J2KWAVEncoder::process_end ()
266 {
267         boost::mutex::scoped_lock lock (_worker_mutex);
268
269         _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
270
271         /* Keep waking workers until the queue is empty */
272         while (!_queue.empty ()) {
273                 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
274                 _worker_condition.notify_all ();
275                 _worker_condition.wait (lock);
276         }
277
278         lock.unlock ();
279         
280         terminate_worker_threads ();
281
282         _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
283
284         /* The following sequence of events can occur in the above code:
285              1. a remote worker takes the last image off the queue
286              2. the loop above terminates
287              3. the remote worker fails to encode the image and puts it back on the queue
288              4. the remote worker is then terminated by terminate_worker_threads
289
290              So just mop up anything left in the queue here.
291         */
292
293         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
294                 _log->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
295                 try {
296                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
297                         e->write (_opt, (*i)->frame ());
298                         frame_done ((*i)->frame ());
299                 } catch (std::exception& e) {
300                         _log->log (String::compose ("Local encode failed (%1)", e.what ()));
301                 }
302         }
303
304 #if HAVE_SWRESAMPLE     
305         if (_swr_context) {
306
307                 shared_ptr<AudioBuffers> out (new AudioBuffers (_fs->audio_channels(), 256));
308                         
309                 while (1) {
310                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
311
312                         if (frames < 0) {
313                                 throw EncodeError ("could not run sample-rate converter");
314                         }
315
316                         if (frames == 0) {
317                                 break;
318                         }
319
320                         write_audio (out);
321                 }
322
323                 swr_free (&_swr_context);
324         }
325 #endif
326
327         int const dcp_sr = dcp_audio_sample_rate (_fs->audio_sample_rate ());
328         int64_t const extra_audio_frames = dcp_sr - (_audio_frames_written % dcp_sr);
329         shared_ptr<AudioBuffers> silence (new AudioBuffers (_fs->audio_channels(), extra_audio_frames));
330         silence->make_silent ();
331         write_audio (silence);
332         
333         close_sound_files ();
334
335         /* Rename .wav.tmp files to .wav */
336         for (int i = 0; i < _fs->audio_channels(); ++i) {
337                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
338                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
339                 }
340                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
341         }
342 }
343
344 void
345 J2KWAVEncoder::process_audio (shared_ptr<const AudioBuffers> audio)
346 {
347         shared_ptr<AudioBuffers> resampled;
348         
349 #if HAVE_SWRESAMPLE
350         /* Maybe sample-rate convert */
351         if (_swr_context) {
352
353                 /* Compute the resampled frames count and add 32 for luck */
354                 int const max_resampled_frames = ceil (audio->frames() * _fs->target_audio_sample_rate() / _fs->audio_sample_rate()) + 32;
355
356                 resampled.reset (new AudioBuffers (_fs->audio_channels(), max_resampled_frames));
357
358                 /* Resample audio */
359                 int const resampled_frames = swr_convert (
360                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) audio->data(), audio->frames()
361                         );
362                 
363                 if (resampled_frames < 0) {
364                         throw EncodeError ("could not run sample-rate converter");
365                 }
366
367                 resampled->set_frames (resampled_frames);
368                 
369                 /* And point our variables at the resampled audio */
370                 audio = resampled;
371         }
372 #endif
373
374         write_audio (audio);
375 }
376
377 void
378 J2KWAVEncoder::write_audio (shared_ptr<const AudioBuffers> audio)
379 {
380         for (int i = 0; i < _fs->audio_channels(); ++i) {
381                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
382         }
383
384         _audio_frames_written += audio->frames ();
385 }
386