Re-add mutex that was taken away in
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/j2k_encoder.cc
22  *  @brief J2K encoder class.
23  */
24
25 #include "j2k_encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "dcpomatic_log.h"
30 #include "config.h"
31 #include "dcp_video.h"
32 #include "cross.h"
33 #include "writer.h"
34 #include "encode_server_finder.h"
35 #include "player.h"
36 #include "player_video.h"
37 #include "encode_server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 using std::list;
46 using std::cout;
47 using std::exception;
48 using boost::shared_ptr;
49 using boost::weak_ptr;
50 using boost::optional;
51 using dcp::Data;
52 using namespace dcpomatic;
53
54 /** @param film Film that we are encoding.
55  *  @param writer Writer that we are using.
56  */
57 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
58         : _film (film)
59         , _history (200)
60         , _writer (writer)
61 {
62         servers_list_changed ();
63 }
64
65 J2KEncoder::~J2KEncoder ()
66 {
67         terminate_threads ();
68 }
69
70 void
71 J2KEncoder::begin ()
72 {
73         weak_ptr<J2KEncoder> wp = shared_from_this ();
74         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
75                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
76                 );
77 }
78
79 /* We don't want the servers-list-changed callback trying to do things
80    during destruction of J2KEncoder, and I think this is the neatest way
81    to achieve that.
82 */
83 void
84 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
85 {
86         shared_ptr<J2KEncoder> e = encoder.lock ();
87         if (e) {
88                 e->servers_list_changed ();
89         }
90 }
91
92 void
93 J2KEncoder::end ()
94 {
95         boost::mutex::scoped_lock lock (_queue_mutex);
96
97         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
98
99         /* Keep waking workers until the queue is empty */
100         while (!_queue.empty ()) {
101                 rethrow ();
102                 _empty_condition.notify_all ();
103                 _full_condition.wait (lock);
104         }
105
106         lock.unlock ();
107
108         LOG_GENERAL_NC (N_("Terminating encoder threads"));
109
110         terminate_threads ();
111
112         /* Something might have been thrown during terminate_threads */
113         rethrow ();
114
115         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
116
117         /* The following sequence of events can occur in the above code:
118              1. a remote worker takes the last image off the queue
119              2. the loop above terminates
120              3. the remote worker fails to encode the image and puts it back on the queue
121              4. the remote worker is then terminated by terminate_threads
122
123              So just mop up anything left in the queue here.
124         */
125
126         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
127                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
128                 try {
129                         _writer->write (
130                                 shared_ptr<dcp::Data>(new dcp::ArrayData((*i)->encode_locally())),
131                                 (*i)->index(),
132                                 (*i)->eyes()
133                                 );
134                         frame_done ();
135                 } catch (std::exception& e) {
136                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
137                 }
138         }
139 }
140
141 /** @return an estimate of the current number of frames we are encoding per second,
142  *  if known.
143  */
144 optional<float>
145 J2KEncoder::current_encoding_rate () const
146 {
147         return _history.rate ();
148 }
149
150 /** @return Number of video frames that have been queued for encoding */
151 int
152 J2KEncoder::video_frames_enqueued () const
153 {
154         if (!_last_player_video_time) {
155                 return 0;
156         }
157
158         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
159 }
160
161 /** Should be called when a frame has been encoded successfully */
162 void
163 J2KEncoder::frame_done ()
164 {
165         _history.event ();
166 }
167
168 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
169  *  so each time the supplied frame is the one after the previous one.
170  *  pv represents one video frame, and could be empty if there is nothing to encode
171  *  for this DCP frame.
172  *
173  *  @param pv PlayerVideo to encode.
174  *  @param time Time of \p pv within the DCP.
175  */
176 void
177 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
178 {
179         _waker.nudge ();
180
181         size_t threads = 0;
182         {
183                 boost::mutex::scoped_lock lm (_threads_mutex);
184                 threads = _threads->size();
185         }
186
187         boost::mutex::scoped_lock queue_lock (_queue_mutex);
188
189         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
190            when there are no threads.
191         */
192         while (_queue.size() >= (threads * 2) + 1) {
193                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
194                 _full_condition.wait (queue_lock);
195                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
196         }
197
198         _writer->rethrow ();
199         /* Re-throw any exception raised by one of our threads.  If more
200            than one has thrown an exception, only one will be rethrown, I think;
201            but then, if that happens something has gone badly wrong.
202         */
203         rethrow ();
204
205         Frame const position = time.frames_floor(_film->video_frame_rate());
206
207         if (_writer->can_fake_write (position)) {
208                 /* We can fake-write this frame */
209                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
210                 _writer->fake_write (position, pv->eyes ());
211                 frame_done ();
212         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
213                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
214                 /* This frame already has J2K data, so just write it */
215                 _writer->write (pv->j2k(), position, pv->eyes ());
216         } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) {
217                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
218                 _writer->repeat (position, pv->eyes ());
219         } else {
220                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
221                 /* Queue this new frame for encoding */
222                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
223                 _queue.push_back (shared_ptr<DCPVideo> (
224                                           new DCPVideo (
225                                                   pv,
226                                                   position,
227                                                   _film->video_frame_rate(),
228                                                   _film->j2k_bandwidth(),
229                                                   _film->resolution()
230                                                   )
231                                           ));
232
233                 /* The queue might not be empty any more, so notify anything which is
234                    waiting on that.
235                 */
236                 _empty_condition.notify_all ();
237         }
238
239         _last_player_video[pv->eyes()] = pv;
240         _last_player_video_time = time;
241 }
242
243 void
244 J2KEncoder::terminate_threads ()
245 {
246         boost::this_thread::disable_interruption dis;
247         boost::mutex::scoped_lock lm (_threads_mutex);
248
249         if (!_threads) {
250                 return;
251         }
252
253         _threads->interrupt_all ();
254         try {
255                 _threads->join_all ();
256         } catch (exception& e) {
257                 LOG_ERROR ("join() threw an exception: %1", e.what());
258         } catch (...) {
259                 LOG_ERROR_NC ("join() threw an exception");
260         }
261
262         _threads.reset ();
263 }
264
265 void
266 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
267 try
268 {
269         if (server) {
270                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
271         } else {
272                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
273         }
274
275         /* Number of seconds that we currently wait between attempts
276            to connect to the server; not relevant for localhost
277            encodings.
278         */
279         int remote_backoff = 0;
280
281         while (true) {
282
283                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
284                 boost::mutex::scoped_lock lock (_queue_mutex);
285                 while (_queue.empty ()) {
286                         _empty_condition.wait (lock);
287                 }
288
289                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
290                 shared_ptr<DCPVideo> vf = _queue.front ();
291
292                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
293                    so we must not be interrupted until one or other of these things have happened.  This
294                    block has thread interruption disabled.
295                 */
296                 {
297                         boost::this_thread::disable_interruption dis;
298
299                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
300                         _queue.pop_front ();
301
302                         lock.unlock ();
303
304                         shared_ptr<Data> encoded;
305
306                         /* We need to encode this input */
307                         if (server) {
308                                 try {
309                                         encoded.reset(new dcp::ArrayData(vf->encode_remotely(server.get())));
310
311                                         if (remote_backoff > 0) {
312                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
313                                         }
314
315                                         /* This job succeeded, so remove any backoff */
316                                         remote_backoff = 0;
317
318                                 } catch (std::exception& e) {
319                                         if (remote_backoff < 60) {
320                                                 /* back off more */
321                                                 remote_backoff += 10;
322                                         }
323                                         LOG_ERROR (
324                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
325                                                 vf->index(), server->host_name(), e.what(), remote_backoff
326                                                 );
327                                 }
328
329                         } else {
330                                 try {
331                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
332                                         encoded.reset(new dcp::ArrayData(vf->encode_locally()));
333                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
334                                 } catch (std::exception& e) {
335                                         /* This is very bad, so don't cope with it, just pass it on */
336                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
337                                         throw;
338                                 }
339                         }
340
341                         if (encoded) {
342                                 _writer->write (encoded, vf->index(), vf->eyes());
343                                 frame_done ();
344                         } else {
345                                 lock.lock ();
346                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
347                                 _queue.push_front (vf);
348                                 lock.unlock ();
349                         }
350                 }
351
352                 if (remote_backoff > 0) {
353                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
354                 }
355
356                 /* The queue might not be full any more, so notify anything that is waiting on that */
357                 lock.lock ();
358                 _full_condition.notify_all ();
359         }
360 }
361 catch (boost::thread_interrupted& e) {
362         /* Ignore these and just stop the thread */
363         _full_condition.notify_all ();
364 }
365 catch (...)
366 {
367         store_current ();
368         /* Wake anything waiting on _full_condition so it can see the exception */
369         _full_condition.notify_all ();
370 }
371
372 void
373 J2KEncoder::servers_list_changed ()
374 {
375         terminate_threads ();
376         _threads.reset (new boost::thread_group());
377
378         boost::mutex::scoped_lock lm (_threads_mutex);
379
380         /* XXX: could re-use threads */
381
382         if (!Config::instance()->only_servers_encode ()) {
383                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
384 #ifdef DCPOMATIC_LINUX
385                         boost::thread* t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
386                         pthread_setname_np (t->native_handle(), "encode-worker");
387 #else
388                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
389 #endif
390                 }
391         }
392
393         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers()) {
394                 if (!i.current_link_version()) {
395                         continue;
396                 }
397
398                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
399                 for (int j = 0; j < i.threads(); ++j) {
400                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
401                 }
402         }
403
404         _writer->set_encoder_threads (_threads->size());
405 }