Fix loss of host's encoding threads when servers list changes.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
45 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
46 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
47
48 using std::list;
49 using std::cout;
50 using boost::shared_ptr;
51 using boost::weak_ptr;
52 using boost::optional;
53
54 int const Encoder::_history_size = 25;
55
56 /** @param f Film that we are encoding */
57 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
58         : _film (film)
59         , _job (j)
60         , _video_frames_enqueued (0)
61         , _left_done (false)
62         , _right_done (false)
63         , _terminate (false)
64         , _writer (writer)
65 {
66         servers_list_changed ();
67 }
68
69 Encoder::~Encoder ()
70 {
71         terminate_threads ();
72 }
73
74 void
75 Encoder::begin ()
76 {
77         if (!ServerFinder::instance()->disabled ()) {
78                 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
79         }
80 }
81
82 void
83 Encoder::end ()
84 {
85         boost::mutex::scoped_lock lock (_mutex);
86
87         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
88
89         /* Keep waking workers until the queue is empty */
90         while (!_queue.empty ()) {
91                 _empty_condition.notify_all ();
92                 _full_condition.wait (lock);
93         }
94
95         lock.unlock ();
96
97         terminate_threads ();
98
99         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
100
101         /* The following sequence of events can occur in the above code:
102              1. a remote worker takes the last image off the queue
103              2. the loop above terminates
104              3. the remote worker fails to encode the image and puts it back on the queue
105              4. the remote worker is then terminated by terminate_threads
106
107              So just mop up anything left in the queue here.
108         */
109
110         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
111                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
112                 try {
113                         _writer->write (
114                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
115                                 (*i)->index (),
116                                 (*i)->eyes ()
117                                 );
118                         frame_done ();
119                 } catch (std::exception& e) {
120                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
121                 }
122         }
123 }
124
125 /** @return an estimate of the current number of frames we are encoding per second,
126  *  or 0 if not known.
127  */
128 float
129 Encoder::current_encoding_rate () const
130 {
131         boost::mutex::scoped_lock lock (_state_mutex);
132         if (int (_time_history.size()) < _history_size) {
133                 return 0;
134         }
135
136         struct timeval now;
137         gettimeofday (&now, 0);
138
139         return _history_size / (seconds (now) - seconds (_time_history.back ()));
140 }
141
142 /** @return Number of video frames that have been sent out */
143 int
144 Encoder::video_frames_out () const
145 {
146         boost::mutex::scoped_lock (_state_mutex);
147         return _video_frames_enqueued;
148 }
149
150 /** Should be called when a frame has been encoded successfully.
151  *  @param n Source frame index.
152  */
153 void
154 Encoder::frame_done ()
155 {
156         boost::mutex::scoped_lock lock (_state_mutex);
157
158         struct timeval tv;
159         gettimeofday (&tv, 0);
160         _time_history.push_front (tv);
161         if (int (_time_history.size()) > _history_size) {
162                 _time_history.pop_back ();
163         }
164 }
165
166 /** Called in order, so each time this is called the supplied frame is the one
167  *  after the previous one.
168  */
169 void
170 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
171 {
172         _waker.nudge ();
173
174         boost::mutex::scoped_lock lock (_mutex);
175
176         /* XXX: discard 3D here if required */
177
178         /* Wait until the queue has gone down a bit */
179         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
180                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
181                 _full_condition.wait (lock);
182                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
183         }
184
185         if (_terminate) {
186                 return;
187         }
188
189         _writer->rethrow ();
190         /* Re-throw any exception raised by one of our threads.  If more
191            than one has thrown an exception, only one will be rethrown, I think;
192            but then, if that happens something has gone badly wrong.
193         */
194         rethrow ();
195
196         if (_writer->can_fake_write (_video_frames_enqueued)) {
197                 /* We can fake-write this frame */
198                 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
199                 frame_done ();
200         } else if (pv->has_j2k ()) {
201                 /* This frame already has JPEG2000 data, so just write it */
202                 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
203         } else if (_last_player_video && pv->same (_last_player_video)) {
204                 _writer->repeat (_video_frames_enqueued, pv->eyes ());
205         } else {
206                 /* Queue this new frame for encoding */
207                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
208                 _queue.push_back (shared_ptr<DCPVideo> (
209                                           new DCPVideo (
210                                                   pv,
211                                                   _video_frames_enqueued,
212                                                   _film->video_frame_rate(),
213                                                   _film->j2k_bandwidth(),
214                                                   _film->resolution(),
215                                                   _film->log()
216                                                   )
217                                           ));
218
219                 /* The queue might not be empty any more, so notify anything which is
220                    waiting on that.
221                 */
222                 _empty_condition.notify_all ();
223         }
224
225         switch (pv->eyes ()) {
226         case EYES_BOTH:
227                 ++_video_frames_enqueued;
228                 break;
229         case EYES_LEFT:
230                 _left_done = true;
231                 break;
232         case EYES_RIGHT:
233                 _right_done = true;
234                 break;
235         default:
236                 break;
237         }
238
239         if (_left_done && _right_done) {
240                 ++_video_frames_enqueued;
241                 _left_done = _right_done = false;
242         }
243
244         _last_player_video = pv;
245 }
246
247 void
248 Encoder::terminate_threads ()
249 {
250         {
251                 boost::mutex::scoped_lock lock (_mutex);
252                 _terminate = true;
253                 _full_condition.notify_all ();
254                 _empty_condition.notify_all ();
255         }
256
257         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
258                 if ((*i)->joinable ()) {
259                         (*i)->join ();
260                 }
261                 delete *i;
262         }
263
264         _threads.clear ();
265         _terminate = false;
266 }
267
268 void
269 Encoder::encoder_thread (optional<ServerDescription> server)
270 try
271 {
272         if (server) {
273                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
274         } else {
275                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
276         }
277
278         /* Number of seconds that we currently wait between attempts
279            to connect to the server; not relevant for localhost
280            encodings.
281         */
282         int remote_backoff = 0;
283
284         while (true) {
285
286                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
287                 boost::mutex::scoped_lock lock (_mutex);
288                 while (_queue.empty () && !_terminate) {
289                         _empty_condition.wait (lock);
290                 }
291
292                 if (_terminate) {
293                         return;
294                 }
295
296                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
297                 shared_ptr<DCPVideo> vf = _queue.front ();
298                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
299                 _queue.pop_front ();
300
301                 lock.unlock ();
302
303                 optional<Data> encoded;
304
305                 /* We need to encode this input */
306                 if (server) {
307                         try {
308                                 encoded = vf->encode_remotely (server.get ());
309
310                                 if (remote_backoff > 0) {
311                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
312                                 }
313
314                                 /* This job succeeded, so remove any backoff */
315                                 remote_backoff = 0;
316
317                         } catch (std::exception& e) {
318                                 if (remote_backoff < 60) {
319                                         /* back off more */
320                                         remote_backoff += 10;
321                                 }
322                                 LOG_ERROR (
323                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
324                                         vf->index(), server->host_name(), e.what(), remote_backoff
325                                         );
326                         }
327
328                 } else {
329                         try {
330                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
331                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
332                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
333                         } catch (std::exception& e) {
334                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
335                         }
336                 }
337
338                 if (encoded) {
339                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
340                         frame_done ();
341                 } else {
342                         lock.lock ();
343                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
344                         _queue.push_front (vf);
345                         lock.unlock ();
346                 }
347
348                 if (remote_backoff > 0) {
349                         dcpomatic_sleep (remote_backoff);
350                 }
351
352                 /* The queue might not be full any more, so notify anything that is waiting on that */
353                 lock.lock ();
354                 _full_condition.notify_all ();
355         }
356 }
357 catch (...)
358 {
359         store_current ();
360 }
361
362 void
363 Encoder::servers_list_changed ()
364 {
365         terminate_threads ();
366
367         /* XXX: could re-use threads */
368
369         if (!Config::instance()->only_servers_encode ()) {
370                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
371                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
372                 }
373         }
374
375         BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
376                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
377                 for (int j = 0; j < i.threads(); ++j) {
378                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
379                 }
380         }
381
382         _writer->set_encoder_threads (_threads.size ());
383 }