Various multi-reel fixes.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include "server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
46 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
47 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
48 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
49
50 using std::list;
51 using std::cout;
52 using boost::shared_ptr;
53 using boost::weak_ptr;
54 using boost::optional;
55
56 int const Encoder::_history_size = 25;
57
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
60         : _film (film)
61         , _job (j)
62         , _position (0)
63         , _terminate_enqueue (false)
64         , _terminate_encoding (false)
65         , _writer (writer)
66 {
67         servers_list_changed ();
68 }
69
70 Encoder::~Encoder ()
71 {
72         terminate_threads ();
73
74         boost::mutex::scoped_lock lm (_queue_mutex);
75         _terminate_enqueue = true;
76         _full_condition.notify_all ();
77         _empty_condition.notify_all ();
78 }
79
80 void
81 Encoder::begin ()
82 {
83         if (!ServerFinder::instance()->disabled ()) {
84                 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
85         }
86 }
87
88 void
89 Encoder::end ()
90 {
91         boost::mutex::scoped_lock lock (_queue_mutex);
92
93         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
94
95         /* Keep waking workers until the queue is empty */
96         while (!_queue.empty ()) {
97                 rethrow ();
98                 _empty_condition.notify_all ();
99                 _full_condition.wait (lock);
100         }
101
102         lock.unlock ();
103
104         LOG_GENERAL_NC (N_("Terminating encoder threads"));
105
106         terminate_threads ();
107
108         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
109
110         /* The following sequence of events can occur in the above code:
111              1. a remote worker takes the last image off the queue
112              2. the loop above terminates
113              3. the remote worker fails to encode the image and puts it back on the queue
114              4. the remote worker is then terminated by terminate_threads
115
116              So just mop up anything left in the queue here.
117         */
118
119         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
120                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
121                 try {
122                         _writer->write (
123                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
124                                 (*i)->index (),
125                                 (*i)->eyes ()
126                                 );
127                         frame_done ();
128                 } catch (std::exception& e) {
129                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
130                 }
131         }
132 }
133
134 /** @return an estimate of the current number of frames we are encoding per second,
135  *  or 0 if not known.
136  */
137 float
138 Encoder::current_encoding_rate () const
139 {
140         boost::mutex::scoped_lock lock (_state_mutex);
141         if (int (_time_history.size()) < _history_size) {
142                 return 0;
143         }
144
145         struct timeval now;
146         gettimeofday (&now, 0);
147
148         return _history_size / (seconds (now) - seconds (_time_history.back ()));
149 }
150
151 /** @return Number of video frames that have been sent out */
152 int
153 Encoder::video_frames_out () const
154 {
155         boost::mutex::scoped_lock (_state_mutex);
156         return _position;
157 }
158
159 /** Should be called when a frame has been encoded successfully.
160  *  @param n Source frame index.
161  */
162 void
163 Encoder::frame_done ()
164 {
165         boost::mutex::scoped_lock lock (_state_mutex);
166
167         struct timeval tv;
168         gettimeofday (&tv, 0);
169         _time_history.push_front (tv);
170         if (int (_time_history.size()) > _history_size) {
171                 _time_history.pop_back ();
172         }
173 }
174
175 /** Called to start encoding of the next video frame in the DCP.  This is called in order,
176  *  so each time the supplied frame is the one after the previous one.
177  *  pv represents one video frame, and could be empty if there is nothing to encode
178  *  for this DCP frame.
179  */
180 void
181 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
182 {
183         if (pv.empty ()) {
184                 _writer->ref_write (_position);
185         } else {
186                 BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
187                         enqueue (i);
188                 }
189         }
190         ++_position;
191 }
192
193 void
194 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
195 {
196         _waker.nudge ();
197
198         size_t threads = 0;
199         {
200                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
201                 threads = _threads.size ();
202         }
203
204         boost::mutex::scoped_lock queue_lock (_queue_mutex);
205
206         /* XXX: discard 3D here if required */
207
208         /* Wait until the queue has gone down a bit */
209         while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
210                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
211                 _full_condition.wait (queue_lock);
212                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
213         }
214
215         if (_terminate_enqueue) {
216                 return;
217         }
218
219         _writer->rethrow ();
220         /* Re-throw any exception raised by one of our threads.  If more
221            than one has thrown an exception, only one will be rethrown, I think;
222            but then, if that happens something has gone badly wrong.
223         */
224         rethrow ();
225
226         if (_writer->can_fake_write (_position)) {
227                 /* We can fake-write this frame */
228                 _writer->fake_write (_position, pv->eyes ());
229                 frame_done ();
230         } else if (pv->has_j2k ()) {
231                 /* This frame already has JPEG2000 data, so just write it */
232                 _writer->write (pv->j2k(), _position, pv->eyes ());
233         } else if (_last_player_video && pv->same (_last_player_video)) {
234                 _writer->repeat (_position, pv->eyes ());
235         } else {
236                 /* Queue this new frame for encoding */
237                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
238                 _queue.push_back (shared_ptr<DCPVideo> (
239                                           new DCPVideo (
240                                                   pv,
241                                                   _position,
242                                                   _film->video_frame_rate(),
243                                                   _film->j2k_bandwidth(),
244                                                   _film->resolution(),
245                                                   _film->log()
246                                                   )
247                                           ));
248
249                 /* The queue might not be empty any more, so notify anything which is
250                    waiting on that.
251                 */
252                 _empty_condition.notify_all ();
253         }
254
255         _last_player_video = pv;
256 }
257
258 void
259 Encoder::terminate_threads ()
260 {
261         {
262                 boost::mutex::scoped_lock queue_lock (_queue_mutex);
263                 _terminate_encoding = true;
264                 _full_condition.notify_all ();
265                 _empty_condition.notify_all ();
266         }
267
268         boost::mutex::scoped_lock threads_lock (_threads_mutex);
269
270         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
271                 if ((*i)->joinable ()) {
272                         (*i)->join ();
273                 }
274                 delete *i;
275         }
276
277         _threads.clear ();
278         _terminate_encoding = false;
279 }
280
281 void
282 Encoder::encoder_thread (optional<ServerDescription> server)
283 try
284 {
285         if (server) {
286                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
287         } else {
288                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
289         }
290
291         /* Number of seconds that we currently wait between attempts
292            to connect to the server; not relevant for localhost
293            encodings.
294         */
295         int remote_backoff = 0;
296
297         while (true) {
298
299                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
300                 boost::mutex::scoped_lock lock (_queue_mutex);
301                 while (_queue.empty () && !_terminate_encoding) {
302                         _empty_condition.wait (lock);
303                 }
304
305                 if (_terminate_encoding) {
306                         return;
307                 }
308
309                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
310                 shared_ptr<DCPVideo> vf = _queue.front ();
311                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
312                 _queue.pop_front ();
313
314                 lock.unlock ();
315
316                 optional<Data> encoded;
317
318                 /* We need to encode this input */
319                 if (server) {
320                         try {
321                                 encoded = vf->encode_remotely (server.get ());
322
323                                 if (remote_backoff > 0) {
324                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
325                                 }
326
327                                 /* This job succeeded, so remove any backoff */
328                                 remote_backoff = 0;
329
330                         } catch (std::exception& e) {
331                                 if (remote_backoff < 60) {
332                                         /* back off more */
333                                         remote_backoff += 10;
334                                 }
335                                 LOG_ERROR (
336                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
337                                         vf->index(), server->host_name(), e.what(), remote_backoff
338                                         );
339                         }
340
341                 } else {
342                         try {
343                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
344                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
345                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
346                         } catch (std::exception& e) {
347                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
348                                 throw;
349                         }
350                 }
351
352                 if (encoded) {
353                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
354                         frame_done ();
355                 } else {
356                         lock.lock ();
357                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
358                         _queue.push_front (vf);
359                         lock.unlock ();
360                 }
361
362                 if (remote_backoff > 0) {
363                         dcpomatic_sleep (remote_backoff);
364                 }
365
366                 /* The queue might not be full any more, so notify anything that is waiting on that */
367                 lock.lock ();
368                 _full_condition.notify_all ();
369         }
370 }
371 catch (...)
372 {
373         store_current ();
374         /* Wake anything waiting on _full_condition so it can see the exception */
375         _full_condition.notify_all ();
376 }
377
378 void
379 Encoder::servers_list_changed ()
380 {
381         terminate_threads ();
382
383         /* XXX: could re-use threads */
384
385         boost::mutex::scoped_lock lm (_threads_mutex);
386
387         if (!Config::instance()->only_servers_encode ()) {
388                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
389                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
390                 }
391         }
392
393         BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
394                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
395                 for (int j = 0; j < i.threads(); ++j) {
396                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
397                 }
398         }
399
400         _writer->set_encoder_threads (_threads.size ());
401 }