Try to fix intermittent deadlocks with encoding servers.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/encoder.h
22  *  @brief Parent class for classes which can encode video and audio frames.
23  */
24
25 #include "encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "config.h"
30 #include "dcp_video.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "encode_server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48
49 using std::list;
50 using std::cout;
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
54 using dcp::Data;
55
56 int const Encoder::_history_size = 200;
57
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
60         : _film (film)
61         , _writer (writer)
62 {
63         servers_list_changed ();
64 }
65
66 Encoder::~Encoder ()
67 {
68         try {
69                 terminate_threads ();
70         } catch (...) {
71                 /* Destructors must not throw exceptions; anything bad
72                    happening now is too late to worry about anyway,
73                    I think.
74                 */
75         }
76 }
77
78 void
79 Encoder::begin ()
80 {
81         if (!EncodeServerFinder::instance()->disabled ()) {
82                 weak_ptr<Encoder> wp = shared_from_this ();
83                 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
84                         boost::bind (&Encoder::call_servers_list_changed, wp)
85                         );
86         }
87 }
88
89 /* We don't want the servers-list-changed callback trying to do things
90    during destruction of Encoder, and I think this is the neatest way
91    to achieve that.
92 */
93 void
94 Encoder::call_servers_list_changed (weak_ptr<Encoder> encoder)
95 {
96         shared_ptr<Encoder> e = encoder.lock ();
97         if (e) {
98                 e->servers_list_changed ();
99         }
100 }
101
102 void
103 Encoder::end ()
104 {
105         boost::mutex::scoped_lock lock (_queue_mutex);
106
107         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
108
109         /* Keep waking workers until the queue is empty */
110         while (!_queue.empty ()) {
111                 rethrow ();
112                 _empty_condition.notify_all ();
113                 _full_condition.wait (lock);
114         }
115
116         lock.unlock ();
117
118         LOG_GENERAL_NC (N_("Terminating encoder threads"));
119
120         terminate_threads ();
121
122         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
123
124         /* The following sequence of events can occur in the above code:
125              1. a remote worker takes the last image off the queue
126              2. the loop above terminates
127              3. the remote worker fails to encode the image and puts it back on the queue
128              4. the remote worker is then terminated by terminate_threads
129
130              So just mop up anything left in the queue here.
131         */
132
133         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
134                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
135                 try {
136                         _writer->write (
137                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
138                                 (*i)->index (),
139                                 (*i)->eyes ()
140                                 );
141                         frame_done ();
142                 } catch (std::exception& e) {
143                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
144                 }
145         }
146 }
147
148 /** @return an estimate of the current number of frames we are encoding per second,
149  *  or 0 if not known.
150  */
151 float
152 Encoder::current_encoding_rate () const
153 {
154         boost::mutex::scoped_lock lock (_state_mutex);
155         if (int (_time_history.size()) < _history_size) {
156                 return 0;
157         }
158
159         struct timeval now;
160         gettimeofday (&now, 0);
161
162         return _history_size / (seconds (now) - seconds (_time_history.back ()));
163 }
164
165 /** @return Number of video frames that have been queued for encoding */
166 int
167 Encoder::video_frames_enqueued () const
168 {
169         if (!_last_player_video) {
170                 return 0;
171         }
172
173         return _last_player_video->time().frames_floor (_film->video_frame_rate ());
174 }
175
176 /** Should be called when a frame has been encoded successfully.
177  *  @param n Source frame index.
178  */
179 void
180 Encoder::frame_done ()
181 {
182         boost::mutex::scoped_lock lock (_state_mutex);
183
184         struct timeval tv;
185         gettimeofday (&tv, 0);
186         _time_history.push_front (tv);
187         if (int (_time_history.size()) > _history_size) {
188                 _time_history.pop_back ();
189         }
190 }
191
192 /** Called to start encoding of the next video frame in the DCP.  This is called in order,
193  *  so each time the supplied frame is the one after the previous one.
194  *  pv represents one video frame, and could be empty if there is nothing to encode
195  *  for this DCP frame.
196  */
197 void
198 Encoder::encode (shared_ptr<PlayerVideo> pv)
199 {
200         _waker.nudge ();
201
202         size_t threads = 0;
203         {
204                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
205                 threads = _threads.size ();
206         }
207
208         boost::mutex::scoped_lock queue_lock (_queue_mutex);
209
210         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
211            when there are no threads.
212         */
213         while (_queue.size() >= (threads * 2) + 1) {
214                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
215                 _full_condition.wait (queue_lock);
216                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
217         }
218
219         _writer->rethrow ();
220         /* Re-throw any exception raised by one of our threads.  If more
221            than one has thrown an exception, only one will be rethrown, I think;
222            but then, if that happens something has gone badly wrong.
223         */
224         rethrow ();
225
226         Frame const position = pv->time().frames_floor(_film->video_frame_rate());
227
228         if (_writer->can_fake_write (position)) {
229                 /* We can fake-write this frame */
230                 _writer->fake_write (position, pv->eyes ());
231                 frame_done ();
232         } else if (pv->has_j2k ()) {
233                 /* This frame already has JPEG2000 data, so just write it */
234                 _writer->write (pv->j2k(), position, pv->eyes ());
235         } else if (_last_player_video && _writer->can_repeat(position) && pv->same (_last_player_video)) {
236                 _writer->repeat (position, pv->eyes ());
237         } else {
238                 /* Queue this new frame for encoding */
239                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
240                 _queue.push_back (shared_ptr<DCPVideo> (
241                                           new DCPVideo (
242                                                   pv,
243                                                   position,
244                                                   _film->video_frame_rate(),
245                                                   _film->j2k_bandwidth(),
246                                                   _film->resolution(),
247                                                   _film->log()
248                                                   )
249                                           ));
250
251                 /* The queue might not be empty any more, so notify anything which is
252                    waiting on that.
253                 */
254                 _empty_condition.notify_all ();
255         }
256
257         _last_player_video = pv;
258 }
259
260 void
261 Encoder::terminate_threads ()
262 {
263         boost::mutex::scoped_lock threads_lock (_threads_mutex);
264
265         int n = 0;
266         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
267                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
268                 (*i)->interrupt ();
269                 DCPOMATIC_ASSERT ((*i)->joinable ());
270                 try {
271                         (*i)->join ();
272                 } catch (boost::thread_interrupted& e) {
273                         /* This is to be expected */
274                 }
275                 delete *i;
276                 LOG_GENERAL_NC ("Thread terminated");
277                 ++n;
278         }
279
280         _threads.clear ();
281 }
282
283 void
284 Encoder::encoder_thread (optional<EncodeServerDescription> server)
285 try
286 {
287         if (server) {
288                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
289         } else {
290                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
291         }
292
293         /* Number of seconds that we currently wait between attempts
294            to connect to the server; not relevant for localhost
295            encodings.
296         */
297         int remote_backoff = 0;
298
299         while (true) {
300
301                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
302                 boost::mutex::scoped_lock lock (_queue_mutex);
303                 while (_queue.empty ()) {
304                         _empty_condition.wait (lock);
305                 }
306
307                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
308                 shared_ptr<DCPVideo> vf = _queue.front ();
309                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
310                 _queue.pop_front ();
311
312                 lock.unlock ();
313
314                 optional<Data> encoded;
315
316                 /* We need to encode this input */
317                 if (server) {
318                         try {
319                                 encoded = vf->encode_remotely (server.get ());
320
321                                 if (remote_backoff > 0) {
322                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
323                                 }
324
325                                 /* This job succeeded, so remove any backoff */
326                                 remote_backoff = 0;
327
328                         } catch (std::exception& e) {
329                                 if (remote_backoff < 60) {
330                                         /* back off more */
331                                         remote_backoff += 10;
332                                 }
333                                 LOG_ERROR (
334                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
335                                         vf->index(), server->host_name(), e.what(), remote_backoff
336                                         );
337                         }
338
339                 } else {
340                         try {
341                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
342                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
343                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
344                         } catch (std::exception& e) {
345                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
346                                 throw;
347                         }
348                 }
349
350                 if (encoded) {
351                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
352                         frame_done ();
353                 } else {
354                         lock.lock ();
355                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
356                         _queue.push_front (vf);
357                         lock.unlock ();
358                 }
359
360                 if (remote_backoff > 0) {
361                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
362                 }
363
364                 /* The queue might not be full any more, so notify anything that is waiting on that */
365                 lock.lock ();
366                 _full_condition.notify_all ();
367         }
368 }
369 catch (boost::thread_interrupted& e) {
370         /* Ignore these and just stop the thread */
371         _full_condition.notify_all ();
372 }
373 catch (...)
374 {
375         store_current ();
376         /* Wake anything waiting on _full_condition so it can see the exception */
377         _full_condition.notify_all ();
378 }
379
380 void
381 Encoder::servers_list_changed ()
382 {
383         terminate_threads ();
384
385         /* XXX: could re-use threads */
386
387         boost::mutex::scoped_lock lm (_threads_mutex);
388
389 #ifdef BOOST_THREAD_PLATFORM_WIN32
390         OSVERSIONINFO info;
391         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
392         GetVersionEx (&info);
393         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
394         if (windows_xp) {
395                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
396         }
397 #endif
398
399         if (!Config::instance()->only_servers_encode ()) {
400                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
401                         boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
402                         _threads.push_back (t);
403 #ifdef BOOST_THREAD_PLATFORM_WIN32
404                         if (windows_xp) {
405                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
406                         }
407 #endif
408                 }
409         }
410
411         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
412                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
413                 for (int j = 0; j < i.threads(); ++j) {
414                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
415                 }
416         }
417
418         _writer->set_encoder_threads (_threads.size ());
419 }