ef58ca09e6e71b85fb7c0c6e29d192fb3dfb468f
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include "server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
46 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, Log::TYPE_GENERAL);
47 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
48 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
49
50 using std::list;
51 using std::cout;
52 using boost::shared_ptr;
53 using boost::weak_ptr;
54 using boost::optional;
55
56 int const Encoder::_history_size = 25;
57
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
60         : _film (film)
61         , _job (j)
62         , _video_frames_enqueued (0)
63         , _left_done (false)
64         , _right_done (false)
65         , _terminate_enqueue (false)
66         , _terminate_encoding (false)
67         , _writer (writer)
68 {
69         servers_list_changed ();
70 }
71
72 Encoder::~Encoder ()
73 {
74         terminate_threads ();
75
76         boost::mutex::scoped_lock lm (_queue_mutex);
77         _terminate_enqueue = true;
78         _full_condition.notify_all ();
79         _empty_condition.notify_all ();
80 }
81
82 void
83 Encoder::begin ()
84 {
85         if (!ServerFinder::instance()->disabled ()) {
86                 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
87         }
88 }
89
90 void
91 Encoder::end ()
92 {
93         boost::mutex::scoped_lock lock (_queue_mutex);
94
95         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
96
97         /* Keep waking workers until the queue is empty */
98         while (!_queue.empty ()) {
99                 rethrow ();
100                 _empty_condition.notify_all ();
101                 _full_condition.wait (lock);
102         }
103
104         lock.unlock ();
105
106         LOG_GENERAL_NC (N_("Terminating encoder threads"));
107
108         terminate_threads ();
109
110         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
111
112         /* The following sequence of events can occur in the above code:
113              1. a remote worker takes the last image off the queue
114              2. the loop above terminates
115              3. the remote worker fails to encode the image and puts it back on the queue
116              4. the remote worker is then terminated by terminate_threads
117
118              So just mop up anything left in the queue here.
119         */
120
121         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
122                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
123                 try {
124                         _writer->write (
125                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
126                                 (*i)->index (),
127                                 (*i)->eyes ()
128                                 );
129                         frame_done ();
130                 } catch (std::exception& e) {
131                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
132                 }
133         }
134 }
135
136 /** @return an estimate of the current number of frames we are encoding per second,
137  *  or 0 if not known.
138  */
139 float
140 Encoder::current_encoding_rate () const
141 {
142         boost::mutex::scoped_lock lock (_state_mutex);
143         if (int (_time_history.size()) < _history_size) {
144                 return 0;
145         }
146
147         struct timeval now;
148         gettimeofday (&now, 0);
149
150         return _history_size / (seconds (now) - seconds (_time_history.back ()));
151 }
152
153 /** @return Number of video frames that have been sent out */
154 int
155 Encoder::video_frames_out () const
156 {
157         boost::mutex::scoped_lock (_state_mutex);
158         return _video_frames_enqueued;
159 }
160
161 /** Should be called when a frame has been encoded successfully.
162  *  @param n Source frame index.
163  */
164 void
165 Encoder::frame_done ()
166 {
167         boost::mutex::scoped_lock lock (_state_mutex);
168
169         struct timeval tv;
170         gettimeofday (&tv, 0);
171         _time_history.push_front (tv);
172         if (int (_time_history.size()) > _history_size) {
173                 _time_history.pop_back ();
174         }
175 }
176
177 /** Called in order, so each time this is called the supplied frame is the one
178  *  after the previous one.
179  */
180 void
181 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
182 {
183         _waker.nudge ();
184
185         size_t threads = 0;
186         {
187                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
188                 threads = _threads.size ();
189         }
190
191         boost::mutex::scoped_lock queue_lock (_queue_mutex);
192
193         /* XXX: discard 3D here if required */
194
195         /* Wait until the queue has gone down a bit */
196         while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
197                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
198                 _full_condition.wait (queue_lock);
199                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
200         }
201
202         if (_terminate_enqueue) {
203                 return;
204         }
205
206         _writer->rethrow ();
207         /* Re-throw any exception raised by one of our threads.  If more
208            than one has thrown an exception, only one will be rethrown, I think;
209            but then, if that happens something has gone badly wrong.
210         */
211         rethrow ();
212
213         if (_writer->can_fake_write (_video_frames_enqueued)) {
214                 /* We can fake-write this frame */
215                 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
216                 frame_done ();
217         } else if (pv->has_j2k ()) {
218                 /* This frame already has JPEG2000 data, so just write it */
219                 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
220         } else if (_last_player_video && pv->same (_last_player_video)) {
221                 _writer->repeat (_video_frames_enqueued, pv->eyes ());
222         } else {
223                 /* Queue this new frame for encoding */
224                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
225                 _queue.push_back (shared_ptr<DCPVideo> (
226                                           new DCPVideo (
227                                                   pv,
228                                                   _video_frames_enqueued,
229                                                   _film->video_frame_rate(),
230                                                   _film->j2k_bandwidth(),
231                                                   _film->resolution(),
232                                                   _film->log()
233                                                   )
234                                           ));
235
236                 /* The queue might not be empty any more, so notify anything which is
237                    waiting on that.
238                 */
239                 _empty_condition.notify_all ();
240         }
241
242         switch (pv->eyes ()) {
243         case EYES_BOTH:
244                 ++_video_frames_enqueued;
245                 break;
246         case EYES_LEFT:
247                 _left_done = true;
248                 break;
249         case EYES_RIGHT:
250                 _right_done = true;
251                 break;
252         default:
253                 break;
254         }
255
256         if (_left_done && _right_done) {
257                 ++_video_frames_enqueued;
258                 _left_done = _right_done = false;
259         }
260
261         _last_player_video = pv;
262 }
263
264 void
265 Encoder::terminate_threads ()
266 {
267         {
268                 boost::mutex::scoped_lock queue_lock (_queue_mutex);
269                 _terminate_encoding = true;
270                 _full_condition.notify_all ();
271                 _empty_condition.notify_all ();
272         }
273
274         boost::mutex::scoped_lock threads_lock (_threads_mutex);
275
276         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
277                 if ((*i)->joinable ()) {
278                         (*i)->join ();
279                 }
280                 delete *i;
281         }
282
283         _threads.clear ();
284         _terminate_encoding = false;
285 }
286
287 void
288 Encoder::encoder_thread (optional<ServerDescription> server)
289 try
290 {
291         if (server) {
292                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
293         } else {
294                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
295         }
296
297         /* Number of seconds that we currently wait between attempts
298            to connect to the server; not relevant for localhost
299            encodings.
300         */
301         int remote_backoff = 0;
302
303         while (true) {
304
305                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
306                 boost::mutex::scoped_lock lock (_queue_mutex);
307                 while (_queue.empty () && !_terminate_encoding) {
308                         _empty_condition.wait (lock);
309                 }
310
311                 if (_terminate_encoding) {
312                         return;
313                 }
314
315                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
316                 shared_ptr<DCPVideo> vf = _queue.front ();
317                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
318                 _queue.pop_front ();
319
320                 lock.unlock ();
321
322                 optional<Data> encoded;
323
324                 /* We need to encode this input */
325                 if (server) {
326                         try {
327                                 encoded = vf->encode_remotely (server.get ());
328
329                                 if (remote_backoff > 0) {
330                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
331                                 }
332
333                                 /* This job succeeded, so remove any backoff */
334                                 remote_backoff = 0;
335
336                         } catch (std::exception& e) {
337                                 if (remote_backoff < 60) {
338                                         /* back off more */
339                                         remote_backoff += 10;
340                                 }
341                                 LOG_ERROR (
342                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
343                                         vf->index(), server->host_name(), e.what(), remote_backoff
344                                         );
345                         }
346
347                 } else {
348                         try {
349                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
350                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
351                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
352                         } catch (std::exception& e) {
353                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
354                                 throw;
355                         }
356                 }
357
358                 if (encoded) {
359                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
360                         frame_done ();
361                 } else {
362                         lock.lock ();
363                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
364                         _queue.push_front (vf);
365                         lock.unlock ();
366                 }
367
368                 if (remote_backoff > 0) {
369                         dcpomatic_sleep (remote_backoff);
370                 }
371
372                 /* The queue might not be full any more, so notify anything that is waiting on that */
373                 lock.lock ();
374                 _full_condition.notify_all ();
375         }
376 }
377 catch (...)
378 {
379         store_current ();
380         /* Wake anything waiting on _full_condition so it can see the exception */
381         _full_condition.notify_all ();
382 }
383
384 void
385 Encoder::servers_list_changed ()
386 {
387         terminate_threads ();
388
389         /* XXX: could re-use threads */
390
391         boost::mutex::scoped_lock lm (_threads_mutex);
392
393         if (!Config::instance()->only_servers_encode ()) {
394                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
395                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
396                 }
397         }
398
399         BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
400                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
401                 for (int j = 0; j < i.threads(); ++j) {
402                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
403                 }
404         }
405
406         _writer->set_encoder_threads (_threads.size ());
407 }