Move things round a bit.
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <sndfile.h>
31 #include <openjpeg.h>
32 #include "j2k_wav_encoder.h"
33 #include "config.h"
34 #include "film_state.h"
35 #include "options.h"
36 #include "exceptions.h"
37 #include "dcp_video_frame.h"
38 #include "server.h"
39 #include "filter.h"
40 #include "log.h"
41
42 using namespace std;
43 using namespace boost;
44
45 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
46         : Encoder (s, o, l)
47         , _deinterleave_buffer_size (8192)
48         , _deinterleave_buffer (0)
49         , _process_end (false)
50 {
51         /* Create sound output files with .tmp suffixes; we will rename
52            them if and when we complete.
53         */
54         for (int i = 0; i < _fs->audio_channels; ++i) {
55                 SF_INFO sf_info;
56                 sf_info.samplerate = _fs->audio_sample_rate;
57                 /* We write mono files */
58                 sf_info.channels = 1;
59                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
60                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
61                 if (f == 0) {
62                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
63                 }
64                 _sound_files.push_back (f);
65         }
66
67         /* Create buffer for deinterleaving audio */
68         _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
69 }
70
71 J2KWAVEncoder::~J2KWAVEncoder ()
72 {
73         terminate_worker_threads ();
74         delete[] _deinterleave_buffer;
75         close_sound_files ();
76 }
77
78 void
79 J2KWAVEncoder::terminate_worker_threads ()
80 {
81         boost::mutex::scoped_lock lock (_worker_mutex);
82         _process_end = true;
83         _worker_condition.notify_all ();
84         lock.unlock ();
85
86         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
87                 (*i)->join ();
88                 delete *i;
89         }
90 }
91
92 void
93 J2KWAVEncoder::close_sound_files ()
94 {
95         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
96                 sf_close (*i);
97         }
98
99         _sound_files.clear ();
100 }       
101
102 void
103 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
104 {
105         boost::mutex::scoped_lock lock (_worker_mutex);
106
107         /* Wait until the queue has gone down a bit */
108         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
109                 _worker_condition.wait (lock);
110         }
111
112         if (_process_end) {
113                 return;
114         }
115
116         /* Only do the processing if we don't already have a file for this frame */
117         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
118                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
119                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
120                                           new DCPVideoFrame (
121                                                   yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
122                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
123                                                   _log
124                                                   )
125                                           ));
126                 
127                 _worker_condition.notify_all ();
128         }
129 }
130
131 void
132 J2KWAVEncoder::encoder_thread (Server* server)
133 {
134         /* Number of seconds that we currently wait between attempts
135            to connect to the server; not relevant for localhost
136            encodings.
137         */
138         int remote_backoff = 0;
139         
140         while (1) {
141                 boost::mutex::scoped_lock lock (_worker_mutex);
142                 while (_queue.empty () && !_process_end) {
143                         _worker_condition.wait (lock);
144                 }
145
146                 if (_process_end) {
147                         return;
148                 }
149
150                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
151                 _queue.pop_front ();
152                 
153                 lock.unlock ();
154
155                 shared_ptr<EncodedData> encoded;
156
157                 if (server) {
158                         try {
159                                 encoded = vf->encode_remotely (server);
160
161                                 if (remote_backoff > 0) {
162                                         stringstream s;
163                                         s << server->host_name() << " was lost, but now she is found; removing backoff";
164                                         _log->log (s.str ());
165                                 }
166                                 
167                                 /* This job succeeded, so remove any backoff */
168                                 remote_backoff = 0;
169                                 
170                         } catch (std::exception& e) {
171                                 if (remote_backoff < 60) {
172                                         /* back off more */
173                                         remote_backoff += 10;
174                                 }
175                                 stringstream s;
176                                 s << "Remote encode on " << server->host_name() << " failed (" << e.what() << "); thread sleeping for " << remote_backoff << "s.";
177                                 _log->log (s.str ());
178                         }
179                                 
180                 } else {
181                         try {
182                                 encoded = vf->encode_locally ();
183                         } catch (std::exception& e) {
184                                 stringstream s;
185                                 s << "Local encode failed " << e.what() << ".";
186                                 _log->log (s.str ());
187                         }
188                 }
189
190                 if (encoded) {
191                         encoded->write (_opt, vf->frame ());
192                         frame_done ();
193                 } else {
194                         lock.lock ();
195                         _queue.push_front (vf);
196                         lock.unlock ();
197                 }
198
199                 if (remote_backoff > 0) {
200                         sleep (remote_backoff);
201                 }
202
203                 lock.lock ();
204                 _worker_condition.notify_all ();
205         }
206 }
207
208 void
209 J2KWAVEncoder::process_begin ()
210 {
211         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
212                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (Server *) 0)));
213         }
214
215         vector<Server*> servers = Config::instance()->servers ();
216
217         for (vector<Server*>::iterator i = servers.begin(); i != servers.end(); ++i) {
218                 for (int j = 0; j < (*i)->threads (); ++j) {
219                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
220                 }
221         }
222 }
223
224 void
225 J2KWAVEncoder::process_end ()
226 {
227         boost::mutex::scoped_lock lock (_worker_mutex);
228
229         /* Keep waking workers until the queue is empty */
230         while (!_queue.empty ()) {
231                 _worker_condition.notify_all ();
232                 _worker_condition.wait (lock);
233         }
234
235         lock.unlock ();
236         
237         terminate_worker_threads ();
238         close_sound_files ();
239
240         /* Rename .wav.tmp files to .wav */
241         for (int i = 0; i < _fs->audio_channels; ++i) {
242                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
243                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
244                 }
245                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
246         }
247 }
248
249 void
250 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
251 {
252         /* Size of a sample in bytes */
253         int const sample_size = 2;
254         
255         /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
256            of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
257         */
258         
259         /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
260         
261         /* Number of bytes left to read this time */
262         int remaining = data_size;
263         /* Our position in the output buffers, in bytes */
264         int position = 0;
265         while (remaining > 0) {
266                 /* How many bytes of the deinterleaved data to do this time */
267                 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
268                 for (int i = 0; i < _fs->audio_channels; ++i) {
269                         for (int j = 0; j < this_time; j += sample_size) {
270                                 for (int k = 0; k < sample_size; ++k) {
271                                         int const to = j + k;
272                                         int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
273                                         _deinterleave_buffer[to] = data[from];
274                                 }
275                         }
276                         
277                         switch (_fs->audio_sample_format) {
278                         case AV_SAMPLE_FMT_S16:
279                                 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
280                                 break;
281                         default:
282                                 throw DecodeError ("unknown audio sample format");
283                         }
284                 }
285                 
286                 position += this_time;
287                 remaining -= this_time * _fs->audio_channels;
288         }
289 }