Merge 1.0
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/lambda/lambda.hpp>
26 #include <libcxml/cxml.h>
27 #include "encoder.h"
28 #include "util.h"
29 #include "film.h"
30 #include "log.h"
31 #include "config.h"
32 #include "dcp_video_frame.h"
33 #include "server.h"
34 #include "cross.h"
35 #include "writer.h"
36 #include "server_finder.h"
37 #include "player.h"
38
39 #include "i18n.h"
40
41 using std::pair;
42 using std::string;
43 using std::stringstream;
44 using std::vector;
45 using std::list;
46 using std::cout;
47 using std::min;
48 using std::make_pair;
49 using boost::shared_ptr;
50 using boost::weak_ptr;
51 using boost::optional;
52 using boost::scoped_array;
53
54 int const Encoder::_history_size = 25;
55
56 /** @param f Film that we are encoding */
57 Encoder::Encoder (shared_ptr<const Film> f, weak_ptr<Job> j)
58         : _film (f)
59         , _job (j)
60         , _video_frames_out (0)
61         , _terminate (false)
62 {
63         _have_a_real_frame[EYES_BOTH] = false;
64         _have_a_real_frame[EYES_LEFT] = false;
65         _have_a_real_frame[EYES_RIGHT] = false;
66 }
67
68 Encoder::~Encoder ()
69 {
70         terminate_threads ();
71         if (_writer) {
72                 _writer->finish ();
73         }
74 }
75
76 /** Add a worker thread for a each thread on a remote server.  Caller must hold
77  *  a lock on _mutex, or know that one is not currently required to
78  *  safely modify _threads.
79  */
80 void
81 Encoder::add_worker_threads (ServerDescription d)
82 {
83         for (int i = 0; i < d.threads(); ++i) {
84                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
85         }
86 }
87
88 void
89 Encoder::process_begin ()
90 {
91         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
92                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
93         }
94
95         _writer.reset (new Writer (_film, _job));
96         ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
97 }
98
99 void
100 Encoder::process_end ()
101 {
102         boost::mutex::scoped_lock lock (_mutex);
103
104         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
105
106         /* Keep waking workers until the queue is empty */
107         while (!_queue.empty ()) {
108                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
109                 _condition.notify_all ();
110                 _condition.wait (lock);
111         }
112
113         lock.unlock ();
114         
115         terminate_threads ();
116
117         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
118
119         /* The following sequence of events can occur in the above code:
120              1. a remote worker takes the last image off the queue
121              2. the loop above terminates
122              3. the remote worker fails to encode the image and puts it back on the queue
123              4. the remote worker is then terminated by terminate_threads
124
125              So just mop up anything left in the queue here.
126         */
127
128         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
129                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
130                 try {
131                         _writer->write ((*i)->encode_locally(), (*i)->frame (), (*i)->eyes ());
132                         frame_done ();
133                 } catch (std::exception& e) {
134                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
135                 }
136         }
137                 
138         _writer->finish ();
139         _writer.reset ();
140 }       
141
142 /** @return an estimate of the current number of frames we are encoding per second,
143  *  or 0 if not known.
144  */
145 float
146 Encoder::current_encoding_rate () const
147 {
148         boost::mutex::scoped_lock lock (_state_mutex);
149         if (int (_time_history.size()) < _history_size) {
150                 return 0;
151         }
152
153         struct timeval now;
154         gettimeofday (&now, 0);
155
156         return _history_size / (seconds (now) - seconds (_time_history.back ()));
157 }
158
159 /** @return Number of video frames that have been sent out */
160 int
161 Encoder::video_frames_out () const
162 {
163         boost::mutex::scoped_lock (_state_mutex);
164         return _video_frames_out;
165 }
166
167 /** Should be called when a frame has been encoded successfully.
168  *  @param n Source frame index.
169  */
170 void
171 Encoder::frame_done ()
172 {
173         boost::mutex::scoped_lock lock (_state_mutex);
174         
175         struct timeval tv;
176         gettimeofday (&tv, 0);
177         _time_history.push_front (tv);
178         if (int (_time_history.size()) > _history_size) {
179                 _time_history.pop_back ();
180         }
181 }
182
183 void
184 Encoder::process_video (shared_ptr<PlayerImage> image, Eyes eyes, ColourConversion conversion, bool same)
185 {
186         boost::mutex::scoped_lock lock (_mutex);
187
188         /* XXX: discard 3D here if required */
189
190         /* Wait until the queue has gone down a bit */
191         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
192                 TIMING ("decoder sleeps with queue of %1", _queue.size());
193                 _condition.wait (lock);
194                 TIMING ("decoder wakes with queue of %1", _queue.size());
195         }
196
197         if (_terminate) {
198                 return;
199         }
200
201         _writer->rethrow ();
202         /* Re-throw any exception raised by one of our threads.  If more
203            than one has thrown an exception, only one will be rethrown, I think;
204            but then, if that happens something has gone badly wrong.
205         */
206         rethrow ();
207
208         if (_writer->can_fake_write (_video_frames_out)) {
209                 _writer->fake_write (_video_frames_out, eyes);
210                 _have_a_real_frame[eyes] = false;
211                 frame_done ();
212         } else if (same && _have_a_real_frame[eyes]) {
213                 /* Use the last frame that we encoded. */
214                 _writer->repeat (_video_frames_out, eyes);
215                 frame_done ();
216         } else {
217                 /* Queue this new frame for encoding */
218                 TIMING ("adding to queue of %1", _queue.size ());
219                 _queue.push_back (shared_ptr<DCPVideoFrame> (
220                                           new DCPVideoFrame (
221                                                   image->image(PIX_FMT_RGB24, false), _video_frames_out, eyes, conversion, _film->video_frame_rate(),
222                                                   _film->j2k_bandwidth(), _film->resolution(), _film->log()
223                                                   )
224                                           ));
225                 
226                 _condition.notify_all ();
227                 _have_a_real_frame[eyes] = true;
228         }
229
230         if (eyes != EYES_LEFT) {
231                 ++_video_frames_out;
232         }
233 }
234
235 void
236 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
237 {
238         _writer->write (data);
239 }
240
241 void
242 Encoder::terminate_threads ()
243 {
244         {
245                 boost::mutex::scoped_lock lock (_mutex);
246                 _terminate = true;
247                 _condition.notify_all ();
248         }
249
250         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
251                 if ((*i)->joinable ()) {
252                         (*i)->join ();
253                 }
254                 delete *i;
255         }
256
257         _threads.clear ();
258 }
259
260 void
261 Encoder::encoder_thread (optional<ServerDescription> server)
262 try
263 {
264         /* Number of seconds that we currently wait between attempts
265            to connect to the server; not relevant for localhost
266            encodings.
267         */
268         int remote_backoff = 0;
269         
270         while (1) {
271
272                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
273                 boost::mutex::scoped_lock lock (_mutex);
274                 while (_queue.empty () && !_terminate) {
275                         _condition.wait (lock);
276                 }
277
278                 if (_terminate) {
279                         return;
280                 }
281
282                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
283                 shared_ptr<DCPVideoFrame> vf = _queue.front ();
284                 TIMING ("encoder thread %1 pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->frame(), vf->eyes ());
285                 _queue.pop_front ();
286                 
287                 lock.unlock ();
288
289                 shared_ptr<EncodedData> encoded;
290
291                 if (server) {
292                         try {
293                                 encoded = vf->encode_remotely (server.get ());
294
295                                 if (remote_backoff > 0) {
296                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
297                                 }
298                                 
299                                 /* This job succeeded, so remove any backoff */
300                                 remote_backoff = 0;
301                                 
302                         } catch (std::exception& e) {
303                                 if (remote_backoff < 60) {
304                                         /* back off more */
305                                         remote_backoff += 10;
306                                 }
307                                 _film->log()->log (
308                                         String::compose (
309                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
310                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
311                                         );
312                         }
313                                 
314                 } else {
315                         try {
316                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
317                                 encoded = vf->encode_locally ();
318                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
319                         } catch (std::exception& e) {
320                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
321                         }
322                 }
323
324                 if (encoded) {
325                         _writer->write (encoded, vf->frame (), vf->eyes ());
326                         frame_done ();
327                 } else {
328                         lock.lock ();
329                         _film->log()->log (
330                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
331                                 );
332                         _queue.push_front (vf);
333                         lock.unlock ();
334                 }
335
336                 if (remote_backoff > 0) {
337                         dcpomatic_sleep (remote_backoff);
338                 }
339
340                 lock.lock ();
341                 _condition.notify_all ();
342         }
343 }
344 catch (...)
345 {
346         store_current ();
347 }
348
349 void
350 Encoder::server_found (ServerDescription s)
351 {
352         add_worker_threads (s);
353 }