5c3fd477ef16dfeefb63d90bdb13bdbdd156bd0a
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/j2k_encoder.cc
22  *  @brief J2K encoder class.
23  */
24
25 #include "j2k_encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "dcpomatic_log.h"
30 #include "config.h"
31 #include "dcp_video.h"
32 #include "cross.h"
33 #include "writer.h"
34 #include "encode_server_finder.h"
35 #include "player.h"
36 #include "player_video.h"
37 #include "encode_server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 using std::list;
46 using std::cout;
47 using std::exception;
48 using boost::shared_ptr;
49 using boost::weak_ptr;
50 using boost::optional;
51 using dcp::Data;
52 using namespace dcpomatic;
53
54 /** @param film Film that we are encoding.
55  *  @param writer Writer that we are using.
56  */
57 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
58         : _film (film)
59         , _history (200)
60         , _writer (writer)
61 {
62         servers_list_changed ();
63 }
64
65 J2KEncoder::~J2KEncoder ()
66 {
67         try {
68                 terminate_threads ();
69         } catch (...) {
70                 /* Destructors must not throw exceptions; anything bad
71                    happening now is too late to worry about anyway,
72                    I think.
73                 */
74         }
75 }
76
77 void
78 J2KEncoder::begin ()
79 {
80         weak_ptr<J2KEncoder> wp = shared_from_this ();
81         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
82                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
83                 );
84 }
85
86 /* We don't want the servers-list-changed callback trying to do things
87    during destruction of J2KEncoder, and I think this is the neatest way
88    to achieve that.
89 */
90 void
91 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
92 {
93         shared_ptr<J2KEncoder> e = encoder.lock ();
94         if (e) {
95                 e->servers_list_changed ();
96         }
97 }
98
99 void
100 J2KEncoder::end ()
101 {
102         boost::mutex::scoped_lock lock (_queue_mutex);
103
104         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
105
106         /* Keep waking workers until the queue is empty */
107         while (!_queue.empty ()) {
108                 rethrow ();
109                 _empty_condition.notify_all ();
110                 _full_condition.wait (lock);
111         }
112
113         lock.unlock ();
114
115         LOG_GENERAL_NC (N_("Terminating encoder threads"));
116
117         terminate_threads ();
118
119         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
120
121         /* The following sequence of events can occur in the above code:
122              1. a remote worker takes the last image off the queue
123              2. the loop above terminates
124              3. the remote worker fails to encode the image and puts it back on the queue
125              4. the remote worker is then terminated by terminate_threads
126
127              So just mop up anything left in the queue here.
128         */
129
130         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
131                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
132                 try {
133                         _writer->write (
134                                 (*i)->encode_locally(),
135                                 (*i)->index(),
136                                 (*i)->eyes()
137                                 );
138                         frame_done ();
139                 } catch (std::exception& e) {
140                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
141                 }
142         }
143 }
144
145 /** @return an estimate of the current number of frames we are encoding per second,
146  *  if known.
147  */
148 optional<float>
149 J2KEncoder::current_encoding_rate () const
150 {
151         return _history.rate ();
152 }
153
154 /** @return Number of video frames that have been queued for encoding */
155 int
156 J2KEncoder::video_frames_enqueued () const
157 {
158         if (!_last_player_video_time) {
159                 return 0;
160         }
161
162         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
163 }
164
165 /** Should be called when a frame has been encoded successfully */
166 void
167 J2KEncoder::frame_done ()
168 {
169         _history.event ();
170 }
171
172 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
173  *  so each time the supplied frame is the one after the previous one.
174  *  pv represents one video frame, and could be empty if there is nothing to encode
175  *  for this DCP frame.
176  *
177  *  @param pv PlayerVideo to encode.
178  *  @param time Time of \p pv within the DCP.
179  */
180 void
181 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
182 {
183         _waker.nudge ();
184
185         size_t threads = 0;
186         {
187                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
188                 threads = _threads.size ();
189         }
190
191         boost::mutex::scoped_lock queue_lock (_queue_mutex);
192
193         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
194            when there are no threads.
195         */
196         while (_queue.size() >= (threads * 2) + 1) {
197                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
198                 _full_condition.wait (queue_lock);
199                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
200         }
201
202         _writer->rethrow ();
203         /* Re-throw any exception raised by one of our threads.  If more
204            than one has thrown an exception, only one will be rethrown, I think;
205            but then, if that happens something has gone badly wrong.
206         */
207         rethrow ();
208
209         Frame const position = time.frames_floor(_film->video_frame_rate());
210
211         if (_writer->can_fake_write (position)) {
212                 /* We can fake-write this frame */
213                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
214                 _writer->fake_write (position, pv->eyes ());
215                 frame_done ();
216         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
217                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
218                 /* This frame already has J2K data, so just write it */
219                 _writer->write (pv->j2k(), position, pv->eyes ());
220         } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) {
221                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
222                 _writer->repeat (position, pv->eyes ());
223         } else {
224                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
225                 /* Queue this new frame for encoding */
226                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
227                 _queue.push_back (shared_ptr<DCPVideo> (
228                                           new DCPVideo (
229                                                   pv,
230                                                   position,
231                                                   _film->video_frame_rate(),
232                                                   _film->j2k_bandwidth(),
233                                                   _film->resolution()
234                                                   )
235                                           ));
236
237                 /* The queue might not be empty any more, so notify anything which is
238                    waiting on that.
239                 */
240                 _empty_condition.notify_all ();
241         }
242
243         _last_player_video[pv->eyes()] = pv;
244         _last_player_video_time = time;
245 }
246
247 void
248 J2KEncoder::terminate_threads ()
249 {
250         boost::mutex::scoped_lock threads_lock (_threads_mutex);
251
252         int n = 0;
253         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
254                 /* Be careful not to throw in here otherwise _threads will not be clear()ed */
255                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
256                 (*i)->interrupt ();
257                 if (!(*i)->joinable()) {
258                         LOG_ERROR_NC ("About to join() a non-joinable thread");
259                 }
260                 try {
261                         (*i)->join ();
262                 } catch (boost::thread_interrupted& e) {
263                         /* This is to be expected (I think?) */
264                 } catch (exception& e) {
265                         LOG_ERROR ("join() threw an exception: %1", e.what());
266                 } catch (...) {
267                         LOG_ERROR_NC ("join() threw an exception");
268                 }
269                 delete *i;
270                 LOG_GENERAL_NC ("Thread terminated");
271                 ++n;
272         }
273
274         _threads.clear ();
275 }
276
277 void
278 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
279 try
280 {
281         if (server) {
282                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
283         } else {
284                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
285         }
286
287         /* Number of seconds that we currently wait between attempts
288            to connect to the server; not relevant for localhost
289            encodings.
290         */
291         int remote_backoff = 0;
292
293         while (true) {
294
295                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
296                 boost::mutex::scoped_lock lock (_queue_mutex);
297                 while (_queue.empty ()) {
298                         _empty_condition.wait (lock);
299                 }
300
301                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
302                 shared_ptr<DCPVideo> vf = _queue.front ();
303
304                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
305                    so we must not be interrupted until one or other of these things have happened.  This
306                    block has thread interruption disabled.
307                 */
308                 {
309                         boost::this_thread::disable_interruption dis;
310
311                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
312                         _queue.pop_front ();
313
314                         lock.unlock ();
315
316                         optional<Data> encoded;
317
318                         /* We need to encode this input */
319                         if (server) {
320                                 try {
321                                         encoded = vf->encode_remotely (server.get ());
322
323                                         if (remote_backoff > 0) {
324                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
325                                         }
326
327                                         /* This job succeeded, so remove any backoff */
328                                         remote_backoff = 0;
329
330                                 } catch (std::exception& e) {
331                                         if (remote_backoff < 60) {
332                                                 /* back off more */
333                                                 remote_backoff += 10;
334                                         }
335                                         LOG_ERROR (
336                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
337                                                 vf->index(), server->host_name(), e.what(), remote_backoff
338                                                 );
339                                 }
340
341                         } else {
342                                 try {
343                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
344                                         encoded = vf->encode_locally ();
345                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
346                                 } catch (std::exception& e) {
347                                         /* This is very bad, so don't cope with it, just pass it on */
348                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
349                                         throw;
350                                 }
351                         }
352
353                         if (encoded) {
354                                 _writer->write (encoded.get(), vf->index (), vf->eyes ());
355                                 frame_done ();
356                         } else {
357                                 lock.lock ();
358                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
359                                 _queue.push_front (vf);
360                                 lock.unlock ();
361                         }
362                 }
363
364                 if (remote_backoff > 0) {
365                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
366                 }
367
368                 /* The queue might not be full any more, so notify anything that is waiting on that */
369                 lock.lock ();
370                 _full_condition.notify_all ();
371         }
372 }
373 catch (boost::thread_interrupted& e) {
374         /* Ignore these and just stop the thread */
375         _full_condition.notify_all ();
376 }
377 catch (...)
378 {
379         store_current ();
380         /* Wake anything waiting on _full_condition so it can see the exception */
381         _full_condition.notify_all ();
382 }
383
384 void
385 J2KEncoder::servers_list_changed ()
386 {
387         terminate_threads ();
388
389         /* XXX: could re-use threads */
390
391         boost::mutex::scoped_lock lm (_threads_mutex);
392
393 #ifdef BOOST_THREAD_PLATFORM_WIN32
394         OSVERSIONINFO info;
395         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
396         GetVersionEx (&info);
397         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
398         if (windows_xp) {
399                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
400         }
401 #endif
402
403         if (!Config::instance()->only_servers_encode ()) {
404                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
405                         boost::thread* t = new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription> ()));
406 #ifdef DCPOMATIC_LINUX
407                         pthread_setname_np (t->native_handle(), "encode-worker");
408 #endif
409                         _threads.push_back (t);
410 #ifdef BOOST_THREAD_PLATFORM_WIN32
411                         if (windows_xp) {
412                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
413                         }
414 #endif
415                 }
416         }
417
418         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers()) {
419                 if (!i.current_link_version()) {
420                         continue;
421                 }
422
423                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
424                 for (int j = 0; j < i.threads(); ++j) {
425                         _threads.push_back (new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, i)));
426                 }
427         }
428
429         _writer->set_encoder_threads (_threads.size ());
430 }