00807f863808e3c952fc52a99ab6cc1833153058
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
28 #include "encoder.h"
29 #include "util.h"
30 #include "film.h"
31 #include "log.h"
32 #include "exceptions.h"
33 #include "filter.h"
34 #include "config.h"
35 #include "dcp_video_frame.h"
36 #include "server.h"
37 #include "format.h"
38 #include "cross.h"
39 #include "writer.h"
40 #include "playlist.h"
41
42 #include "i18n.h"
43
44 using std::pair;
45 using std::string;
46 using std::stringstream;
47 using std::vector;
48 using std::list;
49 using std::cout;
50 using std::make_pair;
51 using namespace boost;
52
53 int const Encoder::_history_size = 25;
54
55 /** @param f Film that we are encoding */
56 Encoder::Encoder (shared_ptr<Film> f, shared_ptr<Playlist> p)
57         : _film (f)
58         , _playlist (p)
59         , _video_frames_in (0)
60         , _video_frames_out (0)
61 #ifdef HAVE_SWRESAMPLE    
62         , _swr_context (0)
63 #endif
64         , _have_a_real_frame (false)
65         , _terminate (false)
66 {
67         
68 }
69
70 Encoder::~Encoder ()
71 {
72         terminate_threads ();
73         if (_writer) {
74                 _writer->finish ();
75         }
76 }
77
78 void
79 Encoder::process_begin ()
80 {
81         if (_playlist->has_audio() && _playlist->audio_frame_rate() != _film->target_audio_sample_rate()) {
82 #ifdef HAVE_SWRESAMPLE
83
84                 stringstream s;
85                 s << String::compose (N_("Will resample audio from %1 to %2"), _playlist->audio_frame_rate(), _film->target_audio_sample_rate());
86                 _film->log()->log (s.str ());
87
88                 /* We will be using planar float data when we call the resampler */
89                 _swr_context = swr_alloc_set_opts (
90                         0,
91                         _playlist->audio_channel_layout(),
92                         AV_SAMPLE_FMT_FLTP,
93                         _film->target_audio_sample_rate(),
94                         _playlist->audio_channel_layout(),
95                         AV_SAMPLE_FMT_FLTP,
96                         _playlist->audio_frame_rate(),
97                         0, 0
98                         );
99                 
100                 swr_init (_swr_context);
101 #else
102                 throw EncodeError (_("Cannot resample audio as libswresample is not present"));
103 #endif
104         } else {
105 #ifdef HAVE_SWRESAMPLE
106                 _swr_context = 0;
107 #endif          
108         }
109
110         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
111                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
112         }
113
114         vector<ServerDescription*> servers = Config::instance()->servers ();
115
116         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
117                 for (int j = 0; j < (*i)->threads (); ++j) {
118                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
119                 }
120         }
121
122         _writer.reset (new Writer (_film, _playlist));
123 }
124
125
126 void
127 Encoder::process_end ()
128 {
129 #if HAVE_SWRESAMPLE     
130         if (_playlist->has_audio() && _playlist->audio_channels() && _swr_context) {
131
132                 shared_ptr<AudioBuffers> out (new AudioBuffers (_playlist->audio_channels(), 256));
133                         
134                 while (1) {
135                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
136
137                         if (frames < 0) {
138                                 throw EncodeError (_("could not run sample-rate converter"));
139                         }
140
141                         if (frames == 0) {
142                                 break;
143                         }
144
145                         out->set_frames (frames);
146                         write_audio (out);
147                 }
148
149                 swr_free (&_swr_context);
150         }
151 #endif
152
153         boost::mutex::scoped_lock lock (_mutex);
154
155         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
156
157         /* Keep waking workers until the queue is empty */
158         while (!_queue.empty ()) {
159                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
160                 _condition.notify_all ();
161                 _condition.wait (lock);
162         }
163
164         lock.unlock ();
165         
166         terminate_threads ();
167
168         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
169
170         /* The following sequence of events can occur in the above code:
171              1. a remote worker takes the last image off the queue
172              2. the loop above terminates
173              3. the remote worker fails to encode the image and puts it back on the queue
174              4. the remote worker is then terminated by terminate_threads
175
176              So just mop up anything left in the queue here.
177         */
178
179         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
180                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
181                 try {
182                         _writer->write ((*i)->encode_locally(), (*i)->frame ());
183                         frame_done ();
184                 } catch (std::exception& e) {
185                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
186                 }
187         }
188
189         _writer->finish ();
190         _writer.reset ();
191 }       
192
193 /** @return an estimate of the current number of frames we are encoding per second,
194  *  or 0 if not known.
195  */
196 float
197 Encoder::current_frames_per_second () const
198 {
199         boost::mutex::scoped_lock lock (_history_mutex);
200         if (int (_time_history.size()) < _history_size) {
201                 return 0;
202         }
203
204         struct timeval now;
205         gettimeofday (&now, 0);
206
207         return _history_size / (seconds (now) - seconds (_time_history.back ()));
208 }
209
210 /** @return Number of video frames that have been sent out */
211 int
212 Encoder::video_frames_out () const
213 {
214         boost::mutex::scoped_lock (_history_mutex);
215         return _video_frames_out;
216 }
217
218 /** Should be called when a frame has been encoded successfully.
219  *  @param n Source frame index.
220  */
221 void
222 Encoder::frame_done ()
223 {
224         boost::mutex::scoped_lock lock (_history_mutex);
225         
226         struct timeval tv;
227         gettimeofday (&tv, 0);
228         _time_history.push_front (tv);
229         if (int (_time_history.size()) > _history_size) {
230                 _time_history.pop_back ();
231         }
232 }
233
234 void
235 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
236 {
237         FrameRateConversion frc (_playlist->video_frame_rate(), _film->dcp_frame_rate());
238         
239         if (frc.skip && (_video_frames_in % 2)) {
240                 ++_video_frames_in;
241                 return;
242         }
243
244         boost::mutex::scoped_lock lock (_mutex);
245
246         /* Wait until the queue has gone down a bit */
247         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
248                 TIMING ("decoder sleeps with queue of %1", _queue.size());
249                 _condition.wait (lock);
250                 TIMING ("decoder wakes with queue of %1", _queue.size());
251         }
252
253         if (_terminate) {
254                 return;
255         }
256
257         if (_writer->thrown ()) {
258                 _writer->rethrow ();
259         }
260
261         if (_writer->can_fake_write (_video_frames_out)) {
262                 _writer->fake_write (_video_frames_out);
263                 _have_a_real_frame = false;
264                 frame_done ();
265         } else if (same && _have_a_real_frame) {
266                 /* Use the last frame that we encoded. */
267                 _writer->repeat (_video_frames_out);
268                 frame_done ();
269         } else {
270                 /* Queue this new frame for encoding */
271                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
272                 TIMING ("adding to queue of %1", _queue.size ());
273                 _queue.push_back (boost::shared_ptr<DCPVideoFrame> (
274                                           new DCPVideoFrame (
275                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_playlist),
276                                                   _film->subtitle_offset(), _film->subtitle_scale(),
277                                                   _film->scaler(), _video_frames_out, _film->dcp_frame_rate(), s.second,
278                                                   _film->colour_lut(), _film->j2k_bandwidth(),
279                                                   _film->log()
280                                                   )
281                                           ));
282                 
283                 _condition.notify_all ();
284                 _have_a_real_frame = true;
285         }
286
287         ++_video_frames_in;
288         ++_video_frames_out;
289
290         if (frc.repeat) {
291                 _writer->repeat (_video_frames_out);
292                 ++_video_frames_out;
293                 frame_done ();
294         }
295 }
296
297 void
298 Encoder::process_audio (shared_ptr<AudioBuffers> data)
299 {
300 #if HAVE_SWRESAMPLE
301         /* Maybe sample-rate convert */
302         if (_swr_context) {
303
304                 /* Compute the resampled frames count and add 32 for luck */
305                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _playlist->audio_frame_rate()) + 32;
306
307                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_playlist->audio_channels(), max_resampled_frames));
308
309                 /* Resample audio */
310                 int const resampled_frames = swr_convert (
311                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
312                         );
313                 
314                 if (resampled_frames < 0) {
315                         throw EncodeError (_("could not run sample-rate converter"));
316                 }
317
318                 resampled->set_frames (resampled_frames);
319                 
320                 /* And point our variables at the resampled audio */
321                 data = resampled;
322         }
323 #endif
324
325         write_audio (data);
326 }
327
328 void
329 Encoder::terminate_threads ()
330 {
331         boost::mutex::scoped_lock lock (_mutex);
332         _terminate = true;
333         _condition.notify_all ();
334         lock.unlock ();
335
336         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
337                 (*i)->join ();
338                 delete *i;
339         }
340 }
341
342 void
343 Encoder::encoder_thread (ServerDescription* server)
344 {
345         /* Number of seconds that we currently wait between attempts
346            to connect to the server; not relevant for localhost
347            encodings.
348         */
349         int remote_backoff = 0;
350         
351         while (1) {
352
353                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
354                 boost::mutex::scoped_lock lock (_mutex);
355                 while (_queue.empty () && !_terminate) {
356                         _condition.wait (lock);
357                 }
358
359                 if (_terminate) {
360                         return;
361                 }
362
363                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
364                 boost::shared_ptr<DCPVideoFrame> vf = _queue.front ();
365                 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
366                 _queue.pop_front ();
367                 
368                 lock.unlock ();
369
370                 shared_ptr<EncodedData> encoded;
371
372                 if (server) {
373                         try {
374                                 encoded = vf->encode_remotely (server);
375
376                                 if (remote_backoff > 0) {
377                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
378                                 }
379                                 
380                                 /* This job succeeded, so remove any backoff */
381                                 remote_backoff = 0;
382                                 
383                         } catch (std::exception& e) {
384                                 if (remote_backoff < 60) {
385                                         /* back off more */
386                                         remote_backoff += 10;
387                                 }
388                                 _film->log()->log (
389                                         String::compose (
390                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
391                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
392                                         );
393                         }
394                                 
395                 } else {
396                         try {
397                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
398                                 encoded = vf->encode_locally ();
399                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
400                         } catch (std::exception& e) {
401                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
402                         }
403                 }
404
405                 if (encoded) {
406                         _writer->write (encoded, vf->frame ());
407                         frame_done ();
408                 } else {
409                         lock.lock ();
410                         _film->log()->log (
411                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
412                                 );
413                         _queue.push_front (vf);
414                         lock.unlock ();
415                 }
416
417                 if (remote_backoff > 0) {
418                         dvdomatic_sleep (remote_backoff);
419                 }
420
421                 lock.lock ();
422                 _condition.notify_all ();
423         }
424 }
425
426 void
427 Encoder::write_audio (shared_ptr<const AudioBuffers> data)
428 {
429         AudioMapping m (_playlist->audio_channels ());
430         if (m.dcp_channels() != _playlist->audio_channels()) {
431
432                 /* Remap (currently just for mono -> 5.1) */
433
434                 shared_ptr<AudioBuffers> b (new AudioBuffers (m.dcp_channels(), data->frames ()));
435                 for (int i = 0; i < m.dcp_channels(); ++i) {
436                         optional<int> s = m.dcp_to_source (static_cast<libdcp::Channel> (i));
437                         if (!s) {
438                                 b->make_silent (i);
439                         } else {
440                                 memcpy (b->data()[i], data->data()[s.get()], data->frames() * sizeof(float));
441                         }
442                 }
443
444                 data = b;
445         }
446
447         _writer->write (data);
448 }