93a364fedbdb74ce92824b763aa69231c112b285
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include "encoder.h"
28 #include "util.h"
29 #include "options.h"
30 #include "film.h"
31 #include "log.h"
32 #include "exceptions.h"
33 #include "filter.h"
34 #include "config.h"
35 #include "dcp_video_frame.h"
36 #include "server.h"
37 #include "cross.h"
38
39 using std::pair;
40 using std::string;
41 using std::stringstream;
42 using std::vector;
43 using std::list;
44 using std::cout;
45 using std::make_pair;
46 using namespace boost;
47
48 int const Encoder::_history_size = 25;
49
50 /** @param f Film that we are encoding.
51  *  @param o Options.
52  */
53 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<const EncodeOptions> o)
54         : _film (f)
55         , _opt (o)
56         , _just_skipped (false)
57         , _video_frame (0)
58         , _audio_frame (0)
59 #ifdef HAVE_SWRESAMPLE    
60         , _swr_context (0)
61 #endif    
62         , _audio_frames_written (0)
63         , _terminate_encoder (false)
64         , _writer_thread (0)
65         , _terminate_writer (false)
66 {
67         if (_film->audio_stream()) {
68                 /* Create sound output files with .tmp suffixes; we will rename
69                    them if and when we complete.
70                 */
71                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
72                         SF_INFO sf_info;
73                         sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
74                         /* We write mono files */
75                         sf_info.channels = 1;
76                         sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
77                         SNDFILE* f = sf_open (_opt->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
78                         if (f == 0) {
79                                 throw CreateFileError (_opt->multichannel_audio_out_path (i, true));
80                         }
81                         _sound_files.push_back (f);
82                 }
83         }
84 }
85
86 Encoder::~Encoder ()
87 {
88         close_sound_files ();
89         terminate_worker_threads ();
90         terminate_writer_thread ();
91 }
92
93 void
94 Encoder::process_begin ()
95 {
96         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
97 #ifdef HAVE_SWRESAMPLE
98
99                 stringstream s;
100                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
101                 _film->log()->log (s.str ());
102
103                 /* We will be using planar float data when we call the resampler */
104                 _swr_context = swr_alloc_set_opts (
105                         0,
106                         _film->audio_stream()->channel_layout(),
107                         AV_SAMPLE_FMT_FLTP,
108                         _film->target_audio_sample_rate(),
109                         _film->audio_stream()->channel_layout(),
110                         AV_SAMPLE_FMT_FLTP,
111                         _film->audio_stream()->sample_rate(),
112                         0, 0
113                         );
114                 
115                 swr_init (_swr_context);
116 #else
117                 throw EncodeError ("Cannot resample audio as libswresample is not present");
118 #endif
119         } else {
120 #ifdef HAVE_SWRESAMPLE
121                 _swr_context = 0;
122 #endif          
123         }
124
125         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
126                 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
127         }
128
129         vector<ServerDescription*> servers = Config::instance()->servers ();
130
131         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
132                 for (int j = 0; j < (*i)->threads (); ++j) {
133                         _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
134                 }
135         }
136
137         _writer_thread = new boost::thread (boost::bind (&Encoder::writer_thread, this));
138 }
139
140
141 void
142 Encoder::process_end ()
143 {
144 #if HAVE_SWRESAMPLE     
145         if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
146
147                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
148                         
149                 while (1) {
150                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
151
152                         if (frames < 0) {
153                                 throw EncodeError ("could not run sample-rate converter");
154                         }
155
156                         if (frames == 0) {
157                                 break;
158                         }
159
160                         out->set_frames (frames);
161                         write_audio (out);
162                 }
163
164                 swr_free (&_swr_context);
165         }
166 #endif
167
168         if (_film->audio_stream()) {
169                 close_sound_files ();
170                 
171                 /* Rename .wav.tmp files to .wav */
172                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
173                         if (boost::filesystem::exists (_opt->multichannel_audio_out_path (i, false))) {
174                                 boost::filesystem::remove (_opt->multichannel_audio_out_path (i, false));
175                         }
176                         boost::filesystem::rename (_opt->multichannel_audio_out_path (i, true), _opt->multichannel_audio_out_path (i, false));
177                 }
178         }
179
180         boost::mutex::scoped_lock lock (_worker_mutex);
181
182         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_encode_queue.size ()));
183
184         /* Keep waking workers until the queue is empty */
185         while (!_encode_queue.empty ()) {
186                 _film->log()->log ("Waking with " + lexical_cast<string> (_encode_queue.size ()), Log::VERBOSE);
187                 _worker_condition.notify_all ();
188                 _worker_condition.wait (lock);
189         }
190
191         lock.unlock ();
192         
193         terminate_worker_threads ();
194         terminate_writer_thread ();
195
196         _film->log()->log ("Mopping up " + lexical_cast<string> (_encode_queue.size()));
197
198         /* The following sequence of events can occur in the above code:
199              1. a remote worker takes the last image off the queue
200              2. the loop above terminates
201              3. the remote worker fails to encode the image and puts it back on the queue
202              4. the remote worker is then terminated by terminate_worker_threads
203
204              So just mop up anything left in the queue here.
205         */
206
207         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
208                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
209                 try {
210                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
211                         e->write (_opt, (*i)->frame ());
212                         frame_done ();
213                 } catch (std::exception& e) {
214                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
215                 }
216         }
217
218         /* Mop up any unwritten things in the writer's queue */
219         for (list<pair<shared_ptr<EncodedData>, int> >::iterator i = _write_queue.begin(); i != _write_queue.end(); ++i) {
220                 i->first->write (_opt, i->second);
221         }
222
223         /* Now do links (or copies on windows) to duplicate frames */
224         for (list<pair<int, int> >::iterator i = _links_required.begin(); i != _links_required.end(); ++i) {
225                 link (_opt->frame_out_path (i->first, false), _opt->frame_out_path (i->second, false));
226                 link (_opt->hash_out_path (i->first, false), _opt->hash_out_path (i->second, false));
227         }
228 }       
229
230 /** @return an estimate of the current number of frames we are encoding per second,
231  *  or 0 if not known.
232  */
233 float
234 Encoder::current_frames_per_second () const
235 {
236         boost::mutex::scoped_lock lock (_history_mutex);
237         if (int (_time_history.size()) < _history_size) {
238                 return 0;
239         }
240
241         struct timeval now;
242         gettimeofday (&now, 0);
243
244         return _history_size / (seconds (now) - seconds (_time_history.back ()));
245 }
246
247 /** @return true if the last frame to be processed was skipped as it already existed */
248 bool
249 Encoder::skipping () const
250 {
251         boost::mutex::scoped_lock (_history_mutex);
252         return _just_skipped;
253 }
254
255 /** @return Number of video frames that have been received */
256 SourceFrame
257 Encoder::video_frame () const
258 {
259         boost::mutex::scoped_lock (_history_mutex);
260         return _video_frame;
261 }
262
263 /** Should be called when a frame has been encoded successfully.
264  *  @param n Source frame index.
265  */
266 void
267 Encoder::frame_done ()
268 {
269         boost::mutex::scoped_lock lock (_history_mutex);
270         _just_skipped = false;
271         
272         struct timeval tv;
273         gettimeofday (&tv, 0);
274         _time_history.push_front (tv);
275         if (int (_time_history.size()) > _history_size) {
276                 _time_history.pop_back ();
277         }
278 }
279
280 /** Called by a subclass when it has just skipped the processing
281     of a frame because it has already been done.
282 */
283 void
284 Encoder::frame_skipped ()
285 {
286         boost::mutex::scoped_lock lock (_history_mutex);
287         _just_skipped = true;
288 }
289
290 void
291 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
292 {
293         if (_opt->video_skip != 0 && (_video_frame % _opt->video_skip) != 0) {
294                 ++_video_frame;
295                 return;
296         }
297
298         if (_opt->video_range) {
299                 pair<SourceFrame, SourceFrame> const r = _opt->video_range.get();
300                 if (_video_frame < r.first || _video_frame >= r.second) {
301                         ++_video_frame;
302                         return;
303                 }
304         }
305
306         boost::mutex::scoped_lock lock (_worker_mutex);
307
308         /* Wait until the queue has gone down a bit */
309         while (_encode_queue.size() >= _worker_threads.size() * 2 && !_terminate_encoder) {
310                 TIMING ("decoder sleeps with queue of %1", _encode_queue.size());
311                 _worker_condition.wait (lock);
312                 TIMING ("decoder wakes with queue of %1", _encode_queue.size());
313         }
314
315         if (_terminate_encoder) {
316                 return;
317         }
318
319         /* Only do the processing if we don't already have a file for this frame */
320         if (boost::filesystem::exists (_opt->frame_out_path (_video_frame, false))) {
321                 frame_skipped ();
322                 return;
323         }
324
325         if (same && _last_real_frame) {
326                 /* Use the last frame that we encoded.  We need to postpone doing the actual link,
327                    as on windows the link is really a copy and the reference frame might not have
328                    finished encoding yet.
329                 */
330                 _links_required.push_back (make_pair (_last_real_frame.get(), _video_frame));
331         } else {
332                 /* Queue this new frame for encoding */
333                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
334                 TIMING ("adding to queue of %1", _encode_queue.size ());
335                 _encode_queue.push_back (boost::shared_ptr<DCPVideoFrame> (
336                                           new DCPVideoFrame (
337                                                   image, sub, _opt->out_size, _opt->padding, _film->subtitle_offset(), _film->subtitle_scale(),
338                                                   _film->scaler(), _video_frame, _film->frames_per_second(), s.second,
339                                                   Config::instance()->colour_lut_index (), Config::instance()->j2k_bandwidth (),
340                                                   _film->log()
341                                                   )
342                                           ));
343                 
344                 _worker_condition.notify_all ();
345                 _last_real_frame = _video_frame;
346         }
347
348         ++_video_frame;
349 }
350
351 void
352 Encoder::process_audio (shared_ptr<AudioBuffers> data)
353 {
354         if (_opt->audio_range) {
355                 shared_ptr<AudioBuffers> trimmed (new AudioBuffers (*data.get ()));
356                 
357                 /* Range that we are encoding */
358                 pair<int64_t, int64_t> required_range = _opt->audio_range.get();
359                 /* Range of this block of data */
360                 pair<int64_t, int64_t> this_range (_audio_frame, _audio_frame + trimmed->frames());
361
362                 if (this_range.second < required_range.first || required_range.second < this_range.first) {
363                         /* No part of this audio is within the required range */
364                         return;
365                 } else if (required_range.first >= this_range.first && required_range.first < this_range.second) {
366                         /* Trim start */
367                         int64_t const shift = required_range.first - this_range.first;
368                         trimmed->move (shift, 0, trimmed->frames() - shift);
369                         trimmed->set_frames (trimmed->frames() - shift);
370                 } else if (required_range.second >= this_range.first && required_range.second < this_range.second) {
371                         /* Trim end */
372                         trimmed->set_frames (required_range.second - this_range.first);
373                 }
374
375                 data = trimmed;
376         }
377
378 #if HAVE_SWRESAMPLE
379         /* Maybe sample-rate convert */
380         if (_swr_context) {
381
382                 /* Compute the resampled frames count and add 32 for luck */
383                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
384
385                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
386
387                 /* Resample audio */
388                 int const resampled_frames = swr_convert (
389                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
390                         );
391                 
392                 if (resampled_frames < 0) {
393                         throw EncodeError ("could not run sample-rate converter");
394                 }
395
396                 resampled->set_frames (resampled_frames);
397                 
398                 /* And point our variables at the resampled audio */
399                 data = resampled;
400         }
401 #endif
402
403         if (_film->audio_channels() == 1) {
404                 /* We need to switch things around so that the mono channel is on
405                    the centre channel of a 5.1 set (with other channels silent).
406                 */
407
408                 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
409                 b->make_silent (libdcp::LEFT);
410                 b->make_silent (libdcp::RIGHT);
411                 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
412                 b->make_silent (libdcp::LFE);
413                 b->make_silent (libdcp::LS);
414                 b->make_silent (libdcp::RS);
415
416                 data = b;
417         }
418
419         write_audio (data);
420         
421         _audio_frame += data->frames ();
422 }
423
424 void
425 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
426 {
427         for (int i = 0; i < audio->channels(); ++i) {
428                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
429         }
430
431         _audio_frames_written += audio->frames ();
432 }
433
434 void
435 Encoder::close_sound_files ()
436 {
437         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
438                 sf_close (*i);
439         }
440
441         _sound_files.clear ();
442 }       
443
444 void
445 Encoder::terminate_worker_threads ()
446 {
447         boost::mutex::scoped_lock lock (_worker_mutex);
448         _terminate_encoder = true;
449         _worker_condition.notify_all ();
450         lock.unlock ();
451
452         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
453                 (*i)->join ();
454                 delete *i;
455         }
456 }
457
458 void
459 Encoder::terminate_writer_thread ()
460 {
461         if (!_writer_thread) {
462                 return;
463         }
464         
465         boost::mutex::scoped_lock lock (_writer_mutex);
466         _terminate_writer = true;
467         _writer_condition.notify_all ();
468         lock.unlock ();
469
470         _writer_thread->join ();
471         delete _writer_thread;
472         _writer_thread = 0;
473 }
474
475 void
476 Encoder::encoder_thread (ServerDescription* server)
477 {
478         /* Number of seconds that we currently wait between attempts
479            to connect to the server; not relevant for localhost
480            encodings.
481         */
482         int remote_backoff = 0;
483         
484         while (1) {
485
486                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
487                 boost::mutex::scoped_lock lock (_worker_mutex);
488                 while (_encode_queue.empty () && !_terminate_encoder) {
489                         _worker_condition.wait (lock);
490                 }
491
492                 if (_terminate_encoder) {
493                         return;
494                 }
495
496                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _encode_queue.size());
497                 boost::shared_ptr<DCPVideoFrame> vf = _encode_queue.front ();
498                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
499                 _encode_queue.pop_front ();
500                 
501                 lock.unlock ();
502
503                 shared_ptr<EncodedData> encoded;
504
505                 if (server) {
506                         try {
507                                 encoded = vf->encode_remotely (server);
508
509                                 if (remote_backoff > 0) {
510                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
511                                 }
512                                 
513                                 /* This job succeeded, so remove any backoff */
514                                 remote_backoff = 0;
515                                 
516                         } catch (std::exception& e) {
517                                 if (remote_backoff < 60) {
518                                         /* back off more */
519                                         remote_backoff += 10;
520                                 }
521                                 _film->log()->log (
522                                         String::compose (
523                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
524                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
525                                         );
526                         }
527                                 
528                 } else {
529                         try {
530                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
531                                 encoded = vf->encode_locally ();
532                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
533                         } catch (std::exception& e) {
534                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
535                         }
536                 }
537
538                 if (encoded) {
539                         boost::mutex::scoped_lock lock (_writer_mutex);
540                         _write_queue.push_back (make_pair (encoded, vf->frame ()));
541                         _writer_condition.notify_all ();
542                 } else {
543                         lock.lock ();
544                         _film->log()->log (
545                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
546                                 );
547                         _encode_queue.push_front (vf);
548                         lock.unlock ();
549                 }
550
551                 if (remote_backoff > 0) {
552                         dvdomatic_sleep (remote_backoff);
553                 }
554
555                 lock.lock ();
556                 _worker_condition.notify_all ();
557         }
558 }
559
560 void
561 Encoder::link (string a, string b) const
562 {
563 #ifdef DVDOMATIC_POSIX                  
564         int const r = symlink (a.c_str(), b.c_str());
565         if (r) {
566                 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
567         }
568 #endif
569         
570 #ifdef DVDOMATIC_WINDOWS
571         boost::filesystem::copy_file (a, b);
572 #endif                  
573 }
574
575 void
576 Encoder::writer_thread ()
577 {
578         while (1)
579         {
580                 boost::mutex::scoped_lock lock (_writer_mutex);
581                 while (_write_queue.empty() && !_terminate_writer) {
582                         _writer_condition.wait (lock);
583                 }
584
585                 if (_terminate_writer) {
586                         return;
587                 }
588
589                 pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.front ();
590                 _write_queue.pop_front ();
591
592                 lock.unlock ();
593                 encoded.first->write (_opt, encoded.second);
594                 lock.lock ();
595         }
596 }