No-op; fix GPL address and use the explicit-program-name version.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/encoder.h
22  *  @brief Parent class for classes which can encode video and audio frames.
23  */
24
25 #include "encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "config.h"
30 #include "dcp_video.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "encode_server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48
49 using std::list;
50 using std::cout;
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
54 using dcp::Data;
55
56 int const Encoder::_history_size = 25;
57
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
60         : _film (film)
61         , _position (0)
62         , _terminate_enqueue (false)
63         , _terminate_encoding (false)
64         , _writer (writer)
65 {
66         servers_list_changed ();
67 }
68
69 Encoder::~Encoder ()
70 {
71         terminate_threads ();
72
73         boost::mutex::scoped_lock lm (_queue_mutex);
74         _terminate_enqueue = true;
75         _full_condition.notify_all ();
76         _empty_condition.notify_all ();
77 }
78
79 void
80 Encoder::begin ()
81 {
82         if (!EncodeServerFinder::instance()->disabled ()) {
83                 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
84         }
85 }
86
87 void
88 Encoder::end ()
89 {
90         boost::mutex::scoped_lock lock (_queue_mutex);
91
92         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
93
94         /* Keep waking workers until the queue is empty */
95         while (!_queue.empty ()) {
96                 rethrow ();
97                 _empty_condition.notify_all ();
98                 _full_condition.wait (lock);
99         }
100
101         lock.unlock ();
102
103         LOG_GENERAL_NC (N_("Terminating encoder threads"));
104
105         terminate_threads ();
106
107         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
108
109         /* The following sequence of events can occur in the above code:
110              1. a remote worker takes the last image off the queue
111              2. the loop above terminates
112              3. the remote worker fails to encode the image and puts it back on the queue
113              4. the remote worker is then terminated by terminate_threads
114
115              So just mop up anything left in the queue here.
116         */
117
118         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
119                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
120                 try {
121                         _writer->write (
122                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
123                                 (*i)->index (),
124                                 (*i)->eyes ()
125                                 );
126                         frame_done ();
127                 } catch (std::exception& e) {
128                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
129                 }
130         }
131 }
132
133 /** @return an estimate of the current number of frames we are encoding per second,
134  *  or 0 if not known.
135  */
136 float
137 Encoder::current_encoding_rate () const
138 {
139         boost::mutex::scoped_lock lock (_state_mutex);
140         if (int (_time_history.size()) < _history_size) {
141                 return 0;
142         }
143
144         struct timeval now;
145         gettimeofday (&now, 0);
146
147         return _history_size / (seconds (now) - seconds (_time_history.back ()));
148 }
149
150 /** @return Number of video frames that have been sent out */
151 int
152 Encoder::video_frames_out () const
153 {
154         boost::mutex::scoped_lock (_state_mutex);
155         return _position;
156 }
157
158 /** Should be called when a frame has been encoded successfully.
159  *  @param n Source frame index.
160  */
161 void
162 Encoder::frame_done ()
163 {
164         boost::mutex::scoped_lock lock (_state_mutex);
165
166         struct timeval tv;
167         gettimeofday (&tv, 0);
168         _time_history.push_front (tv);
169         if (int (_time_history.size()) > _history_size) {
170                 _time_history.pop_back ();
171         }
172 }
173
174 /** Called to start encoding of the next video frame in the DCP.  This is called in order,
175  *  so each time the supplied frame is the one after the previous one.
176  *  pv represents one video frame, and could be empty if there is nothing to encode
177  *  for this DCP frame.
178  */
179 void
180 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
181 {
182         BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
183                 enqueue (i);
184         }
185         ++_position;
186 }
187
188 void
189 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
190 {
191         _waker.nudge ();
192
193         size_t threads = 0;
194         {
195                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
196                 threads = _threads.size ();
197         }
198
199         boost::mutex::scoped_lock queue_lock (_queue_mutex);
200
201         /* XXX: discard 3D here if required */
202
203         /* Wait until the queue has gone down a bit */
204         while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
205                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
206                 _full_condition.wait (queue_lock);
207                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
208         }
209
210         if (_terminate_enqueue) {
211                 return;
212         }
213
214         _writer->rethrow ();
215         /* Re-throw any exception raised by one of our threads.  If more
216            than one has thrown an exception, only one will be rethrown, I think;
217            but then, if that happens something has gone badly wrong.
218         */
219         rethrow ();
220
221         if (_writer->can_fake_write (_position)) {
222                 /* We can fake-write this frame */
223                 _writer->fake_write (_position, pv->eyes ());
224                 frame_done ();
225         } else if (pv->has_j2k ()) {
226                 /* This frame already has JPEG2000 data, so just write it */
227                 _writer->write (pv->j2k(), _position, pv->eyes ());
228         } else if (_last_player_video && _writer->can_repeat(_position) && pv->same (_last_player_video)) {
229                 _writer->repeat (_position, pv->eyes ());
230         } else {
231                 /* Queue this new frame for encoding */
232                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
233                 _queue.push_back (shared_ptr<DCPVideo> (
234                                           new DCPVideo (
235                                                   pv,
236                                                   _position,
237                                                   _film->video_frame_rate(),
238                                                   _film->j2k_bandwidth(),
239                                                   _film->resolution(),
240                                                   _film->log()
241                                                   )
242                                           ));
243
244                 /* The queue might not be empty any more, so notify anything which is
245                    waiting on that.
246                 */
247                 _empty_condition.notify_all ();
248         }
249
250         _last_player_video = pv;
251 }
252
253 void
254 Encoder::terminate_threads ()
255 {
256         {
257                 boost::mutex::scoped_lock queue_lock (_queue_mutex);
258                 _terminate_encoding = true;
259         }
260
261         boost::mutex::scoped_lock threads_lock (_threads_mutex);
262
263         int n = 0;
264         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
265                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
266                 (*i)->interrupt ();
267                 if ((*i)->joinable ()) {
268                         (*i)->join ();
269                 }
270                 delete *i;
271                 LOG_GENERAL_NC ("Thread terminated");
272                 ++n;
273         }
274
275         _threads.clear ();
276         _terminate_encoding = false;
277 }
278
279 void
280 Encoder::encoder_thread (optional<EncodeServerDescription> server)
281 try
282 {
283         if (server) {
284                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
285         } else {
286                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
287         }
288
289         /* Number of seconds that we currently wait between attempts
290            to connect to the server; not relevant for localhost
291            encodings.
292         */
293         int remote_backoff = 0;
294
295         while (true) {
296
297                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
298                 boost::mutex::scoped_lock lock (_queue_mutex);
299                 while (_queue.empty () && !_terminate_encoding) {
300                         _empty_condition.wait (lock);
301                 }
302
303                 if (_terminate_encoding) {
304                         return;
305                 }
306
307                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
308                 shared_ptr<DCPVideo> vf = _queue.front ();
309                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
310                 _queue.pop_front ();
311
312                 lock.unlock ();
313
314                 optional<Data> encoded;
315
316                 /* We need to encode this input */
317                 if (server) {
318                         try {
319                                 encoded = vf->encode_remotely (server.get ());
320
321                                 if (remote_backoff > 0) {
322                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
323                                 }
324
325                                 /* This job succeeded, so remove any backoff */
326                                 remote_backoff = 0;
327
328                         } catch (std::exception& e) {
329                                 if (remote_backoff < 60) {
330                                         /* back off more */
331                                         remote_backoff += 10;
332                                 }
333                                 LOG_ERROR (
334                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
335                                         vf->index(), server->host_name(), e.what(), remote_backoff
336                                         );
337                         }
338
339                 } else {
340                         try {
341                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
342                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
343                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
344                         } catch (std::exception& e) {
345                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
346                                 throw;
347                         }
348                 }
349
350                 if (encoded) {
351                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
352                         frame_done ();
353                 } else {
354                         lock.lock ();
355                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
356                         _queue.push_front (vf);
357                         lock.unlock ();
358                 }
359
360                 if (remote_backoff > 0) {
361                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
362                 }
363
364                 /* The queue might not be full any more, so notify anything that is waiting on that */
365                 lock.lock ();
366                 _full_condition.notify_all ();
367         }
368 }
369 catch (...)
370 {
371         store_current ();
372         /* Wake anything waiting on _full_condition so it can see the exception */
373         _full_condition.notify_all ();
374 }
375
376 void
377 Encoder::servers_list_changed ()
378 {
379         terminate_threads ();
380
381         /* XXX: could re-use threads */
382
383         boost::mutex::scoped_lock lm (_threads_mutex);
384
385 #ifdef BOOST_THREAD_PLATFORM_WIN32
386         OSVERSIONINFO info;
387         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
388         GetVersionEx (&info);
389         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
390         if (windows_xp) {
391                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
392         }
393 #endif
394
395         if (!Config::instance()->only_servers_encode ()) {
396                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
397                         boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
398                         _threads.push_back (t);
399 #ifdef BOOST_THREAD_PLATFORM_WIN32
400                         if (windows_xp) {
401                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
402                         }
403 #endif
404                 }
405         }
406
407         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
408                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
409                 for (int j = 0; j < i.threads(); ++j) {
410                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
411                 }
412         }
413
414         _writer->set_encoder_threads (_threads.size ());
415 }