Some tidying up. Do encode progress in the writer to improve progress bar movement...
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/filesystem.hpp>
26 #include <boost/lexical_cast.hpp>
27 #include <libdcp/picture_asset.h>
28 #include "encoder.h"
29 #include "util.h"
30 #include "film.h"
31 #include "log.h"
32 #include "exceptions.h"
33 #include "filter.h"
34 #include "config.h"
35 #include "dcp_video_frame.h"
36 #include "server.h"
37 #include "format.h"
38 #include "cross.h"
39 #include "writer.h"
40 #include "player.h"
41 #include "audio_mapping.h"
42
43 #include "i18n.h"
44
45 using std::pair;
46 using std::string;
47 using std::stringstream;
48 using std::vector;
49 using std::list;
50 using std::cout;
51 using std::make_pair;
52 using boost::shared_ptr;
53 using boost::optional;
54
55 int const Encoder::_history_size = 25;
56
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<Film> f, shared_ptr<Job> j)
59         : _film (f)
60         , _job (j)
61         , _video_frames_in (0)
62         , _video_frames_out (0)
63 #ifdef HAVE_SWRESAMPLE    
64         , _swr_context (0)
65 #endif
66         , _have_a_real_frame (false)
67         , _terminate (false)
68 {
69         
70 }
71
72 Encoder::~Encoder ()
73 {
74         terminate_threads ();
75         if (_writer) {
76                 _writer->finish ();
77         }
78 }
79
80 void
81 Encoder::process_begin ()
82 {
83         if (_film->has_audio() && _film->audio_frame_rate() != _film->target_audio_sample_rate()) {
84 #ifdef HAVE_SWRESAMPLE
85
86                 stringstream s;
87                 s << String::compose (N_("Will resample audio from %1 to %2"), _film->audio_frame_rate(), _film->target_audio_sample_rate());
88                 _film->log()->log (s.str ());
89
90                 /* We will be using planar float data when we call the
91                    resampler.  As far as I can see, the audio channel
92                    layout is not necessary for our purposes; it seems
93                    only to be used get the number of channels and
94                    decide if rematrixing is needed.  It won't be, since
95                    input and output layouts are the same.
96                 */
97
98                 _swr_context = swr_alloc_set_opts (
99                         0,
100                         av_get_default_channel_layout (_film->audio_mapping().dcp_channels ()),
101                         AV_SAMPLE_FMT_FLTP,
102                         _film->target_audio_sample_rate(),
103                         av_get_default_channel_layout (_film->audio_mapping().dcp_channels ()),
104                         AV_SAMPLE_FMT_FLTP,
105                         _film->audio_frame_rate(),
106                         0, 0
107                         );
108                 
109                 swr_init (_swr_context);
110 #else
111                 throw EncodeError (_("Cannot resample audio as libswresample is not present"));
112 #endif
113         } else {
114 #ifdef HAVE_SWRESAMPLE
115                 _swr_context = 0;
116 #endif          
117         }
118
119         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
120                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
121         }
122
123         vector<ServerDescription*> servers = Config::instance()->servers ();
124
125         for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
126                 for (int j = 0; j < (*i)->threads (); ++j) {
127                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
128                 }
129         }
130
131         _writer.reset (new Writer (_film, _job));
132 }
133
134
135 void
136 Encoder::process_end ()
137 {
138 #if HAVE_SWRESAMPLE     
139         if (_film->has_audio() && _swr_context) {
140
141                 shared_ptr<AudioBuffers> out (new AudioBuffers (_film->audio_mapping().dcp_channels(), 256));
142                         
143                 while (1) {
144                         int const frames = swr_convert (_swr_context, (uint8_t **) out->data(), 256, 0, 0);
145
146                         if (frames < 0) {
147                                 throw EncodeError (_("could not run sample-rate converter"));
148                         }
149
150                         if (frames == 0) {
151                                 break;
152                         }
153
154                         out->set_frames (frames);
155                         _writer->write (out);
156                 }
157
158                 swr_free (&_swr_context);
159         }
160 #endif
161
162         boost::mutex::scoped_lock lock (_mutex);
163
164         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
165
166         /* Keep waking workers until the queue is empty */
167         while (!_queue.empty ()) {
168                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
169                 _condition.notify_all ();
170                 _condition.wait (lock);
171         }
172
173         lock.unlock ();
174         
175         terminate_threads ();
176
177         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
178
179         /* The following sequence of events can occur in the above code:
180              1. a remote worker takes the last image off the queue
181              2. the loop above terminates
182              3. the remote worker fails to encode the image and puts it back on the queue
183              4. the remote worker is then terminated by terminate_threads
184
185              So just mop up anything left in the queue here.
186         */
187
188         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
189                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
190                 try {
191                         _writer->write ((*i)->encode_locally(), (*i)->frame ());
192                         frame_done ();
193                 } catch (std::exception& e) {
194                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
195                 }
196         }
197
198         _writer->finish ();
199         _writer.reset ();
200 }       
201
202 /** @return an estimate of the current number of frames we are encoding per second,
203  *  or 0 if not known.
204  */
205 float
206 Encoder::current_encoding_rate () const
207 {
208         boost::mutex::scoped_lock lock (_history_mutex);
209         if (int (_time_history.size()) < _history_size) {
210                 return 0;
211         }
212
213         struct timeval now;
214         gettimeofday (&now, 0);
215
216         return _history_size / (seconds (now) - seconds (_time_history.back ()));
217 }
218
219 /** @return Number of video frames that have been sent out */
220 int
221 Encoder::video_frames_out () const
222 {
223         boost::mutex::scoped_lock (_history_mutex);
224         return _video_frames_out;
225 }
226
227 /** Should be called when a frame has been encoded successfully.
228  *  @param n Source frame index.
229  */
230 void
231 Encoder::frame_done ()
232 {
233         boost::mutex::scoped_lock lock (_history_mutex);
234         
235         struct timeval tv;
236         gettimeofday (&tv, 0);
237         _time_history.push_front (tv);
238         if (int (_time_history.size()) > _history_size) {
239                 _time_history.pop_back ();
240         }
241 }
242
243 void
244 Encoder::process_video (shared_ptr<const Image> image, bool same, shared_ptr<Subtitle> sub)
245 {
246         FrameRateConversion frc (_film->video_frame_rate(), _film->dcp_frame_rate());
247         
248         if (frc.skip && (_video_frames_in % 2)) {
249                 ++_video_frames_in;
250                 return;
251         }
252
253         boost::mutex::scoped_lock lock (_mutex);
254
255         /* Wait until the queue has gone down a bit */
256         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
257                 TIMING ("decoder sleeps with queue of %1", _queue.size());
258                 _condition.wait (lock);
259                 TIMING ("decoder wakes with queue of %1", _queue.size());
260         }
261
262         if (_terminate) {
263                 return;
264         }
265
266         if (_writer->thrown ()) {
267                 _writer->rethrow ();
268         }
269
270         if (_writer->can_fake_write (_video_frames_out)) {
271                 _writer->fake_write (_video_frames_out);
272                 _have_a_real_frame = false;
273                 frame_done ();
274         } else if (same && _have_a_real_frame) {
275                 /* Use the last frame that we encoded. */
276                 _writer->repeat (_video_frames_out);
277                 frame_done ();
278         } else {
279                 /* Queue this new frame for encoding */
280                 pair<string, string> const s = Filter::ffmpeg_strings (_film->filters());
281                 TIMING ("adding to queue of %1", _queue.size ());
282                 _queue.push_back (shared_ptr<DCPVideoFrame> (
283                                           new DCPVideoFrame (
284                                                   image, sub, _film->format()->dcp_size(), _film->format()->dcp_padding (_film),
285                                                   _film->subtitle_offset(), _film->subtitle_scale(),
286                                                   _film->scaler(), _video_frames_out, _film->dcp_frame_rate(), s.second,
287                                                   _film->colour_lut(), _film->j2k_bandwidth(),
288                                                   _film->log()
289                                                   )
290                                           ));
291                 
292                 _condition.notify_all ();
293                 _have_a_real_frame = true;
294         }
295
296         ++_video_frames_in;
297         ++_video_frames_out;
298
299         if (frc.repeat) {
300                 _writer->repeat (_video_frames_out);
301                 ++_video_frames_out;
302                 frame_done ();
303         }
304 }
305
306 void
307 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
308 {
309 #if HAVE_SWRESAMPLE
310         /* Maybe sample-rate convert */
311         if (_swr_context) {
312
313                 /* Compute the resampled frames count and add 32 for luck */
314                 int const max_resampled_frames = ceil ((int64_t) data->frames() * _film->target_audio_sample_rate() / _film->audio_frame_rate()) + 32;
315
316                 shared_ptr<AudioBuffers> resampled (new AudioBuffers (_film->audio_mapping().dcp_channels(), max_resampled_frames));
317
318                 /* Resample audio */
319                 int const resampled_frames = swr_convert (
320                         _swr_context, (uint8_t **) resampled->data(), max_resampled_frames, (uint8_t const **) data->data(), data->frames()
321                         );
322                 
323                 if (resampled_frames < 0) {
324                         throw EncodeError (_("could not run sample-rate converter"));
325                 }
326
327                 resampled->set_frames (resampled_frames);
328                 
329                 /* And point our variables at the resampled audio */
330                 data = resampled;
331         }
332 #endif
333
334         _writer->write (data);
335 }
336
337 void
338 Encoder::terminate_threads ()
339 {
340         boost::mutex::scoped_lock lock (_mutex);
341         _terminate = true;
342         _condition.notify_all ();
343         lock.unlock ();
344
345         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
346                 if ((*i)->joinable ()) {
347                         (*i)->join ();
348                 }
349                 delete *i;
350         }
351 }
352
353 void
354 Encoder::encoder_thread (ServerDescription* server)
355 {
356         /* Number of seconds that we currently wait between attempts
357            to connect to the server; not relevant for localhost
358            encodings.
359         */
360         int remote_backoff = 0;
361         
362         while (1) {
363
364                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
365                 boost::mutex::scoped_lock lock (_mutex);
366                 while (_queue.empty () && !_terminate) {
367                         _condition.wait (lock);
368                 }
369
370                 if (_terminate) {
371                         return;
372                 }
373
374                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
375                 shared_ptr<DCPVideoFrame> vf = _queue.front ();
376                 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
377                 _queue.pop_front ();
378                 
379                 lock.unlock ();
380
381                 shared_ptr<EncodedData> encoded;
382
383                 if (server) {
384                         try {
385                                 encoded = vf->encode_remotely (server);
386
387                                 if (remote_backoff > 0) {
388                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
389                                 }
390                                 
391                                 /* This job succeeded, so remove any backoff */
392                                 remote_backoff = 0;
393                                 
394                         } catch (std::exception& e) {
395                                 if (remote_backoff < 60) {
396                                         /* back off more */
397                                         remote_backoff += 10;
398                                 }
399                                 _film->log()->log (
400                                         String::compose (
401                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
402                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
403                                         );
404                         }
405                                 
406                 } else {
407                         try {
408                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
409                                 encoded = vf->encode_locally ();
410                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
411                         } catch (std::exception& e) {
412                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
413                         }
414                 }
415
416                 if (encoded) {
417                         _writer->write (encoded, vf->frame ());
418                         frame_done ();
419                 } else {
420                         lock.lock ();
421                         _film->log()->log (
422                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
423                                 );
424                         _queue.push_front (vf);
425                         lock.unlock ();
426                 }
427
428                 if (remote_backoff > 0) {
429                         dcpomatic_sleep (remote_backoff);
430                 }
431
432                 lock.lock ();
433                 _condition.notify_all ();
434         }
435 }