Merge branch 'fix-encoder-threading' of ssh://git.carlh.net/home/carl/git/dcpomatic...
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/encoder.h
22  *  @brief Parent class for classes which can encode video and audio frames.
23  */
24
25 #include "encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "config.h"
30 #include "dcp_video.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "encode_server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48
49 using std::list;
50 using std::cout;
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
54 using dcp::Data;
55
56 int const Encoder::_history_size = 25;
57
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
60         : _film (film)
61         , _writer (writer)
62 {
63         servers_list_changed ();
64 }
65
66 Encoder::~Encoder ()
67 {
68         try {
69                 terminate_threads ();
70         } catch (...) {
71                 /* Destructors must not throw exceptions; anything bad
72                    happening now is too late to worry about anyway,
73                    I think.
74                 */
75         }
76 }
77
78 void
79 Encoder::begin ()
80 {
81         if (!EncodeServerFinder::instance()->disabled ()) {
82                 weak_ptr<Encoder> wp = shared_from_this ();
83                 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
84                         boost::bind (&Encoder::call_servers_list_changed, wp)
85                         );
86         }
87 }
88
89 /* We don't want the servers-list-changed callback trying to do things
90    during destruction of Encoder, and I think this is the neatest way
91    to achieve that.
92 */
93 void
94 Encoder::call_servers_list_changed (weak_ptr<Encoder> encoder)
95 {
96         shared_ptr<Encoder> e = encoder.lock ();
97         if (e) {
98                 e->servers_list_changed ();
99         }
100 }
101
102 void
103 Encoder::end ()
104 {
105         boost::mutex::scoped_lock lock (_queue_mutex);
106
107         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
108
109         /* Keep waking workers until the queue is empty */
110         while (!_queue.empty ()) {
111                 rethrow ();
112                 _empty_condition.notify_all ();
113                 _full_condition.wait (lock);
114         }
115
116         lock.unlock ();
117
118         LOG_GENERAL_NC (N_("Terminating encoder threads"));
119
120         terminate_threads ();
121
122         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
123
124         /* The following sequence of events can occur in the above code:
125              1. a remote worker takes the last image off the queue
126              2. the loop above terminates
127              3. the remote worker fails to encode the image and puts it back on the queue
128              4. the remote worker is then terminated by terminate_threads
129
130              So just mop up anything left in the queue here.
131         */
132
133         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
134                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
135                 try {
136                         _writer->write (
137                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
138                                 (*i)->index (),
139                                 (*i)->eyes ()
140                                 );
141                         frame_done ();
142                 } catch (std::exception& e) {
143                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
144                 }
145         }
146 }
147
148 /** @return an estimate of the current number of frames we are encoding per second,
149  *  or 0 if not known.
150  */
151 float
152 Encoder::current_encoding_rate () const
153 {
154         boost::mutex::scoped_lock lock (_state_mutex);
155         if (int (_time_history.size()) < _history_size) {
156                 return 0;
157         }
158
159         struct timeval now;
160         gettimeofday (&now, 0);
161
162         return _history_size / (seconds (now) - seconds (_time_history.back ()));
163 }
164
165 /** @return Number of video frames that have been queued for encoding */
166 int
167 Encoder::video_frames_enqueued () const
168 {
169         return _last_player_video->time().frames_floor (_film->video_frame_rate ());
170 }
171
172 /** Should be called when a frame has been encoded successfully.
173  *  @param n Source frame index.
174  */
175 void
176 Encoder::frame_done ()
177 {
178         boost::mutex::scoped_lock lock (_state_mutex);
179
180         struct timeval tv;
181         gettimeofday (&tv, 0);
182         _time_history.push_front (tv);
183         if (int (_time_history.size()) > _history_size) {
184                 _time_history.pop_back ();
185         }
186 }
187
188 /** Called to start encoding of the next video frame in the DCP.  This is called in order,
189  *  so each time the supplied frame is the one after the previous one.
190  *  pv represents one video frame, and could be empty if there is nothing to encode
191  *  for this DCP frame.
192  */
193 void
194 Encoder::encode (shared_ptr<PlayerVideo> pv)
195 {
196         _waker.nudge ();
197
198         size_t threads = 0;
199         {
200                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
201                 threads = _threads.size ();
202         }
203
204         boost::mutex::scoped_lock queue_lock (_queue_mutex);
205
206         /* Wait until the queue has gone down a bit */
207         while (_queue.size() >= threads * 2) {
208                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
209                 _full_condition.wait (queue_lock);
210                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
211         }
212
213         _writer->rethrow ();
214         /* Re-throw any exception raised by one of our threads.  If more
215            than one has thrown an exception, only one will be rethrown, I think;
216            but then, if that happens something has gone badly wrong.
217         */
218         rethrow ();
219
220         Frame const position = pv->time().frames_floor(_film->video_frame_rate());
221
222         if (_writer->can_fake_write (position)) {
223                 /* We can fake-write this frame */
224                 _writer->fake_write (position, pv->eyes ());
225                 frame_done ();
226         } else if (pv->has_j2k ()) {
227                 /* This frame already has JPEG2000 data, so just write it */
228                 _writer->write (pv->j2k(), position, pv->eyes ());
229         } else if (_last_player_video && _writer->can_repeat(position) && pv->same (_last_player_video)) {
230                 _writer->repeat (position, pv->eyes ());
231         } else {
232                 /* Queue this new frame for encoding */
233                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
234                 _queue.push_back (shared_ptr<DCPVideo> (
235                                           new DCPVideo (
236                                                   pv,
237                                                   position,
238                                                   _film->video_frame_rate(),
239                                                   _film->j2k_bandwidth(),
240                                                   _film->resolution(),
241                                                   _film->log()
242                                                   )
243                                           ));
244
245                 /* The queue might not be empty any more, so notify anything which is
246                    waiting on that.
247                 */
248                 _empty_condition.notify_all ();
249         }
250
251         _last_player_video = pv;
252 }
253
254 void
255 Encoder::terminate_threads ()
256 {
257         boost::mutex::scoped_lock threads_lock (_threads_mutex);
258
259         int n = 0;
260         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
261                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
262                 (*i)->interrupt ();
263                 DCPOMATIC_ASSERT ((*i)->joinable ());
264                 try {
265                         (*i)->join ();
266                 } catch (boost::thread_interrupted& e) {
267                         /* This is to be expected */
268                 }
269                 delete *i;
270                 LOG_GENERAL_NC ("Thread terminated");
271                 ++n;
272         }
273
274         _threads.clear ();
275 }
276
277 void
278 Encoder::encoder_thread (optional<EncodeServerDescription> server)
279 try
280 {
281         if (server) {
282                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
283         } else {
284                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
285         }
286
287         /* Number of seconds that we currently wait between attempts
288            to connect to the server; not relevant for localhost
289            encodings.
290         */
291         int remote_backoff = 0;
292
293         while (true) {
294
295                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
296                 boost::mutex::scoped_lock lock (_queue_mutex);
297                 while (_queue.empty ()) {
298                         _empty_condition.wait (lock);
299                 }
300
301                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
302                 shared_ptr<DCPVideo> vf = _queue.front ();
303                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
304                 _queue.pop_front ();
305
306                 lock.unlock ();
307
308                 optional<Data> encoded;
309
310                 /* We need to encode this input */
311                 if (server) {
312                         try {
313                                 encoded = vf->encode_remotely (server.get ());
314
315                                 if (remote_backoff > 0) {
316                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
317                                 }
318
319                                 /* This job succeeded, so remove any backoff */
320                                 remote_backoff = 0;
321
322                         } catch (std::exception& e) {
323                                 if (remote_backoff < 60) {
324                                         /* back off more */
325                                         remote_backoff += 10;
326                                 }
327                                 LOG_ERROR (
328                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
329                                         vf->index(), server->host_name(), e.what(), remote_backoff
330                                         );
331                         }
332
333                 } else {
334                         try {
335                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
336                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
337                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
338                         } catch (std::exception& e) {
339                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
340                                 throw;
341                         }
342                 }
343
344                 if (encoded) {
345                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
346                         frame_done ();
347                 } else {
348                         lock.lock ();
349                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
350                         _queue.push_front (vf);
351                         lock.unlock ();
352                 }
353
354                 if (remote_backoff > 0) {
355                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
356                 }
357
358                 /* The queue might not be full any more, so notify anything that is waiting on that */
359                 lock.lock ();
360                 _full_condition.notify_all ();
361         }
362 }
363 catch (boost::thread_interrupted& e) {
364         /* Ignore these and just stop the thread */
365         _full_condition.notify_all ();
366 }
367 catch (...)
368 {
369         store_current ();
370         /* Wake anything waiting on _full_condition so it can see the exception */
371         _full_condition.notify_all ();
372 }
373
374 void
375 Encoder::servers_list_changed ()
376 {
377         terminate_threads ();
378
379         /* XXX: could re-use threads */
380
381         boost::mutex::scoped_lock lm (_threads_mutex);
382
383 #ifdef BOOST_THREAD_PLATFORM_WIN32
384         OSVERSIONINFO info;
385         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
386         GetVersionEx (&info);
387         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
388         if (windows_xp) {
389                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
390         }
391 #endif
392
393         if (!Config::instance()->only_servers_encode ()) {
394                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
395                         boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
396                         _threads.push_back (t);
397 #ifdef BOOST_THREAD_PLATFORM_WIN32
398                         if (windows_xp) {
399                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
400                         }
401 #endif
402                 }
403         }
404
405         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
406                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
407                 for (int j = 0; j < i.threads(); ++j) {
408                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
409                 }
410         }
411
412         _writer->set_encoder_threads (_threads.size ());
413 }