Make sure exceptions during local encodes are propagated up to the front end (#694).
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include "server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
48
49 using std::list;
50 using std::cout;
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
54
55 int const Encoder::_history_size = 25;
56
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
59         : _film (film)
60         , _job (j)
61         , _video_frames_enqueued (0)
62         , _left_done (false)
63         , _right_done (false)
64         , _terminate_enqueue (false)
65         , _terminate_encoding (false)
66         , _writer (writer)
67 {
68         servers_list_changed ();
69 }
70
71 Encoder::~Encoder ()
72 {
73         terminate_threads ();
74
75         boost::mutex::scoped_lock lm (_queue_mutex);
76         _terminate_enqueue = true;
77         _full_condition.notify_all ();
78         _empty_condition.notify_all ();
79 }
80
81 void
82 Encoder::begin ()
83 {
84         if (!ServerFinder::instance()->disabled ()) {
85                 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
86         }
87 }
88
89 void
90 Encoder::end ()
91 {
92         boost::mutex::scoped_lock lock (_queue_mutex);
93
94         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
95
96         /* Keep waking workers until the queue is empty */
97         while (!_queue.empty ()) {
98                 _empty_condition.notify_all ();
99                 _full_condition.wait (lock);
100         }
101
102         lock.unlock ();
103
104         terminate_threads ();
105
106         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
107
108         /* The following sequence of events can occur in the above code:
109              1. a remote worker takes the last image off the queue
110              2. the loop above terminates
111              3. the remote worker fails to encode the image and puts it back on the queue
112              4. the remote worker is then terminated by terminate_threads
113
114              So just mop up anything left in the queue here.
115         */
116
117         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
118                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
119                 try {
120                         _writer->write (
121                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
122                                 (*i)->index (),
123                                 (*i)->eyes ()
124                                 );
125                         frame_done ();
126                 } catch (std::exception& e) {
127                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
128                 }
129         }
130 }
131
132 /** @return an estimate of the current number of frames we are encoding per second,
133  *  or 0 if not known.
134  */
135 float
136 Encoder::current_encoding_rate () const
137 {
138         boost::mutex::scoped_lock lock (_state_mutex);
139         if (int (_time_history.size()) < _history_size) {
140                 return 0;
141         }
142
143         struct timeval now;
144         gettimeofday (&now, 0);
145
146         return _history_size / (seconds (now) - seconds (_time_history.back ()));
147 }
148
149 /** @return Number of video frames that have been sent out */
150 int
151 Encoder::video_frames_out () const
152 {
153         boost::mutex::scoped_lock (_state_mutex);
154         return _video_frames_enqueued;
155 }
156
157 /** Should be called when a frame has been encoded successfully.
158  *  @param n Source frame index.
159  */
160 void
161 Encoder::frame_done ()
162 {
163         boost::mutex::scoped_lock lock (_state_mutex);
164
165         struct timeval tv;
166         gettimeofday (&tv, 0);
167         _time_history.push_front (tv);
168         if (int (_time_history.size()) > _history_size) {
169                 _time_history.pop_back ();
170         }
171 }
172
173 /** Called in order, so each time this is called the supplied frame is the one
174  *  after the previous one.
175  */
176 void
177 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
178 {
179         _waker.nudge ();
180
181         size_t threads = 0;
182         {
183                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
184                 threads = _threads.size ();
185         }
186
187         boost::mutex::scoped_lock queue_lock (_queue_mutex);
188
189         /* XXX: discard 3D here if required */
190
191         /* Wait until the queue has gone down a bit */
192         while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
193                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
194                 _full_condition.wait (queue_lock);
195                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
196         }
197
198         if (_terminate_enqueue) {
199                 return;
200         }
201
202         _writer->rethrow ();
203         /* Re-throw any exception raised by one of our threads.  If more
204            than one has thrown an exception, only one will be rethrown, I think;
205            but then, if that happens something has gone badly wrong.
206         */
207         rethrow ();
208
209         if (_writer->can_fake_write (_video_frames_enqueued)) {
210                 /* We can fake-write this frame */
211                 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
212                 frame_done ();
213         } else if (pv->has_j2k ()) {
214                 /* This frame already has JPEG2000 data, so just write it */
215                 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
216         } else if (_last_player_video && pv->same (_last_player_video)) {
217                 _writer->repeat (_video_frames_enqueued, pv->eyes ());
218         } else {
219                 /* Queue this new frame for encoding */
220                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
221                 _queue.push_back (shared_ptr<DCPVideo> (
222                                           new DCPVideo (
223                                                   pv,
224                                                   _video_frames_enqueued,
225                                                   _film->video_frame_rate(),
226                                                   _film->j2k_bandwidth(),
227                                                   _film->resolution(),
228                                                   _film->log()
229                                                   )
230                                           ));
231
232                 /* The queue might not be empty any more, so notify anything which is
233                    waiting on that.
234                 */
235                 _empty_condition.notify_all ();
236         }
237
238         switch (pv->eyes ()) {
239         case EYES_BOTH:
240                 ++_video_frames_enqueued;
241                 break;
242         case EYES_LEFT:
243                 _left_done = true;
244                 break;
245         case EYES_RIGHT:
246                 _right_done = true;
247                 break;
248         default:
249                 break;
250         }
251
252         if (_left_done && _right_done) {
253                 ++_video_frames_enqueued;
254                 _left_done = _right_done = false;
255         }
256
257         _last_player_video = pv;
258 }
259
260 void
261 Encoder::terminate_threads ()
262 {
263         {
264                 boost::mutex::scoped_lock queue_lock (_queue_mutex);
265                 _terminate_encoding = true;
266                 _full_condition.notify_all ();
267                 _empty_condition.notify_all ();
268         }
269
270         boost::mutex::scoped_lock threads_lock (_threads_mutex);
271
272         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
273                 if ((*i)->joinable ()) {
274                         (*i)->join ();
275                 }
276                 delete *i;
277         }
278
279         _threads.clear ();
280         _terminate_encoding = false;
281 }
282
283 void
284 Encoder::encoder_thread (optional<ServerDescription> server)
285 try
286 {
287         if (server) {
288                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
289         } else {
290                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
291         }
292
293         /* Number of seconds that we currently wait between attempts
294            to connect to the server; not relevant for localhost
295            encodings.
296         */
297         int remote_backoff = 0;
298
299         while (true) {
300
301                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
302                 boost::mutex::scoped_lock lock (_queue_mutex);
303                 while (_queue.empty () && !_terminate_encoding) {
304                         _empty_condition.wait (lock);
305                 }
306
307                 if (_terminate_encoding) {
308                         return;
309                 }
310
311                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
312                 shared_ptr<DCPVideo> vf = _queue.front ();
313                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
314                 _queue.pop_front ();
315
316                 lock.unlock ();
317
318                 optional<Data> encoded;
319
320                 /* We need to encode this input */
321                 if (server) {
322                         try {
323                                 encoded = vf->encode_remotely (server.get ());
324
325                                 if (remote_backoff > 0) {
326                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
327                                 }
328
329                                 /* This job succeeded, so remove any backoff */
330                                 remote_backoff = 0;
331
332                         } catch (std::exception& e) {
333                                 if (remote_backoff < 60) {
334                                         /* back off more */
335                                         remote_backoff += 10;
336                                 }
337                                 LOG_ERROR (
338                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
339                                         vf->index(), server->host_name(), e.what(), remote_backoff
340                                         );
341                         }
342
343                 } else {
344                         try {
345                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
346                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
347                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
348                         } catch (std::exception& e) {
349                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
350                                 throw;
351                         }
352                 }
353
354                 if (encoded) {
355                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
356                         frame_done ();
357                 } else {
358                         lock.lock ();
359                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
360                         _queue.push_front (vf);
361                         lock.unlock ();
362                 }
363
364                 if (remote_backoff > 0) {
365                         dcpomatic_sleep (remote_backoff);
366                 }
367
368                 /* The queue might not be full any more, so notify anything that is waiting on that */
369                 lock.lock ();
370                 _full_condition.notify_all ();
371         }
372 }
373 catch (...)
374 {
375         store_current ();
376 }
377
378 void
379 Encoder::servers_list_changed ()
380 {
381         terminate_threads ();
382
383         /* XXX: could re-use threads */
384
385         boost::mutex::scoped_lock lm (_threads_mutex);
386
387         if (!Config::instance()->only_servers_encode ()) {
388                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
389                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
390                 }
391         }
392
393         BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
394                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
395                 for (int j = 0; j < i.threads(); ++j) {
396                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
397                 }
398         }
399
400         _writer->set_encoder_threads (_threads.size ());
401 }