Remove some flawed condition manipulation.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/encoder.h
22  *  @brief Parent class for classes which can encode video and audio frames.
23  */
24
25 #include "encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "config.h"
30 #include "dcp_video.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "encode_server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48
49 using std::list;
50 using std::cout;
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
54 using dcp::Data;
55
56 int const Encoder::_history_size = 25;
57
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
60         : _film (film)
61         , _position (0)
62         , _writer (writer)
63 {
64         servers_list_changed ();
65 }
66
67 Encoder::~Encoder ()
68 {
69         terminate_threads ();
70 }
71
72 void
73 Encoder::begin ()
74 {
75         if (!EncodeServerFinder::instance()->disabled ()) {
76                 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
77         }
78 }
79
80 void
81 Encoder::end ()
82 {
83         boost::mutex::scoped_lock lock (_queue_mutex);
84
85         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
86
87         /* Keep waking workers until the queue is empty */
88         while (!_queue.empty ()) {
89                 rethrow ();
90                 _empty_condition.notify_all ();
91                 _full_condition.wait (lock);
92         }
93
94         lock.unlock ();
95
96         LOG_GENERAL_NC (N_("Terminating encoder threads"));
97
98         terminate_threads ();
99
100         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
101
102         /* The following sequence of events can occur in the above code:
103              1. a remote worker takes the last image off the queue
104              2. the loop above terminates
105              3. the remote worker fails to encode the image and puts it back on the queue
106              4. the remote worker is then terminated by terminate_threads
107
108              So just mop up anything left in the queue here.
109         */
110
111         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
112                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
113                 try {
114                         _writer->write (
115                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
116                                 (*i)->index (),
117                                 (*i)->eyes ()
118                                 );
119                         frame_done ();
120                 } catch (std::exception& e) {
121                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
122                 }
123         }
124 }
125
126 /** @return an estimate of the current number of frames we are encoding per second,
127  *  or 0 if not known.
128  */
129 float
130 Encoder::current_encoding_rate () const
131 {
132         boost::mutex::scoped_lock lock (_state_mutex);
133         if (int (_time_history.size()) < _history_size) {
134                 return 0;
135         }
136
137         struct timeval now;
138         gettimeofday (&now, 0);
139
140         return _history_size / (seconds (now) - seconds (_time_history.back ()));
141 }
142
143 /** @return Number of video frames that have been sent out */
144 int
145 Encoder::video_frames_out () const
146 {
147         boost::mutex::scoped_lock (_state_mutex);
148         return _position;
149 }
150
151 /** Should be called when a frame has been encoded successfully.
152  *  @param n Source frame index.
153  */
154 void
155 Encoder::frame_done ()
156 {
157         boost::mutex::scoped_lock lock (_state_mutex);
158
159         struct timeval tv;
160         gettimeofday (&tv, 0);
161         _time_history.push_front (tv);
162         if (int (_time_history.size()) > _history_size) {
163                 _time_history.pop_back ();
164         }
165 }
166
167 /** Called to start encoding of the next video frame in the DCP.  This is called in order,
168  *  so each time the supplied frame is the one after the previous one.
169  *  pv represents one video frame, and could be empty if there is nothing to encode
170  *  for this DCP frame.
171  */
172 void
173 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
174 {
175         BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
176                 enqueue (i);
177         }
178         ++_position;
179 }
180
181 void
182 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
183 {
184         _waker.nudge ();
185
186         size_t threads = 0;
187         {
188                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
189                 threads = _threads.size ();
190         }
191
192         boost::mutex::scoped_lock queue_lock (_queue_mutex);
193
194         /* XXX: discard 3D here if required */
195
196         /* Wait until the queue has gone down a bit */
197         while (_queue.size() >= threads * 2) {
198                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
199                 _full_condition.wait (queue_lock);
200                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
201         }
202
203         _writer->rethrow ();
204         /* Re-throw any exception raised by one of our threads.  If more
205            than one has thrown an exception, only one will be rethrown, I think;
206            but then, if that happens something has gone badly wrong.
207         */
208         rethrow ();
209
210         if (_writer->can_fake_write (_position)) {
211                 /* We can fake-write this frame */
212                 _writer->fake_write (_position, pv->eyes ());
213                 frame_done ();
214         } else if (pv->has_j2k ()) {
215                 /* This frame already has JPEG2000 data, so just write it */
216                 _writer->write (pv->j2k(), _position, pv->eyes ());
217         } else if (_last_player_video && _writer->can_repeat(_position) && pv->same (_last_player_video)) {
218                 _writer->repeat (_position, pv->eyes ());
219         } else {
220                 /* Queue this new frame for encoding */
221                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
222                 _queue.push_back (shared_ptr<DCPVideo> (
223                                           new DCPVideo (
224                                                   pv,
225                                                   _position,
226                                                   _film->video_frame_rate(),
227                                                   _film->j2k_bandwidth(),
228                                                   _film->resolution(),
229                                                   _film->log()
230                                                   )
231                                           ));
232
233                 /* The queue might not be empty any more, so notify anything which is
234                    waiting on that.
235                 */
236                 _empty_condition.notify_all ();
237         }
238
239         _last_player_video = pv;
240 }
241
242 void
243 Encoder::terminate_threads ()
244 {
245         boost::mutex::scoped_lock threads_lock (_threads_mutex);
246
247         int n = 0;
248         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
249                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
250                 (*i)->interrupt ();
251                 DCPOMATIC_ASSERT ((*i)->joinable ());
252                 (*i)->join ();
253                 delete *i;
254                 LOG_GENERAL_NC ("Thread terminated");
255                 ++n;
256         }
257
258         _threads.clear ();
259 }
260
261 void
262 Encoder::encoder_thread (optional<EncodeServerDescription> server)
263 try
264 {
265         if (server) {
266                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
267         } else {
268                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
269         }
270
271         /* Number of seconds that we currently wait between attempts
272            to connect to the server; not relevant for localhost
273            encodings.
274         */
275         int remote_backoff = 0;
276
277         while (true) {
278
279                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
280                 boost::mutex::scoped_lock lock (_queue_mutex);
281                 while (_queue.empty ()) {
282                         _empty_condition.wait (lock);
283                 }
284
285                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
286                 shared_ptr<DCPVideo> vf = _queue.front ();
287                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
288                 _queue.pop_front ();
289
290                 lock.unlock ();
291
292                 optional<Data> encoded;
293
294                 /* We need to encode this input */
295                 if (server) {
296                         try {
297                                 encoded = vf->encode_remotely (server.get ());
298
299                                 if (remote_backoff > 0) {
300                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
301                                 }
302
303                                 /* This job succeeded, so remove any backoff */
304                                 remote_backoff = 0;
305
306                         } catch (std::exception& e) {
307                                 if (remote_backoff < 60) {
308                                         /* back off more */
309                                         remote_backoff += 10;
310                                 }
311                                 LOG_ERROR (
312                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
313                                         vf->index(), server->host_name(), e.what(), remote_backoff
314                                         );
315                         }
316
317                 } else {
318                         try {
319                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
320                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
321                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
322                         } catch (std::exception& e) {
323                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
324                                 throw;
325                         }
326                 }
327
328                 if (encoded) {
329                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
330                         frame_done ();
331                 } else {
332                         lock.lock ();
333                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
334                         _queue.push_front (vf);
335                         lock.unlock ();
336                 }
337
338                 if (remote_backoff > 0) {
339                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
340                 }
341
342                 /* The queue might not be full any more, so notify anything that is waiting on that */
343                 lock.lock ();
344                 _full_condition.notify_all ();
345         }
346 }
347 catch (...)
348 {
349         store_current ();
350         /* Wake anything waiting on _full_condition so it can see the exception */
351         _full_condition.notify_all ();
352 }
353
354 void
355 Encoder::servers_list_changed ()
356 {
357         terminate_threads ();
358
359         /* XXX: could re-use threads */
360
361         boost::mutex::scoped_lock lm (_threads_mutex);
362
363 #ifdef BOOST_THREAD_PLATFORM_WIN32
364         OSVERSIONINFO info;
365         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
366         GetVersionEx (&info);
367         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
368         if (windows_xp) {
369                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
370         }
371 #endif
372
373         if (!Config::instance()->only_servers_encode ()) {
374                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
375                         boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
376                         _threads.push_back (t);
377 #ifdef BOOST_THREAD_PLATFORM_WIN32
378                         if (windows_xp) {
379                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
380                         }
381 #endif
382                 }
383         }
384
385         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
386                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
387                 for (int j = 0; j < i.threads(); ++j) {
388                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
389                 }
390         }
391
392         _writer->set_encoder_threads (_threads.size ());
393 }