Remove film player, DVD ripping, alignment, screen configs; never finished and not...
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "options.h"
36 #include "exceptions.h"
37 #include "dcp_video_frame.h"
38 #include "server.h"
39 #include "filter.h"
40 #include "log.h"
41 #include "cross.h"
42 #include "film.h"
43
44 using std::string;
45 using std::stringstream;
46 using std::list;
47 using std::vector;
48 using std::pair;
49 using boost::shared_ptr;
50 using boost::thread;
51 using boost::lexical_cast;
52
53 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const Film> f, shared_ptr<const Options> o)
54         : Encoder (f, o)
55 #ifdef HAVE_SWRESAMPLE    
56         , _swr_context (0)
57 #endif    
58         , _audio_frames_written (0)
59         , _process_end (false)
60 {
61         /* Create sound output files with .tmp suffixes; we will rename
62            them if and when we complete.
63         */
64         for (int i = 0; i < _film->audio_channels(); ++i) {
65                 SF_INFO sf_info;
66                 sf_info.samplerate = dcp_audio_sample_rate (_film->audio_sample_rate());
67                 /* We write mono files */
68                 sf_info.channels = 1;
69                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
70                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
71                 if (f == 0) {
72                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
73                 }
74                 _sound_files.push_back (f);
75         }
76 }
77
78 J2KWAVEncoder::~J2KWAVEncoder ()
79 {
80         terminate_worker_threads ();
81         close_sound_files ();
82 }
83
84 void
85 J2KWAVEncoder::terminate_worker_threads ()
86 {
87         boost::mutex::scoped_lock lock (_worker_mutex);
88         _process_end = true;
89         _worker_condition.notify_all ();
90         lock.unlock ();
91
92         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
93                 (*i)->join ();
94                 delete *i;
95         }
96 }
97
98 void
99 J2KWAVEncoder::close_sound_files ()
100 {
101         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
102                 sf_close (*i);
103         }
104
105         _sound_files.clear ();
106 }       
107
108 void
109 J2KWAVEncoder::do_process_video (shared_ptr<const Image> yuv, SourceFrame frame, shared_ptr<Subtitle> sub)
110 {
111         boost::mutex::scoped_lock lock (_worker_mutex);
112
113         /* Wait until the queue has gone down a bit */
114         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
115                 TIMING ("decoder sleeps with queue of %1", _queue.size());
116                 _worker_condition.wait (lock);
117                 TIMING ("decoder wakes with queue of %1", _queue.size());
118         }
119
120         if (_process_end) {
121                 return;
122         }
123
124         /* Only do the processing if we don't already have a file for this frame */
125         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
126                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
127                 TIMING ("adding to queue of %1", _queue.size ());
128                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
129                                           new DCPVideoFrame (
130                                                   yuv, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
131                                                   _film->scaler(), frame, _film->frames_per_second(), s.second,
132                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
133                                                   _film->log()
134                                                   )
135                                           ));
136                 
137                 _worker_condition.notify_all ();
138         } else {
139                 frame_skipped ();
140         }
141 }
142
143 void
144 J2KWAVEncoder::encoder_thread (ServerDescription* server)
145 {
146         /* Number of seconds that we currently wait between attempts
147            to connect to the server; not relevant for localhost
148            encodings.
149         */
150         int remote_backoff = 0;
151         
152         while (1) {
153
154                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
155                 boost::mutex::scoped_lock lock (_worker_mutex);
156                 while (_queue.empty () && !_process_end) {
157                         _worker_condition.wait (lock);
158                 }
159
160                 if (_process_end) {
161                         return;
162                 }
163
164                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
165                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
166                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
167                 _queue.pop_front ();
168                 
169                 lock.unlock ();
170
171                 shared_ptr<EncodedData> encoded;
172
173                 if (server) {
174                         try {
175                                 encoded = vf->encode_remotely (server);
176
177                                 if (remote_backoff > 0) {
178                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
179                                 }
180                                 
181                                 /* This job succeeded, so remove any backoff */
182                                 remote_backoff = 0;
183                                 
184                         } catch (std::exception& e) {
185                                 if (remote_backoff < 60) {
186                                         /* back off more */
187                                         remote_backoff += 10;
188                                 }
189                                 _film->log()->log (
190                                         String::compose (
191                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
192                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
193                                         );
194                         }
195                                 
196                 } else {
197                         try {
198                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
199                                 encoded = vf->encode_locally ();
200                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
201                         } catch (std::exception& e) {
202                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
203                         }
204                 }
205
206                 if (encoded) {
207                         encoded->write (_opt, vf->frame ());
208                         frame_done (vf->frame ());
209                 } else {
210                         lock.lock ();
211                         _film->log()->log (
212                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
213                                 );
214                         _queue.push_front (vf);
215                         lock.unlock ();
216                 }
217
218                 if (remote_backoff > 0) {
219                         dvdomatic_sleep (remote_backoff);
220                 }
221
222                 lock.lock ();
223                 _worker_condition.notify_all ();
224         }
225 }
226
227 void
228 J2KWAVEncoder::process_begin (int64_t audio_channel_layout)
229 {
230         if (_film->audio_sample_rate() != _film->target_audio_sample_rate()) {
231 #ifdef HAVE_SWRESAMPLE
232
233                 stringstream s;
234                 s << "Will resample audio from " << _film->audio_sample_rate() << " to " << _film->target_audio_sample_rate();
235                 _film->log()->log (s.str ());
236
237                 /* We will be using planar float data when we call the resampler */
238                 _swr_context = swr_alloc_set_opts (
239                         0,
240                         audio_channel_layout,
241                         AV_SAMPLE_FMT_FLTP,
242                         _film->target_audio_sample_rate(),
243                         audio_channel_layout,
244                         AV_SAMPLE_FMT_FLTP,
245                         _film->audio_sample_rate(),
246                         0, 0
247                         );
248                 
249                 swr_init (_swr_context);
250 #else
251                 throw EncodeError ("Cannot resample audio as libswresample is not present");
252 #endif
253         } else {
254 #ifdef HAVE_SWRESAMPLE
255                 _swr_context = 0;
256 #endif          
257         }
258         
259         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
260                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
261         }
262
263         vector<ServerDescription*> servers = Config::instance()->servers ();
264
265         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
266                 for (int j = 0; j < (*i)->threads (); ++j) {
267                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
268                 }
269         }
270 }
271
272 void
273 J2KWAVEncoder::process_end ()
274 {
275         boost::mutex::scoped_lock lock (_worker_mutex);
276
277         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
278
279         /* Keep waking workers until the queue is empty */
280         while (!_queue.empty ()) {
281                 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
282                 _worker_condition.notify_all ();
283                 _worker_condition.wait (lock);
284         }
285
286         lock.unlock ();
287         
288         terminate_worker_threads ();
289
290         _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
291
292         /* The following sequence of events can occur in the above code:
293              1. a remote worker takes the last image off the queue
294              2. the loop above terminates
295              3. the remote worker fails to encode the image and puts it back on the queue
296              4. the remote worker is then terminated by terminate_worker_threads
297
298              So just mop up anything left in the queue here.
299         */
300
301         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
302                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
303                 try {
304                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
305                         e->write (_opt, (*i)->frame ());
306                         frame_done ((*i)->frame ());
307                 } catch (std::exception& e) {
308                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
309                 }
310         }
311
312 #if HAVE_SWRESAMPLE     
313         if (_swr_context) {
314
315                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_channels(), 256));
316                         
317                 while (1) {
318                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
319
320                         if (frames < 0) {
321                                 throw EncodeError ("could not run sample-rate converter");
322                         }
323
324                         if (frames == 0) {
325                                 break;
326                         }
327
328                         out->set_frames (frames);
329                         write_audio (out);
330                 }
331
332                 swr_free (&_swr_context);
333         }
334 #endif
335
336         int const dcp_sr = dcp_audio_sample_rate (_film->audio_sample_rate ());
337         int64_t const extra_audio_frames = dcp_sr - (_audio_frames_written % dcp_sr);
338         shared_ptr<AudioBuffers> silence (new AudioBuffers (_film->audio_channels(), extra_audio_frames));
339         silence->make_silent ();
340         write_audio (silence);
341         
342         close_sound_files ();
343
344         /* Rename .wav.tmp files to .wav */
345         for (int i = 0; i < _film->audio_channels(); ++i) {
346                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
347                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
348                 }
349                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
350         }
351 }
352
353 void
354 J2KWAVEncoder::do_process_audio (shared_ptr<const AudioBuffers> audio)
355 {
356         shared_ptr<AudioBuffers> resampled;
357         
358 #if HAVE_SWRESAMPLE
359         /* Maybe sample-rate convert */
360         if (_swr_context) {
361
362                 /* Compute the resampled frames count and add 32 for luck */
363                 int const max_resampled_frames = ceil (audio->frames() * _film->target_audio_sample_rate() / _film->audio_sample_rate()) + 32;
364
365                 resampled.reset (new AudioBuffers (_film->audio_channels(), max_resampled_frames));
366
367                 /* Resample audio */
368                 int const resampled_frames = swr_convert (
369                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) audio->data(), audio->frames()
370                         );
371                 
372                 if (resampled_frames < 0) {
373                         throw EncodeError ("could not run sample-rate converter");
374                 }
375
376                 resampled->set_frames (resampled_frames);
377                 
378                 /* And point our variables at the resampled audio */
379                 audio = resampled;
380         }
381 #endif
382
383         write_audio (audio);
384 }
385
386 void
387 J2KWAVEncoder::write_audio (shared_ptr<const AudioBuffers> audio)
388 {
389         for (int i = 0; i < _film->audio_channels(); ++i) {
390                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
391         }
392
393         _audio_frames_written += audio->frames ();
394 }
395