Fix crash when repeating video frames across a reel boundary.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include "server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
46 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
47 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
48 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
49
50 using std::list;
51 using std::cout;
52 using boost::shared_ptr;
53 using boost::weak_ptr;
54 using boost::optional;
55
56 int const Encoder::_history_size = 25;
57
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
60         : _film (film)
61         , _job (j)
62         , _position (0)
63         , _terminate_enqueue (false)
64         , _terminate_encoding (false)
65         , _writer (writer)
66 {
67         servers_list_changed ();
68 }
69
70 Encoder::~Encoder ()
71 {
72         terminate_threads ();
73
74         boost::mutex::scoped_lock lm (_queue_mutex);
75         _terminate_enqueue = true;
76         _full_condition.notify_all ();
77         _empty_condition.notify_all ();
78 }
79
80 void
81 Encoder::begin ()
82 {
83         if (!ServerFinder::instance()->disabled ()) {
84                 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
85         }
86 }
87
88 void
89 Encoder::end ()
90 {
91         boost::mutex::scoped_lock lock (_queue_mutex);
92
93         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
94
95         /* Keep waking workers until the queue is empty */
96         while (!_queue.empty ()) {
97                 rethrow ();
98                 _empty_condition.notify_all ();
99                 _full_condition.wait (lock);
100         }
101
102         lock.unlock ();
103
104         LOG_GENERAL_NC (N_("Terminating encoder threads"));
105
106         terminate_threads ();
107
108         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
109
110         /* The following sequence of events can occur in the above code:
111              1. a remote worker takes the last image off the queue
112              2. the loop above terminates
113              3. the remote worker fails to encode the image and puts it back on the queue
114              4. the remote worker is then terminated by terminate_threads
115
116              So just mop up anything left in the queue here.
117         */
118
119         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
120                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
121                 try {
122                         _writer->write (
123                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
124                                 (*i)->index (),
125                                 (*i)->eyes ()
126                                 );
127                         frame_done ();
128                 } catch (std::exception& e) {
129                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
130                 }
131         }
132 }
133
134 /** @return an estimate of the current number of frames we are encoding per second,
135  *  or 0 if not known.
136  */
137 float
138 Encoder::current_encoding_rate () const
139 {
140         boost::mutex::scoped_lock lock (_state_mutex);
141         if (int (_time_history.size()) < _history_size) {
142                 return 0;
143         }
144
145         struct timeval now;
146         gettimeofday (&now, 0);
147
148         return _history_size / (seconds (now) - seconds (_time_history.back ()));
149 }
150
151 /** @return Number of video frames that have been sent out */
152 int
153 Encoder::video_frames_out () const
154 {
155         boost::mutex::scoped_lock (_state_mutex);
156         return _position;
157 }
158
159 /** Should be called when a frame has been encoded successfully.
160  *  @param n Source frame index.
161  */
162 void
163 Encoder::frame_done ()
164 {
165         boost::mutex::scoped_lock lock (_state_mutex);
166
167         struct timeval tv;
168         gettimeofday (&tv, 0);
169         _time_history.push_front (tv);
170         if (int (_time_history.size()) > _history_size) {
171                 _time_history.pop_back ();
172         }
173 }
174
175 /** Called to start encoding of the next video frame in the DCP.  This is called in order,
176  *  so each time the supplied frame is the one after the previous one.
177  *  pv represents one video frame, and could be empty if there is nothing to encode
178  *  for this DCP frame.
179  */
180 void
181 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
182 {
183         BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
184                 enqueue (i);
185         }
186         ++_position;
187 }
188
189 void
190 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
191 {
192         _waker.nudge ();
193
194         size_t threads = 0;
195         {
196                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
197                 threads = _threads.size ();
198         }
199
200         boost::mutex::scoped_lock queue_lock (_queue_mutex);
201
202         /* XXX: discard 3D here if required */
203
204         /* Wait until the queue has gone down a bit */
205         while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
206                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
207                 _full_condition.wait (queue_lock);
208                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
209         }
210
211         if (_terminate_enqueue) {
212                 return;
213         }
214
215         _writer->rethrow ();
216         /* Re-throw any exception raised by one of our threads.  If more
217            than one has thrown an exception, only one will be rethrown, I think;
218            but then, if that happens something has gone badly wrong.
219         */
220         rethrow ();
221
222         if (_writer->can_fake_write (_position)) {
223                 /* We can fake-write this frame */
224                 _writer->fake_write (_position, pv->eyes ());
225                 frame_done ();
226         } else if (pv->has_j2k ()) {
227                 /* This frame already has JPEG2000 data, so just write it */
228                 _writer->write (pv->j2k(), _position, pv->eyes ());
229         } else if (_last_player_video && _writer->can_repeat(_position) && pv->same (_last_player_video)) {
230                 _writer->repeat (_position, pv->eyes ());
231         } else {
232                 /* Queue this new frame for encoding */
233                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
234                 _queue.push_back (shared_ptr<DCPVideo> (
235                                           new DCPVideo (
236                                                   pv,
237                                                   _position,
238                                                   _film->video_frame_rate(),
239                                                   _film->j2k_bandwidth(),
240                                                   _film->resolution(),
241                                                   _film->log()
242                                                   )
243                                           ));
244
245                 /* The queue might not be empty any more, so notify anything which is
246                    waiting on that.
247                 */
248                 _empty_condition.notify_all ();
249         }
250
251         _last_player_video = pv;
252 }
253
254 void
255 Encoder::terminate_threads ()
256 {
257         {
258                 boost::mutex::scoped_lock queue_lock (_queue_mutex);
259                 _terminate_encoding = true;
260                 _full_condition.notify_all ();
261                 _empty_condition.notify_all ();
262         }
263
264         boost::mutex::scoped_lock threads_lock (_threads_mutex);
265
266         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
267                 if ((*i)->joinable ()) {
268                         (*i)->join ();
269                 }
270                 delete *i;
271         }
272
273         _threads.clear ();
274         _terminate_encoding = false;
275 }
276
277 void
278 Encoder::encoder_thread (optional<ServerDescription> server)
279 try
280 {
281         if (server) {
282                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
283         } else {
284                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
285         }
286
287         /* Number of seconds that we currently wait between attempts
288            to connect to the server; not relevant for localhost
289            encodings.
290         */
291         int remote_backoff = 0;
292
293         while (true) {
294
295                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
296                 boost::mutex::scoped_lock lock (_queue_mutex);
297                 while (_queue.empty () && !_terminate_encoding) {
298                         _empty_condition.wait (lock);
299                 }
300
301                 if (_terminate_encoding) {
302                         return;
303                 }
304
305                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
306                 shared_ptr<DCPVideo> vf = _queue.front ();
307                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
308                 _queue.pop_front ();
309
310                 lock.unlock ();
311
312                 optional<Data> encoded;
313
314                 /* We need to encode this input */
315                 if (server) {
316                         try {
317                                 encoded = vf->encode_remotely (server.get ());
318
319                                 if (remote_backoff > 0) {
320                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
321                                 }
322
323                                 /* This job succeeded, so remove any backoff */
324                                 remote_backoff = 0;
325
326                         } catch (std::exception& e) {
327                                 if (remote_backoff < 60) {
328                                         /* back off more */
329                                         remote_backoff += 10;
330                                 }
331                                 LOG_ERROR (
332                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
333                                         vf->index(), server->host_name(), e.what(), remote_backoff
334                                         );
335                         }
336
337                 } else {
338                         try {
339                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
340                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
341                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
342                         } catch (std::exception& e) {
343                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
344                                 throw;
345                         }
346                 }
347
348                 if (encoded) {
349                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
350                         frame_done ();
351                 } else {
352                         lock.lock ();
353                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
354                         _queue.push_front (vf);
355                         lock.unlock ();
356                 }
357
358                 if (remote_backoff > 0) {
359                         dcpomatic_sleep (remote_backoff);
360                 }
361
362                 /* The queue might not be full any more, so notify anything that is waiting on that */
363                 lock.lock ();
364                 _full_condition.notify_all ();
365         }
366 }
367 catch (...)
368 {
369         store_current ();
370         /* Wake anything waiting on _full_condition so it can see the exception */
371         _full_condition.notify_all ();
372 }
373
374 void
375 Encoder::servers_list_changed ()
376 {
377         terminate_threads ();
378
379         /* XXX: could re-use threads */
380
381         boost::mutex::scoped_lock lm (_threads_mutex);
382
383         if (!Config::instance()->only_servers_encode ()) {
384                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
385                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
386                 }
387         }
388
389         BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
390                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
391                 for (int j = 0; j < i.threads(); ++j) {
392                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
393                 }
394         }
395
396         _writer->set_encoder_threads (_threads.size ());
397 }