Remove support for Windows XP.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2019 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/j2k_encoder.cc
22  *  @brief J2K encoder class.
23  */
24
25 #include "j2k_encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "dcpomatic_log.h"
30 #include "config.h"
31 #include "dcp_video.h"
32 #include "cross.h"
33 #include "writer.h"
34 #include "encode_server_finder.h"
35 #include "player.h"
36 #include "player_video.h"
37 #include "encode_server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 using std::list;
46 using std::cout;
47 using std::exception;
48 using boost::shared_ptr;
49 using boost::weak_ptr;
50 using boost::optional;
51 using dcp::Data;
52 using namespace dcpomatic;
53
54 /** @param film Film that we are encoding.
55  *  @param writer Writer that we are using.
56  */
57 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
58         : _film (film)
59         , _history (200)
60         , _writer (writer)
61 {
62         servers_list_changed ();
63 }
64
65 J2KEncoder::~J2KEncoder ()
66 {
67         terminate_threads ();
68 }
69
70 void
71 J2KEncoder::begin ()
72 {
73         weak_ptr<J2KEncoder> wp = shared_from_this ();
74         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
75                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
76                 );
77 }
78
79 /* We don't want the servers-list-changed callback trying to do things
80    during destruction of J2KEncoder, and I think this is the neatest way
81    to achieve that.
82 */
83 void
84 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
85 {
86         shared_ptr<J2KEncoder> e = encoder.lock ();
87         if (e) {
88                 e->servers_list_changed ();
89         }
90 }
91
92 void
93 J2KEncoder::end ()
94 {
95         boost::mutex::scoped_lock lock (_queue_mutex);
96
97         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
98
99         /* Keep waking workers until the queue is empty */
100         while (!_queue.empty ()) {
101                 rethrow ();
102                 _empty_condition.notify_all ();
103                 _full_condition.wait (lock);
104         }
105
106         lock.unlock ();
107
108         LOG_GENERAL_NC (N_("Terminating encoder threads"));
109
110         terminate_threads ();
111
112         /* Something might have been thrown during terminate_threads */
113         rethrow ();
114
115         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
116
117         /* The following sequence of events can occur in the above code:
118              1. a remote worker takes the last image off the queue
119              2. the loop above terminates
120              3. the remote worker fails to encode the image and puts it back on the queue
121              4. the remote worker is then terminated by terminate_threads
122
123              So just mop up anything left in the queue here.
124         */
125
126         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
127                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
128                 try {
129                         _writer->write (
130                                 (*i)->encode_locally(),
131                                 (*i)->index(),
132                                 (*i)->eyes()
133                                 );
134                         frame_done ();
135                 } catch (std::exception& e) {
136                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
137                 }
138         }
139 }
140
141 /** @return an estimate of the current number of frames we are encoding per second,
142  *  if known.
143  */
144 optional<float>
145 J2KEncoder::current_encoding_rate () const
146 {
147         return _history.rate ();
148 }
149
150 /** @return Number of video frames that have been queued for encoding */
151 int
152 J2KEncoder::video_frames_enqueued () const
153 {
154         if (!_last_player_video_time) {
155                 return 0;
156         }
157
158         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
159 }
160
161 /** Should be called when a frame has been encoded successfully */
162 void
163 J2KEncoder::frame_done ()
164 {
165         _history.event ();
166 }
167
168 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
169  *  so each time the supplied frame is the one after the previous one.
170  *  pv represents one video frame, and could be empty if there is nothing to encode
171  *  for this DCP frame.
172  *
173  *  @param pv PlayerVideo to encode.
174  *  @param time Time of \p pv within the DCP.
175  */
176 void
177 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
178 {
179         _waker.nudge ();
180
181         size_t threads = _threads->size();
182
183         boost::mutex::scoped_lock queue_lock (_queue_mutex);
184
185         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
186            when there are no threads.
187         */
188         while (_queue.size() >= (threads * 2) + 1) {
189                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
190                 _full_condition.wait (queue_lock);
191                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
192         }
193
194         _writer->rethrow ();
195         /* Re-throw any exception raised by one of our threads.  If more
196            than one has thrown an exception, only one will be rethrown, I think;
197            but then, if that happens something has gone badly wrong.
198         */
199         rethrow ();
200
201         Frame const position = time.frames_floor(_film->video_frame_rate());
202
203         if (_writer->can_fake_write (position)) {
204                 /* We can fake-write this frame */
205                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
206                 _writer->fake_write (position, pv->eyes ());
207                 frame_done ();
208         } else if (pv->has_j2k() && !_film->reencode_j2k()) {
209                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
210                 /* This frame already has J2K data, so just write it */
211                 _writer->write (pv->j2k(), position, pv->eyes ());
212         } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) {
213                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
214                 _writer->repeat (position, pv->eyes ());
215         } else {
216                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
217                 /* Queue this new frame for encoding */
218                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
219                 _queue.push_back (shared_ptr<DCPVideo> (
220                                           new DCPVideo (
221                                                   pv,
222                                                   position,
223                                                   _film->video_frame_rate(),
224                                                   _film->j2k_bandwidth(),
225                                                   _film->resolution()
226                                                   )
227                                           ));
228
229                 /* The queue might not be empty any more, so notify anything which is
230                    waiting on that.
231                 */
232                 _empty_condition.notify_all ();
233         }
234
235         _last_player_video[pv->eyes()] = pv;
236         _last_player_video_time = time;
237 }
238
239 void
240 J2KEncoder::terminate_threads ()
241 {
242         boost::this_thread::disable_interruption dis;
243
244         if (!_threads) {
245                 return;
246         }
247
248         _threads->interrupt_all ();
249         try {
250                 _threads->join_all ();
251         } catch (exception& e) {
252                 LOG_ERROR ("join() threw an exception: %1", e.what());
253         } catch (...) {
254                 LOG_ERROR_NC ("join() threw an exception");
255         }
256
257         _threads.reset ();
258 }
259
260 void
261 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
262 try
263 {
264         if (server) {
265                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
266         } else {
267                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
268         }
269
270         /* Number of seconds that we currently wait between attempts
271            to connect to the server; not relevant for localhost
272            encodings.
273         */
274         int remote_backoff = 0;
275
276         while (true) {
277
278                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
279                 boost::mutex::scoped_lock lock (_queue_mutex);
280                 while (_queue.empty ()) {
281                         _empty_condition.wait (lock);
282                 }
283
284                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
285                 shared_ptr<DCPVideo> vf = _queue.front ();
286
287                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
288                    so we must not be interrupted until one or other of these things have happened.  This
289                    block has thread interruption disabled.
290                 */
291                 {
292                         boost::this_thread::disable_interruption dis;
293
294                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
295                         _queue.pop_front ();
296
297                         lock.unlock ();
298
299                         optional<Data> encoded;
300
301                         /* We need to encode this input */
302                         if (server) {
303                                 try {
304                                         encoded = vf->encode_remotely (server.get ());
305
306                                         if (remote_backoff > 0) {
307                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
308                                         }
309
310                                         /* This job succeeded, so remove any backoff */
311                                         remote_backoff = 0;
312
313                                 } catch (std::exception& e) {
314                                         if (remote_backoff < 60) {
315                                                 /* back off more */
316                                                 remote_backoff += 10;
317                                         }
318                                         LOG_ERROR (
319                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
320                                                 vf->index(), server->host_name(), e.what(), remote_backoff
321                                                 );
322                                 }
323
324                         } else {
325                                 try {
326                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
327                                         encoded = vf->encode_locally ();
328                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
329                                 } catch (std::exception& e) {
330                                         /* This is very bad, so don't cope with it, just pass it on */
331                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
332                                         throw;
333                                 }
334                         }
335
336                         if (encoded) {
337                                 _writer->write (encoded.get(), vf->index (), vf->eyes ());
338                                 frame_done ();
339                         } else {
340                                 lock.lock ();
341                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
342                                 _queue.push_front (vf);
343                                 lock.unlock ();
344                         }
345                 }
346
347                 if (remote_backoff > 0) {
348                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
349                 }
350
351                 /* The queue might not be full any more, so notify anything that is waiting on that */
352                 lock.lock ();
353                 _full_condition.notify_all ();
354         }
355 }
356 catch (boost::thread_interrupted& e) {
357         /* Ignore these and just stop the thread */
358         _full_condition.notify_all ();
359 }
360 catch (...)
361 {
362         store_current ();
363         /* Wake anything waiting on _full_condition so it can see the exception */
364         _full_condition.notify_all ();
365 }
366
367 void
368 J2KEncoder::servers_list_changed ()
369 {
370         terminate_threads ();
371         _threads.reset (new boost::thread_group());
372
373         /* XXX: could re-use threads */
374
375         if (!Config::instance()->only_servers_encode ()) {
376                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
377 #ifdef DCPOMATIC_LINUX
378                         boost::thread* t = _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
379                         pthread_setname_np (t->native_handle(), "encode-worker");
380 #else
381                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription>()));
382 #endif
383                 }
384         }
385
386         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers()) {
387                 if (!i.current_link_version()) {
388                         continue;
389                 }
390
391                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
392                 for (int j = 0; j < i.threads(); ++j) {
393                         _threads->create_thread(boost::bind(&J2KEncoder::encoder_thread, this, i));
394                 }
395         }
396
397         _writer->set_encoder_threads (_threads->size());
398 }