Merge branch '2.0' of ssh://git.carlh.net/home/carl/git/dcpomatic2 into 2.0
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include "server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
48
49 using std::list;
50 using std::cout;
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
54
55 int const Encoder::_history_size = 25;
56
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
59         : _film (film)
60         , _job (j)
61         , _video_frames_enqueued (0)
62         , _left_done (false)
63         , _right_done (false)
64         , _terminate (false)
65         , _writer (writer)
66 {
67         servers_list_changed ();
68 }
69
70 Encoder::~Encoder ()
71 {
72         terminate_threads ();
73 }
74
75 void
76 Encoder::begin ()
77 {
78         if (!ServerFinder::instance()->disabled ()) {
79                 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
80         }
81 }
82
83 void
84 Encoder::end ()
85 {
86         boost::mutex::scoped_lock lock (_queue_mutex);
87
88         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
89
90         /* Keep waking workers until the queue is empty */
91         while (!_queue.empty ()) {
92                 _empty_condition.notify_all ();
93                 _full_condition.wait (lock);
94         }
95
96         lock.unlock ();
97
98         terminate_threads ();
99
100         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
101
102         /* The following sequence of events can occur in the above code:
103              1. a remote worker takes the last image off the queue
104              2. the loop above terminates
105              3. the remote worker fails to encode the image and puts it back on the queue
106              4. the remote worker is then terminated by terminate_threads
107
108              So just mop up anything left in the queue here.
109         */
110
111         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
112                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
113                 try {
114                         _writer->write (
115                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
116                                 (*i)->index (),
117                                 (*i)->eyes ()
118                                 );
119                         frame_done ();
120                 } catch (std::exception& e) {
121                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
122                 }
123         }
124 }
125
126 /** @return an estimate of the current number of frames we are encoding per second,
127  *  or 0 if not known.
128  */
129 float
130 Encoder::current_encoding_rate () const
131 {
132         boost::mutex::scoped_lock lock (_state_mutex);
133         if (int (_time_history.size()) < _history_size) {
134                 return 0;
135         }
136
137         struct timeval now;
138         gettimeofday (&now, 0);
139
140         return _history_size / (seconds (now) - seconds (_time_history.back ()));
141 }
142
143 /** @return Number of video frames that have been sent out */
144 int
145 Encoder::video_frames_out () const
146 {
147         boost::mutex::scoped_lock (_state_mutex);
148         return _video_frames_enqueued;
149 }
150
151 /** Should be called when a frame has been encoded successfully.
152  *  @param n Source frame index.
153  */
154 void
155 Encoder::frame_done ()
156 {
157         boost::mutex::scoped_lock lock (_state_mutex);
158
159         struct timeval tv;
160         gettimeofday (&tv, 0);
161         _time_history.push_front (tv);
162         if (int (_time_history.size()) > _history_size) {
163                 _time_history.pop_back ();
164         }
165 }
166
167 /** Called in order, so each time this is called the supplied frame is the one
168  *  after the previous one.
169  */
170 void
171 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
172 {
173         _waker.nudge ();
174
175         size_t threads = 0;
176         {
177                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
178                 threads = _threads.size ();
179         }
180
181         boost::mutex::scoped_lock queue_lock (_queue_mutex);
182
183         /* XXX: discard 3D here if required */
184
185         /* Wait until the queue has gone down a bit */
186         while (_queue.size() >= threads * 2 && !_terminate) {
187                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
188                 _full_condition.wait (queue_lock);
189                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
190         }
191
192         if (_terminate) {
193                 return;
194         }
195
196         _writer->rethrow ();
197         /* Re-throw any exception raised by one of our threads.  If more
198            than one has thrown an exception, only one will be rethrown, I think;
199            but then, if that happens something has gone badly wrong.
200         */
201         rethrow ();
202
203         if (_writer->can_fake_write (_video_frames_enqueued)) {
204                 /* We can fake-write this frame */
205                 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
206                 frame_done ();
207         } else if (pv->has_j2k ()) {
208                 /* This frame already has JPEG2000 data, so just write it */
209                 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
210         } else if (_last_player_video && pv->same (_last_player_video)) {
211                 _writer->repeat (_video_frames_enqueued, pv->eyes ());
212         } else {
213                 /* Queue this new frame for encoding */
214                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
215                 _queue.push_back (shared_ptr<DCPVideo> (
216                                           new DCPVideo (
217                                                   pv,
218                                                   _video_frames_enqueued,
219                                                   _film->video_frame_rate(),
220                                                   _film->j2k_bandwidth(),
221                                                   _film->resolution(),
222                                                   _film->log()
223                                                   )
224                                           ));
225
226                 /* The queue might not be empty any more, so notify anything which is
227                    waiting on that.
228                 */
229                 _empty_condition.notify_all ();
230         }
231
232         switch (pv->eyes ()) {
233         case EYES_BOTH:
234                 ++_video_frames_enqueued;
235                 break;
236         case EYES_LEFT:
237                 _left_done = true;
238                 break;
239         case EYES_RIGHT:
240                 _right_done = true;
241                 break;
242         default:
243                 break;
244         }
245
246         if (_left_done && _right_done) {
247                 ++_video_frames_enqueued;
248                 _left_done = _right_done = false;
249         }
250
251         _last_player_video = pv;
252 }
253
254 void
255 Encoder::terminate_threads ()
256 {
257         {
258                 boost::mutex::scoped_lock queue_lock (_queue_mutex);
259                 _terminate = true;
260                 _full_condition.notify_all ();
261                 _empty_condition.notify_all ();
262         }
263
264         boost::mutex::scoped_lock threads_lock (_threads_mutex);
265
266         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
267                 if ((*i)->joinable ()) {
268                         (*i)->join ();
269                 }
270                 delete *i;
271         }
272
273         _threads.clear ();
274         _terminate = false;
275 }
276
277 void
278 Encoder::encoder_thread (optional<ServerDescription> server)
279 try
280 {
281         if (server) {
282                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
283         } else {
284                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
285         }
286
287         /* Number of seconds that we currently wait between attempts
288            to connect to the server; not relevant for localhost
289            encodings.
290         */
291         int remote_backoff = 0;
292
293         while (true) {
294
295                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
296                 boost::mutex::scoped_lock lock (_queue_mutex);
297                 while (_queue.empty () && !_terminate) {
298                         _empty_condition.wait (lock);
299                 }
300
301                 if (_terminate) {
302                         return;
303                 }
304
305                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
306                 shared_ptr<DCPVideo> vf = _queue.front ();
307                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
308                 _queue.pop_front ();
309
310                 lock.unlock ();
311
312                 optional<Data> encoded;
313
314                 /* We need to encode this input */
315                 if (server) {
316                         try {
317                                 encoded = vf->encode_remotely (server.get ());
318
319                                 if (remote_backoff > 0) {
320                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
321                                 }
322
323                                 /* This job succeeded, so remove any backoff */
324                                 remote_backoff = 0;
325
326                         } catch (std::exception& e) {
327                                 if (remote_backoff < 60) {
328                                         /* back off more */
329                                         remote_backoff += 10;
330                                 }
331                                 LOG_ERROR (
332                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
333                                         vf->index(), server->host_name(), e.what(), remote_backoff
334                                         );
335                         }
336
337                 } else {
338                         try {
339                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
340                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
341                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
342                         } catch (std::exception& e) {
343                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
344                         }
345                 }
346
347                 if (encoded) {
348                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
349                         frame_done ();
350                 } else {
351                         lock.lock ();
352                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
353                         _queue.push_front (vf);
354                         lock.unlock ();
355                 }
356
357                 if (remote_backoff > 0) {
358                         dcpomatic_sleep (remote_backoff);
359                 }
360
361                 /* The queue might not be full any more, so notify anything that is waiting on that */
362                 lock.lock ();
363                 _full_condition.notify_all ();
364         }
365 }
366 catch (...)
367 {
368         store_current ();
369 }
370
371 void
372 Encoder::servers_list_changed ()
373 {
374         terminate_threads ();
375
376         /* XXX: could re-use threads */
377
378         boost::mutex::scoped_lock lm (_threads_mutex);
379
380         if (!Config::instance()->only_servers_encode ()) {
381                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
382                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
383                 }
384         }
385
386         BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
387                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
388                 for (int j = 0; j < i.threads(); ++j) {
389                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
390                 }
391         }
392
393         _writer->set_encoder_threads (_threads.size ());
394 }