Some more timing logging.
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "film_state.h"
36 #include "options.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
39 #include "server.h"
40 #include "filter.h"
41 #include "log.h"
42 #include "cross.h"
43
44 using namespace std;
45 using namespace boost;
46
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
48         : Encoder (s, o, l)
49         , _deinterleave_buffer_size (8192)
50         , _deinterleave_buffer (0)
51         , _process_end (false)
52 {
53         /* Create sound output files with .tmp suffixes; we will rename
54            them if and when we complete.
55         */
56         for (int i = 0; i < _fs->audio_channels; ++i) {
57                 SF_INFO sf_info;
58                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
59                 /* We write mono files */
60                 sf_info.channels = 1;
61                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
62                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
63                 if (f == 0) {
64                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
65                 }
66                 _sound_files.push_back (f);
67         }
68
69         /* Create buffer for deinterleaving audio */
70         _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
71 }
72
73 J2KWAVEncoder::~J2KWAVEncoder ()
74 {
75         terminate_worker_threads ();
76         delete[] _deinterleave_buffer;
77         close_sound_files ();
78 }
79
80 void
81 J2KWAVEncoder::terminate_worker_threads ()
82 {
83         boost::mutex::scoped_lock lock (_worker_mutex);
84         _process_end = true;
85         _worker_condition.notify_all ();
86         lock.unlock ();
87
88         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
89                 (*i)->join ();
90                 delete *i;
91         }
92 }
93
94 void
95 J2KWAVEncoder::close_sound_files ()
96 {
97         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
98                 sf_close (*i);
99         }
100
101         _sound_files.clear ();
102 }       
103
104 void
105 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
106 {
107         boost::mutex::scoped_lock lock (_worker_mutex);
108
109         /* Wait until the queue has gone down a bit */
110         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
111                 _log->microsecond_log ("Decoder sleeps", Log::TIMING);
112                 _worker_condition.wait (lock);
113                 _log->microsecond_log ("Decoder wakes", Log::TIMING);
114         }
115
116         if (_process_end) {
117                 return;
118         }
119
120         /* Only do the processing if we don't already have a file for this frame */
121         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
122                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
123                 _log->microsecond_log ("Adding to queue of " + boost::lexical_cast<string> (_queue.size ()), Log::TIMING);
124                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
125                                           new DCPVideoFrame (
126                                                   yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
127                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
128                                                   _log
129                                                   )
130                                           ));
131                 
132                 _worker_condition.notify_all ();
133         } else {
134                 frame_skipped ();
135         }
136 }
137
138 void
139 J2KWAVEncoder::encoder_thread (ServerDescription* server)
140 {
141         /* Number of seconds that we currently wait between attempts
142            to connect to the server; not relevant for localhost
143            encodings.
144         */
145         int remote_backoff = 0;
146         
147         while (1) {
148
149                 {
150                         stringstream s;
151                         s << "Encoder thread " << pthread_self() << " sleeps.";
152                         _log->microsecond_log (s.str(), Log::TIMING);
153                 }
154                 
155                 boost::mutex::scoped_lock lock (_worker_mutex);
156                 while (_queue.empty () && !_process_end) {
157                         _worker_condition.wait (lock);
158                 }
159
160                 if (_process_end) {
161                         return;
162                 }
163
164                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
165
166                 {
167                         stringstream s;
168                         s << "Encoder thread " << pthread_self() << " wakes with queue of " << _queue.size();
169                         _log->microsecond_log (s.str(), Log::TIMING);
170                 }
171                 
172                 _queue.pop_front ();
173                 
174                 lock.unlock ();
175
176                 shared_ptr<EncodedData> encoded;
177
178                 if (server) {
179                         try {
180                                 encoded = vf->encode_remotely (server);
181
182                                 if (remote_backoff > 0) {
183                                         stringstream s;
184                                         s << server->host_name() << " was lost, but now she is found; removing backoff";
185                                         _log->log (s.str ());
186                                 }
187                                 
188                                 /* This job succeeded, so remove any backoff */
189                                 remote_backoff = 0;
190                                 
191                         } catch (std::exception& e) {
192                                 if (remote_backoff < 60) {
193                                         /* back off more */
194                                         remote_backoff += 10;
195                                 }
196                                 stringstream s;
197                                 s << "Remote encode of " << vf->frame() << " on " << server->host_name() << " failed (" << e.what() << "); thread sleeping for " << remote_backoff << "s.";
198                                 _log->log (s.str ());
199                         }
200                                 
201                 } else {
202                         try {
203                                 {
204                                         stringstream s;
205                                         s << "Encoder thread " << pthread_self() << " begins local encode of " << vf->frame();
206                                         _log->microsecond_log (s.str(), Log::TIMING);
207                                 }
208
209                                 encoded = vf->encode_locally ();
210
211                                 {
212                                         stringstream s;
213                                         s << "Encoder thread " << pthread_self() << " finishes local encode of " << vf->frame();
214                                         _log->microsecond_log (s.str(), Log::TIMING);
215                                 }
216                         } catch (std::exception& e) {
217                                 stringstream s;
218                                 s << "Local encode failed " << e.what() << ".";
219                                 _log->log (s.str ());
220                         }
221                 }
222
223                 if (encoded) {
224                         encoded->write (_opt, vf->frame ());
225                         frame_done (vf->frame ());
226                 } else {
227                         lock.lock ();
228                         _queue.push_front (vf);
229                         lock.unlock ();
230                 }
231
232                 if (remote_backoff > 0) {
233                         dvdomatic_sleep (remote_backoff);
234                 }
235
236                 lock.lock ();
237                 _worker_condition.notify_all ();
238         }
239 }
240
241 void
242 J2KWAVEncoder::process_begin ()
243 {
244         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
245                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
246         }
247
248         vector<ServerDescription*> servers = Config::instance()->servers ();
249
250         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
251                 for (int j = 0; j < (*i)->threads (); ++j) {
252                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
253                 }
254         }
255 }
256
257 void
258 J2KWAVEncoder::process_end ()
259 {
260         boost::mutex::scoped_lock lock (_worker_mutex);
261
262         _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
263
264         /* Keep waking workers until the queue is empty */
265         while (!_queue.empty ()) {
266                 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
267                 _worker_condition.notify_all ();
268                 _worker_condition.wait (lock);
269         }
270
271         lock.unlock ();
272         
273         terminate_worker_threads ();
274
275         _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
276
277         /* The following sequence of events can occur in the above code:
278              1. a remote worker takes the last image off the queue
279              2. the loop above terminates
280              3. the remote worker fails to encode the image and puts it back on the queue
281              4. the remote worker is then terminated by terminate_worker_threads
282
283              So just mop up anything left in the queue here.
284         */
285
286         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
287                 stringstream s;
288                 s << "Encode left-over frame " << (*i)->frame();
289                 _log->log (s.str ());
290                 try {
291                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
292                         e->write (_opt, (*i)->frame ());
293                         frame_done ((*i)->frame ());
294                 } catch (std::exception& e) {
295                         stringstream s;
296                         s << "Local encode failed " << e.what() << ".";
297                         _log->log (s.str ());
298                 }
299         }
300         
301         close_sound_files ();
302
303         /* Rename .wav.tmp files to .wav */
304         for (int i = 0; i < _fs->audio_channels; ++i) {
305                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
306                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
307                 }
308                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
309         }
310 }
311
312 void
313 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
314 {
315         /* Size of a sample in bytes */
316         int const sample_size = 2;
317         
318         /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
319            of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
320         */
321         
322         /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
323         
324         /* Number of bytes left to read this time */
325         int remaining = data_size;
326         /* Our position in the output buffers, in bytes */
327         int position = 0;
328         while (remaining > 0) {
329                 /* How many bytes of the deinterleaved data to do this time */
330                 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
331                 for (int i = 0; i < _fs->audio_channels; ++i) {
332                         for (int j = 0; j < this_time; j += sample_size) {
333                                 for (int k = 0; k < sample_size; ++k) {
334                                         int const to = j + k;
335                                         int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
336                                         _deinterleave_buffer[to] = data[from];
337                                 }
338                         }
339                         
340                         switch (_fs->audio_sample_format) {
341                         case AV_SAMPLE_FMT_S16:
342                                 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
343                                 break;
344                         default:
345                                 throw DecodeError ("unknown audio sample format");
346                         }
347                 }
348                 
349                 position += this_time;
350                 remaining -= this_time * _fs->audio_channels;
351         }
352 }