Merge master; fix crash on new film.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
28 #include "encoder.h"
29 #include "util.h"
30 #include "film.h"
31 #include "log.h"
32 #include "exceptions.h"
33 #include "filter.h"
34 #include "config.h"
35 #include "dcp_video_frame.h"
36 #include "server.h"
37 #include "format.h"
38 #include "cross.h"
39 #include "writer.h"
40 #include "player.h"
41 #include "audio_mapping.h"
42
43 #include "i18n.h"
44
45 using std::pair;
46 using std::string;
47 using std::stringstream;
48 using std::vector;
49 using std::list;
50 using std::cout;
51 using std::make_pair;
52 using boost::shared_ptr;
53 using boost::optional;
54
55 int const Encoder::_history_size = 25;
56
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<Film> f)
59         : _film (f)
60         , _video_frames_in (0)
61         , _video_frames_out (0)
62 #ifdef HAVE_SWRESAMPLE    
63         , _swr_context (0)
64 #endif
65         , _have_a_real_frame (false)
66         , _terminate (false)
67 {
68         
69 }
70
71 Encoder::~Encoder ()
72 {
73         terminate_threads ();
74         if (_writer) {
75                 _writer->finish ();
76         }
77 }
78
79 void
80 Encoder::process_begin ()
81 {
82         if (_film->has_audio() && _film->audio_frame_rate() != _film->target_audio_sample_rate()) {
83 #ifdef HAVE_SWRESAMPLE
84
85                 stringstream s;
86                 s << String::compose (N_("Will resample audio from %1 to %2"), _film->audio_frame_rate(), _film->target_audio_sample_rate());
87                 _film->log()->log (s.str ());
88
89                 /* We will be using planar float data when we call the
90                    resampler.  As far as I can see, the audio channel
91                    layout is not necessary for our purposes; it seems
92                    only to be used get the number of channels and
93                    decide if rematrixing is needed.  It won't be, since
94                    input and output layouts are the same.
95                 */
96
97                 _swr_context = swr_alloc_set_opts (
98                         0,
99                         av_get_default_channel_layout (_film->audio_mapping().dcp_channels ()),
100                         AV_SAMPLE_FMT_FLTP,
101                         _film->target_audio_sample_rate(),
102                         av_get_default_channel_layout (_film->audio_mapping().dcp_channels ()),
103                         AV_SAMPLE_FMT_FLTP,
104                         _film->audio_frame_rate(),
105                         0, 0
106                         );
107                 
108                 swr_init (_swr_context);
109 #else
110                 throw EncodeError (_("Cannot resample audio as libswresample is not present"));
111 #endif
112         } else {
113 #ifdef HAVE_SWRESAMPLE
114                 _swr_context = 0;
115 #endif          
116         }
117
118         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
119                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
120         }
121
122         vector<ServerDescription*> servers = Config::instance()->servers ();
123
124         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
125                 for (int j = 0; j < (*i)->threads (); ++j) {
126                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
127                 }
128         }
129
130         _writer.reset (new Writer (_film));
131 }
132
133
134 void
135 Encoder::process_end ()
136 {
137 #if HAVE_SWRESAMPLE     
138         if (_film->has_audio() && _swr_context) {
139
140                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_mapping().dcp_channels(), 256));
141                         
142                 while (1) {
143                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
144
145                         if (frames < 0) {
146                                 throw EncodeError (_("could not run sample-rate converter"));
147                         }
148
149                         if (frames == 0) {
150                                 break;
151                         }
152
153                         out->set_frames (frames);
154                         _writer->write (out);
155                 }
156
157                 swr_free (&_swr_context);
158         }
159 #endif
160
161         boost::mutex::scoped_lock lock (_mutex);
162
163         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
164
165         /* Keep waking workers until the queue is empty */
166         while (!_queue.empty ()) {
167                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
168                 _condition.notify_all ();
169                 _condition.wait (lock);
170         }
171
172         lock.unlock ();
173         
174         terminate_threads ();
175
176         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
177
178         /* The following sequence of events can occur in the above code:
179              1. a remote worker takes the last image off the queue
180              2. the loop above terminates
181              3. the remote worker fails to encode the image and puts it back on the queue
182              4. the remote worker is then terminated by terminate_threads
183
184              So just mop up anything left in the queue here.
185         */
186
187         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
188                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
189                 try {
190                         _writer->write ((*i)->encode_locally(), (*i)->frame ());
191                         frame_done ();
192                 } catch (std::exception& e) {
193                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
194                 }
195         }
196
197         _writer->finish ();
198         _writer.reset ();
199 }       
200
201 /** @return an estimate of the current number of frames we are encoding per second,
202  *  or 0 if not known.
203  */
204 float
205 Encoder::current_encoding_rate () const
206 {
207         boost::mutex::scoped_lock lock (_history_mutex);
208         if (int (_time_history.size()) < _history_size) {
209                 return 0;
210         }
211
212         struct timeval now;
213         gettimeofday (&now, 0);
214
215         return _history_size / (seconds (now) - seconds (_time_history.back ()));
216 }
217
218 /** @return Number of video frames that have been sent out */
219 int
220 Encoder::video_frames_out () const
221 {
222         boost::mutex::scoped_lock (_history_mutex);
223         return _video_frames_out;
224 }
225
226 /** Should be called when a frame has been encoded successfully.
227  *  @param n Source frame index.
228  */
229 void
230 Encoder::frame_done ()
231 {
232         boost::mutex::scoped_lock lock (_history_mutex);
233         
234         struct timeval tv;
235         gettimeofday (&tv, 0);
236         _time_history.push_front (tv);
237         if (int (_time_history.size()) > _history_size) {
238                 _time_history.pop_back ();
239         }
240 }
241
242 void
243 Encoder::process_video (shared_ptr<const Image> image, bool same, shared_ptr<Subtitle> sub)
244 {
245         FrameRateConversion frc (_film->video_frame_rate(), _film->dcp_frame_rate());
246         
247         if (frc.skip && (_video_frames_in % 2)) {
248                 ++_video_frames_in;
249                 return;
250         }
251
252         boost::mutex::scoped_lock lock (_mutex);
253
254         /* Wait until the queue has gone down a bit */
255         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
256                 TIMING ("decoder sleeps with queue of %1", _queue.size());
257                 _condition.wait (lock);
258                 TIMING ("decoder wakes with queue of %1", _queue.size());
259         }
260
261         if (_terminate) {
262                 return;
263         }
264
265         if (_writer->thrown ()) {
266                 _writer->rethrow ();
267         }
268
269         if (_writer->can_fake_write (_video_frames_out)) {
270                 _writer->fake_write (_video_frames_out);
271                 _have_a_real_frame = false;
272                 frame_done ();
273         } else if (same && _have_a_real_frame) {
274                 /* Use the last frame that we encoded. */
275                 _writer->repeat (_video_frames_out);
276                 frame_done ();
277         } else {
278                 /* Queue this new frame for encoding */
279                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
280                 TIMING ("adding to queue of %1", _queue.size ());
281                 _queue.push_back (shared_ptr<DCPVideoFrame> (
282                                           new DCPVideoFrame (
283                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
284                                                   _film->subtitle_offset(), _film->subtitle_scale(),
285                                                   _film->scaler(), _video_frames_out, _film->dcp_frame_rate(), s.second,
286                                                   _film->colour_lut(), _film->j2k_bandwidth(),
287                                                   _film->log()
288                                                   )
289                                           ));
290                 
291                 _condition.notify_all ();
292                 _have_a_real_frame = true;
293         }
294
295         ++_video_frames_in;
296         ++_video_frames_out;
297
298         if (frc.repeat) {
299                 _writer->repeat (_video_frames_out);
300                 ++_video_frames_out;
301                 frame_done ();
302         }
303 }
304
305 void
306 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
307 {
308 #if HAVE_SWRESAMPLE
309         /* Maybe sample-rate convert */
310         if (_swr_context) {
311
312                 /* Compute the resampled frames count and add 32 for luck */
313                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_frame_rate()) + 32;
314
315                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_mapping().dcp_channels(), max_resampled_frames));
316
317                 /* Resample audio */
318                 int const resampled_frames = swr_convert (
319                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
320                         );
321                 
322                 if (resampled_frames < 0) {
323                         throw EncodeError (_("could not run sample-rate converter"));
324                 }
325
326                 resampled->set_frames (resampled_frames);
327                 
328                 /* And point our variables at the resampled audio */
329                 data = resampled;
330         }
331 #endif
332
333         _writer->write (data);
334 }
335
336 void
337 Encoder::terminate_threads ()
338 {
339         boost::mutex::scoped_lock lock (_mutex);
340         _terminate = true;
341         _condition.notify_all ();
342         lock.unlock ();
343
344         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
345                 if ((*i)->joinable ()) {
346                         (*i)->join ();
347                 }
348                 delete *i;
349         }
350 }
351
352 void
353 Encoder::encoder_thread (ServerDescription* server)
354 {
355         /* Number of seconds that we currently wait between attempts
356            to connect to the server; not relevant for localhost
357            encodings.
358         */
359         int remote_backoff = 0;
360         
361         while (1) {
362
363                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
364                 boost::mutex::scoped_lock lock (_mutex);
365                 while (_queue.empty () && !_terminate) {
366                         _condition.wait (lock);
367                 }
368
369                 if (_terminate) {
370                         return;
371                 }
372
373                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
374                 shared_ptr<DCPVideoFrame> vf = _queue.front ();
375                 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
376                 _queue.pop_front ();
377                 
378                 lock.unlock ();
379
380                 shared_ptr<EncodedData> encoded;
381
382                 if (server) {
383                         try {
384                                 encoded = vf->encode_remotely (server);
385
386                                 if (remote_backoff > 0) {
387                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
388                                 }
389                                 
390                                 /* This job succeeded, so remove any backoff */
391                                 remote_backoff = 0;
392                                 
393                         } catch (std::exception& e) {
394                                 if (remote_backoff < 60) {
395                                         /* back off more */
396                                         remote_backoff += 10;
397                                 }
398                                 _film->log()->log (
399                                         String::compose (
400                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
401                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
402                                         );
403                         }
404                                 
405                 } else {
406                         try {
407                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
408                                 encoded = vf->encode_locally ();
409                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
410                         } catch (std::exception& e) {
411                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
412                         }
413                 }
414
415                 if (encoded) {
416                         _writer->write (encoded, vf->frame ());
417                         frame_done ();
418                 } else {
419                         lock.lock ();
420                         _film->log()->log (
421                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
422                                 );
423                         _queue.push_front (vf);
424                         lock.unlock ();
425                 }
426
427                 if (remote_backoff > 0) {
428                         dcpomatic_sleep (remote_backoff);
429                 }
430
431                 lock.lock ();
432                 _condition.notify_all ();
433         }
434 }