2f29f9021d2ba7561e53bb8406df0f5c35f6ba1a
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <sndfile.h>
31 #include <openjpeg.h>
32 #include "j2k_wav_encoder.h"
33 #include "config.h"
34 #include "film_state.h"
35 #include "options.h"
36 #include "exceptions.h"
37 #include "dcp_video_frame.h"
38 #include "server.h"
39 #include "filter.h"
40 #include "log.h"
41 #include "cross.h"
42
43 using namespace std;
44 using namespace boost;
45
46 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
47         : Encoder (s, o, l)
48         , _deinterleave_buffer_size (8192)
49         , _deinterleave_buffer (0)
50         , _process_end (false)
51 {
52         /* Create sound output files with .tmp suffixes; we will rename
53            them if and when we complete.
54         */
55         for (int i = 0; i < _fs->audio_channels; ++i) {
56                 SF_INFO sf_info;
57                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
58                 /* We write mono files */
59                 sf_info.channels = 1;
60                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
61                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
62                 if (f == 0) {
63                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
64                 }
65                 _sound_files.push_back (f);
66         }
67
68         /* Create buffer for deinterleaving audio */
69         _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
70 }
71
72 J2KWAVEncoder::~J2KWAVEncoder ()
73 {
74         terminate_worker_threads ();
75         delete[] _deinterleave_buffer;
76         close_sound_files ();
77 }
78
79 void
80 J2KWAVEncoder::terminate_worker_threads ()
81 {
82         boost::mutex::scoped_lock lock (_worker_mutex);
83         _process_end = true;
84         _worker_condition.notify_all ();
85         lock.unlock ();
86
87         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
88                 (*i)->join ();
89                 delete *i;
90         }
91 }
92
93 void
94 J2KWAVEncoder::close_sound_files ()
95 {
96         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
97                 sf_close (*i);
98         }
99
100         _sound_files.clear ();
101 }       
102
103 void
104 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
105 {
106         boost::mutex::scoped_lock lock (_worker_mutex);
107
108         /* Wait until the queue has gone down a bit */
109         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
110                 _worker_condition.wait (lock);
111         }
112
113         if (_process_end) {
114                 return;
115         }
116
117         /* Only do the processing if we don't already have a file for this frame */
118         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
119                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
120                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
121                                           new DCPVideoFrame (
122                                                   yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
123                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
124                                                   _log
125                                                   )
126                                           ));
127                 
128                 _worker_condition.notify_all ();
129         }
130 }
131
132 void
133 J2KWAVEncoder::encoder_thread (Server* server)
134 {
135         /* Number of seconds that we currently wait between attempts
136            to connect to the server; not relevant for localhost
137            encodings.
138         */
139         int remote_backoff = 0;
140         
141         while (1) {
142                 boost::mutex::scoped_lock lock (_worker_mutex);
143                 while (_queue.empty () && !_process_end) {
144                         _worker_condition.wait (lock);
145                 }
146
147                 if (_process_end) {
148                         return;
149                 }
150
151                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
152                 _queue.pop_front ();
153                 
154                 lock.unlock ();
155
156                 shared_ptr<EncodedData> encoded;
157
158                 if (server) {
159                         try {
160                                 encoded = vf->encode_remotely (server);
161
162                                 if (remote_backoff > 0) {
163                                         stringstream s;
164                                         s << server->host_name() << " was lost, but now she is found; removing backoff";
165                                         _log->log (s.str ());
166                                 }
167                                 
168                                 /* This job succeeded, so remove any backoff */
169                                 remote_backoff = 0;
170                                 
171                         } catch (std::exception& e) {
172                                 if (remote_backoff < 60) {
173                                         /* back off more */
174                                         remote_backoff += 10;
175                                 }
176                                 stringstream s;
177                                 s << "Remote encode of " << vf->frame() << " on " << server->host_name() << " failed (" << e.what() << "); thread sleeping for " << remote_backoff << "s.";
178                                 _log->log (s.str ());
179                         }
180                                 
181                 } else {
182                         try {
183                                 encoded = vf->encode_locally ();
184                         } catch (std::exception& e) {
185                                 stringstream s;
186                                 s << "Local encode failed " << e.what() << ".";
187                                 _log->log (s.str ());
188                         }
189                 }
190
191                 if (encoded) {
192                         encoded->write (_opt, vf->frame ());
193                         frame_done ();
194                 } else {
195                         lock.lock ();
196                         _queue.push_front (vf);
197                         lock.unlock ();
198                 }
199
200                 if (remote_backoff > 0) {
201                         dvdomatic_sleep (remote_backoff);
202                 }
203
204                 lock.lock ();
205                 _worker_condition.notify_all ();
206         }
207 }
208
209 void
210 J2KWAVEncoder::process_begin ()
211 {
212         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
213                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (Server *) 0)));
214         }
215
216         vector<Server*> servers = Config::instance()->servers ();
217
218         for (vector<Server*>::iterator i = servers.begin(); i != servers.end(); ++i) {
219                 for (int j = 0; j < (*i)->threads (); ++j) {
220                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
221                 }
222         }
223 }
224
225 void
226 J2KWAVEncoder::process_end ()
227 {
228         boost::mutex::scoped_lock lock (_worker_mutex);
229
230         /* Keep waking workers until the queue is empty */
231         while (!_queue.empty ()) {
232                 _worker_condition.notify_all ();
233                 _worker_condition.wait (lock);
234         }
235
236         lock.unlock ();
237         
238         terminate_worker_threads ();
239
240         /* The following sequence of events can occur in the above code:
241              1. a remote worker takes the last image off the queue
242              2. the loop above terminates
243              3. the remote worker fails to encode the image and puts it back on the queue
244              4. the remote worker is then terminated by terminate_worker_threads
245
246              So just mop up anything left in the queue here.
247         */
248
249         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
250                 stringstream s;
251                 s << "Encode left-over frame " << (*i)->frame();
252                 _log->log (s.str ());
253                 try {
254                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
255                         e->write (_opt, (*i)->frame ());
256                         frame_done ();
257                 } catch (std::exception& e) {
258                         stringstream s;
259                         s << "Local encode failed " << e.what() << ".";
260                         _log->log (s.str ());
261                 }
262         }
263         
264         close_sound_files ();
265
266         /* Rename .wav.tmp files to .wav */
267         for (int i = 0; i < _fs->audio_channels; ++i) {
268                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
269                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
270                 }
271                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
272         }
273 }
274
275 void
276 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
277 {
278         /* Size of a sample in bytes */
279         int const sample_size = 2;
280         
281         /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
282            of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
283         */
284         
285         /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
286         
287         /* Number of bytes left to read this time */
288         int remaining = data_size;
289         /* Our position in the output buffers, in bytes */
290         int position = 0;
291         while (remaining > 0) {
292                 /* How many bytes of the deinterleaved data to do this time */
293                 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
294                 for (int i = 0; i < _fs->audio_channels; ++i) {
295                         for (int j = 0; j < this_time; j += sample_size) {
296                                 for (int k = 0; k < sample_size; ++k) {
297                                         int const to = j + k;
298                                         int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
299                                         _deinterleave_buffer[to] = data[from];
300                                 }
301                         }
302                         
303                         switch (_fs->audio_sample_format) {
304                         case AV_SAMPLE_FMT_S16:
305                                 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
306                                 break;
307                         default:
308                                 throw DecodeError ("unknown audio sample format");
309                         }
310                 }
311                 
312                 position += this_time;
313                 remaining -= this_time * _fs->audio_channels;
314         }
315 }