Distinguish master DoM encode threads count from the server count.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/encoder.h
22  *  @brief Parent class for classes which can encode video and audio frames.
23  */
24
25 #include "encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "config.h"
30 #include "dcp_video.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "encode_server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48 #define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE);
49
50 using std::list;
51 using std::cout;
52 using boost::shared_ptr;
53 using boost::weak_ptr;
54 using boost::optional;
55 using dcp::Data;
56
57 int const Encoder::_history_size = 200;
58
59 /** @param film Film that we are encoding.
60  *  @param writer Writer that we are using.
61  */
62 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
63         : _film (film)
64         , _writer (writer)
65 {
66         servers_list_changed ();
67 }
68
69 Encoder::~Encoder ()
70 {
71         try {
72                 terminate_threads ();
73         } catch (...) {
74                 /* Destructors must not throw exceptions; anything bad
75                    happening now is too late to worry about anyway,
76                    I think.
77                 */
78         }
79 }
80
81 void
82 Encoder::begin ()
83 {
84         weak_ptr<Encoder> wp = shared_from_this ();
85         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
86                 boost::bind (&Encoder::call_servers_list_changed, wp)
87                 );
88 }
89
90 /* We don't want the servers-list-changed callback trying to do things
91    during destruction of Encoder, and I think this is the neatest way
92    to achieve that.
93 */
94 void
95 Encoder::call_servers_list_changed (weak_ptr<Encoder> encoder)
96 {
97         shared_ptr<Encoder> e = encoder.lock ();
98         if (e) {
99                 e->servers_list_changed ();
100         }
101 }
102
103 void
104 Encoder::end ()
105 {
106         boost::mutex::scoped_lock lock (_queue_mutex);
107
108         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
109
110         /* Keep waking workers until the queue is empty */
111         while (!_queue.empty ()) {
112                 rethrow ();
113                 _empty_condition.notify_all ();
114                 _full_condition.wait (lock);
115         }
116
117         lock.unlock ();
118
119         LOG_GENERAL_NC (N_("Terminating encoder threads"));
120
121         terminate_threads ();
122
123         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
124
125         /* The following sequence of events can occur in the above code:
126              1. a remote worker takes the last image off the queue
127              2. the loop above terminates
128              3. the remote worker fails to encode the image and puts it back on the queue
129              4. the remote worker is then terminated by terminate_threads
130
131              So just mop up anything left in the queue here.
132         */
133
134         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
135                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
136                 try {
137                         _writer->write (
138                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
139                                 (*i)->index (),
140                                 (*i)->eyes ()
141                                 );
142                         frame_done ();
143                 } catch (std::exception& e) {
144                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
145                 }
146         }
147 }
148
149 /** @return an estimate of the current number of frames we are encoding per second,
150  *  or 0 if not known.
151  */
152 float
153 Encoder::current_encoding_rate () const
154 {
155         boost::mutex::scoped_lock lock (_state_mutex);
156         if (int (_time_history.size()) < _history_size) {
157                 return 0;
158         }
159
160         struct timeval now;
161         gettimeofday (&now, 0);
162
163         return _history_size / (seconds (now) - seconds (_time_history.back ()));
164 }
165
166 /** @return Number of video frames that have been queued for encoding */
167 int
168 Encoder::video_frames_enqueued () const
169 {
170         if (!_last_player_video_time) {
171                 return 0;
172         }
173
174         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
175 }
176
177 /** Should be called when a frame has been encoded successfully */
178 void
179 Encoder::frame_done ()
180 {
181         boost::mutex::scoped_lock lock (_state_mutex);
182
183         struct timeval tv;
184         gettimeofday (&tv, 0);
185         _time_history.push_front (tv);
186         if (int (_time_history.size()) > _history_size) {
187                 _time_history.pop_back ();
188         }
189 }
190
191 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
192  *  so each time the supplied frame is the one after the previous one.
193  *  pv represents one video frame, and could be empty if there is nothing to encode
194  *  for this DCP frame.
195  *
196  *  @param pv PlayerVideo to encode.
197  *  @param time Time of \p pv within the DCP.
198  */
199 void
200 Encoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
201 {
202         _waker.nudge ();
203
204         size_t threads = 0;
205         {
206                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
207                 threads = _threads.size ();
208         }
209
210         boost::mutex::scoped_lock queue_lock (_queue_mutex);
211
212         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
213            when there are no threads.
214         */
215         while (_queue.size() >= (threads * 2) + 1) {
216                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
217                 _full_condition.wait (queue_lock);
218                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
219         }
220
221         _writer->rethrow ();
222         /* Re-throw any exception raised by one of our threads.  If more
223            than one has thrown an exception, only one will be rethrown, I think;
224            but then, if that happens something has gone badly wrong.
225         */
226         rethrow ();
227
228         Frame const position = time.frames_floor(_film->video_frame_rate());
229
230         if (_writer->can_fake_write (position)) {
231                 /* We can fake-write this frame */
232                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
233                 _writer->fake_write (position, pv->eyes ());
234                 frame_done ();
235         } else if (pv->has_j2k ()) {
236                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
237                 /* This frame already has JPEG2000 data, so just write it */
238                 _writer->write (pv->j2k(), position, pv->eyes ());
239         } else if (_last_player_video && _writer->can_repeat(position) && pv->same (_last_player_video)) {
240                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
241                 _writer->repeat (position, pv->eyes ());
242         } else {
243                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
244                 /* Queue this new frame for encoding */
245                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
246                 _queue.push_back (shared_ptr<DCPVideo> (
247                                           new DCPVideo (
248                                                   pv,
249                                                   position,
250                                                   _film->video_frame_rate(),
251                                                   _film->j2k_bandwidth(),
252                                                   _film->resolution(),
253                                                   _film->log()
254                                                   )
255                                           ));
256
257                 /* The queue might not be empty any more, so notify anything which is
258                    waiting on that.
259                 */
260                 _empty_condition.notify_all ();
261         }
262
263         _last_player_video = pv;
264         _last_player_video_time = time;
265 }
266
267 void
268 Encoder::terminate_threads ()
269 {
270         boost::mutex::scoped_lock threads_lock (_threads_mutex);
271
272         int n = 0;
273         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
274                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
275                 (*i)->interrupt ();
276                 DCPOMATIC_ASSERT ((*i)->joinable ());
277                 try {
278                         (*i)->join ();
279                 } catch (boost::thread_interrupted& e) {
280                         /* This is to be expected */
281                 }
282                 delete *i;
283                 LOG_GENERAL_NC ("Thread terminated");
284                 ++n;
285         }
286
287         _threads.clear ();
288 }
289
290 void
291 Encoder::encoder_thread (optional<EncodeServerDescription> server)
292 try
293 {
294         if (server) {
295                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
296         } else {
297                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
298         }
299
300         /* Number of seconds that we currently wait between attempts
301            to connect to the server; not relevant for localhost
302            encodings.
303         */
304         int remote_backoff = 0;
305
306         while (true) {
307
308                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
309                 boost::mutex::scoped_lock lock (_queue_mutex);
310                 while (_queue.empty ()) {
311                         _empty_condition.wait (lock);
312                 }
313
314                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
315                 shared_ptr<DCPVideo> vf = _queue.front ();
316
317                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
318                    so we must not be interrupted until one or other of these things have happened.  This
319                    block has thread interruption disabled.
320                 */
321                 {
322                         boost::this_thread::disable_interruption dis;
323
324                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
325                         _queue.pop_front ();
326
327                         lock.unlock ();
328
329                         optional<Data> encoded;
330
331                         /* We need to encode this input */
332                         if (server) {
333                                 try {
334                                         encoded = vf->encode_remotely (server.get ());
335
336                                         if (remote_backoff > 0) {
337                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
338                                         }
339
340                                         /* This job succeeded, so remove any backoff */
341                                         remote_backoff = 0;
342
343                                 } catch (std::exception& e) {
344                                         if (remote_backoff < 60) {
345                                                 /* back off more */
346                                                 remote_backoff += 10;
347                                         }
348                                         LOG_ERROR (
349                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
350                                                 vf->index(), server->host_name(), e.what(), remote_backoff
351                                                 );
352                                 }
353
354                         } else {
355                                 try {
356                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
357                                         encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
358                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
359                                 } catch (std::exception& e) {
360                                         /* This is very bad, so don't cope with it, just pass it on */
361                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
362                                         throw;
363                                 }
364                         }
365
366                         if (encoded) {
367                                 _writer->write (encoded.get(), vf->index (), vf->eyes ());
368                                 frame_done ();
369                         } else {
370                                 lock.lock ();
371                                 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
372                                 _queue.push_front (vf);
373                                 lock.unlock ();
374                         }
375                 }
376
377                 if (remote_backoff > 0) {
378                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
379                 }
380
381                 /* The queue might not be full any more, so notify anything that is waiting on that */
382                 lock.lock ();
383                 _full_condition.notify_all ();
384         }
385 }
386 catch (boost::thread_interrupted& e) {
387         /* Ignore these and just stop the thread */
388         _full_condition.notify_all ();
389 }
390 catch (...)
391 {
392         store_current ();
393         /* Wake anything waiting on _full_condition so it can see the exception */
394         _full_condition.notify_all ();
395 }
396
397 void
398 Encoder::servers_list_changed ()
399 {
400         terminate_threads ();
401
402         /* XXX: could re-use threads */
403
404         boost::mutex::scoped_lock lm (_threads_mutex);
405
406 #ifdef BOOST_THREAD_PLATFORM_WIN32
407         OSVERSIONINFO info;
408         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
409         GetVersionEx (&info);
410         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
411         if (windows_xp) {
412                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
413         }
414 #endif
415
416         if (!Config::instance()->only_servers_encode ()) {
417                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
418                         boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
419                         _threads.push_back (t);
420 #ifdef BOOST_THREAD_PLATFORM_WIN32
421                         if (windows_xp) {
422                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
423                         }
424 #endif
425                 }
426         }
427
428         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
429                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
430                 for (int j = 0; j < i.threads(); ++j) {
431                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
432                 }
433         }
434
435         _writer->set_encoder_threads (_threads.size ());
436 }