Don't overlap simultaneous video content in the timeline. Fix keep-aligned for separ...
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/lambda/lambda.hpp>
26 #include <libcxml/cxml.h>
27 #include "encoder.h"
28 #include "util.h"
29 #include "film.h"
30 #include "log.h"
31 #include "config.h"
32 #include "dcp_video_frame.h"
33 #include "server.h"
34 #include "cross.h"
35 #include "writer.h"
36 #include "server_finder.h"
37 #include "player.h"
38
39 #include "i18n.h"
40
41 using std::pair;
42 using std::string;
43 using std::stringstream;
44 using std::vector;
45 using std::list;
46 using std::cout;
47 using std::min;
48 using std::make_pair;
49 using boost::shared_ptr;
50 using boost::weak_ptr;
51 using boost::optional;
52 using boost::scoped_array;
53
54 int const Encoder::_history_size = 25;
55
56 /** @param f Film that we are encoding */
57 Encoder::Encoder (shared_ptr<const Film> f, weak_ptr<Job> j)
58         : _film (f)
59         , _job (j)
60         , _video_frames_out (0)
61         , _terminate (false)
62 {
63         _have_a_real_frame[EYES_BOTH] = false;
64         _have_a_real_frame[EYES_LEFT] = false;
65         _have_a_real_frame[EYES_RIGHT] = false;
66 }
67
68 Encoder::~Encoder ()
69 {
70         terminate_threads ();
71 }
72
73 /** Add a worker thread for a each thread on a remote server.  Caller must hold
74  *  a lock on _mutex, or know that one is not currently required to
75  *  safely modify _threads.
76  */
77 void
78 Encoder::add_worker_threads (ServerDescription d)
79 {
80         _film->log()->log (String::compose (N_("Adding %1 worker threads for remote %2"), d.host_name ()));
81         for (int i = 0; i < d.threads(); ++i) {
82                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
83         }
84 }
85
86 void
87 Encoder::process_begin ()
88 {
89         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
90                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
91         }
92
93         _writer.reset (new Writer (_film, _job));
94         ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
95 }
96
97 void
98 Encoder::process_end ()
99 {
100         boost::mutex::scoped_lock lock (_mutex);
101
102         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
103
104         /* Keep waking workers until the queue is empty */
105         while (!_queue.empty ()) {
106                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
107                 _condition.notify_all ();
108                 _condition.wait (lock);
109         }
110
111         lock.unlock ();
112         
113         terminate_threads ();
114
115         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
116
117         /* The following sequence of events can occur in the above code:
118              1. a remote worker takes the last image off the queue
119              2. the loop above terminates
120              3. the remote worker fails to encode the image and puts it back on the queue
121              4. the remote worker is then terminated by terminate_threads
122
123              So just mop up anything left in the queue here.
124         */
125
126         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
127                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
128                 try {
129                         _writer->write ((*i)->encode_locally(), (*i)->frame (), (*i)->eyes ());
130                         frame_done ();
131                 } catch (std::exception& e) {
132                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
133                 }
134         }
135                 
136         _writer->finish ();
137         _writer.reset ();
138 }       
139
140 /** @return an estimate of the current number of frames we are encoding per second,
141  *  or 0 if not known.
142  */
143 float
144 Encoder::current_encoding_rate () const
145 {
146         boost::mutex::scoped_lock lock (_state_mutex);
147         if (int (_time_history.size()) < _history_size) {
148                 return 0;
149         }
150
151         struct timeval now;
152         gettimeofday (&now, 0);
153
154         return _history_size / (seconds (now) - seconds (_time_history.back ()));
155 }
156
157 /** @return Number of video frames that have been sent out */
158 int
159 Encoder::video_frames_out () const
160 {
161         boost::mutex::scoped_lock (_state_mutex);
162         return _video_frames_out;
163 }
164
165 /** Should be called when a frame has been encoded successfully.
166  *  @param n Source frame index.
167  */
168 void
169 Encoder::frame_done ()
170 {
171         boost::mutex::scoped_lock lock (_state_mutex);
172         
173         struct timeval tv;
174         gettimeofday (&tv, 0);
175         _time_history.push_front (tv);
176         if (int (_time_history.size()) > _history_size) {
177                 _time_history.pop_back ();
178         }
179 }
180
181 void
182 Encoder::process_video (shared_ptr<PlayerImage> image, Eyes eyes, ColourConversion conversion, bool same)
183 {
184         _waker.nudge ();
185         
186         boost::mutex::scoped_lock lock (_mutex);
187
188         /* XXX: discard 3D here if required */
189
190         /* Wait until the queue has gone down a bit */
191         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
192                 TIMING ("decoder sleeps with queue of %1", _queue.size());
193                 _condition.wait (lock);
194                 TIMING ("decoder wakes with queue of %1", _queue.size());
195         }
196
197         if (_terminate) {
198                 return;
199         }
200
201         _writer->rethrow ();
202         /* Re-throw any exception raised by one of our threads.  If more
203            than one has thrown an exception, only one will be rethrown, I think;
204            but then, if that happens something has gone badly wrong.
205         */
206         rethrow ();
207
208         if (_writer->can_fake_write (_video_frames_out)) {
209                 _writer->fake_write (_video_frames_out, eyes);
210                 _have_a_real_frame[eyes] = false;
211                 frame_done ();
212         } else if (same && _have_a_real_frame[eyes]) {
213                 /* Use the last frame that we encoded. */
214                 _writer->repeat (_video_frames_out, eyes);
215                 frame_done ();
216         } else {
217                 /* Queue this new frame for encoding */
218                 TIMING ("adding to queue of %1", _queue.size ());
219                 _queue.push_back (shared_ptr<DCPVideoFrame> (
220                                           new DCPVideoFrame (
221                                                   image->image(), _video_frames_out, eyes, conversion, _film->video_frame_rate(),
222                                                   _film->j2k_bandwidth(), _film->resolution(), _film->log()
223                                                   )
224                                           ));
225                 
226                 _condition.notify_all ();
227                 _have_a_real_frame[eyes] = true;
228         }
229
230         if (eyes != EYES_LEFT) {
231                 ++_video_frames_out;
232         }
233 }
234
235 void
236 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
237 {
238         _writer->write (data);
239 }
240
241 void
242 Encoder::terminate_threads ()
243 {
244         {
245                 boost::mutex::scoped_lock lock (_mutex);
246                 _terminate = true;
247                 _condition.notify_all ();
248         }
249
250         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
251                 if ((*i)->joinable ()) {
252                         (*i)->join ();
253                 }
254                 delete *i;
255         }
256
257         _threads.clear ();
258 }
259
260 void
261 Encoder::encoder_thread (optional<ServerDescription> server)
262 try
263 {
264         /* Number of seconds that we currently wait between attempts
265            to connect to the server; not relevant for localhost
266            encodings.
267         */
268         int remote_backoff = 0;
269         
270         while (1) {
271
272                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
273                 boost::mutex::scoped_lock lock (_mutex);
274                 while (_queue.empty () && !_terminate) {
275                         _condition.wait (lock);
276                 }
277
278                 if (_terminate) {
279                         return;
280                 }
281
282                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
283                 shared_ptr<DCPVideoFrame> vf = _queue.front ();
284                 TIMING ("encoder thread %1 pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->frame(), vf->eyes ());
285                 _queue.pop_front ();
286                 
287                 lock.unlock ();
288
289                 shared_ptr<EncodedData> encoded;
290
291                 if (server) {
292                         try {
293                                 encoded = vf->encode_remotely (server.get ());
294
295                                 if (remote_backoff > 0) {
296                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
297                                 }
298                                 
299                                 /* This job succeeded, so remove any backoff */
300                                 remote_backoff = 0;
301                                 
302                         } catch (std::exception& e) {
303                                 if (remote_backoff < 60) {
304                                         /* back off more */
305                                         remote_backoff += 10;
306                                 }
307                                 _film->log()->log (
308                                         String::compose (
309                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
310                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
311                                         );
312                         }
313                                 
314                 } else {
315                         try {
316                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
317                                 encoded = vf->encode_locally ();
318                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
319                         } catch (std::exception& e) {
320                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
321                         }
322                 }
323
324                 if (encoded) {
325                         _writer->write (encoded, vf->frame (), vf->eyes ());
326                         frame_done ();
327                 } else {
328                         lock.lock ();
329                         _film->log()->log (
330                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
331                                 );
332                         _queue.push_front (vf);
333                         lock.unlock ();
334                 }
335
336                 if (remote_backoff > 0) {
337                         dcpomatic_sleep (remote_backoff);
338                 }
339
340                 lock.lock ();
341                 _condition.notify_all ();
342         }
343 }
344 catch (...)
345 {
346         store_current ();
347 }
348
349 void
350 Encoder::server_found (ServerDescription s)
351 {
352         add_worker_threads (s);
353 }