Tidy up timing code a bit.
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <boost/lexical_cast.hpp>
31 #include <sndfile.h>
32 #include <openjpeg.h>
33 #include "j2k_wav_encoder.h"
34 #include "config.h"
35 #include "film_state.h"
36 #include "options.h"
37 #include "exceptions.h"
38 #include "dcp_video_frame.h"
39 #include "server.h"
40 #include "filter.h"
41 #include "log.h"
42 #include "cross.h"
43
44 using namespace std;
45 using namespace boost;
46
47 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
48         : Encoder (s, o, l)
49         , _deinterleave_buffer_size (8192)
50         , _deinterleave_buffer (0)
51         , _process_end (false)
52 {
53         /* Create sound output files with .tmp suffixes; we will rename
54            them if and when we complete.
55         */
56         for (int i = 0; i < _fs->audio_channels; ++i) {
57                 SF_INFO sf_info;
58                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
59                 /* We write mono files */
60                 sf_info.channels = 1;
61                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
62                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
63                 if (f == 0) {
64                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
65                 }
66                 _sound_files.push_back (f);
67         }
68
69         /* Create buffer for deinterleaving audio */
70         _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
71 }
72
73 J2KWAVEncoder::~J2KWAVEncoder ()
74 {
75         terminate_worker_threads ();
76         delete[] _deinterleave_buffer;
77         close_sound_files ();
78 }
79
80 void
81 J2KWAVEncoder::terminate_worker_threads ()
82 {
83         boost::mutex::scoped_lock lock (_worker_mutex);
84         _process_end = true;
85         _worker_condition.notify_all ();
86         lock.unlock ();
87
88         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
89                 (*i)->join ();
90                 delete *i;
91         }
92 }
93
94 void
95 J2KWAVEncoder::close_sound_files ()
96 {
97         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
98                 sf_close (*i);
99         }
100
101         _sound_files.clear ();
102 }       
103
104 void
105 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
106 {
107         boost::mutex::scoped_lock lock (_worker_mutex);
108
109         /* Wait until the queue has gone down a bit */
110         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
111                 TIMING ("decoder sleeps with queue of %1", _queue.size());
112                 _worker_condition.wait (lock);
113                 TIMING ("decoder wakes with queue of %1", _queue.size());
114         }
115
116         if (_process_end) {
117                 return;
118         }
119
120         /* Only do the processing if we don't already have a file for this frame */
121         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
122                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
123                 TIMING ("adding to queue of %1", _queue.size ());
124                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
125                                           new DCPVideoFrame (
126                                                   yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
127                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
128                                                   _log
129                                                   )
130                                           ));
131                 
132                 _worker_condition.notify_all ();
133         } else {
134                 frame_skipped ();
135         }
136 }
137
138 void
139 J2KWAVEncoder::encoder_thread (ServerDescription* server)
140 {
141         /* Number of seconds that we currently wait between attempts
142            to connect to the server; not relevant for localhost
143            encodings.
144         */
145         int remote_backoff = 0;
146         
147         while (1) {
148
149                 TIMING ("encoder thread %1 sleeps", pthread_self ());
150                 boost::mutex::scoped_lock lock (_worker_mutex);
151                 while (_queue.empty () && !_process_end) {
152                         _worker_condition.wait (lock);
153                 }
154
155                 if (_process_end) {
156                         return;
157                 }
158
159                 TIMING ("encoder thread %1 wakes with queue of %2", pthread_self(), _queue.size());
160                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
161                 _queue.pop_front ();
162                 
163                 lock.unlock ();
164
165                 shared_ptr<EncodedData> encoded;
166
167                 if (server) {
168                         try {
169                                 encoded = vf->encode_remotely (server);
170
171                                 if (remote_backoff > 0) {
172                                         stringstream s;
173                                         s << server->host_name() << " was lost, but now she is found; removing backoff";
174                                         _log->log (s.str ());
175                                 }
176                                 
177                                 /* This job succeeded, so remove any backoff */
178                                 remote_backoff = 0;
179                                 
180                         } catch (std::exception& e) {
181                                 if (remote_backoff < 60) {
182                                         /* back off more */
183                                         remote_backoff += 10;
184                                 }
185                                 stringstream s;
186                                 s << "Remote encode of " << vf->frame() << " on " << server->host_name() << " failed (" << e.what() << "); thread sleeping for " << remote_backoff << "s.";
187                                 _log->log (s.str ());
188                         }
189                                 
190                 } else {
191                         try {
192                                 TIMING ("encoder thread %1 begins local encode of %2", pthread_self(), vf->frame());
193                                 encoded = vf->encode_locally ();
194                                 TIMING ("encoder thread %1 finishes local encode of %2", pthread_self(), vf->frame());
195                         } catch (std::exception& e) {
196                                 stringstream s;
197                                 s << "Local encode failed " << e.what() << ".";
198                                 _log->log (s.str ());
199                         }
200                 }
201
202                 if (encoded) {
203                         encoded->write (_opt, vf->frame ());
204                         frame_done (vf->frame ());
205                 } else {
206                         lock.lock ();
207                         _queue.push_front (vf);
208                         lock.unlock ();
209                 }
210
211                 if (remote_backoff > 0) {
212                         dvdomatic_sleep (remote_backoff);
213                 }
214
215                 lock.lock ();
216                 _worker_condition.notify_all ();
217         }
218 }
219
220 void
221 J2KWAVEncoder::process_begin ()
222 {
223         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
224                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
225         }
226
227         vector<ServerDescription*> servers = Config::instance()->servers ();
228
229         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
230                 for (int j = 0; j < (*i)->threads (); ++j) {
231                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
232                 }
233         }
234 }
235
236 void
237 J2KWAVEncoder::process_end ()
238 {
239         boost::mutex::scoped_lock lock (_worker_mutex);
240
241         _log->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
242
243         /* Keep waking workers until the queue is empty */
244         while (!_queue.empty ()) {
245                 _log->log ("Waking with " + lexical_cast<string> (_queue.size ()));
246                 _worker_condition.notify_all ();
247                 _worker_condition.wait (lock);
248         }
249
250         lock.unlock ();
251         
252         terminate_worker_threads ();
253
254         _log->log ("Mopping up " + lexical_cast<string> (_queue.size()));
255
256         /* The following sequence of events can occur in the above code:
257              1. a remote worker takes the last image off the queue
258              2. the loop above terminates
259              3. the remote worker fails to encode the image and puts it back on the queue
260              4. the remote worker is then terminated by terminate_worker_threads
261
262              So just mop up anything left in the queue here.
263         */
264
265         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
266                 stringstream s;
267                 s << "Encode left-over frame " << (*i)->frame();
268                 _log->log (s.str ());
269                 try {
270                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
271                         e->write (_opt, (*i)->frame ());
272                         frame_done ((*i)->frame ());
273                 } catch (std::exception& e) {
274                         stringstream s;
275                         s << "Local encode failed " << e.what() << ".";
276                         _log->log (s.str ());
277                 }
278         }
279         
280         close_sound_files ();
281
282         /* Rename .wav.tmp files to .wav */
283         for (int i = 0; i < _fs->audio_channels; ++i) {
284                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
285                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
286                 }
287                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
288         }
289 }
290
291 void
292 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
293 {
294         /* Size of a sample in bytes */
295         int const sample_size = 2;
296         
297         /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
298            of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
299         */
300         
301         /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
302         
303         /* Number of bytes left to read this time */
304         int remaining = data_size;
305         /* Our position in the output buffers, in bytes */
306         int position = 0;
307         while (remaining > 0) {
308                 /* How many bytes of the deinterleaved data to do this time */
309                 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
310                 for (int i = 0; i < _fs->audio_channels; ++i) {
311                         for (int j = 0; j < this_time; j += sample_size) {
312                                 for (int k = 0; k < sample_size; ++k) {
313                                         int const to = j + k;
314                                         int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
315                                         _deinterleave_buffer[to] = data[from];
316                                 }
317                         }
318                         
319                         switch (_fs->audio_sample_format) {
320                         case AV_SAMPLE_FMT_S16:
321                                 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
322                                 break;
323                         default:
324                                 throw DecodeError ("unknown audio sample format");
325                         }
326                 }
327                 
328                 position += this_time;
329                 remaining -= this_time * _fs->audio_channels;
330         }
331 }