Add only-servers-encode option for debugging / optimisation / testing of servers.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
45 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
46 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
47
48 using std::list;
49 using std::cout;
50 using boost::shared_ptr;
51 using boost::weak_ptr;
52 using boost::optional;
53
54 int const Encoder::_history_size = 25;
55
56 /** @param f Film that we are encoding */
57 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
58         : _film (film)
59         , _job (j)
60         , _video_frames_enqueued (0)
61         , _left_done (false)
62         , _right_done (false)
63         , _terminate (false)
64         , _writer (writer)
65 {
66         servers_list_changed ();
67 }
68
69 Encoder::~Encoder ()
70 {
71         terminate_threads ();
72 }
73
74 /** Add a worker thread for a each thread on a remote server.  Caller must hold
75  *  a lock on _mutex, or know that one is not currently required to
76  *  safely modify _threads.
77  */
78 void
79 Encoder::add_worker_threads (ServerDescription d)
80 {
81         LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), d.threads(), d.host_name ());
82         for (int i = 0; i < d.threads(); ++i) {
83                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
84         }
85
86         _writer->set_encoder_threads (_threads.size ());
87 }
88
89 void
90 Encoder::begin ()
91 {
92         if (!Config::instance()->only_servers_encode ()) {
93                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
94                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
95                 }
96         }
97
98         _writer->set_encoder_threads (_threads.size ());
99
100         if (!ServerFinder::instance()->disabled ()) {
101                 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
102         }
103 }
104
105 void
106 Encoder::end ()
107 {
108         boost::mutex::scoped_lock lock (_mutex);
109
110         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
111
112         /* Keep waking workers until the queue is empty */
113         while (!_queue.empty ()) {
114                 _empty_condition.notify_all ();
115                 _full_condition.wait (lock);
116         }
117
118         lock.unlock ();
119
120         terminate_threads ();
121
122         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
123
124         /* The following sequence of events can occur in the above code:
125              1. a remote worker takes the last image off the queue
126              2. the loop above terminates
127              3. the remote worker fails to encode the image and puts it back on the queue
128              4. the remote worker is then terminated by terminate_threads
129
130              So just mop up anything left in the queue here.
131         */
132
133         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
134                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
135                 try {
136                         _writer->write (
137                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
138                                 (*i)->index (),
139                                 (*i)->eyes ()
140                                 );
141                         frame_done ();
142                 } catch (std::exception& e) {
143                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
144                 }
145         }
146 }
147
148 /** @return an estimate of the current number of frames we are encoding per second,
149  *  or 0 if not known.
150  */
151 float
152 Encoder::current_encoding_rate () const
153 {
154         boost::mutex::scoped_lock lock (_state_mutex);
155         if (int (_time_history.size()) < _history_size) {
156                 return 0;
157         }
158
159         struct timeval now;
160         gettimeofday (&now, 0);
161
162         return _history_size / (seconds (now) - seconds (_time_history.back ()));
163 }
164
165 /** @return Number of video frames that have been sent out */
166 int
167 Encoder::video_frames_out () const
168 {
169         boost::mutex::scoped_lock (_state_mutex);
170         return _video_frames_enqueued;
171 }
172
173 /** Should be called when a frame has been encoded successfully.
174  *  @param n Source frame index.
175  */
176 void
177 Encoder::frame_done ()
178 {
179         boost::mutex::scoped_lock lock (_state_mutex);
180
181         struct timeval tv;
182         gettimeofday (&tv, 0);
183         _time_history.push_front (tv);
184         if (int (_time_history.size()) > _history_size) {
185                 _time_history.pop_back ();
186         }
187 }
188
189 /** Called in order, so each time this is called the supplied frame is the one
190  *  after the previous one.
191  */
192 void
193 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
194 {
195         _waker.nudge ();
196
197         boost::mutex::scoped_lock lock (_mutex);
198
199         /* XXX: discard 3D here if required */
200
201         /* Wait until the queue has gone down a bit */
202         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
203                 LOG_TIMING ("decoder sleeps with queue of %1", _queue.size());
204                 _full_condition.wait (lock);
205                 LOG_TIMING ("decoder wakes with queue of %1", _queue.size());
206         }
207
208         if (_terminate) {
209                 return;
210         }
211
212         _writer->rethrow ();
213         /* Re-throw any exception raised by one of our threads.  If more
214            than one has thrown an exception, only one will be rethrown, I think;
215            but then, if that happens something has gone badly wrong.
216         */
217         rethrow ();
218
219         if (_writer->can_fake_write (_video_frames_enqueued)) {
220                 /* We can fake-write this frame */
221                 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
222                 frame_done ();
223         } else if (pv->has_j2k ()) {
224                 /* This frame already has JPEG2000 data, so just write it */
225                 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
226         } else if (_last_player_video && pv->same (_last_player_video)) {
227                 _writer->repeat (_video_frames_enqueued, pv->eyes ());
228         } else {
229                 /* Queue this new frame for encoding */
230                 LOG_TIMING ("adding to queue of %1", _queue.size ());
231                 _queue.push_back (shared_ptr<DCPVideo> (
232                                           new DCPVideo (
233                                                   pv,
234                                                   _video_frames_enqueued,
235                                                   _film->video_frame_rate(),
236                                                   _film->j2k_bandwidth(),
237                                                   _film->resolution(),
238                                                   _film->log()
239                                                   )
240                                           ));
241
242                 /* The queue might not be empty any more, so notify anything which is
243                    waiting on that.
244                 */
245                 _empty_condition.notify_all ();
246         }
247
248         switch (pv->eyes ()) {
249         case EYES_BOTH:
250                 ++_video_frames_enqueued;
251                 break;
252         case EYES_LEFT:
253                 _left_done = true;
254                 break;
255         case EYES_RIGHT:
256                 _right_done = true;
257                 break;
258         default:
259                 break;
260         }
261
262         if (_left_done && _right_done) {
263                 ++_video_frames_enqueued;
264                 _left_done = _right_done = false;
265         }
266
267         _last_player_video = pv;
268 }
269
270 void
271 Encoder::terminate_threads ()
272 {
273         {
274                 boost::mutex::scoped_lock lock (_mutex);
275                 _terminate = true;
276                 _full_condition.notify_all ();
277                 _empty_condition.notify_all ();
278         }
279
280         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
281                 if ((*i)->joinable ()) {
282                         (*i)->join ();
283                 }
284                 delete *i;
285         }
286
287         _threads.clear ();
288         _terminate = false;
289 }
290
291 void
292 Encoder::encoder_thread (optional<ServerDescription> server)
293 try
294 {
295         /* Number of seconds that we currently wait between attempts
296            to connect to the server; not relevant for localhost
297            encodings.
298         */
299         int remote_backoff = 0;
300
301         while (true) {
302
303                 LOG_TIMING ("[%1] encoder thread sleeps", boost::this_thread::get_id());
304                 boost::mutex::scoped_lock lock (_mutex);
305                 while (_queue.empty () && !_terminate) {
306                         _empty_condition.wait (lock);
307                 }
308
309                 if (_terminate) {
310                         return;
311                 }
312
313                 LOG_TIMING ("[%1] encoder thread wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
314                 shared_ptr<DCPVideo> vf = _queue.front ();
315                 LOG_TIMING ("[%1] encoder thread pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
316                 _queue.pop_front ();
317
318                 lock.unlock ();
319
320                 optional<Data> encoded;
321
322                 /* We need to encode this input */
323                 if (server) {
324                         try {
325                                 encoded = vf->encode_remotely (server.get ());
326
327                                 if (remote_backoff > 0) {
328                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
329                                 }
330
331                                 /* This job succeeded, so remove any backoff */
332                                 remote_backoff = 0;
333
334                         } catch (std::exception& e) {
335                                 if (remote_backoff < 60) {
336                                         /* back off more */
337                                         remote_backoff += 10;
338                                 }
339                                 LOG_ERROR (
340                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
341                                         vf->index(), server->host_name(), e.what(), remote_backoff
342                                         );
343                         }
344
345                 } else {
346                         try {
347                                 LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index());
348                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
349                                 LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index());
350                         } catch (std::exception& e) {
351                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
352                         }
353                 }
354
355                 if (encoded) {
356                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
357                         frame_done ();
358                 } else {
359                         lock.lock ();
360                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
361                         _queue.push_front (vf);
362                         lock.unlock ();
363                 }
364
365                 if (remote_backoff > 0) {
366                         dcpomatic_sleep (remote_backoff);
367                 }
368
369                 /* The queue might not be full any more, so notify anything that is waiting on that */
370                 lock.lock ();
371                 _full_condition.notify_all ();
372         }
373 }
374 catch (...)
375 {
376         store_current ();
377 }
378
379 void
380 Encoder::servers_list_changed ()
381 {
382         terminate_threads ();
383         BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
384                 add_worker_threads (i);
385         }
386 }