Implemented but faulty.
[dcpomatic.git] / src / lib / j2k_wav_encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file  src/j2k_wav_encoder.cc
21  *  @brief An encoder which writes JPEG2000 and WAV files.
22  */
23
24 #include <sstream>
25 #include <stdexcept>
26 #include <iomanip>
27 #include <iostream>
28 #include <boost/thread.hpp>
29 #include <boost/filesystem.hpp>
30 #include <sndfile.h>
31 #include <openjpeg.h>
32 #include "j2k_wav_encoder.h"
33 #include "config.h"
34 #include "film_state.h"
35 #include "options.h"
36 #include "exceptions.h"
37 #include "dcp_video_frame.h"
38 #include "server.h"
39 #include "filter.h"
40 #include "log.h"
41 #include "cross.h"
42
43 using namespace std;
44 using namespace boost;
45
46 J2KWAVEncoder::J2KWAVEncoder (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
47         : Encoder (s, o, l)
48         , _deinterleave_buffer_size (8192)
49         , _deinterleave_buffer (0)
50         , _process_end (false)
51 {
52         /* Create sound output files with .tmp suffixes; we will rename
53            them if and when we complete.
54         */
55         for (int i = 0; i < _fs->audio_channels; ++i) {
56                 SF_INFO sf_info;
57                 sf_info.samplerate = dcp_audio_sample_rate (_fs->audio_sample_rate);
58                 /* We write mono files */
59                 sf_info.channels = 1;
60                 sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
61                 SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
62                 if (f == 0) {
63                         throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
64                 }
65                 _sound_files.push_back (f);
66         }
67
68         /* Create buffer for deinterleaving audio */
69         _deinterleave_buffer = new uint8_t[_deinterleave_buffer_size];
70 }
71
72 J2KWAVEncoder::~J2KWAVEncoder ()
73 {
74         terminate_worker_threads ();
75         delete[] _deinterleave_buffer;
76         close_sound_files ();
77 }
78
79 void
80 J2KWAVEncoder::terminate_worker_threads ()
81 {
82         boost::mutex::scoped_lock lock (_worker_mutex);
83         _process_end = true;
84         _worker_condition.notify_all ();
85         lock.unlock ();
86
87         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
88                 (*i)->join ();
89                 delete *i;
90         }
91 }
92
93 void
94 J2KWAVEncoder::close_sound_files ()
95 {
96         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
97                 sf_close (*i);
98         }
99
100         _sound_files.clear ();
101 }       
102
103 void
104 J2KWAVEncoder::process_video (shared_ptr<Image> yuv, int frame)
105 {
106         boost::mutex::scoped_lock lock (_worker_mutex);
107
108         /* Wait until the queue has gone down a bit */
109         while (_queue.size() >= _worker_threads.size() * 2 && !_process_end) {
110                 _worker_condition.wait (lock);
111         }
112
113         if (_process_end) {
114                 return;
115         }
116
117         /* Only do the processing if we don't already have a file for this frame */
118         if (!boost::filesystem::exists (_opt->frame_out_path (frame, false))) {
119                 pair<string, string> const s = Filter::ffmpeg_strings (_fs->filters);
120                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
121                                           new DCPVideoFrame (
122                                                   yuv, _opt->out_size, _opt->padding, _fs->scaler, frame, _fs->frames_per_second, s.second,
123                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
124                                                   _log
125                                                   )
126                                           ));
127                 
128                 _worker_condition.notify_all ();
129         }
130 }
131
132 void
133 J2KWAVEncoder::encoder_thread (Server* server)
134 {
135         /* Number of seconds that we currently wait between attempts
136            to connect to the server; not relevant for localhost
137            encodings.
138         */
139         int remote_backoff = 0;
140         
141         while (1) {
142                 boost::mutex::scoped_lock lock (_worker_mutex);
143                 while (_queue.empty () && !_process_end) {
144                         _worker_condition.wait (lock);
145                 }
146
147                 if (_process_end) {
148                         return;
149                 }
150
151                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
152                 _queue.pop_front ();
153                 
154                 lock.unlock ();
155
156                 shared_ptr<EncodedData> encoded;
157
158                 if (server) {
159                         try {
160                                 encoded = vf->encode_remotely (server);
161
162                                 if (remote_backoff > 0) {
163                                         stringstream s;
164                                         s << server->host_name() << " was lost, but now she is found; removing backoff";
165                                         _log->log (s.str ());
166                                 }
167                                 
168                                 /* This job succeeded, so remove any backoff */
169                                 remote_backoff = 0;
170                                 
171                         } catch (std::exception& e) {
172                                 if (remote_backoff < 60) {
173                                         /* back off more */
174                                         remote_backoff += 10;
175                                 }
176                                 stringstream s;
177                                 s << "Remote encode on " << server->host_name() << " failed (" << e.what() << "); thread sleeping for " << remote_backoff << "s.";
178                                 _log->log (s.str ());
179                         }
180                                 
181                 } else {
182                         try {
183                                 encoded = vf->encode_locally ();
184                         } catch (std::exception& e) {
185                                 stringstream s;
186                                 s << "Local encode failed " << e.what() << ".";
187                                 _log->log (s.str ());
188                         }
189                 }
190
191                 if (encoded) {
192                         encoded->write (_opt, vf->frame ());
193                         frame_done ();
194                 } else {
195                         lock.lock ();
196                         _queue.push_front (vf);
197                         lock.unlock ();
198                 }
199
200                 if (remote_backoff > 0) {
201                         dvdomatic_sleep (remote_backoff);
202                 }
203
204                 lock.lock ();
205                 _worker_condition.notify_all ();
206         }
207 }
208
209 void
210 J2KWAVEncoder::process_begin ()
211 {
212         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
213                 _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, (Server *) 0)));
214         }
215
216         vector<Server*> servers = Config::instance()->servers ();
217
218         for (vector<Server*>::iterator i = servers.begin(); i != servers.end(); ++i) {
219                 for (int j = 0; j < (*i)->threads (); ++j) {
220                         _worker_threads.push_back (new boost::thread (boost::bind (&J2KWAVEncoder::encoder_thread, this, *i)));
221                 }
222         }
223 }
224
225 void
226 J2KWAVEncoder::process_end ()
227 {
228         boost::mutex::scoped_lock lock (_worker_mutex);
229
230         /* Keep waking workers until the queue is empty */
231         while (!_queue.empty ()) {
232                 _worker_condition.notify_all ();
233                 _worker_condition.wait (lock);
234         }
235
236         lock.unlock ();
237         
238         terminate_worker_threads ();
239         close_sound_files ();
240
241         /* Rename .wav.tmp files to .wav */
242         for (int i = 0; i < _fs->audio_channels; ++i) {
243                 if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
244                         boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
245                 }
246                 boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
247         }
248 }
249
250 void
251 J2KWAVEncoder::process_audio (uint8_t* data, int data_size)
252 {
253         /* Size of a sample in bytes */
254         int const sample_size = 2;
255         
256         /* XXX: we are assuming that sample_size is right, the _deinterleave_buffer_size is a multiple
257            of the sample size and that data_size is a multiple of _fs->audio_channels * sample_size.
258         */
259         
260         /* XXX: this code is very tricksy and it must be possible to make it simpler ... */
261         
262         /* Number of bytes left to read this time */
263         int remaining = data_size;
264         /* Our position in the output buffers, in bytes */
265         int position = 0;
266         while (remaining > 0) {
267                 /* How many bytes of the deinterleaved data to do this time */
268                 int this_time = min (remaining / _fs->audio_channels, _deinterleave_buffer_size);
269                 for (int i = 0; i < _fs->audio_channels; ++i) {
270                         for (int j = 0; j < this_time; j += sample_size) {
271                                 for (int k = 0; k < sample_size; ++k) {
272                                         int const to = j + k;
273                                         int const from = position + (i * sample_size) + (j * _fs->audio_channels) + k;
274                                         _deinterleave_buffer[to] = data[from];
275                                 }
276                         }
277                         
278                         switch (_fs->audio_sample_format) {
279                         case AV_SAMPLE_FMT_S16:
280                                 sf_write_short (_sound_files[i], (const short *) _deinterleave_buffer, this_time / sample_size);
281                                 break;
282                         default:
283                                 throw DecodeError ("unknown audio sample format");
284                         }
285                 }
286                 
287                 position += this_time;
288                 remaining -= this_time * _fs->audio_channels;
289         }
290 }