Debugging for encode decisions.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/encoder.h
22  *  @brief Parent class for classes which can encode video and audio frames.
23  */
24
25 #include "encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "config.h"
30 #include "dcp_video.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "encode_server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48 #define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE);
49
50 using std::list;
51 using std::cout;
52 using boost::shared_ptr;
53 using boost::weak_ptr;
54 using boost::optional;
55 using dcp::Data;
56
57 int const Encoder::_history_size = 200;
58
59 /** @param f Film that we are encoding */
60 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
61         : _film (film)
62         , _writer (writer)
63 {
64         servers_list_changed ();
65 }
66
67 Encoder::~Encoder ()
68 {
69         try {
70                 terminate_threads ();
71         } catch (...) {
72                 /* Destructors must not throw exceptions; anything bad
73                    happening now is too late to worry about anyway,
74                    I think.
75                 */
76         }
77 }
78
79 void
80 Encoder::begin ()
81 {
82         weak_ptr<Encoder> wp = shared_from_this ();
83         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
84                 boost::bind (&Encoder::call_servers_list_changed, wp)
85                 );
86 }
87
88 /* We don't want the servers-list-changed callback trying to do things
89    during destruction of Encoder, and I think this is the neatest way
90    to achieve that.
91 */
92 void
93 Encoder::call_servers_list_changed (weak_ptr<Encoder> encoder)
94 {
95         shared_ptr<Encoder> e = encoder.lock ();
96         if (e) {
97                 e->servers_list_changed ();
98         }
99 }
100
101 void
102 Encoder::end ()
103 {
104         boost::mutex::scoped_lock lock (_queue_mutex);
105
106         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
107
108         /* Keep waking workers until the queue is empty */
109         while (!_queue.empty ()) {
110                 rethrow ();
111                 _empty_condition.notify_all ();
112                 _full_condition.wait (lock);
113         }
114
115         lock.unlock ();
116
117         LOG_GENERAL_NC (N_("Terminating encoder threads"));
118
119         terminate_threads ();
120
121         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
122
123         /* The following sequence of events can occur in the above code:
124              1. a remote worker takes the last image off the queue
125              2. the loop above terminates
126              3. the remote worker fails to encode the image and puts it back on the queue
127              4. the remote worker is then terminated by terminate_threads
128
129              So just mop up anything left in the queue here.
130         */
131
132         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
133                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
134                 try {
135                         _writer->write (
136                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
137                                 (*i)->index (),
138                                 (*i)->eyes ()
139                                 );
140                         frame_done ();
141                 } catch (std::exception& e) {
142                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
143                 }
144         }
145 }
146
147 /** @return an estimate of the current number of frames we are encoding per second,
148  *  or 0 if not known.
149  */
150 float
151 Encoder::current_encoding_rate () const
152 {
153         boost::mutex::scoped_lock lock (_state_mutex);
154         if (int (_time_history.size()) < _history_size) {
155                 return 0;
156         }
157
158         struct timeval now;
159         gettimeofday (&now, 0);
160
161         return _history_size / (seconds (now) - seconds (_time_history.back ()));
162 }
163
164 /** @return Number of video frames that have been queued for encoding */
165 int
166 Encoder::video_frames_enqueued () const
167 {
168         if (!_last_player_video) {
169                 return 0;
170         }
171
172         return _last_player_video->time().frames_floor (_film->video_frame_rate ());
173 }
174
175 /** Should be called when a frame has been encoded successfully.
176  *  @param n Source frame index.
177  */
178 void
179 Encoder::frame_done ()
180 {
181         boost::mutex::scoped_lock lock (_state_mutex);
182
183         struct timeval tv;
184         gettimeofday (&tv, 0);
185         _time_history.push_front (tv);
186         if (int (_time_history.size()) > _history_size) {
187                 _time_history.pop_back ();
188         }
189 }
190
191 /** Called to start encoding of the next video frame in the DCP.  This is called in order,
192  *  so each time the supplied frame is the one after the previous one.
193  *  pv represents one video frame, and could be empty if there is nothing to encode
194  *  for this DCP frame.
195  */
196 void
197 Encoder::encode (shared_ptr<PlayerVideo> pv)
198 {
199         _waker.nudge ();
200
201         size_t threads = 0;
202         {
203                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
204                 threads = _threads.size ();
205         }
206
207         boost::mutex::scoped_lock queue_lock (_queue_mutex);
208
209         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
210            when there are no threads.
211         */
212         while (_queue.size() >= (threads * 2) + 1) {
213                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
214                 _full_condition.wait (queue_lock);
215                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
216         }
217
218         _writer->rethrow ();
219         /* Re-throw any exception raised by one of our threads.  If more
220            than one has thrown an exception, only one will be rethrown, I think;
221            but then, if that happens something has gone badly wrong.
222         */
223         rethrow ();
224
225         Frame const position = pv->time().frames_floor(_film->video_frame_rate());
226
227         if (_writer->can_fake_write (position)) {
228                 /* We can fake-write this frame */
229                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(pv->time()));
230                 _writer->fake_write (position, pv->eyes ());
231                 frame_done ();
232         } else if (pv->has_j2k ()) {
233                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(pv->time()));
234                 /* This frame already has JPEG2000 data, so just write it */
235                 _writer->write (pv->j2k(), position, pv->eyes ());
236         } else if (_last_player_video && _writer->can_repeat(position) && pv->same (_last_player_video)) {
237                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(pv->time()));
238                 _writer->repeat (position, pv->eyes ());
239         } else {
240                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(pv->time()));
241                 /* Queue this new frame for encoding */
242                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
243                 _queue.push_back (shared_ptr<DCPVideo> (
244                                           new DCPVideo (
245                                                   pv,
246                                                   position,
247                                                   _film->video_frame_rate(),
248                                                   _film->j2k_bandwidth(),
249                                                   _film->resolution(),
250                                                   _film->log()
251                                                   )
252                                           ));
253
254                 /* The queue might not be empty any more, so notify anything which is
255                    waiting on that.
256                 */
257                 _empty_condition.notify_all ();
258         }
259
260         _last_player_video = pv;
261 }
262
263 void
264 Encoder::terminate_threads ()
265 {
266         boost::mutex::scoped_lock threads_lock (_threads_mutex);
267
268         int n = 0;
269         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
270                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
271                 (*i)->interrupt ();
272                 DCPOMATIC_ASSERT ((*i)->joinable ());
273                 try {
274                         (*i)->join ();
275                 } catch (boost::thread_interrupted& e) {
276                         /* This is to be expected */
277                 }
278                 delete *i;
279                 LOG_GENERAL_NC ("Thread terminated");
280                 ++n;
281         }
282
283         _threads.clear ();
284 }
285
286 void
287 Encoder::encoder_thread (optional<EncodeServerDescription> server)
288 try
289 {
290         if (server) {
291                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
292         } else {
293                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
294         }
295
296         /* Number of seconds that we currently wait between attempts
297            to connect to the server; not relevant for localhost
298            encodings.
299         */
300         int remote_backoff = 0;
301
302         while (true) {
303
304                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
305                 boost::mutex::scoped_lock lock (_queue_mutex);
306                 while (_queue.empty ()) {
307                         _empty_condition.wait (lock);
308                 }
309
310                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
311                 shared_ptr<DCPVideo> vf = _queue.front ();
312
313                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
314                    so we must not be interrupted until one or other of these things have happened.  This
315                    block has thread interruption disabled.
316                 */
317                 {
318                         boost::this_thread::disable_interruption dis;
319
320                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
321                         _queue.pop_front ();
322
323                         lock.unlock ();
324
325                         optional<Data> encoded;
326
327                         /* We need to encode this input */
328                         if (server) {
329                                 try {
330                                         encoded = vf->encode_remotely (server.get ());
331
332                                         if (remote_backoff > 0) {
333                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
334                                         }
335
336                                         /* This job succeeded, so remove any backoff */
337                                         remote_backoff = 0;
338
339                                 } catch (std::exception& e) {
340                                         if (remote_backoff < 60) {
341                                                 /* back off more */
342                                                 remote_backoff += 10;
343                                         }
344                                         LOG_ERROR (
345                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
346                                                 vf->index(), server->host_name(), e.what(), remote_backoff
347                                                 );
348                                 }
349
350                         } else {
351                                 try {
352                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
353                                         encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
354                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
355                                 } catch (std::exception& e) {
356                                         /* This is very bad, so don't cope with it, just pass it on */
357                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
358                                         throw;
359                                 }
360                         }
361
362                         if (encoded) {
363                                 _writer->write (encoded.get(), vf->index (), vf->eyes ());
364                                 frame_done ();
365                         } else {
366                                 lock.lock ();
367                                 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
368                                 _queue.push_front (vf);
369                                 lock.unlock ();
370                         }
371                 }
372
373                 if (remote_backoff > 0) {
374                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
375                 }
376
377                 /* The queue might not be full any more, so notify anything that is waiting on that */
378                 lock.lock ();
379                 _full_condition.notify_all ();
380         }
381 }
382 catch (boost::thread_interrupted& e) {
383         /* Ignore these and just stop the thread */
384         _full_condition.notify_all ();
385 }
386 catch (...)
387 {
388         store_current ();
389         /* Wake anything waiting on _full_condition so it can see the exception */
390         _full_condition.notify_all ();
391 }
392
393 void
394 Encoder::servers_list_changed ()
395 {
396         terminate_threads ();
397
398         /* XXX: could re-use threads */
399
400         boost::mutex::scoped_lock lm (_threads_mutex);
401
402 #ifdef BOOST_THREAD_PLATFORM_WIN32
403         OSVERSIONINFO info;
404         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
405         GetVersionEx (&info);
406         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
407         if (windows_xp) {
408                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
409         }
410 #endif
411
412         if (!Config::instance()->only_servers_encode ()) {
413                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
414                         boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
415                         _threads.push_back (t);
416 #ifdef BOOST_THREAD_PLATFORM_WIN32
417                         if (windows_xp) {
418                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
419                         }
420 #endif
421                 }
422         }
423
424         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
425                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
426                 for (int j = 0; j < i.threads(); ++j) {
427                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
428                 }
429         }
430
431         _writer->set_encoder_threads (_threads.size ());
432 }