Some fixes and logs.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
28 #include "encoder.h"
29 #include "util.h"
30 #include "options.h"
31 #include "film.h"
32 #include "log.h"
33 #include "exceptions.h"
34 #include "filter.h"
35 #include "config.h"
36 #include "dcp_video_frame.h"
37 #include "server.h"
38 #include "format.h"
39 #include "cross.h"
40
41 using std::pair;
42 using std::string;
43 using std::stringstream;
44 using std::vector;
45 using std::list;
46 using std::cout;
47 using std::make_pair;
48 using namespace boost;
49
50 int const Encoder::_history_size = 25;
51 unsigned int const Encoder::_maximum_frames_in_memory = 8;
52
53 /** @param f Film that we are encoding.
54  *  @param o Options.
55  */
56 Encoder::Encoder (shared_ptr<const Film> f)
57         : _film (f)
58         , _just_skipped (false)
59         , _video_frames_in (0)
60         , _audio_frames_in (0)
61         , _video_frames_out (0)
62         , _audio_frames_out (0)
63 #ifdef HAVE_SWRESAMPLE    
64         , _swr_context (0)
65 #endif
66         , _have_a_real_frame (false)
67         , _terminate_encoder (false)
68         , _writer_thread (0)
69         , _finish_writer (false)
70         , _last_written_frame (-1)
71 {
72         if (_film->audio_stream()) {
73                 /* Create sound output files with .tmp suffixes; we will rename
74                    them if and when we complete.
75                 */
76                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
77                         SF_INFO sf_info;
78                         sf_info.samplerate = dcp_audio_sample_rate (_film->audio_stream()->sample_rate());
79                         /* We write mono files */
80                         sf_info.channels = 1;
81                         sf_info.format = SF_FORMAT_WAV | SF_FORMAT_PCM_24;
82                         SNDFILE* f = sf_open (_film->multichannel_audio_out_path (i, true).c_str (), SFM_WRITE, &sf_info);
83                         if (f == 0) {
84                                 throw CreateFileError (_film->multichannel_audio_out_path (i, true));
85                         }
86                         _sound_files.push_back (f);
87                 }
88         }
89 }
90
91 Encoder::~Encoder ()
92 {
93         close_sound_files ();
94         terminate_worker_threads ();
95         finish_writer_thread ();
96 }
97
98 void
99 Encoder::process_begin ()
100 {
101         if (_film->audio_stream() && _film->audio_stream()->sample_rate() != _film->target_audio_sample_rate()) {
102 #ifdef HAVE_SWRESAMPLE
103
104                 stringstream s;
105                 s << "Will resample audio from " << _film->audio_stream()->sample_rate() << " to " << _film->target_audio_sample_rate();
106                 _film->log()->log (s.str ());
107
108                 /* We will be using planar float data when we call the resampler */
109                 _swr_context = swr_alloc_set_opts (
110                         0,
111                         _film->audio_stream()->channel_layout(),
112                         AV_SAMPLE_FMT_FLTP,
113                         _film->target_audio_sample_rate(),
114                         _film->audio_stream()->channel_layout(),
115                         AV_SAMPLE_FMT_FLTP,
116                         _film->audio_stream()->sample_rate(),
117                         0, 0
118                         );
119                 
120                 swr_init (_swr_context);
121 #else
122                 throw EncodeError ("Cannot resample audio as libswresample is not present");
123 #endif
124         } else {
125 #ifdef HAVE_SWRESAMPLE
126                 _swr_context = 0;
127 #endif          
128         }
129
130         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
131                 _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
132         }
133
134         vector<ServerDescription*> servers = Config::instance()->servers ();
135
136         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
137                 for (int j = 0; j < (*i)->threads (); ++j) {
138                         _worker_threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
139                 }
140         }
141
142         /* XXX! */
143         _picture_asset.reset (
144                 new libdcp::MonoPictureAsset (
145                         _film->dir (_film->dcp_name()),
146                         String::compose ("video_%1.mxf", 0),
147                         DCPFrameRate (_film->frames_per_second()).frames_per_second,
148                         _film->format()->dcp_size()
149                         )
150                 );
151         
152         _picture_asset_writer = _picture_asset->start_write ();
153         _writer_thread = new boost::thread (boost::bind (&Encoder::writer_thread, this));
154 }
155
156
157 void
158 Encoder::process_end ()
159 {
160 #if HAVE_SWRESAMPLE     
161         if (_film->audio_stream() && _film->audio_stream()->channels() && _swr_context) {
162
163                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_stream()->channels(), 256));
164                         
165                 while (1) {
166                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
167
168                         if (frames < 0) {
169                                 throw EncodeError ("could not run sample-rate converter");
170                         }
171
172                         if (frames == 0) {
173                                 break;
174                         }
175
176                         out->set_frames (frames);
177                         write_audio (out);
178                 }
179
180                 swr_free (&_swr_context);
181         }
182 #endif
183
184         if (_film->audio_stream()) {
185                 close_sound_files ();
186                 
187                 /* Rename .wav.tmp files to .wav */
188                 for (int i = 0; i < dcp_audio_channels (_film->audio_channels()); ++i) {
189                         if (boost::filesystem::exists (_film->multichannel_audio_out_path (i, false))) {
190                                 boost::filesystem::remove (_film->multichannel_audio_out_path (i, false));
191                         }
192                         boost::filesystem::rename (_film->multichannel_audio_out_path (i, true), _film->multichannel_audio_out_path (i, false));
193                 }
194         }
195
196         boost::mutex::scoped_lock lock (_worker_mutex);
197
198         _film->log()->log ("Clearing queue of " + lexical_cast<string> (_encode_queue.size ()));
199
200         /* Keep waking workers until the queue is empty */
201         while (!_encode_queue.empty ()) {
202                 _film->log()->log ("Waking with " + lexical_cast<string> (_encode_queue.size ()), Log::VERBOSE);
203                 _worker_condition.notify_all ();
204                 _worker_condition.wait (lock);
205         }
206
207         lock.unlock ();
208         
209         terminate_worker_threads ();
210
211         _film->log()->log ("Mopping up " + lexical_cast<string> (_encode_queue.size()));
212
213         /* The following sequence of events can occur in the above code:
214              1. a remote worker takes the last image off the queue
215              2. the loop above terminates
216              3. the remote worker fails to encode the image and puts it back on the queue
217              4. the remote worker is then terminated by terminate_worker_threads
218
219              So just mop up anything left in the queue here.
220         */
221
222         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _encode_queue.begin(); i != _encode_queue.end(); ++i) {
223                 _film->log()->log (String::compose ("Encode left-over frame %1", (*i)->frame ()));
224                 try {
225                         shared_ptr<EncodedData> e = (*i)->encode_locally ();
226                         {
227                                 boost::mutex::scoped_lock lock2 (_writer_mutex);
228                                 _write_queue.push_back (make_pair (e, (*i)->frame ()));
229                                 _writer_condition.notify_all ();
230                         }
231                         frame_done ();
232                 } catch (std::exception& e) {
233                         _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
234                 }
235         }
236
237         finish_writer_thread ();
238         _picture_asset_writer->finalize ();
239 }       
240
241 /** @return an estimate of the current number of frames we are encoding per second,
242  *  or 0 if not known.
243  */
244 float
245 Encoder::current_frames_per_second () const
246 {
247         boost::mutex::scoped_lock lock (_history_mutex);
248         if (int (_time_history.size()) < _history_size) {
249                 return 0;
250         }
251
252         struct timeval now;
253         gettimeofday (&now, 0);
254
255         return _history_size / (seconds (now) - seconds (_time_history.back ()));
256 }
257
258 /** @return true if the last frame to be processed was skipped as it already existed */
259 bool
260 Encoder::skipping () const
261 {
262         boost::mutex::scoped_lock (_history_mutex);
263         return _just_skipped;
264 }
265
266 /** @return Number of video frames that have been sent out */
267 int
268 Encoder::video_frames_out () const
269 {
270         boost::mutex::scoped_lock (_history_mutex);
271         return _video_frames_out;
272 }
273
274 /** Should be called when a frame has been encoded successfully.
275  *  @param n Source frame index.
276  */
277 void
278 Encoder::frame_done ()
279 {
280         boost::mutex::scoped_lock lock (_history_mutex);
281         _just_skipped = false;
282         
283         struct timeval tv;
284         gettimeofday (&tv, 0);
285         _time_history.push_front (tv);
286         if (int (_time_history.size()) > _history_size) {
287                 _time_history.pop_back ();
288         }
289 }
290
291 /** Called by a subclass when it has just skipped the processing
292     of a frame because it has already been done.
293 */
294 void
295 Encoder::frame_skipped ()
296 {
297         boost::mutex::scoped_lock lock (_history_mutex);
298         _just_skipped = true;
299 }
300
301 void
302 Encoder::process_video (shared_ptr<Image> image, bool same, boost::shared_ptr<Subtitle> sub)
303 {
304         DCPFrameRate dfr (_film->frames_per_second ());
305         
306         if (dfr.skip && (_video_frames_in % 2)) {
307                 ++_video_frames_in;
308                 return;
309         }
310
311         boost::mutex::scoped_lock lock (_worker_mutex);
312
313         /* Wait until the queue has gone down a bit */
314         while (_encode_queue.size() >= _worker_threads.size() * 2 && !_terminate_encoder) {
315                 TIMING ("decoder sleeps with queue of %1", _encode_queue.size());
316                 _worker_condition.wait (lock);
317                 TIMING ("decoder wakes with queue of %1", _encode_queue.size());
318         }
319
320         if (_terminate_encoder) {
321                 return;
322         }
323
324         /* Only do the processing if we don't already have a file for this frame */
325         if (boost::filesystem::exists (_film->frame_out_path (_video_frames_out, false))) {
326                 frame_skipped ();
327                 return;
328         }
329
330         if (same && _have_a_real_frame) {
331                 /* Use the last frame that we encoded.  We do this by putting a null encoded
332                    frame straight onto the writer's queue.  It will know to duplicate the previous frame
333                    in this case.
334                 */
335                 boost::mutex::scoped_lock lock2 (_writer_mutex);
336                 _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
337         } else {
338                 /* Queue this new frame for encoding */
339                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
340                 TIMING ("adding to queue of %1", _encode_queue.size ());
341                 _encode_queue.push_back (boost::shared_ptr<DCPVideoFrame> (
342                                           new DCPVideoFrame (
343                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
344                                                   _film->subtitle_offset(), _film->subtitle_scale(),
345                                                   _film->scaler(), _video_frames_out, _film->frames_per_second(), s.second,
346                                                   _film->colour_lut(), _film->j2k_bandwidth(),
347                                                   _film->log()
348                                                   )
349                                           ));
350                 
351                 _worker_condition.notify_all ();
352                 _have_a_real_frame = true;
353         }
354
355         ++_video_frames_in;
356         ++_video_frames_out;
357
358         if (dfr.repeat) {
359                 boost::mutex::scoped_lock lock2 (_writer_mutex);
360                 _write_queue.push_back (make_pair (shared_ptr<EncodedData> (), _video_frames_out));
361                 ++_video_frames_out;
362         }
363 }
364
365 void
366 Encoder::process_audio (shared_ptr<AudioBuffers> data)
367 {
368 #if HAVE_SWRESAMPLE
369         /* Maybe sample-rate convert */
370         if (_swr_context) {
371
372                 /* Compute the resampled frames count and add 32 for luck */
373                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_stream()->sample_rate()) + 32;
374
375                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_stream()->channels(), max_resampled_frames));
376
377                 /* Resample audio */
378                 int const resampled_frames = swr_convert (
379                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
380                         );
381                 
382                 if (resampled_frames < 0) {
383                         throw EncodeError ("could not run sample-rate converter");
384                 }
385
386                 resampled->set_frames (resampled_frames);
387                 
388                 /* And point our variables at the resampled audio */
389                 data = resampled;
390         }
391 #endif
392
393         if (_film->audio_channels() == 1) {
394                 /* We need to switch things around so that the mono channel is on
395                    the centre channel of a 5.1 set (with other channels silent).
396                 */
397
398                 shared_ptr<AudioBuffers> b (new AudioBuffers (6, data->frames ()));
399                 b->make_silent (libdcp::LEFT);
400                 b->make_silent (libdcp::RIGHT);
401                 memcpy (b->data()[libdcp::CENTRE], data->data()[0], data->frames() * sizeof(float));
402                 b->make_silent (libdcp::LFE);
403                 b->make_silent (libdcp::LS);
404                 b->make_silent (libdcp::RS);
405
406                 data = b;
407         }
408
409         write_audio (data);
410         
411         _audio_frames_in += data->frames ();
412 }
413
414 void
415 Encoder::write_audio (shared_ptr<const AudioBuffers> audio)
416 {
417         for (int i = 0; i < audio->channels(); ++i) {
418                 sf_write_float (_sound_files[i], audio->data(i), audio->frames());
419         }
420
421         _audio_frames_out += audio->frames ();
422 }
423
424 void
425 Encoder::close_sound_files ()
426 {
427         for (vector<SNDFILE*>::iterator i = _sound_files.begin(); i != _sound_files.end(); ++i) {
428                 sf_close (*i);
429         }
430
431         _sound_files.clear ();
432 }       
433
434 void
435 Encoder::terminate_worker_threads ()
436 {
437         boost::mutex::scoped_lock lock (_worker_mutex);
438         _terminate_encoder = true;
439         _worker_condition.notify_all ();
440         lock.unlock ();
441
442         for (list<boost::thread *>::iterator i = _worker_threads.begin(); i != _worker_threads.end(); ++i) {
443                 (*i)->join ();
444                 delete *i;
445         }
446 }
447
448 void
449 Encoder::finish_writer_thread ()
450 {
451         if (!_writer_thread) {
452                 return;
453         }
454         
455         boost::mutex::scoped_lock lock (_writer_mutex);
456         _finish_writer = true;
457         _writer_condition.notify_all ();
458         lock.unlock ();
459
460         _writer_thread->join ();
461         delete _writer_thread;
462         _writer_thread = 0;
463 }
464
465 void
466 Encoder::encoder_thread (ServerDescription* server)
467 {
468         /* Number of seconds that we currently wait between attempts
469            to connect to the server; not relevant for localhost
470            encodings.
471         */
472         int remote_backoff = 0;
473         
474         while (1) {
475
476                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
477                 boost::mutex::scoped_lock lock (_worker_mutex);
478                 while (_encode_queue.empty () && !_terminate_encoder) {
479                         _worker_condition.wait (lock);
480                 }
481
482                 if (_terminate_encoder) {
483                         return;
484                 }
485
486                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _encode_queue.size());
487                 boost::shared_ptr<DCPVideoFrame> vf = _encode_queue.front ();
488                 _film->log()->log (String::compose ("Encoder thread %1 pops frame %2 from queue", boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
489                 _encode_queue.pop_front ();
490                 
491                 lock.unlock ();
492
493                 shared_ptr<EncodedData> encoded;
494
495                 if (server) {
496                         try {
497                                 encoded = vf->encode_remotely (server);
498
499                                 if (remote_backoff > 0) {
500                                         _film->log()->log (String::compose ("%1 was lost, but now she is found; removing backoff", server->host_name ()));
501                                 }
502                                 
503                                 /* This job succeeded, so remove any backoff */
504                                 remote_backoff = 0;
505                                 
506                         } catch (std::exception& e) {
507                                 if (remote_backoff < 60) {
508                                         /* back off more */
509                                         remote_backoff += 10;
510                                 }
511                                 _film->log()->log (
512                                         String::compose (
513                                                 "Remote encode of %1 on %2 failed (%3); thread sleeping for %4s",
514                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
515                                         );
516                         }
517                                 
518                 } else {
519                         try {
520                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
521                                 encoded = vf->encode_locally ();
522                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
523                         } catch (std::exception& e) {
524                                 _film->log()->log (String::compose ("Local encode failed (%1)", e.what ()));
525                         }
526                 }
527
528                 if (encoded) {
529                         boost::mutex::scoped_lock lock2 (_writer_mutex);
530                         _write_queue.push_back (make_pair (encoded, vf->frame ()));
531                         _writer_condition.notify_all ();
532                 } else {
533                         lock.lock ();
534                         _film->log()->log (
535                                 String::compose ("Encoder thread %1 pushes frame %2 back onto queue after failure", boost::this_thread::get_id(), vf->frame())
536                                 );
537                         _encode_queue.push_front (vf);
538                         lock.unlock ();
539                 }
540
541                 if (remote_backoff > 0) {
542                         dvdomatic_sleep (remote_backoff);
543                 }
544
545                 lock.lock ();
546                 _worker_condition.notify_all ();
547         }
548 }
549
550 void
551 Encoder::link (string a, string b) const
552 {
553 #ifdef DVDOMATIC_POSIX                  
554         int const r = symlink (a.c_str(), b.c_str());
555         if (r) {
556                 throw EncodeError (String::compose ("could not create symlink from %1 to %2", a, b));
557         }
558 #endif
559         
560 #ifdef DVDOMATIC_WINDOWS
561         boost::filesystem::copy_file (a, b);
562 #endif                  
563 }
564
565 struct WriteQueueSorter
566 {
567         bool operator() (pair<shared_ptr<EncodedData>, int> const & a, pair<shared_ptr<EncodedData>, int> const & b) {
568                 return a.second < b.second;
569         }
570 };
571
572 void
573 Encoder::writer_thread ()
574 {
575         while (1)
576         {
577                 boost::mutex::scoped_lock lock (_writer_mutex);
578
579                 while (1) {
580                         if (_finish_writer ||
581                             _write_queue.size() > _maximum_frames_in_memory ||
582                             (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1))) {
583                                     
584                                     break;
585                             }
586
587                             TIMING ("writer sleeps with a queue of %1; %2 pending", _write_queue.size(), _pending.size());
588                             _writer_condition.wait (lock);
589                             TIMING ("writer wakes with a queue of %1", _write_queue.size());
590
591                             _write_queue.sort (WriteQueueSorter ());
592                 }
593
594                 if (_finish_writer && _write_queue.empty() && _pending.empty()) {
595                         return;
596                 }
597
598                 /* Write any frames that we can write; i.e. those that are in sequence */
599                 while (!_write_queue.empty() && _write_queue.front().second == (_last_written_frame + 1)) {
600                         pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.front ();
601                         _write_queue.pop_front ();
602
603                         lock.unlock ();
604                         _film->log()->log (String::compose ("Writer writes %1 to MXF", encoded.second));
605                         if (encoded.first) {
606                                 _picture_asset_writer->write (encoded.first->data(), encoded.first->size());
607                                 _last_written = encoded.first;
608                         } else {
609                                 _picture_asset_writer->write (_last_written->data(), _last_written->size());
610                         }
611                         lock.lock ();
612
613                         ++_last_written_frame;
614                 }
615
616                 while (_write_queue.size() > _maximum_frames_in_memory) {
617                         /* Too many frames in memory which can't yet be written to the stream.
618                            Put some to disk.
619                         */
620
621                         pair<boost::shared_ptr<EncodedData>, int> encoded = _write_queue.back ();
622                         _write_queue.pop_back ();
623                         if (!encoded.first) {
624                                 /* This is a `repeat-last' frame, so no need to write it to disk */
625                                 continue;
626                         }
627
628                         lock.unlock ();
629                         _film->log()->log (String::compose ("Writer full (awaiting %1); pushes %2 to disk", _last_written_frame + 1, encoded.second));
630                         encoded.first->write (_film, encoded.second);
631                         lock.lock ();
632
633                         _pending.push_back (encoded.second);
634                 }
635
636                 while (_write_queue.size() < _maximum_frames_in_memory && !_pending.empty()) {
637                         /* We have some space in memory.  Fetch some frames back off disk. */
638
639                         _pending.sort ();
640                         int const fetch = _pending.front ();
641
642                         lock.unlock ();
643                         _film->log()->log (String::compose ("Writer pulls %1 back from disk", fetch));
644                         shared_ptr<EncodedData> encoded;
645                         if (boost::filesystem::exists (_film->frame_out_path (fetch, false))) {
646                                 /* It's an actual frame (not a repeat-last); load it in */
647                                 encoded.reset (new EncodedData (_film->frame_out_path (fetch, false)));
648                         }
649                         lock.lock ();
650
651                         _write_queue.push_back (make_pair (encoded, fetch));
652                         _pending.remove (fetch);
653                 }
654         }
655 }