Try to speed up cancel of encoder threads if they are sleeping for remote backoff.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48
49 using std::list;
50 using std::cout;
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
54 using dcp::Data;
55
56 int const Encoder::_history_size = 25;
57
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
60         : _film (film)
61         , _job (j)
62         , _position (0)
63         , _terminate_enqueue (false)
64         , _terminate_encoding (false)
65         , _writer (writer)
66 {
67         servers_list_changed ();
68 }
69
70 Encoder::~Encoder ()
71 {
72         terminate_threads ();
73
74         boost::mutex::scoped_lock lm (_queue_mutex);
75         _terminate_enqueue = true;
76         _full_condition.notify_all ();
77         _empty_condition.notify_all ();
78 }
79
80 void
81 Encoder::begin ()
82 {
83         if (!ServerFinder::instance()->disabled ()) {
84                 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
85         }
86 }
87
88 void
89 Encoder::end ()
90 {
91         boost::mutex::scoped_lock lock (_queue_mutex);
92
93         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
94
95         /* Keep waking workers until the queue is empty */
96         while (!_queue.empty ()) {
97                 rethrow ();
98                 _empty_condition.notify_all ();
99                 _full_condition.wait (lock);
100         }
101
102         lock.unlock ();
103
104         LOG_GENERAL_NC (N_("Terminating encoder threads"));
105
106         terminate_threads ();
107
108         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
109
110         /* The following sequence of events can occur in the above code:
111              1. a remote worker takes the last image off the queue
112              2. the loop above terminates
113              3. the remote worker fails to encode the image and puts it back on the queue
114              4. the remote worker is then terminated by terminate_threads
115
116              So just mop up anything left in the queue here.
117         */
118
119         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
120                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
121                 try {
122                         _writer->write (
123                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
124                                 (*i)->index (),
125                                 (*i)->eyes ()
126                                 );
127                         frame_done ();
128                 } catch (std::exception& e) {
129                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
130                 }
131         }
132 }
133
134 /** @return an estimate of the current number of frames we are encoding per second,
135  *  or 0 if not known.
136  */
137 float
138 Encoder::current_encoding_rate () const
139 {
140         boost::mutex::scoped_lock lock (_state_mutex);
141         if (int (_time_history.size()) < _history_size) {
142                 return 0;
143         }
144
145         struct timeval now;
146         gettimeofday (&now, 0);
147
148         return _history_size / (seconds (now) - seconds (_time_history.back ()));
149 }
150
151 /** @return Number of video frames that have been sent out */
152 int
153 Encoder::video_frames_out () const
154 {
155         boost::mutex::scoped_lock (_state_mutex);
156         return _position;
157 }
158
159 /** Should be called when a frame has been encoded successfully.
160  *  @param n Source frame index.
161  */
162 void
163 Encoder::frame_done ()
164 {
165         boost::mutex::scoped_lock lock (_state_mutex);
166
167         struct timeval tv;
168         gettimeofday (&tv, 0);
169         _time_history.push_front (tv);
170         if (int (_time_history.size()) > _history_size) {
171                 _time_history.pop_back ();
172         }
173 }
174
175 /** Called to start encoding of the next video frame in the DCP.  This is called in order,
176  *  so each time the supplied frame is the one after the previous one.
177  *  pv represents one video frame, and could be empty if there is nothing to encode
178  *  for this DCP frame.
179  */
180 void
181 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
182 {
183         BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
184                 enqueue (i);
185         }
186         ++_position;
187 }
188
189 void
190 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
191 {
192         _waker.nudge ();
193
194         size_t threads = 0;
195         {
196                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
197                 threads = _threads.size ();
198         }
199
200         boost::mutex::scoped_lock queue_lock (_queue_mutex);
201
202         /* XXX: discard 3D here if required */
203
204         /* Wait until the queue has gone down a bit */
205         while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
206                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
207                 _full_condition.wait (queue_lock);
208                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
209         }
210
211         if (_terminate_enqueue) {
212                 return;
213         }
214
215         _writer->rethrow ();
216         /* Re-throw any exception raised by one of our threads.  If more
217            than one has thrown an exception, only one will be rethrown, I think;
218            but then, if that happens something has gone badly wrong.
219         */
220         rethrow ();
221
222         if (_writer->can_fake_write (_position)) {
223                 /* We can fake-write this frame */
224                 _writer->fake_write (_position, pv->eyes ());
225                 frame_done ();
226         } else if (pv->has_j2k ()) {
227                 /* This frame already has JPEG2000 data, so just write it */
228                 _writer->write (pv->j2k(), _position, pv->eyes ());
229         } else if (_last_player_video && _writer->can_repeat(_position) && pv->same (_last_player_video)) {
230                 _writer->repeat (_position, pv->eyes ());
231         } else {
232                 /* Queue this new frame for encoding */
233                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
234                 _queue.push_back (shared_ptr<DCPVideo> (
235                                           new DCPVideo (
236                                                   pv,
237                                                   _position,
238                                                   _film->video_frame_rate(),
239                                                   _film->j2k_bandwidth(),
240                                                   _film->resolution(),
241                                                   _film->log()
242                                                   )
243                                           ));
244
245                 /* The queue might not be empty any more, so notify anything which is
246                    waiting on that.
247                 */
248                 _empty_condition.notify_all ();
249         }
250
251         _last_player_video = pv;
252 }
253
254 void
255 Encoder::terminate_threads ()
256 {
257         {
258                 boost::mutex::scoped_lock queue_lock (_queue_mutex);
259                 _terminate_encoding = true;
260         }
261
262         boost::mutex::scoped_lock threads_lock (_threads_mutex);
263
264         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
265                 (*i)->interrupt ();
266                 if ((*i)->joinable ()) {
267                         (*i)->join ();
268                 }
269                 delete *i;
270         }
271
272         _threads.clear ();
273         _terminate_encoding = false;
274 }
275
276 void
277 Encoder::encoder_thread (optional<ServerDescription> server)
278 try
279 {
280         if (server) {
281                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
282         } else {
283                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
284         }
285
286         /* Number of seconds that we currently wait between attempts
287            to connect to the server; not relevant for localhost
288            encodings.
289         */
290         int remote_backoff = 0;
291
292         while (true) {
293
294                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
295                 boost::mutex::scoped_lock lock (_queue_mutex);
296                 while (_queue.empty () && !_terminate_encoding) {
297                         _empty_condition.wait (lock);
298                 }
299
300                 if (_terminate_encoding) {
301                         return;
302                 }
303
304                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
305                 shared_ptr<DCPVideo> vf = _queue.front ();
306                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
307                 _queue.pop_front ();
308
309                 lock.unlock ();
310
311                 optional<Data> encoded;
312
313                 /* We need to encode this input */
314                 if (server) {
315                         try {
316                                 encoded = vf->encode_remotely (server.get ());
317
318                                 if (remote_backoff > 0) {
319                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
320                                 }
321
322                                 /* This job succeeded, so remove any backoff */
323                                 remote_backoff = 0;
324
325                         } catch (std::exception& e) {
326                                 if (remote_backoff < 60) {
327                                         /* back off more */
328                                         remote_backoff += 10;
329                                 }
330                                 LOG_ERROR (
331                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
332                                         vf->index(), server->host_name(), e.what(), remote_backoff
333                                         );
334                         }
335
336                 } else {
337                         try {
338                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
339                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
340                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
341                         } catch (std::exception& e) {
342                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
343                                 throw;
344                         }
345                 }
346
347                 if (encoded) {
348                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
349                         frame_done ();
350                 } else {
351                         lock.lock ();
352                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
353                         _queue.push_front (vf);
354                         lock.unlock ();
355                 }
356
357                 if (remote_backoff > 0) {
358                         boost::this_thread::sleep_for (boost::chrono::seconds (remote_backoff));
359                 }
360
361                 /* The queue might not be full any more, so notify anything that is waiting on that */
362                 lock.lock ();
363                 _full_condition.notify_all ();
364         }
365 }
366 catch (...)
367 {
368         store_current ();
369         /* Wake anything waiting on _full_condition so it can see the exception */
370         _full_condition.notify_all ();
371 }
372
373 void
374 Encoder::servers_list_changed ()
375 {
376         terminate_threads ();
377
378         /* XXX: could re-use threads */
379
380         boost::mutex::scoped_lock lm (_threads_mutex);
381
382         if (!Config::instance()->only_servers_encode ()) {
383                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
384                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
385                 }
386         }
387
388         BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
389                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
390                 for (int j = 0; j < i.threads(); ++j) {
391                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
392                 }
393         }
394
395         _writer->set_encoder_threads (_threads.size ());
396 }