Merge master.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
28 #include "encoder.h"
29 #include "util.h"
30 #include "film.h"
31 #include "log.h"
32 #include "exceptions.h"
33 #include "filter.h"
34 #include "config.h"
35 #include "dcp_video_frame.h"
36 #include "server.h"
37 #include "format.h"
38 #include "cross.h"
39 #include "writer.h"
40 #include "player.h"
41 #include "audio_mapping.h"
42
43 #include "i18n.h"
44
45 using std::pair;
46 using std::string;
47 using std::stringstream;
48 using std::vector;
49 using std::list;
50 using std::cout;
51 using std::make_pair;
52 using boost::shared_ptr;
53 using boost::optional;
54
55 int const Encoder::_history_size = 25;
56
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<Film> f)
59         : _film (f)
60         , _video_frames_in (0)
61         , _video_frames_out (0)
62 #ifdef HAVE_SWRESAMPLE    
63         , _swr_context (0)
64 #endif
65         , _have_a_real_frame (false)
66         , _terminate (false)
67 {
68         
69 }
70
71 Encoder::~Encoder ()
72 {
73         terminate_threads ();
74         if (_writer) {
75                 _writer->finish ();
76         }
77 }
78
79 void
80 Encoder::process_begin ()
81 {
82         if (_film->has_audio() && _film->audio_frame_rate() != _film->target_audio_sample_rate()) {
83 #ifdef HAVE_SWRESAMPLE
84
85                 stringstream s;
86                 s << String::compose (N_("Will resample audio from %1 to %2"), _film->audio_frame_rate(), _film->target_audio_sample_rate());
87                 _film->log()->log (s.str ());
88
89                 /* We will be using planar float data when we call the resampler */
90                 _swr_context = swr_alloc_set_opts (
91                         0,
92                         _film->audio_channel_layout(),
93                         AV_SAMPLE_FMT_FLTP,
94                         _film->target_audio_sample_rate(),
95                         _film->audio_channel_layout(),
96                         AV_SAMPLE_FMT_FLTP,
97                         _film->audio_frame_rate(),
98                         0, 0
99                         );
100                 
101                 swr_init (_swr_context);
102 #else
103                 throw EncodeError (_("Cannot resample audio as libswresample is not present"));
104 #endif
105         } else {
106 #ifdef HAVE_SWRESAMPLE
107                 _swr_context = 0;
108 #endif          
109         }
110
111         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
112                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
113         }
114
115         vector<ServerDescription*> servers = Config::instance()->servers ();
116
117         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
118                 for (int j = 0; j < (*i)->threads (); ++j) {
119                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
120                 }
121         }
122
123         _writer.reset (new Writer (_film));
124 }
125
126
127 void
128 Encoder::process_end ()
129 {
130 #if HAVE_SWRESAMPLE     
131         if (_film->has_audio() && _film->audio_channels() && _swr_context) {
132
133                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_channels(), 256));
134                         
135                 while (1) {
136                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
137
138                         if (frames < 0) {
139                                 throw EncodeError (_("could not run sample-rate converter"));
140                         }
141
142                         if (frames == 0) {
143                                 break;
144                         }
145
146                         out->set_frames (frames);
147                         _writer->write (out);
148                 }
149
150                 swr_free (&_swr_context);
151         }
152 #endif
153
154         boost::mutex::scoped_lock lock (_mutex);
155
156         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
157
158         /* Keep waking workers until the queue is empty */
159         while (!_queue.empty ()) {
160                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
161                 _condition.notify_all ();
162                 _condition.wait (lock);
163         }
164
165         lock.unlock ();
166         
167         terminate_threads ();
168
169         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
170
171         /* The following sequence of events can occur in the above code:
172              1. a remote worker takes the last image off the queue
173              2. the loop above terminates
174              3. the remote worker fails to encode the image and puts it back on the queue
175              4. the remote worker is then terminated by terminate_threads
176
177              So just mop up anything left in the queue here.
178         */
179
180         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
181                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
182                 try {
183                         _writer->write ((*i)->encode_locally(), (*i)->frame ());
184                         frame_done ();
185                 } catch (std::exception& e) {
186                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
187                 }
188         }
189
190         _writer->finish ();
191         _writer.reset ();
192 }       
193
194 /** @return an estimate of the current number of frames we are encoding per second,
195  *  or 0 if not known.
196  */
197 float
198 Encoder::current_encoding_rate () const
199 {
200         boost::mutex::scoped_lock lock (_history_mutex);
201         if (int (_time_history.size()) < _history_size) {
202                 return 0;
203         }
204
205         struct timeval now;
206         gettimeofday (&now, 0);
207
208         return _history_size / (seconds (now) - seconds (_time_history.back ()));
209 }
210
211 /** @return Number of video frames that have been sent out */
212 int
213 Encoder::video_frames_out () const
214 {
215         boost::mutex::scoped_lock (_history_mutex);
216         return _video_frames_out;
217 }
218
219 /** Should be called when a frame has been encoded successfully.
220  *  @param n Source frame index.
221  */
222 void
223 Encoder::frame_done ()
224 {
225         boost::mutex::scoped_lock lock (_history_mutex);
226         
227         struct timeval tv;
228         gettimeofday (&tv, 0);
229         _time_history.push_front (tv);
230         if (int (_time_history.size()) > _history_size) {
231                 _time_history.pop_back ();
232         }
233 }
234
235 void
236 Encoder::process_video (shared_ptr<Image> image, bool same, shared_ptr<Subtitle> sub)
237 {
238         FrameRateConversion frc (_film->video_frame_rate(), _film->dcp_frame_rate());
239         
240         if (frc.skip && (_video_frames_in % 2)) {
241                 ++_video_frames_in;
242                 return;
243         }
244
245         boost::mutex::scoped_lock lock (_mutex);
246
247         /* Wait until the queue has gone down a bit */
248         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
249                 TIMING ("decoder sleeps with queue of %1", _queue.size());
250                 _condition.wait (lock);
251                 TIMING ("decoder wakes with queue of %1", _queue.size());
252         }
253
254         if (_terminate) {
255                 return;
256         }
257
258         if (_writer->thrown ()) {
259                 _writer->rethrow ();
260         }
261
262         if (_writer->can_fake_write (_video_frames_out)) {
263                 _writer->fake_write (_video_frames_out);
264                 _have_a_real_frame = false;
265                 frame_done ();
266         } else if (same && _have_a_real_frame) {
267                 /* Use the last frame that we encoded. */
268                 _writer->repeat (_video_frames_out);
269                 frame_done ();
270         } else {
271                 /* Queue this new frame for encoding */
272                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
273                 TIMING ("adding to queue of %1", _queue.size ());
274                 _queue.push_back (shared_ptr<DCPVideoFrame> (
275                                           new DCPVideoFrame (
276                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
277                                                   _film->subtitle_offset(), _film->subtitle_scale(),
278                                                   _film->scaler(), _video_frames_out, _film->dcp_frame_rate(), s.second,
279                                                   _film->colour_lut(), _film->j2k_bandwidth(),
280                                                   _film->log()
281                                                   )
282                                           ));
283                 
284                 _condition.notify_all ();
285                 _have_a_real_frame = true;
286         }
287
288         ++_video_frames_in;
289         ++_video_frames_out;
290
291         if (frc.repeat) {
292                 _writer->repeat (_video_frames_out);
293                 ++_video_frames_out;
294                 frame_done ();
295         }
296 }
297
298 void
299 Encoder::process_audio (shared_ptr<AudioBuffers> data)
300 {
301 #if HAVE_SWRESAMPLE
302         /* Maybe sample-rate convert */
303         if (_swr_context) {
304
305                 /* Compute the resampled frames count and add 32 for luck */
306                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_frame_rate()) + 32;
307
308                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_channels(), max_resampled_frames));
309
310                 /* Resample audio */
311                 int const resampled_frames = swr_convert (
312                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
313                         );
314                 
315                 if (resampled_frames < 0) {
316                         throw EncodeError (_("could not run sample-rate converter"));
317                 }
318
319                 resampled->set_frames (resampled_frames);
320                 
321                 /* And point our variables at the resampled audio */
322                 data = resampled;
323         }
324 #endif
325
326         _writer->write (data);
327 }
328
329 void
330 Encoder::terminate_threads ()
331 {
332         boost::mutex::scoped_lock lock (_mutex);
333         _terminate = true;
334         _condition.notify_all ();
335         lock.unlock ();
336
337         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
338                 (*i)->join ();
339                 delete *i;
340         }
341 }
342
343 void
344 Encoder::encoder_thread (ServerDescription* server)
345 {
346         /* Number of seconds that we currently wait between attempts
347            to connect to the server; not relevant for localhost
348            encodings.
349         */
350         int remote_backoff = 0;
351         
352         while (1) {
353
354                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
355                 boost::mutex::scoped_lock lock (_mutex);
356                 while (_queue.empty () && !_terminate) {
357                         _condition.wait (lock);
358                 }
359
360                 if (_terminate) {
361                         return;
362                 }
363
364                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
365                 shared_ptr<DCPVideoFrame> vf = _queue.front ();
366                 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
367                 _queue.pop_front ();
368                 
369                 lock.unlock ();
370
371                 shared_ptr<EncodedData> encoded;
372
373                 if (server) {
374                         try {
375                                 encoded = vf->encode_remotely (server);
376
377                                 if (remote_backoff > 0) {
378                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
379                                 }
380                                 
381                                 /* This job succeeded, so remove any backoff */
382                                 remote_backoff = 0;
383                                 
384                         } catch (std::exception& e) {
385                                 if (remote_backoff < 60) {
386                                         /* back off more */
387                                         remote_backoff += 10;
388                                 }
389                                 _film->log()->log (
390                                         String::compose (
391                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
392                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
393                                         );
394                         }
395                                 
396                 } else {
397                         try {
398                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
399                                 encoded = vf->encode_locally ();
400                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
401                         } catch (std::exception& e) {
402                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
403                         }
404                 }
405
406                 if (encoded) {
407                         _writer->write (encoded, vf->frame ());
408                         frame_done ();
409                 } else {
410                         lock.lock ();
411                         _film->log()->log (
412                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
413                                 );
414                         _queue.push_front (vf);
415                         lock.unlock ();
416                 }
417
418                 if (remote_backoff > 0) {
419                         dvdomatic_sleep (remote_backoff);
420                 }
421
422                 lock.lock ();
423                 _condition.notify_all ();
424         }
425 }