Improve transcode job progress reporting.
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <sndfile.h>
31 #include <openjpeg.h>
32 #include "j2k_wav_encoder.h"
33 #include "config.h"
34 #include "film_state.h"
35 #include "options.h"
36 #include "exceptions.h"
37 #include "dcp_video_frame.h"
38 #include "server.h"
39 #include "filter.h"
40 #include "log.h"
41 #include "cross.h"
42
43 using namespace std;
44 using namespace boost;
45
46 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
47         : Encoder (s, o, l)
48         , _deinterleave_buffer_size (8192)
49         , _deinterleave_buffer (0)
50         , _process_end (false)
51 {
52         /* Create sound output files with .tmp suffixes; we will rename
53            them if and when we complete.
54         */
55         for (int i = 0; i < _fs->audio_channels; ++i) {
56                 SF_INFO sf_info;
57                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
58                 /* We write mono files */
59                 sf_info.channels = 1;
60                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
61                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
62                 if (f == 0) {
63                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
64                 }
65                 _sound_files.push_back (f);
66         }
67
68         /* Create buffer for deinterleaving audio */
69         _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
70 }
71
72 J2KWAVEncoder::~J2KWAVEncoder ()
73 {
74         terminate_worker_threads ();
75         delete[] _deinterleave_buffer;
76         close_sound_files ();
77 }
78
79 void
80 J2KWAVEncoder::terminate_worker_threads ()
81 {
82         boost::mutex::scoped_lock lock (_worker_mutex);
83         _process_end = true;
84         _worker_condition.notify_all ();
85         lock.unlock ();
86
87         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
88                 (*i)->join ();
89                 delete *i;
90         }
91 }
92
93 void
94 J2KWAVEncoder::close_sound_files ()
95 {
96         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
97                 sf_close (*i);
98         }
99
100         _sound_files.clear ();
101 }       
102
103 void
104 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
105 {
106         boost::mutex::scoped_lock lock (_worker_mutex);
107
108         /* Wait until the queue has gone down a bit */
109         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
110                 _worker_condition.wait (lock);
111         }
112
113         if (_process_end) {
114                 return;
115         }
116
117         /* Only do the processing if we don't already have a file for this frame */
118         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
119                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
120                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
121                                           new DCPVideoFrame (
122                                                   yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
123                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
124                                                   _log
125                                                   )
126                                           ));
127                 
128                 _worker_condition.notify_all ();
129         } else {
130                 frame_skipped ();
131         }
132 }
133
134 void
135 J2KWAVEncoder::encoder_thread (ServerDescription* server)
136 {
137         /* Number of seconds that we currently wait between attempts
138            to connect to the server; not relevant for localhost
139            encodings.
140         */
141         int remote_backoff = 0;
142         
143         while (1) {
144                 boost::mutex::scoped_lock lock (_worker_mutex);
145                 while (_queue.empty () && !_process_end) {
146                         _worker_condition.wait (lock);
147                 }
148
149                 if (_process_end) {
150                         return;
151                 }
152
153                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
154                 _queue.pop_front ();
155                 
156                 lock.unlock ();
157
158                 shared_ptr<EncodedData> encoded;
159
160                 if (server) {
161                         try {
162                                 encoded = vf->encode_remotely (server);
163
164                                 if (remote_backoff > 0) {
165                                         stringstream s;
166                                         s << server->host_name() << " was lost, but now she is found; removing backoff";
167                                         _log->log (s.str ());
168                                 }
169                                 
170                                 /* This job succeeded, so remove any backoff */
171                                 remote_backoff = 0;
172                                 
173                         } catch (std::exception& e) {
174                                 if (remote_backoff < 60) {
175                                         /* back off more */
176                                         remote_backoff += 10;
177                                 }
178                                 stringstream s;
179                                 s << "Remote encode of " << vf->frame() << " on " << server->host_name() << " failed (" << e.what() << "); thread sleeping for " << remote_backoff << "s.";
180                                 _log->log (s.str ());
181                         }
182                                 
183                 } else {
184                         try {
185                                 encoded = vf->encode_locally ();
186                         } catch (std::exception& e) {
187                                 stringstream s;
188                                 s << "Local encode failed " << e.what() << ".";
189                                 _log->log (s.str ());
190                         }
191                 }
192
193                 if (encoded) {
194                         encoded->write (_opt, vf->frame ());
195                         frame_done (vf->frame ());
196                 } else {
197                         lock.lock ();
198                         _queue.push_front (vf);
199                         lock.unlock ();
200                 }
201
202                 if (remote_backoff > 0) {
203                         dvdomatic_sleep (remote_backoff);
204                 }
205
206                 lock.lock ();
207                 _worker_condition.notify_all ();
208         }
209 }
210
211 void
212 J2KWAVEncoder::process_begin ()
213 {
214         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
215                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (ServerDescription *) 0)));
216         }
217
218         vector<ServerDescription*> servers = Config::instance()->servers ();
219
220         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
221                 for (int j = 0; j < (*i)->threads (); ++j) {
222                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
223                 }
224         }
225 }
226
227 void
228 J2KWAVEncoder::process_end ()
229 {
230         boost::mutex::scoped_lock lock (_worker_mutex);
231
232         /* Keep waking workers until the queue is empty */
233         while (!_queue.empty ()) {
234                 _worker_condition.notify_all ();
235                 _worker_condition.wait (lock);
236         }
237
238         lock.unlock ();
239         
240         terminate_worker_threads ();
241
242         /* The following sequence of events can occur in the above code:
243              1. a remote worker takes the last image off the queue
244              2. the loop above terminates
245              3. the remote worker fails to encode the image and puts it back on the queue
246              4. the remote worker is then terminated by terminate_worker_threads
247
248              So just mop up anything left in the queue here.
249         */
250
251         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
252                 stringstream s;
253                 s << "Encode left-over frame " << (*i)->frame();
254                 _log->log (s.str ());
255                 try {
256                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
257                         e->write (_opt, (*i)->frame ());
258                         frame_done ((*i)->frame ());
259                 } catch (std::exception& e) {
260                         stringstream s;
261                         s << "Local encode failed " << e.what() << ".";
262                         _log->log (s.str ());
263                 }
264         }
265         
266         close_sound_files ();
267
268         /* Rename .wav.tmp files to .wav */
269         for (int i = 0; i < _fs->audio_channels; ++i) {
270                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
271                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
272                 }
273                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
274         }
275 }
276
277 void
278 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
279 {
280         /* Size of a sample in bytes */
281         int const sample_size = 2;
282         
283         /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
284            of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
285         */
286         
287         /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
288         
289         /* Number of bytes left to read this time */
290         int remaining = data_size;
291         /* Our position in the output buffers, in bytes */
292         int position = 0;
293         while (remaining > 0) {
294                 /* How many bytes of the deinterleaved data to do this time */
295                 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
296                 for (int i = 0; i < _fs->audio_channels; ++i) {
297                         for (int j = 0; j < this_time; j += sample_size) {
298                                 for (int k = 0; k < sample_size; ++k) {
299                                         int const to = j + k;
300                                         int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
301                                         _deinterleave_buffer[to] = data[from];
302                                 }
303                         }
304                         
305                         switch (_fs->audio_sample_format) {
306                         case AV_SAMPLE_FMT_S16:
307                                 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
308                                 break;
309                         default:
310                                 throw DecodeError ("unknown audio sample format");
311                         }
312                 }
313                 
314                 position += this_time;
315                 remaining -= this_time * _fs->audio_channels;
316         }
317 }