273bce8fb6331acc76101e2b1b1a0f42ce609c44
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/j2k_encoder.cc
22  *  @brief J2K encoder class.
23  */
24
25 #include "j2k_encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "dcpomatic_log.h"
30 #include "config.h"
31 #include "dcp_video.h"
32 #include "cross.h"
33 #include "writer.h"
34 #include "encode_server_finder.h"
35 #include "player.h"
36 #include "player_video.h"
37 #include "encode_server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 using std::list;
46 using std::cout;
47 using std::exception;
48 using std::shared_ptr;
49 using std::weak_ptr;
50 using boost::optional;
51 using dcp::Data;
52 using namespace dcpomatic;
53
54 /** @param film Film that we are encoding.
55  *  @param writer Writer that we are using.
56  */
57 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
58         : _film (film)
59         , _history (200)
60         , _writer (writer)
61 {
62         servers_list_changed ();
63 }
64
65 J2KEncoder::~J2KEncoder ()
66 {
67         boost::mutex::scoped_lock lm (_threads_mutex);
68         terminate_threads ();
69 }
70
71 void
72 J2KEncoder::begin ()
73 {
74         weak_ptr<J2KEncoder> wp = shared_from_this ();
75         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
76                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
77                 );
78 }
79
80 /* We don't want the servers-list-changed callback trying to do things
81    during destruction of J2KEncoder, and I think this is the neatest way
82    to achieve that.
83 */
84 void
85 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
86 {
87         shared_ptr<J2KEncoder> e = encoder.lock ();
88         if (e) {
89                 e->servers_list_changed ();
90         }
91 }
92
93 void
94 J2KEncoder::end ()
95 {
96         boost::mutex::scoped_lock lock (_queue_mutex);
97
98         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
99
100         /* Keep waking workers until the queue is empty */
101         while (!_queue.empty ()) {
102                 rethrow ();
103                 _empty_condition.notify_all ();
104                 _full_condition.wait (lock);
105         }
106
107         lock.unlock ();
108
109         LOG_GENERAL_NC (N_("Terminating encoder threads"));
110
111         {
112                 boost::mutex::scoped_lock lm (_threads_mutex);
113                 terminate_threads ();
114         }
115
116         /* Something might have been thrown during terminate_threads */
117         rethrow ();
118
119         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
120
121         /* The following sequence of events can occur in the above code:
122              1. a remote worker takes the last image off the queue
123              2. the loop above terminates
124              3. the remote worker fails to encode the image and puts it back on the queue
125              4. the remote worker is then terminated by terminate_threads
126
127              So just mop up anything left in the queue here.
128         */
129
130         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
131                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
132                 try {
133                         _writer->write (
134                                 shared_ptr<dcp::Data>(new dcp::ArrayData((*i)->encode_locally())),
135                                 (*i)->index(),
136                                 (*i)->eyes()
137                                 );
138                         frame_done ();
139                 } catch (std::exception& e) {
140                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
141                 }
142         }
143 }
144
145 /** @return an estimate of the current number of frames we are encoding per second,
146  *  if known.
147  */
148 optional<float>
149 J2KEncoder::current_encoding_rate () const
150 {
151         return _history.rate ();
152 }
153
154 /** @return Number of video frames that have been queued for encoding */
155 int
156 J2KEncoder::video_frames_enqueued () const
157 {
158         if (!_last_player_video_time) {
159                 return 0;
160         }
161
162         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
163 }
164
165 /** Should be called when a frame has been encoded successfully */
166 void
167 J2KEncoder::frame_done ()
168 {
169         _history.event ();
170 }
171
172 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
173  *  so each time the supplied frame is the one after the previous one.
174  *  pv represents one video frame, and could be empty if there is nothing to encode
175  *  for this DCP frame.
176  *
177  *  @param pv PlayerVideo to encode.
178  *  @param time Time of \p pv within the DCP.
179  */
180 void
181 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
182 {
183         _waker.nudge ();
184
185         size_t threads = 0;
186         {
187                 boost::mutex::scoped_lock lm (_threads_mutex);
188                 threads = _threads->size();
189         }
190
191         boost::mutex::scoped_lock queue_lock (_queue_mutex);
192
193         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
194            when there are no threads.
195         */
196         while (_queue.size() >= (threads * 2) + 1) {
197                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
198                 _full_condition.wait (queue_lock);
199                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
200         }
201
202         _writer->rethrow ();
203         /* Re-throw any exception raised by one of our threads.  If more
204            than one has thrown an exception, only one will be rethrown, I think;
205            but then, if that happens something has gone badly wrong.
206         */
207         rethrow ();
208
209         Frame const position = time.frames_floor(_film->video_frame_rate());
210
211         if (_writer->can_fake_write (position)) {
212                 /* We can fake-write this frame */
213                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
214                 _writer->fake_write (position, pv->eyes ());
215                 frame_done ();
216         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
217                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
218                 /* This frame already has J2K data, so just write it */
219                 _writer->write (pv->j2k(), position, pv->eyes ());
220                 frame_done ();
221         } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) {
222                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
223                 _writer->repeat (position, pv->eyes ());
224         } else {
225                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
226                 /* Queue this new frame for encoding */
227                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
228                 _queue.push_back (shared_ptr<DCPVideo> (
229                                           new DCPVideo (
230                                                   pv,
231                                                   position,
232                                                   _film->video_frame_rate(),
233                                                   _film->j2k_bandwidth(),
234                                                   _film->resolution()
235                                                   )
236                                           ));
237
238                 /* The queue might not be empty any more, so notify anything which is
239                    waiting on that.
240                 */
241                 _empty_condition.notify_all ();
242         }
243
244         _last_player_video[pv->eyes()] = pv;
245         _last_player_video_time = time;
246 }
247
248
249 /** Caller must hold a lock on _threads_mutex */
250 void
251 J2KEncoder::terminate_threads ()
252 {
253         boost::this_thread::disable_interruption dis;
254
255         if (!_threads) {
256                 return;
257         }
258
259         _threads->interrupt_all ();
260         try {
261                 _threads->join_all ();
262         } catch (exception& e) {
263                 LOG_ERROR ("join() threw an exception: %1", e.what());
264         } catch (...) {
265                 LOG_ERROR_NC ("join() threw an exception");
266         }
267
268         _threads.reset ();
269 }
270
271 void
272 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
273 try
274 {
275         if (server) {
276                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
277         } else {
278                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
279         }
280
281         /* Number of seconds that we currently wait between attempts
282            to connect to the server; not relevant for localhost
283            encodings.
284         */
285         int remote_backoff = 0;
286
287         while (true) {
288
289                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
290                 boost::mutex::scoped_lock lock (_queue_mutex);
291                 while (_queue.empty ()) {
292                         _empty_condition.wait (lock);
293                 }
294
295                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
296                 shared_ptr<DCPVideo> vf = _queue.front ();
297
298                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
299                    so we must not be interrupted until one or other of these things have happened.  This
300                    block has thread interruption disabled.
301                 */
302                 {
303                         boost::this_thread::disable_interruption dis;
304
305                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
306                         _queue.pop_front ();
307
308                         lock.unlock ();
309
310                         shared_ptr<Data> encoded;
311
312                         /* We need to encode this input */
313                         if (server) {
314                                 try {
315                                         encoded.reset(new dcp::ArrayData(vf->encode_remotely(server.get())));
316
317                                         if (remote_backoff > 0) {
318                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
319                                         }
320
321                                         /* This job succeeded, so remove any backoff */
322                                         remote_backoff = 0;
323
324                                 } catch (std::exception& e) {
325                                         if (remote_backoff < 60) {
326                                                 /* back off more */
327                                                 remote_backoff += 10;
328                                         }
329                                         LOG_ERROR (
330                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
331                                                 vf->index(), server->host_name(), e.what(), remote_backoff
332                                                 );
333                                 }
334
335                         } else {
336                                 try {
337                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
338                                         encoded.reset(new dcp::ArrayData(vf->encode_locally()));
339                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
340                                 } catch (std::exception& e) {
341                                         /* This is very bad, so don't cope with it, just pass it on */
342                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
343                                         throw;
344                                 }
345                         }
346
347                         if (encoded) {
348                                 _writer->write (encoded, vf->index(), vf->eyes());
349                                 frame_done ();
350                         } else {
351                                 lock.lock ();
352                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
353                                 _queue.push_front (vf);
354                                 lock.unlock ();
355                         }
356                 }
357
358                 if (remote_backoff > 0) {
359                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
360                 }
361
362                 /* The queue might not be full any more, so notify anything that is waiting on that */
363                 lock.lock ();
364                 _full_condition.notify_all ();
365         }
366 }
367 catch (boost::thread_interrupted& e) {
368         /* Ignore these and just stop the thread */
369         _full_condition.notify_all ();
370 }
371 catch (...)
372 {
373         store_current ();
374         /* Wake anything waiting on _full_condition so it can see the exception */
375         _full_condition.notify_all ();
376 }
377
378 void
379 J2KEncoder::servers_list_changed ()
380 {
381         boost::mutex::scoped_lock lm (_threads_mutex);
382
383         terminate_threads ();
384         _threads.reset (new boost::thread_group());
385
386         /* XXX: could re-use threads */
387
388         if (!Config::instance()->only_servers_encode ()) {
389                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
390 #ifdef DCPOMATIC_LINUX
391                         boost::thread* t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
392                         pthread_setname_np (t->native_handle(), "encode-worker");
393 #else
394                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
395 #endif
396                 }
397         }
398
399         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers()) {
400                 if (!i.current_link_version()) {
401                         continue;
402                 }
403
404                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
405                 for (int j = 0; j < i.threads(); ++j) {
406                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
407                 }
408         }
409
410         _writer->set_encoder_threads (_threads->size());
411 }