Name threads on Linux.
[dcpomatic.git] / src / lib / j2k_encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/j2k_encoder.cc
22  *  @brief J2K encoder class.
23  */
24
25 #include "j2k_encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "config.h"
30 #include "dcp_video.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "encode_server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48 #define LOG_DEBUG_ENCODE(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_DEBUG_ENCODE);
49
50 using std::list;
51 using std::cout;
52 using boost::shared_ptr;
53 using boost::weak_ptr;
54 using boost::optional;
55 using dcp::Data;
56
57 /** @param film Film that we are encoding.
58  *  @param writer Writer that we are using.
59  */
60 J2KEncoder::J2KEncoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
61         : _film (film)
62         , _history (200)
63         , _writer (writer)
64 {
65         servers_list_changed ();
66 }
67
68 J2KEncoder::~J2KEncoder ()
69 {
70         try {
71                 terminate_threads ();
72         } catch (...) {
73                 /* Destructors must not throw exceptions; anything bad
74                    happening now is too late to worry about anyway,
75                    I think.
76                 */
77         }
78 }
79
80 void
81 J2KEncoder::begin ()
82 {
83         weak_ptr<J2KEncoder> wp = shared_from_this ();
84         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
85                 boost::bind (&J2KEncoder::call_servers_list_changed, wp)
86                 );
87 }
88
89 /* We don't want the servers-list-changed callback trying to do things
90    during destruction of J2KEncoder, and I think this is the neatest way
91    to achieve that.
92 */
93 void
94 J2KEncoder::call_servers_list_changed (weak_ptr<J2KEncoder> encoder)
95 {
96         shared_ptr<J2KEncoder> e = encoder.lock ();
97         if (e) {
98                 e->servers_list_changed ();
99         }
100 }
101
102 void
103 J2KEncoder::end ()
104 {
105         boost::mutex::scoped_lock lock (_queue_mutex);
106
107         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
108
109         /* Keep waking workers until the queue is empty */
110         while (!_queue.empty ()) {
111                 rethrow ();
112                 _empty_condition.notify_all ();
113                 _full_condition.wait (lock);
114         }
115
116         lock.unlock ();
117
118         LOG_GENERAL_NC (N_("Terminating encoder threads"));
119
120         terminate_threads ();
121
122         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
123
124         /* The following sequence of events can occur in the above code:
125              1. a remote worker takes the last image off the queue
126              2. the loop above terminates
127              3. the remote worker fails to encode the image and puts it back on the queue
128              4. the remote worker is then terminated by terminate_threads
129
130              So just mop up anything left in the queue here.
131         */
132
133         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
134                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
135                 try {
136                         _writer->write (
137                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
138                                 (*i)->index (),
139                                 (*i)->eyes ()
140                                 );
141                         frame_done ();
142                 } catch (std::exception& e) {
143                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
144                 }
145         }
146 }
147
148 /** @return an estimate of the current number of frames we are encoding per second,
149  *  or 0 if not known.
150  */
151 float
152 J2KEncoder::current_encoding_rate () const
153 {
154         return _history.rate ();
155 }
156
157 /** @return Number of video frames that have been queued for encoding */
158 int
159 J2KEncoder::video_frames_enqueued () const
160 {
161         if (!_last_player_video_time) {
162                 return 0;
163         }
164
165         return _last_player_video_time->frames_floor (_film->video_frame_rate ());
166 }
167
168 /** Should be called when a frame has been encoded successfully */
169 void
170 J2KEncoder::frame_done ()
171 {
172         _history.event ();
173 }
174
175 /** Called to request encoding of the next video frame in the DCP.  This is called in order,
176  *  so each time the supplied frame is the one after the previous one.
177  *  pv represents one video frame, and could be empty if there is nothing to encode
178  *  for this DCP frame.
179  *
180  *  @param pv PlayerVideo to encode.
181  *  @param time Time of \p pv within the DCP.
182  */
183 void
184 J2KEncoder::encode (shared_ptr<PlayerVideo> pv, DCPTime time)
185 {
186         _waker.nudge ();
187
188         size_t threads = 0;
189         {
190                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
191                 threads = _threads.size ();
192         }
193
194         boost::mutex::scoped_lock queue_lock (_queue_mutex);
195
196         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
197            when there are no threads.
198         */
199         while (_queue.size() >= (threads * 2) + 1) {
200                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
201                 _full_condition.wait (queue_lock);
202                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
203         }
204
205         _writer->rethrow ();
206         /* Re-throw any exception raised by one of our threads.  If more
207            than one has thrown an exception, only one will be rethrown, I think;
208            but then, if that happens something has gone badly wrong.
209         */
210         rethrow ();
211
212         Frame const position = time.frames_floor(_film->video_frame_rate());
213
214         if (_writer->can_fake_write (position)) {
215                 /* We can fake-write this frame */
216                 LOG_DEBUG_ENCODE("Frame @ %1 FAKE", to_string(time));
217                 _writer->fake_write (position, pv->eyes ());
218                 frame_done ();
219         } else if (pv->has_j2k ()) {
220                 LOG_DEBUG_ENCODE("Frame @ %1 J2K", to_string(time));
221                 /* This frame already has J2K data, so just write it */
222                 _writer->write (pv->j2k(), position, pv->eyes ());
223         } else if (_last_player_video[pv->eyes()] && _writer->can_repeat(position) && pv->same (_last_player_video[pv->eyes()])) {
224                 LOG_DEBUG_ENCODE("Frame @ %1 REPEAT", to_string(time));
225                 _writer->repeat (position, pv->eyes ());
226         } else {
227                 LOG_DEBUG_ENCODE("Frame @ %1 ENCODE", to_string(time));
228                 /* Queue this new frame for encoding */
229                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
230                 _queue.push_back (shared_ptr<DCPVideo> (
231                                           new DCPVideo (
232                                                   pv,
233                                                   position,
234                                                   _film->video_frame_rate(),
235                                                   _film->j2k_bandwidth(),
236                                                   _film->resolution(),
237                                                   _film->log()
238                                                   )
239                                           ));
240
241                 /* The queue might not be empty any more, so notify anything which is
242                    waiting on that.
243                 */
244                 _empty_condition.notify_all ();
245         }
246
247         _last_player_video[pv->eyes()] = pv;
248         _last_player_video_time = time;
249 }
250
251 void
252 J2KEncoder::terminate_threads ()
253 {
254         boost::mutex::scoped_lock threads_lock (_threads_mutex);
255
256         int n = 0;
257         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
258                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
259                 (*i)->interrupt ();
260                 DCPOMATIC_ASSERT ((*i)->joinable ());
261                 try {
262                         (*i)->join ();
263                 } catch (boost::thread_interrupted& e) {
264                         /* This is to be expected */
265                 }
266                 delete *i;
267                 LOG_GENERAL_NC ("Thread terminated");
268                 ++n;
269         }
270
271         _threads.clear ();
272 }
273
274 void
275 J2KEncoder::encoder_thread (optional<EncodeServerDescription> server)
276 try
277 {
278         if (server) {
279                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", thread_id (), server->host_name ());
280         } else {
281                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", thread_id ());
282         }
283
284         /* Number of seconds that we currently wait between attempts
285            to connect to the server; not relevant for localhost
286            encodings.
287         */
288         int remote_backoff = 0;
289
290         while (true) {
291
292                 LOG_TIMING ("encoder-sleep thread=%1", thread_id ());
293                 boost::mutex::scoped_lock lock (_queue_mutex);
294                 while (_queue.empty ()) {
295                         _empty_condition.wait (lock);
296                 }
297
298                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", thread_id(), _queue.size());
299                 shared_ptr<DCPVideo> vf = _queue.front ();
300
301                 /* We're about to commit to either encoding this frame or putting it back onto the queue,
302                    so we must not be interrupted until one or other of these things have happened.  This
303                    block has thread interruption disabled.
304                 */
305                 {
306                         boost::this_thread::disable_interruption dis;
307
308                         LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", thread_id(), vf->index(), (int) vf->eyes ());
309                         _queue.pop_front ();
310
311                         lock.unlock ();
312
313                         optional<Data> encoded;
314
315                         /* We need to encode this input */
316                         if (server) {
317                                 try {
318                                         encoded = vf->encode_remotely (server.get ());
319
320                                         if (remote_backoff > 0) {
321                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
322                                         }
323
324                                         /* This job succeeded, so remove any backoff */
325                                         remote_backoff = 0;
326
327                                 } catch (std::exception& e) {
328                                         if (remote_backoff < 60) {
329                                                 /* back off more */
330                                                 remote_backoff += 10;
331                                         }
332                                         LOG_ERROR (
333                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
334                                                 vf->index(), server->host_name(), e.what(), remote_backoff
335                                                 );
336                                 }
337
338                         } else {
339                                 try {
340                                         LOG_TIMING ("start-local-encode thread=%1 frame=%2", thread_id(), vf->index());
341                                         encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
342                                         LOG_TIMING ("finish-local-encode thread=%1 frame=%2", thread_id(), vf->index());
343                                 } catch (std::exception& e) {
344                                         /* This is very bad, so don't cope with it, just pass it on */
345                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
346                                         throw;
347                                 }
348                         }
349
350                         if (encoded) {
351                                 _writer->write (encoded.get(), vf->index (), vf->eyes ());
352                                 frame_done ();
353                         } else {
354                                 lock.lock ();
355                                 LOG_GENERAL (N_("[%1] J2KEncoder thread pushes frame %2 back onto queue after failure"), thread_id(), vf->index());
356                                 _queue.push_front (vf);
357                                 lock.unlock ();
358                         }
359                 }
360
361                 if (remote_backoff > 0) {
362                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
363                 }
364
365                 /* The queue might not be full any more, so notify anything that is waiting on that */
366                 lock.lock ();
367                 _full_condition.notify_all ();
368         }
369 }
370 catch (boost::thread_interrupted& e) {
371         /* Ignore these and just stop the thread */
372         _full_condition.notify_all ();
373 }
374 catch (...)
375 {
376         store_current ();
377         /* Wake anything waiting on _full_condition so it can see the exception */
378         _full_condition.notify_all ();
379 }
380
381 void
382 J2KEncoder::servers_list_changed ()
383 {
384         terminate_threads ();
385
386         /* XXX: could re-use threads */
387
388         boost::mutex::scoped_lock lm (_threads_mutex);
389
390 #ifdef BOOST_THREAD_PLATFORM_WIN32
391         OSVERSIONINFO info;
392         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
393         GetVersionEx (&info);
394         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
395         if (windows_xp) {
396                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
397         }
398 #endif
399
400         if (!Config::instance()->only_servers_encode ()) {
401                 for (int i = 0; i < Config::instance()->master_encoding_threads (); ++i) {
402                         boost::thread* t = new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, optional<EncodeServerDescription> ()));
403 #ifdef DCPOMATIC_LINUX
404                         pthread_setname_np (t->native_handle(), "encode-worker");
405 #endif
406                         _threads.push_back (t);
407 #ifdef BOOST_THREAD_PLATFORM_WIN32
408                         if (windows_xp) {
409                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
410                         }
411 #endif
412                 }
413         }
414
415         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
416                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
417                 for (int j = 0; j < i.threads(); ++j) {
418                         _threads.push_back (new boost::thread (boost::bind (&J2KEncoder::encoder_thread, this, i)));
419                 }
420         }
421
422         _writer->set_encoder_threads (_threads.size ());
423 }