Some comments.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
28 #include "encoder.h"
29 #include "util.h"
30 #include "options.h"
31 #include "film.h"
32 #include "log.h"
33 #include "exceptions.h"
34 #include "filter.h"
35 #include "config.h"
36 #include "dcp_video_frame.h"
37 #include "server.h"
38 #include "format.h"
39 #include "cross.h"
40 #include "writer.h"
41
42 using std::pair;
43 using std::string;
44 using std::stringstream;
45 using std::vector;
46 using std::list;
47 using std::cout;
48 using std::make_pair;
49 using namespace boost;
50
51 int const Encoder::_history_size = 25;
52
53 /** @param f Film that we are encoding */
54 Encoder::Encoder (shared_ptr<Film> f)
55         : _film (f)
56         , _video_frames_in (0)
57         , _video_frames_out (0)
58 #ifdef HAVE_SWRESAMPLE    
59         , _swr_context (0)
60 #endif
61         , _have_a_real_frame (false)
62         , _terminate (false)
63 {
64         
65 }
66
67 Encoder::~Encoder ()
68 {
69         terminate_threads ();
70         if (_writer) {
71                 _writer->finish ();
72         }
73 }
74
75 void
76 Encoder::process_begin ()
77 {
78         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
79 #ifdef HAVE_SWRESAMPLE
80
81                 stringstream s;
82                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
83                 _film->log()->log (s.str ());
84
85                 /* We will be using planar float data when we call the resampler */
86                 _swr_context = swr_alloc_set_opts (
87                         0,
88                         _film->audio_stream()->channel_layout(),
89                         AV_SAMPLE_FMT_FLTP,
90                         _film->target_audio_sample_rate(),
91                         _film->audio_stream()->channel_layout(),
92                         AV_SAMPLE_FMT_FLTP,
93                         _film->audio_stream()->sample_rate(),
94                         0, 0
95                         );
96                 
97                 swr_init (_swr_context);
98 #else
99                 throw EncodeError ("Cannot resample audio as libswresample is not present");
100 #endif
101         } else {
102 #ifdef HAVE_SWRESAMPLE
103                 _swr_context = 0;
104 #endif          
105         }
106
107         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
108                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
109         }
110
111         vector<ServerDescription*> servers = Config::instance()->servers ();
112
113         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
114                 for (int j = 0; j < (*i)->threads (); ++j) {
115                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
116                 }
117         }
118
119         _writer.reset (new Writer (_film));
120 }
121
122
123 void
124 Encoder::process_end ()
125 {
126 #if HAVE_SWRESAMPLE     
127         if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
128
129                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
130                         
131                 while (1) {
132                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
133
134                         if (frames < 0) {
135                                 throw EncodeError ("could not run sample-rate converter");
136                         }
137
138                         if (frames == 0) {
139                                 break;
140                         }
141
142                         out->set_frames (frames);
143                         _writer->write (out);
144                 }
145
146                 swr_free (&_swr_context);
147         }
148 #endif
149
150         boost::mutex::scoped_lock lock (_mutex);
151
152         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_queue.size ()));
153
154         /* Keep waking workers until the queue is empty */
155         while (!_queue.empty ()) {
156                 _film->log()->log ("Waking with " + lexical_cast<string> (_queue.size ()), Log::VERBOSE);
157                 _condition.notify_all ();
158                 _condition.wait (lock);
159         }
160
161         lock.unlock ();
162         
163         terminate_threads ();
164
165         _film->log()->log ("Mopping up " + lexical_cast<string> (_queue.size()));
166
167         /* The following sequence of events can occur in the above code:
168              1. a remote worker takes the last image off the queue
169              2. the loop above terminates
170              3. the remote worker fails to encode the image and puts it back on the queue
171              4. the remote worker is then terminated by terminate_threads
172
173              So just mop up anything left in the queue here.
174         */
175
176         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
177                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
178                 try {
179                         _writer->write ((*i)->encode_locally(), (*i)->frame ());
180                         frame_done ();
181                 } catch (std::exception& e) {
182                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
183                 }
184         }
185
186         _writer->finish ();
187         _writer.reset ();
188 }       
189
190 /** @return an estimate of the current number of frames we are encoding per second,
191  *  or 0 if not known.
192  */
193 float
194 Encoder::current_frames_per_second () const
195 {
196         boost::mutex::scoped_lock lock (_history_mutex);
197         if (int (_time_history.size()) < _history_size) {
198                 return 0;
199         }
200
201         struct timeval now;
202         gettimeofday (&now, 0);
203
204         return _history_size / (seconds (now) - seconds (_time_history.back ()));
205 }
206
207 /** @return Number of video frames that have been sent out */
208 int
209 Encoder::video_frames_out () const
210 {
211         boost::mutex::scoped_lock (_history_mutex);
212         return _video_frames_out;
213 }
214
215 /** Should be called when a frame has been encoded successfully.
216  *  @param n Source frame index.
217  */
218 void
219 Encoder::frame_done ()
220 {
221         boost::mutex::scoped_lock lock (_history_mutex);
222         
223         struct timeval tv;
224         gettimeofday (&tv, 0);
225         _time_history.push_front (tv);
226         if (int (_time_history.size()) > _history_size) {
227                 _time_history.pop_back ();
228         }
229 }
230
231 void
232 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
233 {
234         DCPFrameRate dfr (_film->frames_per_second ());
235         
236         if (dfr.skip && (_video_frames_in % 2)) {
237                 ++_video_frames_in;
238                 return;
239         }
240
241         boost::mutex::scoped_lock lock (_mutex);
242
243         /* Wait until the queue has gone down a bit */
244         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
245                 TIMING ("decoder sleeps with queue of %1", _queue.size());
246                 _condition.wait (lock);
247                 TIMING ("decoder wakes with queue of %1", _queue.size());
248         }
249
250         if (_terminate) {
251                 return;
252         }
253
254         if (_writer->can_fake_write (_video_frames_out)) {
255                 _writer->fake_write (_video_frames_out);
256                 _have_a_real_frame = false;
257                 frame_done ();
258         } else if (same && _have_a_real_frame) {
259                 /* Use the last frame that we encoded. */
260                 _writer->repeat (_video_frames_out);
261                 frame_done ();
262         } else {
263                 /* Queue this new frame for encoding */
264                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
265                 TIMING ("adding to queue of %1", _queue.size ());
266                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
267                                           new DCPVideoFrame (
268                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
269                                                   _film->subtitle_offset(), _film->subtitle_scale(),
270                                                   _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
271                                                   _film->colour_lut(), _film->j2k_bandwidth(),
272                                                   _film->log()
273                                                   )
274                                           ));
275                 
276                 _condition.notify_all ();
277                 _have_a_real_frame = true;
278         }
279
280         ++_video_frames_in;
281         ++_video_frames_out;
282
283         if (dfr.repeat) {
284                 _writer->repeat (_video_frames_out);
285                 ++_video_frames_out;
286                 frame_done ();
287         }
288 }
289
290 void
291 Encoder::process_audio (shared_ptr<AudioBuffers> data)
292 {
293 #if HAVE_SWRESAMPLE
294         /* Maybe sample-rate convert */
295         if (_swr_context) {
296
297                 /* Compute the resampled frames count and add 32 for luck */
298                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
299
300                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
301
302                 /* Resample audio */
303                 int const resampled_frames = swr_convert (
304                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
305                         );
306                 
307                 if (resampled_frames < 0) {
308                         throw EncodeError ("could not run sample-rate converter");
309                 }
310
311                 resampled->set_frames (resampled_frames);
312                 
313                 /* And point our variables at the resampled audio */
314                 data = resampled;
315         }
316 #endif
317
318         if (_film->audio_channels() == 1) {
319                 /* We need to switch things around so that the mono channel is on
320                    the centre channel of a 5.1 set (with other channels silent).
321                 */
322
323                 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
324                 b->make_silent (libdcp::LEFT);
325                 b->make_silent (libdcp::RIGHT);
326                 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
327                 b->make_silent (libdcp::LFE);
328                 b->make_silent (libdcp::LS);
329                 b->make_silent (libdcp::RS);
330
331                 data = b;
332         }
333
334         _writer->write (data);
335 }
336
337 void
338 Encoder::terminate_threads ()
339 {
340         boost::mutex::scoped_lock lock (_mutex);
341         _terminate = true;
342         _condition.notify_all ();
343         lock.unlock ();
344
345         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
346                 (*i)->join ();
347                 delete *i;
348         }
349 }
350
351 void
352 Encoder::encoder_thread (ServerDescription* server)
353 {
354         /* Number of seconds that we currently wait between attempts
355            to connect to the server; not relevant for localhost
356            encodings.
357         */
358         int remote_backoff = 0;
359         
360         while (1) {
361
362                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
363                 boost::mutex::scoped_lock lock (_mutex);
364                 while (_queue.empty () && !_terminate) {
365                         _condition.wait (lock);
366                 }
367
368                 if (_terminate) {
369                         return;
370                 }
371
372                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
373                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
374                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
375                 _queue.pop_front ();
376                 
377                 lock.unlock ();
378
379                 shared_ptr<EncodedData> encoded;
380
381                 if (server) {
382                         try {
383                                 encoded = vf->encode_remotely (server);
384
385                                 if (remote_backoff > 0) {
386                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
387                                 }
388                                 
389                                 /* This job succeeded, so remove any backoff */
390                                 remote_backoff = 0;
391                                 
392                         } catch (std::exception& e) {
393                                 if (remote_backoff < 60) {
394                                         /* back off more */
395                                         remote_backoff += 10;
396                                 }
397                                 _film->log()->log (
398                                         String::compose (
399                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
400                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
401                                         );
402                         }
403                                 
404                 } else {
405                         try {
406                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
407                                 encoded = vf->encode_locally ();
408                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
409                         } catch (std::exception& e) {
410                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
411                         }
412                 }
413
414                 if (encoded) {
415                         _writer->write (encoded, vf->frame ());
416                         frame_done ();
417                 } else {
418                         lock.lock ();
419                         _film->log()->log (
420                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
421                                 );
422                         _queue.push_front (vf);
423                         lock.unlock ();
424                 }
425
426                 if (remote_backoff > 0) {
427                         dvdomatic_sleep (remote_backoff);
428                 }
429
430                 lock.lock ();
431                 _condition.notify_all ();
432         }
433 }