acd27932e226c87ed8a4572fbfa65b999b1f31ec
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/j2k_encoder.cc
22  *  @brief J2K encoder class.
23  */
24
25 #include "j2k_encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "dcpomatic_log.h"
30 #include "config.h"
31 #include "dcp_video.h"
32 #include "cross.h"
33 #include "writer.h"
34 #include "encode_server_finder.h"
35 #include "player.h"
36 #include "player_video.h"
37 #include "encode_server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 using std::list;
45 using std::cout;
46 using std::exception;
47 using std::shared_ptr;
48 using std::weak_ptr;
49 using boost::optional;
50 using dcp::Data;
51 using namespace dcpomatic;
52
53 /** @param film Film that we are encoding.
54  *  @param writer Writer that we are using.
55  */
56 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
57         : _film (film)
58         , _history (200)
59         , _writer (writer)
60 {
61         servers_list_changed ();
62 }
63
64 J2KEncoder::~J2KEncoder ()
65 {
66         boost::mutex::scoped_lock lm (_threads_mutex);
67         terminate_threads ();
68 }
69
70 void
71 J2KEncoder::begin ()
72 {
73         weak_ptr<J2KEncoder> wp = shared_from_this ();
74         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
75                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
76                 );
77 }
78
79 /* We don't want the servers-list-changed callback trying to do things
80    during destruction of J2KEncoder, and I think this is the neatest way
81    to achieve that.
82 */
83 void
84 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
85 {
86         shared_ptr<J2KEncoder> e = encoder.lock ();
87         if (e) {
88                 e->servers_list_changed ();
89         }
90 }
91
92 void
93 J2KEncoder::end ()
94 {
95         boost::mutex::scoped_lock lock (_queue_mutex);
96
97         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
98
99         /* Keep waking workers until the queue is empty */
100         while (!_queue.empty ()) {
101                 rethrow ();
102                 _empty_condition.notify_all ();
103                 _full_condition.wait (lock);
104         }
105
106         lock.unlock ();
107
108         LOG_GENERAL_NC (N_("Terminating encoder threads"));
109
110         {
111                 boost::mutex::scoped_lock lm (_threads_mutex);
112                 terminate_threads ();
113         }
114
115         /* Something might have been thrown during terminate_threads */
116         rethrow ();
117
118         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
119
120         /* The following sequence of events can occur in the above code:
121              1. a remote worker takes the last image off the queue
122              2. the loop above terminates
123              3. the remote worker fails to encode the image and puts it back on the queue
124              4. the remote worker is then terminated by terminate_threads
125
126              So just mop up anything left in the queue here.
127         */
128
129         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
130                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
131                 try {
132                         _writer->write (
133                                 shared_ptr<dcp::Data>(new dcp::ArrayData((*i)->encode_locally())),
134                                 (*i)->index(),
135                                 (*i)->eyes()
136                                 );
137                         frame_done ();
138                 } catch (std::exception& e) {
139                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
140                 }
141         }
142 }
143
144 /** @return an estimate of the current number of frames we are encoding per second,
145  *  if known.
146  */
147 optional<float>
148 J2KEncoder::current_encoding_rate () const
149 {
150         return _history.rate ();
151 }
152
153 /** @return Number of video frames that have been queued for encoding */
154 int
155 J2KEncoder::video_frames_enqueued () const
156 {
157         if (!_last_player_video_time) {
158                 return 0;
159         }
160
161         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
162 }
163
164 /** Should be called when a frame has been encoded successfully */
165 void
166 J2KEncoder::frame_done ()
167 {
168         _history.event ();
169 }
170
171 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
172  *  so each time the supplied frame is the one after the previous one.
173  *  pv represents one video frame, and could be empty if there is nothing to encode
174  *  for this DCP frame.
175  *
176  *  @param pv PlayerVideo to encode.
177  *  @param time Time of \p pv within the DCP.
178  */
179 void
180 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
181 {
182         _waker.nudge ();
183
184         size_t threads = 0;
185         {
186                 boost::mutex::scoped_lock lm (_threads_mutex);
187                 threads = _threads->size();
188         }
189
190         boost::mutex::scoped_lock queue_lock (_queue_mutex);
191
192         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
193            when there are no threads.
194         */
195         while (_queue.size() >= (threads * 2) + 1) {
196                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
197                 _full_condition.wait (queue_lock);
198                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
199         }
200
201         _writer->rethrow ();
202         /* Re-throw any exception raised by one of our threads.  If more
203            than one has thrown an exception, only one will be rethrown, I think;
204            but then, if that happens something has gone badly wrong.
205         */
206         rethrow ();
207
208         Frame const position = time.frames_floor(_film->video_frame_rate());
209
210         if (_writer->can_fake_write (position)) {
211                 /* We can fake-write this frame */
212                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
213                 _writer->fake_write (position, pv->eyes ());
214                 frame_done ();
215         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
216                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
217                 /* This frame already has J2K data, so just write it */
218                 _writer->write (pv->j2k(), position, pv->eyes ());
219                 frame_done ();
220         } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) {
221                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
222                 _writer->repeat (position, pv->eyes ());
223         } else {
224                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
225                 /* Queue this new frame for encoding */
226                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
227                 _queue.push_back (shared_ptr<DCPVideo> (
228                                           new DCPVideo (
229                                                   pv,
230                                                   position,
231                                                   _film->video_frame_rate(),
232                                                   _film->j2k_bandwidth(),
233                                                   _film->resolution()
234                                                   )
235                                           ));
236
237                 /* The queue might not be empty any more, so notify anything which is
238                    waiting on that.
239                 */
240                 _empty_condition.notify_all ();
241         }
242
243         _last_player_video[pv->eyes()] = pv;
244         _last_player_video_time = time;
245 }
246
247
248 /** Caller must hold a lock on _threads_mutex */
249 void
250 J2KEncoder::terminate_threads ()
251 {
252         boost::this_thread::disable_interruption dis;
253
254         if (!_threads) {
255                 return;
256         }
257
258         _threads->interrupt_all ();
259         try {
260                 _threads->join_all ();
261         } catch (exception& e) {
262                 LOG_ERROR ("join() threw an exception: %1", e.what());
263         } catch (...) {
264                 LOG_ERROR_NC ("join() threw an exception");
265         }
266
267         _threads.reset ();
268 }
269
270 void
271 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
272 try
273 {
274         if (server) {
275                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
276         } else {
277                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
278         }
279
280         /* Number of seconds that we currently wait between attempts
281            to connect to the server; not relevant for localhost
282            encodings.
283         */
284         int remote_backoff = 0;
285
286         while (true) {
287
288                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
289                 boost::mutex::scoped_lock lock (_queue_mutex);
290                 while (_queue.empty ()) {
291                         _empty_condition.wait (lock);
292                 }
293
294                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
295                 shared_ptr<DCPVideo> vf = _queue.front ();
296
297                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
298                    so we must not be interrupted until one or other of these things have happened.  This
299                    block has thread interruption disabled.
300                 */
301                 {
302                         boost::this_thread::disable_interruption dis;
303
304                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
305                         _queue.pop_front ();
306
307                         lock.unlock ();
308
309                         shared_ptr<Data> encoded;
310
311                         /* We need to encode this input */
312                         if (server) {
313                                 try {
314                                         encoded.reset(new dcp::ArrayData(vf->encode_remotely(server.get())));
315
316                                         if (remote_backoff > 0) {
317                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
318                                         }
319
320                                         /* This job succeeded, so remove any backoff */
321                                         remote_backoff = 0;
322
323                                 } catch (std::exception& e) {
324                                         if (remote_backoff < 60) {
325                                                 /* back off more */
326                                                 remote_backoff += 10;
327                                         }
328                                         LOG_ERROR (
329                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
330                                                 vf->index(), server->host_name(), e.what(), remote_backoff
331                                                 );
332                                 }
333
334                         } else {
335                                 try {
336                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
337                                         encoded.reset(new dcp::ArrayData(vf->encode_locally()));
338                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
339                                 } catch (std::exception& e) {
340                                         /* This is very bad, so don't cope with it, just pass it on */
341                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
342                                         throw;
343                                 }
344                         }
345
346                         if (encoded) {
347                                 _writer->write (encoded, vf->index(), vf->eyes());
348                                 frame_done ();
349                         } else {
350                                 lock.lock ();
351                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
352                                 _queue.push_front (vf);
353                                 lock.unlock ();
354                         }
355                 }
356
357                 if (remote_backoff > 0) {
358                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
359                 }
360
361                 /* The queue might not be full any more, so notify anything that is waiting on that */
362                 lock.lock ();
363                 _full_condition.notify_all ();
364         }
365 }
366 catch (boost::thread_interrupted& e) {
367         /* Ignore these and just stop the thread */
368         _full_condition.notify_all ();
369 }
370 catch (...)
371 {
372         store_current ();
373         /* Wake anything waiting on _full_condition so it can see the exception */
374         _full_condition.notify_all ();
375 }
376
377 void
378 J2KEncoder::servers_list_changed ()
379 {
380         boost::mutex::scoped_lock lm (_threads_mutex);
381
382         terminate_threads ();
383         _threads.reset (new boost::thread_group());
384
385         /* XXX: could re-use threads */
386
387         if (!Config::instance()->only_servers_encode ()) {
388                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
389 #ifdef DCPOMATIC_LINUX
390                         boost::thread* t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
391                         pthread_setname_np (t->native_handle(), "encode-worker");
392 #else
393                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
394 #endif
395                 }
396         }
397
398         for (auto i: EncodeServerFinder::instance()->servers()) {
399                 if (!i.current_link_version()) {
400                         continue;
401                 }
402
403                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
404                 for (int j = 0; j < i.threads(); ++j) {
405                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
406                 }
407         }
408
409         _writer->set_encoder_threads (_threads->size());
410 }