Missed update to private test repo version.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/j2k_encoder.cc
22  *  @brief J2K encoder class.
23  */
24
25 #include "j2k_encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "dcpomatic_log.h"
30 #include "config.h"
31 #include "dcp_video.h"
32 #include "cross.h"
33 #include "writer.h"
34 #include "encode_server_finder.h"
35 #include "player.h"
36 #include "player_video.h"
37 #include "encode_server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 using std::list;
46 using std::cout;
47 using std::exception;
48 using boost::shared_ptr;
49 using boost::weak_ptr;
50 using boost::optional;
51 using dcp::Data;
52
53 /** @param film Film that we are encoding.
54  *  @param writer Writer that we are using.
55  */
56 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
57         : _film (film)
58         , _history (200)
59         , _writer (writer)
60 {
61         servers_list_changed ();
62 }
63
64 J2KEncoder::~J2KEncoder ()
65 {
66         try {
67                 terminate_threads ();
68         } catch (...) {
69                 /* Destructors must not throw exceptions; anything bad
70                    happening now is too late to worry about anyway,
71                    I think.
72                 */
73         }
74 }
75
76 void
77 J2KEncoder::begin ()
78 {
79         weak_ptr<J2KEncoder> wp = shared_from_this ();
80         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
81                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
82                 );
83 }
84
85 /* We don't want the servers-list-changed callback trying to do things
86    during destruction of J2KEncoder, and I think this is the neatest way
87    to achieve that.
88 */
89 void
90 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
91 {
92         shared_ptr<J2KEncoder> e = encoder.lock ();
93         if (e) {
94                 e->servers_list_changed ();
95         }
96 }
97
98 void
99 J2KEncoder::end ()
100 {
101         boost::mutex::scoped_lock lock (_queue_mutex);
102
103         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
104
105         /* Keep waking workers until the queue is empty */
106         while (!_queue.empty ()) {
107                 rethrow ();
108                 _empty_condition.notify_all ();
109                 _full_condition.wait (lock);
110         }
111
112         lock.unlock ();
113
114         LOG_GENERAL_NC (N_("Terminating encoder threads"));
115
116         terminate_threads ();
117
118         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
119
120         /* The following sequence of events can occur in the above code:
121              1. a remote worker takes the last image off the queue
122              2. the loop above terminates
123              3. the remote worker fails to encode the image and puts it back on the queue
124              4. the remote worker is then terminated by terminate_threads
125
126              So just mop up anything left in the queue here.
127         */
128
129         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
130                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
131                 try {
132                         _writer->write (
133                                 (*i)->encode_locally(),
134                                 (*i)->index(),
135                                 (*i)->eyes()
136                                 );
137                         frame_done ();
138                 } catch (std::exception& e) {
139                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
140                 }
141         }
142 }
143
144 /** @return an estimate of the current number of frames we are encoding per second,
145  *  or 0 if not known.
146  */
147 float
148 J2KEncoder::current_encoding_rate () const
149 {
150         return _history.rate ();
151 }
152
153 /** @return Number of video frames that have been queued for encoding */
154 int
155 J2KEncoder::video_frames_enqueued () const
156 {
157         if (!_last_player_video_time) {
158                 return 0;
159         }
160
161         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
162 }
163
164 /** Should be called when a frame has been encoded successfully */
165 void
166 J2KEncoder::frame_done ()
167 {
168         _history.event ();
169 }
170
171 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
172  *  so each time the supplied frame is the one after the previous one.
173  *  pv represents one video frame, and could be empty if there is nothing to encode
174  *  for this DCP frame.
175  *
176  *  @param pv PlayerVideo to encode.
177  *  @param time Time of \p pv within the DCP.
178  */
179 void
180 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
181 {
182         _waker.nudge ();
183
184         size_t threads = 0;
185         {
186                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
187                 threads = _threads.size ();
188         }
189
190         boost::mutex::scoped_lock queue_lock (_queue_mutex);
191
192         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
193            when there are no threads.
194         */
195         while (_queue.size() >= (threads * 2) + 1) {
196                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
197                 _full_condition.wait (queue_lock);
198                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
199         }
200
201         _writer->rethrow ();
202         /* Re-throw any exception raised by one of our threads.  If more
203            than one has thrown an exception, only one will be rethrown, I think;
204            but then, if that happens something has gone badly wrong.
205         */
206         rethrow ();
207
208         Frame const position = time.frames_floor(_film->video_frame_rate());
209
210         if (_writer->can_fake_write (position)) {
211                 /* We can fake-write this frame */
212                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
213                 _writer->fake_write (position, pv->eyes ());
214                 frame_done ();
215         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
216                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
217                 /* This frame already has J2K data, so just write it */
218                 _writer->write (pv->j2k(), position, pv->eyes ());
219         } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) {
220                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
221                 _writer->repeat (position, pv->eyes ());
222         } else {
223                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
224                 /* Queue this new frame for encoding */
225                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
226                 _queue.push_back (shared_ptr<DCPVideo> (
227                                           new DCPVideo (
228                                                   pv,
229                                                   position,
230                                                   _film->video_frame_rate(),
231                                                   _film->j2k_bandwidth(),
232                                                   _film->resolution()
233                                                   )
234                                           ));
235
236                 /* The queue might not be empty any more, so notify anything which is
237                    waiting on that.
238                 */
239                 _empty_condition.notify_all ();
240         }
241
242         _last_player_video[pv->eyes()] = pv;
243         _last_player_video_time = time;
244 }
245
246 void
247 J2KEncoder::terminate_threads ()
248 {
249         boost::mutex::scoped_lock threads_lock (_threads_mutex);
250
251         int n = 0;
252         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
253                 /* Be careful not to throw in here otherwise _threads will not be clear()ed */
254                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
255                 (*i)->interrupt ();
256                 if (!(*i)->joinable()) {
257                         LOG_ERROR_NC ("About to join() a non-joinable thread");
258                 }
259                 try {
260                         (*i)->join ();
261                 } catch (boost::thread_interrupted& e) {
262                         /* This is to be expected (I think?) */
263                 } catch (exception& e) {
264                         LOG_ERROR ("join() threw an exception: %1", e.what());
265                 } catch (...) {
266                         LOG_ERROR_NC ("join() threw an exception");
267                 }
268                 delete *i;
269                 LOG_GENERAL_NC ("Thread terminated");
270                 ++n;
271         }
272
273         _threads.clear ();
274 }
275
276 void
277 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
278 try
279 {
280         if (server) {
281                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
282         } else {
283                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
284         }
285
286         /* Number of seconds that we currently wait between attempts
287            to connect to the server; not relevant for localhost
288            encodings.
289         */
290         int remote_backoff = 0;
291
292         while (true) {
293
294                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
295                 boost::mutex::scoped_lock lock (_queue_mutex);
296                 while (_queue.empty ()) {
297                         _empty_condition.wait (lock);
298                 }
299
300                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
301                 shared_ptr<DCPVideo> vf = _queue.front ();
302
303                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
304                    so we must not be interrupted until one or other of these things have happened.  This
305                    block has thread interruption disabled.
306                 */
307                 {
308                         boost::this_thread::disable_interruption dis;
309
310                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
311                         _queue.pop_front ();
312
313                         lock.unlock ();
314
315                         optional<Data> encoded;
316
317                         /* We need to encode this input */
318                         if (server) {
319                                 try {
320                                         encoded = vf->encode_remotely (server.get ());
321
322                                         if (remote_backoff > 0) {
323                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
324                                         }
325
326                                         /* This job succeeded, so remove any backoff */
327                                         remote_backoff = 0;
328
329                                 } catch (std::exception& e) {
330                                         if (remote_backoff < 60) {
331                                                 /* back off more */
332                                                 remote_backoff += 10;
333                                         }
334                                         LOG_ERROR (
335                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
336                                                 vf->index(), server->host_name(), e.what(), remote_backoff
337                                                 );
338                                 }
339
340                         } else {
341                                 try {
342                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
343                                         encoded = vf->encode_locally ();
344                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
345                                 } catch (std::exception& e) {
346                                         /* This is very bad, so don't cope with it, just pass it on */
347                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
348                                         throw;
349                                 }
350                         }
351
352                         if (encoded) {
353                                 _writer->write (encoded.get(), vf->index (), vf->eyes ());
354                                 frame_done ();
355                         } else {
356                                 lock.lock ();
357                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
358                                 _queue.push_front (vf);
359                                 lock.unlock ();
360                         }
361                 }
362
363                 if (remote_backoff > 0) {
364                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
365                 }
366
367                 /* The queue might not be full any more, so notify anything that is waiting on that */
368                 lock.lock ();
369                 _full_condition.notify_all ();
370         }
371 }
372 catch (boost::thread_interrupted& e) {
373         /* Ignore these and just stop the thread */
374         _full_condition.notify_all ();
375 }
376 catch (...)
377 {
378         store_current ();
379         /* Wake anything waiting on _full_condition so it can see the exception */
380         _full_condition.notify_all ();
381 }
382
383 void
384 J2KEncoder::servers_list_changed ()
385 {
386         terminate_threads ();
387
388         /* XXX: could re-use threads */
389
390         boost::mutex::scoped_lock lm (_threads_mutex);
391
392 #ifdef BOOST_THREAD_PLATFORM_WIN32
393         OSVERSIONINFO info;
394         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
395         GetVersionEx (&info);
396         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
397         if (windows_xp) {
398                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
399         }
400 #endif
401
402         if (!Config::instance()->only_servers_encode ()) {
403                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
404                         boost::thread* t = new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription> ()));
405 #ifdef DCPOMATIC_LINUX
406                         pthread_setname_np (t->native_handle(), "encode-worker");
407 #endif
408                         _threads.push_back (t);
409 #ifdef BOOST_THREAD_PLATFORM_WIN32
410                         if (windows_xp) {
411                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
412                         }
413 #endif
414                 }
415         }
416
417         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers()) {
418                 if (!i.current_link_version()) {
419                         continue;
420                 }
421
422                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
423                 for (int j = 0; j < i.threads(); ++j) {
424                         _threads.push_back (new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, i)));
425                 }
426         }
427
428         _writer->set_encoder_threads (_threads.size ());
429 }