Attempt to tidy up internal APIs slightly.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/encoder.h
22  *  @brief Parent class for classes which can encode video and audio frames.
23  */
24
25 #include "encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "config.h"
30 #include "dcp_video.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "encode_server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48 #define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE);
49
50 using std::list;
51 using std::cout;
52 using boost::shared_ptr;
53 using boost::weak_ptr;
54 using boost::optional;
55 using dcp::Data;
56
57 int const Encoder::_history_size = 200;
58
59 /** @param f Film that we are encoding */
60 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
61         : _film (film)
62         , _writer (writer)
63 {
64         servers_list_changed ();
65 }
66
67 Encoder::~Encoder ()
68 {
69         try {
70                 terminate_threads ();
71         } catch (...) {
72                 /* Destructors must not throw exceptions; anything bad
73                    happening now is too late to worry about anyway,
74                    I think.
75                 */
76         }
77 }
78
79 void
80 Encoder::begin ()
81 {
82         weak_ptr<Encoder> wp = shared_from_this ();
83         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
84                 boost::bind (&Encoder::call_servers_list_changed, wp)
85                 );
86 }
87
88 /* We don't want the servers-list-changed callback trying to do things
89    during destruction of Encoder, and I think this is the neatest way
90    to achieve that.
91 */
92 void
93 Encoder::call_servers_list_changed (weak_ptr<Encoder> encoder)
94 {
95         shared_ptr<Encoder> e = encoder.lock ();
96         if (e) {
97                 e->servers_list_changed ();
98         }
99 }
100
101 void
102 Encoder::end ()
103 {
104         boost::mutex::scoped_lock lock (_queue_mutex);
105
106         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
107
108         /* Keep waking workers until the queue is empty */
109         while (!_queue.empty ()) {
110                 rethrow ();
111                 _empty_condition.notify_all ();
112                 _full_condition.wait (lock);
113         }
114
115         lock.unlock ();
116
117         LOG_GENERAL_NC (N_("Terminating encoder threads"));
118
119         terminate_threads ();
120
121         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
122
123         /* The following sequence of events can occur in the above code:
124              1. a remote worker takes the last image off the queue
125              2. the loop above terminates
126              3. the remote worker fails to encode the image and puts it back on the queue
127              4. the remote worker is then terminated by terminate_threads
128
129              So just mop up anything left in the queue here.
130         */
131
132         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
133                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
134                 try {
135                         _writer->write (
136                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
137                                 (*i)->index (),
138                                 (*i)->eyes ()
139                                 );
140                         frame_done ();
141                 } catch (std::exception& e) {
142                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
143                 }
144         }
145 }
146
147 /** @return an estimate of the current number of frames we are encoding per second,
148  *  or 0 if not known.
149  */
150 float
151 Encoder::current_encoding_rate () const
152 {
153         boost::mutex::scoped_lock lock (_state_mutex);
154         if (int (_time_history.size()) < _history_size) {
155                 return 0;
156         }
157
158         struct timeval now;
159         gettimeofday (&now, 0);
160
161         return _history_size / (seconds (now) - seconds (_time_history.back ()));
162 }
163
164 /** @return Number of video frames that have been queued for encoding */
165 int
166 Encoder::video_frames_enqueued () const
167 {
168         if (!_last_player_video_time) {
169                 return 0;
170         }
171
172         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
173 }
174
175 /** Should be called when a frame has been encoded successfully.
176  *  @param n Source frame index.
177  */
178 void
179 Encoder::frame_done ()
180 {
181         boost::mutex::scoped_lock lock (_state_mutex);
182
183         struct timeval tv;
184         gettimeofday (&tv, 0);
185         _time_history.push_front (tv);
186         if (int (_time_history.size()) > _history_size) {
187                 _time_history.pop_back ();
188         }
189 }
190
191 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
192  *  so each time the supplied frame is the one after the previous one.
193  *  pv represents one video frame, and could be empty if there is nothing to encode
194  *  for this DCP frame.
195  */
196 void
197 Encoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
198 {
199         _waker.nudge ();
200
201         size_t threads = 0;
202         {
203                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
204                 threads = _threads.size ();
205         }
206
207         boost::mutex::scoped_lock queue_lock (_queue_mutex);
208
209         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
210            when there are no threads.
211         */
212         while (_queue.size() >= (threads * 2) + 1) {
213                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
214                 _full_condition.wait (queue_lock);
215                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
216         }
217
218         _writer->rethrow ();
219         /* Re-throw any exception raised by one of our threads.  If more
220            than one has thrown an exception, only one will be rethrown, I think;
221            but then, if that happens something has gone badly wrong.
222         */
223         rethrow ();
224
225         Frame const position = time.frames_floor(_film->video_frame_rate());
226
227         if (_writer->can_fake_write (position)) {
228                 /* We can fake-write this frame */
229                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(pv->time()));
230                 _writer->fake_write (position, pv->eyes ());
231                 frame_done ();
232         } else if (pv->has_j2k ()) {
233                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(pv->time()));
234                 /* This frame already has JPEG2000 data, so just write it */
235                 _writer->write (pv->j2k(), position, pv->eyes ());
236         } else if (_last_player_video && _writer->can_repeat(position) && pv->same (_last_player_video)) {
237                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(pv->time()));
238                 _writer->repeat (position, pv->eyes ());
239         } else {
240                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(pv->time()));
241                 /* Queue this new frame for encoding */
242                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
243                 _queue.push_back (shared_ptr<DCPVideo> (
244                                           new DCPVideo (
245                                                   pv,
246                                                   position,
247                                                   _film->video_frame_rate(),
248                                                   _film->j2k_bandwidth(),
249                                                   _film->resolution(),
250                                                   _film->log()
251                                                   )
252                                           ));
253
254                 /* The queue might not be empty any more, so notify anything which is
255                    waiting on that.
256                 */
257                 _empty_condition.notify_all ();
258         }
259
260         _last_player_video = pv;
261         _last_player_video_time = time;
262 }
263
264 void
265 Encoder::terminate_threads ()
266 {
267         boost::mutex::scoped_lock threads_lock (_threads_mutex);
268
269         int n = 0;
270         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
271                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
272                 (*i)->interrupt ();
273                 DCPOMATIC_ASSERT ((*i)->joinable ());
274                 try {
275                         (*i)->join ();
276                 } catch (boost::thread_interrupted& e) {
277                         /* This is to be expected */
278                 }
279                 delete *i;
280                 LOG_GENERAL_NC ("Thread terminated");
281                 ++n;
282         }
283
284         _threads.clear ();
285 }
286
287 void
288 Encoder::encoder_thread (optional<EncodeServerDescription> server)
289 try
290 {
291         if (server) {
292                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
293         } else {
294                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
295         }
296
297         /* Number of seconds that we currently wait between attempts
298            to connect to the server; not relevant for localhost
299            encodings.
300         */
301         int remote_backoff = 0;
302
303         while (true) {
304
305                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
306                 boost::mutex::scoped_lock lock (_queue_mutex);
307                 while (_queue.empty ()) {
308                         _empty_condition.wait (lock);
309                 }
310
311                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
312                 shared_ptr<DCPVideo> vf = _queue.front ();
313
314                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
315                    so we must not be interrupted until one or other of these things have happened.  This
316                    block has thread interruption disabled.
317                 */
318                 {
319                         boost::this_thread::disable_interruption dis;
320
321                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
322                         _queue.pop_front ();
323
324                         lock.unlock ();
325
326                         optional<Data> encoded;
327
328                         /* We need to encode this input */
329                         if (server) {
330                                 try {
331                                         encoded = vf->encode_remotely (server.get ());
332
333                                         if (remote_backoff > 0) {
334                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
335                                         }
336
337                                         /* This job succeeded, so remove any backoff */
338                                         remote_backoff = 0;
339
340                                 } catch (std::exception& e) {
341                                         if (remote_backoff < 60) {
342                                                 /* back off more */
343                                                 remote_backoff += 10;
344                                         }
345                                         LOG_ERROR (
346                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
347                                                 vf->index(), server->host_name(), e.what(), remote_backoff
348                                                 );
349                                 }
350
351                         } else {
352                                 try {
353                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
354                                         encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
355                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
356                                 } catch (std::exception& e) {
357                                         /* This is very bad, so don't cope with it, just pass it on */
358                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
359                                         throw;
360                                 }
361                         }
362
363                         if (encoded) {
364                                 _writer->write (encoded.get(), vf->index (), vf->eyes ());
365                                 frame_done ();
366                         } else {
367                                 lock.lock ();
368                                 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
369                                 _queue.push_front (vf);
370                                 lock.unlock ();
371                         }
372                 }
373
374                 if (remote_backoff > 0) {
375                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
376                 }
377
378                 /* The queue might not be full any more, so notify anything that is waiting on that */
379                 lock.lock ();
380                 _full_condition.notify_all ();
381         }
382 }
383 catch (boost::thread_interrupted& e) {
384         /* Ignore these and just stop the thread */
385         _full_condition.notify_all ();
386 }
387 catch (...)
388 {
389         store_current ();
390         /* Wake anything waiting on _full_condition so it can see the exception */
391         _full_condition.notify_all ();
392 }
393
394 void
395 Encoder::servers_list_changed ()
396 {
397         terminate_threads ();
398
399         /* XXX: could re-use threads */
400
401         boost::mutex::scoped_lock lm (_threads_mutex);
402
403 #ifdef BOOST_THREAD_PLATFORM_WIN32
404         OSVERSIONINFO info;
405         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
406         GetVersionEx (&info);
407         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
408         if (windows_xp) {
409                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
410         }
411 #endif
412
413         if (!Config::instance()->only_servers_encode ()) {
414                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
415                         boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
416                         _threads.push_back (t);
417 #ifdef BOOST_THREAD_PLATFORM_WIN32
418                         if (windows_xp) {
419                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
420                         }
421 #endif
422                 }
423         }
424
425         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
426                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
427                 for (int j = 0; j < i.threads(); ++j) {
428                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
429                 }
430         }
431
432         _writer->set_encoder_threads (_threads.size ());
433 }