Use dcp::file_to_string().
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2021 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21
22 /** @file src/j2k_encoder.cc
23  *  @brief J2K encoder class.
24  */
25
26
27 #include "j2k_encoder.h"
28 #include "util.h"
29 #include "film.h"
30 #include "log.h"
31 #include "dcpomatic_log.h"
32 #include "config.h"
33 #include "dcp_video.h"
34 #include "cross.h"
35 #include "writer.h"
36 #include "encode_server_finder.h"
37 #include "player.h"
38 #include "player_video.h"
39 #include "encode_server_description.h"
40 #include "compose.hpp"
41 #include <libcxml/cxml.h>
42 #include <iostream>
43
44 #include "i18n.h"
45
46
47 using std::list;
48 using std::cout;
49 using std::exception;
50 using std::shared_ptr;
51 using std::weak_ptr;
52 using std::make_shared;
53 using boost::optional;
54 using dcp::Data;
55 using namespace dcpomatic;
56
57
58 /** @param film Film that we are encoding.
59  *  @param writer Writer that we are using.
60  */
61 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
62         : _film (film)
63         , _history (200)
64         , _writer (writer)
65 {
66         servers_list_changed ();
67 }
68
69
70 J2KEncoder::~J2KEncoder ()
71 {
72         boost::mutex::scoped_lock lm (_threads_mutex);
73         terminate_threads ();
74 }
75
76
77 void
78 J2KEncoder::begin ()
79 {
80         auto wp = shared_from_this ();
81         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
82                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
83                 );
84 }
85
86
87 /* We don't want the servers-list-changed callback trying to do things
88    during destruction of J2KEncoder, and I think this is the neatest way
89    to achieve that.
90 */
91 void
92 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
93 {
94         auto e = encoder.lock ();
95         if (e) {
96                 e->servers_list_changed ();
97         }
98 }
99
100
101 void
102 J2KEncoder::end ()
103 {
104         boost::mutex::scoped_lock lock (_queue_mutex);
105
106         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
107
108         /* Keep waking workers until the queue is empty */
109         while (!_queue.empty ()) {
110                 rethrow ();
111                 _empty_condition.notify_all ();
112                 _full_condition.wait (lock);
113         }
114
115         lock.unlock ();
116
117         LOG_GENERAL_NC (N_("Terminating encoder threads"));
118
119         {
120                 boost::mutex::scoped_lock lm (_threads_mutex);
121                 terminate_threads ();
122         }
123
124         /* Something might have been thrown during terminate_threads */
125         rethrow ();
126
127         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
128
129         /* The following sequence of events can occur in the above code:
130              1. a remote worker takes the last image off the queue
131              2. the loop above terminates
132              3. the remote worker fails to encode the image and puts it back on the queue
133              4. the remote worker is then terminated by terminate_threads
134
135              So just mop up anything left in the queue here.
136         */
137
138         for (auto i: _queue) {
139                 LOG_GENERAL(N_("Encode left-over frame %1"), i->index());
140                 try {
141                         _writer->write (
142                                 make_shared<dcp::ArrayData>(i->encode_locally()),
143                                 i->index(),
144                                 i->eyes()
145                                 );
146                         frame_done ();
147                 } catch (std::exception& e) {
148                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
149                 }
150         }
151 }
152
153
154 /** @return an estimate of the current number of frames we are encoding per second,
155  *  if known.
156  */
157 optional<float>
158 J2KEncoder::current_encoding_rate () const
159 {
160         return _history.rate ();
161 }
162
163
164 /** @return Number of video frames that have been queued for encoding */
165 int
166 J2KEncoder::video_frames_enqueued () const
167 {
168         if (!_last_player_video_time) {
169                 return 0;
170         }
171
172         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
173 }
174
175
176 /** Should be called when a frame has been encoded successfully */
177 void
178 J2KEncoder::frame_done ()
179 {
180         _history.event ();
181 }
182
183
184 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
185  *  so each time the supplied frame is the one after the previous one.
186  *  pv represents one video frame, and could be empty if there is nothing to encode
187  *  for this DCP frame.
188  *
189  *  @param pv PlayerVideo to encode.
190  *  @param time Time of \p pv within the DCP.
191  */
192 void
193 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
194 {
195         _waker.nudge ();
196
197         size_t threads = 0;
198         {
199                 boost::mutex::scoped_lock lm (_threads_mutex);
200                 threads = _threads->size();
201         }
202
203         boost::mutex::scoped_lock queue_lock (_queue_mutex);
204
205         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
206            when there are no threads.
207         */
208         while (_queue.size() >= (threads * 2) + 1) {
209                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
210                 _full_condition.wait (queue_lock);
211                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
212         }
213
214         _writer->rethrow ();
215         /* Re-throw any exception raised by one of our threads.  If more
216            than one has thrown an exception, only one will be rethrown, I think;
217            but then, if that happens something has gone badly wrong.
218         */
219         rethrow ();
220
221         auto const position = time.frames_floor(_film->video_frame_rate());
222
223         if (_writer->can_fake_write (position)) {
224                 /* We can fake-write this frame */
225                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
226                 _writer->fake_write (position, pv->eyes ());
227                 frame_done ();
228         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
229                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
230                 /* This frame already has J2K data, so just write it */
231                 _writer->write (pv->j2k(), position, pv->eyes ());
232                 frame_done ();
233         } else if (_last_player_video[static_cast<int>(pv->eyes())] && _writer->can_repeat(position) && pv->same (_last_player_video[static_cast<int>(pv->eyes())])) {
234                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
235                 _writer->repeat (position, pv->eyes ());
236         } else {
237                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
238                 /* Queue this new frame for encoding */
239                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
240                 _queue.push_back (make_shared<DCPVideo>(
241                                 pv,
242                                 position,
243                                 _film->video_frame_rate(),
244                                 _film->j2k_bandwidth(),
245                                 _film->resolution()
246                                 ));
247
248                 /* The queue might not be empty any more, so notify anything which is
249                    waiting on that.
250                 */
251                 _empty_condition.notify_all ();
252         }
253
254         _last_player_video[static_cast<int>(pv->eyes())] = pv;
255         _last_player_video_time = time;
256 }
257
258
259 /** Caller must hold a lock on _threads_mutex */
260 void
261 J2KEncoder::terminate_threads ()
262 {
263         boost::this_thread::disable_interruption dis;
264
265         if (!_threads) {
266                 return;
267         }
268
269         _threads->interrupt_all ();
270         try {
271                 _threads->join_all ();
272         } catch (exception& e) {
273                 LOG_ERROR ("join() threw an exception: %1", e.what());
274         } catch (...) {
275                 LOG_ERROR_NC ("join() threw an exception");
276         }
277
278         _threads.reset ();
279 }
280
281
282 void
283 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
284 try
285 {
286         start_of_thread ("J2KEncoder");
287
288         if (server) {
289                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
290         } else {
291                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
292         }
293
294         /* Number of seconds that we currently wait between attempts
295            to connect to the server; not relevant for localhost
296            encodings.
297         */
298         int remote_backoff = 0;
299
300         while (true) {
301
302                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
303                 boost::mutex::scoped_lock lock (_queue_mutex);
304                 while (_queue.empty ()) {
305                         _empty_condition.wait (lock);
306                 }
307
308                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
309                 auto vf = _queue.front ();
310
311                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
312                    so we must not be interrupted until one or other of these things have happened.  This
313                    block has thread interruption disabled.
314                 */
315                 {
316                         boost::this_thread::disable_interruption dis;
317
318                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
319                         _queue.pop_front ();
320
321                         lock.unlock ();
322
323                         shared_ptr<Data> encoded;
324
325                         /* We need to encode this input */
326                         if (server) {
327                                 try {
328                                         encoded = make_shared<dcp::ArrayData>(vf->encode_remotely(server.get()));
329
330                                         if (remote_backoff > 0) {
331                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
332                                         }
333
334                                         /* This job succeeded, so remove any backoff */
335                                         remote_backoff = 0;
336
337                                 } catch (std::exception& e) {
338                                         if (remote_backoff < 60) {
339                                                 /* back off more */
340                                                 remote_backoff += 10;
341                                         }
342                                         LOG_ERROR (
343                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
344                                                 vf->index(), server->host_name(), e.what(), remote_backoff
345                                                 );
346                                 }
347
348                         } else {
349                                 try {
350                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
351                                         encoded = make_shared<dcp::ArrayData>(vf->encode_locally());
352                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
353                                 } catch (std::exception& e) {
354                                         /* This is very bad, so don't cope with it, just pass it on */
355                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
356                                         throw;
357                                 }
358                         }
359
360                         if (encoded) {
361                                 _writer->write (encoded, vf->index(), vf->eyes());
362                                 frame_done ();
363                         } else {
364                                 lock.lock ();
365                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
366                                 _queue.push_front (vf);
367                                 lock.unlock ();
368                         }
369                 }
370
371                 if (remote_backoff > 0) {
372                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
373                 }
374
375                 /* The queue might not be full any more, so notify anything that is waiting on that */
376                 lock.lock ();
377                 _full_condition.notify_all ();
378         }
379 }
380 catch (boost::thread_interrupted& e) {
381         /* Ignore these and just stop the thread */
382         _full_condition.notify_all ();
383 }
384 catch (...)
385 {
386         store_current ();
387         /* Wake anything waiting on _full_condition so it can see the exception */
388         _full_condition.notify_all ();
389 }
390
391
392 void
393 J2KEncoder::servers_list_changed ()
394 {
395         boost::mutex::scoped_lock lm (_threads_mutex);
396
397         terminate_threads ();
398         _threads = make_shared<boost::thread_group>();
399
400         /* XXX: could re-use threads */
401
402         if (!Config::instance()->only_servers_encode ()) {
403                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
404 #ifdef DCPOMATIC_LINUX
405                         auto t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
406                         pthread_setname_np (t->native_handle(), "encode-worker");
407 #else
408                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
409 #endif
410                 }
411         }
412
413         for (auto i: EncodeServerFinder::instance()->servers()) {
414                 if (!i.current_link_version()) {
415                         continue;
416                 }
417
418                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
419                 for (int j = 0; j < i.threads(); ++j) {
420                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
421                 }
422         }
423
424         _writer->set_encoder_threads (_threads->size());
425 }