More player debugging for butler video-full states.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/j2k_encoder.cc
22  *  @brief J2K encoder class.
23  */
24
25 #include "j2k_encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "dcpomatic_log.h"
30 #include "config.h"
31 #include "dcp_video.h"
32 #include "cross.h"
33 #include "writer.h"
34 #include "encode_server_finder.h"
35 #include "player.h"
36 #include "player_video.h"
37 #include "encode_server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 using std::list;
46 using std::cout;
47 using boost::shared_ptr;
48 using boost::weak_ptr;
49 using boost::optional;
50 using dcp::Data;
51
52 /** @param film Film that we are encoding.
53  *  @param writer Writer that we are using.
54  */
55 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
56         : _film (film)
57         , _history (200)
58         , _writer (writer)
59 {
60         servers_list_changed ();
61 }
62
63 J2KEncoder::~J2KEncoder ()
64 {
65         try {
66                 terminate_threads ();
67         } catch (...) {
68                 /* Destructors must not throw exceptions; anything bad
69                    happening now is too late to worry about anyway,
70                    I think.
71                 */
72         }
73 }
74
75 void
76 J2KEncoder::begin ()
77 {
78         weak_ptr<J2KEncoder> wp = shared_from_this ();
79         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
80                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
81                 );
82 }
83
84 /* We don't want the servers-list-changed callback trying to do things
85    during destruction of J2KEncoder, and I think this is the neatest way
86    to achieve that.
87 */
88 void
89 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
90 {
91         shared_ptr<J2KEncoder> e = encoder.lock ();
92         if (e) {
93                 e->servers_list_changed ();
94         }
95 }
96
97 void
98 J2KEncoder::end ()
99 {
100         boost::mutex::scoped_lock lock (_queue_mutex);
101
102         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
103
104         /* Keep waking workers until the queue is empty */
105         while (!_queue.empty ()) {
106                 rethrow ();
107                 _empty_condition.notify_all ();
108                 _full_condition.wait (lock);
109         }
110
111         lock.unlock ();
112
113         LOG_GENERAL_NC (N_("Terminating encoder threads"));
114
115         terminate_threads ();
116
117         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
118
119         /* The following sequence of events can occur in the above code:
120              1. a remote worker takes the last image off the queue
121              2. the loop above terminates
122              3. the remote worker fails to encode the image and puts it back on the queue
123              4. the remote worker is then terminated by terminate_threads
124
125              So just mop up anything left in the queue here.
126         */
127
128         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
129                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
130                 try {
131                         _writer->write (
132                                 (*i)->encode_locally(),
133                                 (*i)->index(),
134                                 (*i)->eyes()
135                                 );
136                         frame_done ();
137                 } catch (std::exception& e) {
138                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
139                 }
140         }
141 }
142
143 /** @return an estimate of the current number of frames we are encoding per second,
144  *  or 0 if not known.
145  */
146 float
147 J2KEncoder::current_encoding_rate () const
148 {
149         return _history.rate ();
150 }
151
152 /** @return Number of video frames that have been queued for encoding */
153 int
154 J2KEncoder::video_frames_enqueued () const
155 {
156         if (!_last_player_video_time) {
157                 return 0;
158         }
159
160         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
161 }
162
163 /** Should be called when a frame has been encoded successfully */
164 void
165 J2KEncoder::frame_done ()
166 {
167         _history.event ();
168 }
169
170 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
171  *  so each time the supplied frame is the one after the previous one.
172  *  pv represents one video frame, and could be empty if there is nothing to encode
173  *  for this DCP frame.
174  *
175  *  @param pv PlayerVideo to encode.
176  *  @param time Time of \p pv within the DCP.
177  */
178 void
179 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
180 {
181         _waker.nudge ();
182
183         size_t threads = 0;
184         {
185                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
186                 threads = _threads.size ();
187         }
188
189         boost::mutex::scoped_lock queue_lock (_queue_mutex);
190
191         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
192            when there are no threads.
193         */
194         while (_queue.size() >= (threads * 2) + 1) {
195                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
196                 _full_condition.wait (queue_lock);
197                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
198         }
199
200         _writer->rethrow ();
201         /* Re-throw any exception raised by one of our threads.  If more
202            than one has thrown an exception, only one will be rethrown, I think;
203            but then, if that happens something has gone badly wrong.
204         */
205         rethrow ();
206
207         Frame const position = time.frames_floor(_film->video_frame_rate());
208
209         if (_writer->can_fake_write (position)) {
210                 /* We can fake-write this frame */
211                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
212                 _writer->fake_write (position, pv->eyes ());
213                 frame_done ();
214         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
215                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
216                 /* This frame already has J2K data, so just write it */
217                 _writer->write (pv->j2k(), position, pv->eyes ());
218         } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) {
219                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
220                 _writer->repeat (position, pv->eyes ());
221         } else {
222                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
223                 /* Queue this new frame for encoding */
224                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
225                 _queue.push_back (shared_ptr<DCPVideo> (
226                                           new DCPVideo (
227                                                   pv,
228                                                   position,
229                                                   _film->video_frame_rate(),
230                                                   _film->j2k_bandwidth(),
231                                                   _film->resolution()
232                                                   )
233                                           ));
234
235                 /* The queue might not be empty any more, so notify anything which is
236                    waiting on that.
237                 */
238                 _empty_condition.notify_all ();
239         }
240
241         _last_player_video[pv->eyes()] = pv;
242         _last_player_video_time = time;
243 }
244
245 void
246 J2KEncoder::terminate_threads ()
247 {
248         boost::mutex::scoped_lock threads_lock (_threads_mutex);
249
250         int n = 0;
251         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
252                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
253                 (*i)->interrupt ();
254                 DCPOMATIC_ASSERT ((*i)->joinable ());
255                 try {
256                         (*i)->join ();
257                 } catch (boost::thread_interrupted& e) {
258                         /* This is to be expected */
259                 }
260                 delete *i;
261                 LOG_GENERAL_NC ("Thread terminated");
262                 ++n;
263         }
264
265         _threads.clear ();
266 }
267
268 void
269 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
270 try
271 {
272         if (server) {
273                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
274         } else {
275                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
276         }
277
278         /* Number of seconds that we currently wait between attempts
279            to connect to the server; not relevant for localhost
280            encodings.
281         */
282         int remote_backoff = 0;
283
284         while (true) {
285
286                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
287                 boost::mutex::scoped_lock lock (_queue_mutex);
288                 while (_queue.empty ()) {
289                         _empty_condition.wait (lock);
290                 }
291
292                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
293                 shared_ptr<DCPVideo> vf = _queue.front ();
294
295                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
296                    so we must not be interrupted until one or other of these things have happened.  This
297                    block has thread interruption disabled.
298                 */
299                 {
300                         boost::this_thread::disable_interruption dis;
301
302                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
303                         _queue.pop_front ();
304
305                         lock.unlock ();
306
307                         optional<Data> encoded;
308
309                         /* We need to encode this input */
310                         if (server) {
311                                 try {
312                                         encoded = vf->encode_remotely (server.get ());
313
314                                         if (remote_backoff > 0) {
315                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
316                                         }
317
318                                         /* This job succeeded, so remove any backoff */
319                                         remote_backoff = 0;
320
321                                 } catch (std::exception& e) {
322                                         if (remote_backoff < 60) {
323                                                 /* back off more */
324                                                 remote_backoff += 10;
325                                         }
326                                         LOG_ERROR (
327                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
328                                                 vf->index(), server->host_name(), e.what(), remote_backoff
329                                                 );
330                                 }
331
332                         } else {
333                                 try {
334                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
335                                         encoded = vf->encode_locally ();
336                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
337                                 } catch (std::exception& e) {
338                                         /* This is very bad, so don't cope with it, just pass it on */
339                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
340                                         throw;
341                                 }
342                         }
343
344                         if (encoded) {
345                                 _writer->write (encoded.get(), vf->index (), vf->eyes ());
346                                 frame_done ();
347                         } else {
348                                 lock.lock ();
349                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
350                                 _queue.push_front (vf);
351                                 lock.unlock ();
352                         }
353                 }
354
355                 if (remote_backoff > 0) {
356                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
357                 }
358
359                 /* The queue might not be full any more, so notify anything that is waiting on that */
360                 lock.lock ();
361                 _full_condition.notify_all ();
362         }
363 }
364 catch (boost::thread_interrupted& e) {
365         /* Ignore these and just stop the thread */
366         _full_condition.notify_all ();
367 }
368 catch (...)
369 {
370         store_current ();
371         /* Wake anything waiting on _full_condition so it can see the exception */
372         _full_condition.notify_all ();
373 }
374
375 void
376 J2KEncoder::servers_list_changed ()
377 {
378         terminate_threads ();
379
380         /* XXX: could re-use threads */
381
382         boost::mutex::scoped_lock lm (_threads_mutex);
383
384 #ifdef BOOST_THREAD_PLATFORM_WIN32
385         OSVERSIONINFO info;
386         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
387         GetVersionEx (&info);
388         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
389         if (windows_xp) {
390                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
391         }
392 #endif
393
394         if (!Config::instance()->only_servers_encode ()) {
395                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
396                         boost::thread* t = new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription> ()));
397 #ifdef DCPOMATIC_LINUX
398                         pthread_setname_np (t->native_handle(), "encode-worker");
399 #endif
400                         _threads.push_back (t);
401 #ifdef BOOST_THREAD_PLATFORM_WIN32
402                         if (windows_xp) {
403                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
404                         }
405 #endif
406                 }
407         }
408
409         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers()) {
410                 if (!i.current_link_version()) {
411                         continue;
412                 }
413
414                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
415                 for (int j = 0; j < i.threads(); ++j) {
416                         _threads.push_back (new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, i)));
417                 }
418         }
419
420         _writer->set_encoder_threads (_threads.size ());
421 }