Do EncodeServerFinder 'disable' in a more sensible way.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file src/encoder.h
22  *  @brief Parent class for classes which can encode video and audio frames.
23  */
24
25 #include "encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "config.h"
30 #include "dcp_video.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "encode_server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "encode_server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48
49 using std::list;
50 using std::cout;
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
54 using dcp::Data;
55
56 int const Encoder::_history_size = 200;
57
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
60         : _film (film)
61         , _writer (writer)
62 {
63         servers_list_changed ();
64 }
65
66 Encoder::~Encoder ()
67 {
68         try {
69                 terminate_threads ();
70         } catch (...) {
71                 /* Destructors must not throw exceptions; anything bad
72                    happening now is too late to worry about anyway,
73                    I think.
74                 */
75         }
76 }
77
78 void
79 Encoder::begin ()
80 {
81         weak_ptr<Encoder> wp = shared_from_this ();
82         _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (
83                 boost::bind (&Encoder::call_servers_list_changed, wp)
84                 );
85 }
86
87 /* We don't want the servers-list-changed callback trying to do things
88    during destruction of Encoder, and I think this is the neatest way
89    to achieve that.
90 */
91 void
92 Encoder::call_servers_list_changed (weak_ptr<Encoder> encoder)
93 {
94         shared_ptr<Encoder> e = encoder.lock ();
95         if (e) {
96                 e->servers_list_changed ();
97         }
98 }
99
100 void
101 Encoder::end ()
102 {
103         boost::mutex::scoped_lock lock (_queue_mutex);
104
105         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
106
107         /* Keep waking workers until the queue is empty */
108         while (!_queue.empty ()) {
109                 rethrow ();
110                 _empty_condition.notify_all ();
111                 _full_condition.wait (lock);
112         }
113
114         lock.unlock ();
115
116         LOG_GENERAL_NC (N_("Terminating encoder threads"));
117
118         terminate_threads ();
119
120         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
121
122         /* The following sequence of events can occur in the above code:
123              1. a remote worker takes the last image off the queue
124              2. the loop above terminates
125              3. the remote worker fails to encode the image and puts it back on the queue
126              4. the remote worker is then terminated by terminate_threads
127
128              So just mop up anything left in the queue here.
129         */
130
131         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
132                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
133                 try {
134                         _writer->write (
135                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
136                                 (*i)->index (),
137                                 (*i)->eyes ()
138                                 );
139                         frame_done ();
140                 } catch (std::exception& e) {
141                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
142                 }
143         }
144 }
145
146 /** @return an estimate of the current number of frames we are encoding per second,
147  *  or 0 if not known.
148  */
149 float
150 Encoder::current_encoding_rate () const
151 {
152         boost::mutex::scoped_lock lock (_state_mutex);
153         if (int (_time_history.size()) < _history_size) {
154                 return 0;
155         }
156
157         struct timeval now;
158         gettimeofday (&now, 0);
159
160         return _history_size / (seconds (now) - seconds (_time_history.back ()));
161 }
162
163 /** @return Number of video frames that have been queued for encoding */
164 int
165 Encoder::video_frames_enqueued () const
166 {
167         if (!_last_player_video) {
168                 return 0;
169         }
170
171         return _last_player_video->time().frames_floor (_film->video_frame_rate ());
172 }
173
174 /** Should be called when a frame has been encoded successfully.
175  *  @param n Source frame index.
176  */
177 void
178 Encoder::frame_done ()
179 {
180         boost::mutex::scoped_lock lock (_state_mutex);
181
182         struct timeval tv;
183         gettimeofday (&tv, 0);
184         _time_history.push_front (tv);
185         if (int (_time_history.size()) > _history_size) {
186                 _time_history.pop_back ();
187         }
188 }
189
190 /** Called to start encoding of the next video frame in the DCP.  This is called in order,
191  *  so each time the supplied frame is the one after the previous one.
192  *  pv represents one video frame, and could be empty if there is nothing to encode
193  *  for this DCP frame.
194  */
195 void
196 Encoder::encode (shared_ptr<PlayerVideo> pv)
197 {
198         _waker.nudge ();
199
200         size_t threads = 0;
201         {
202                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
203                 threads = _threads.size ();
204         }
205
206         boost::mutex::scoped_lock queue_lock (_queue_mutex);
207
208         /* Wait until the queue has gone down a bit.  Allow one thing in the queue even
209            when there are no threads.
210         */
211         while (_queue.size() >= (threads * 2) + 1) {
212                 LOG_TIMING ("decoder-sleep queue=%1 threads=%2", _queue.size(), threads);
213                 _full_condition.wait (queue_lock);
214                 LOG_TIMING ("decoder-wake queue=%1 threads=%2", _queue.size(), threads);
215         }
216
217         _writer->rethrow ();
218         /* Re-throw any exception raised by one of our threads.  If more
219            than one has thrown an exception, only one will be rethrown, I think;
220            but then, if that happens something has gone badly wrong.
221         */
222         rethrow ();
223
224         Frame const position = pv->time().frames_floor(_film->video_frame_rate());
225
226         if (_writer->can_fake_write (position)) {
227                 /* We can fake-write this frame */
228                 _writer->fake_write (position, pv->eyes ());
229                 frame_done ();
230         } else if (pv->has_j2k ()) {
231                 /* This frame already has JPEG2000 data, so just write it */
232                 _writer->write (pv->j2k(), position, pv->eyes ());
233         } else if (_last_player_video && _writer->can_repeat(position) && pv->same (_last_player_video)) {
234                 _writer->repeat (position, pv->eyes ());
235         } else {
236                 /* Queue this new frame for encoding */
237                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
238                 _queue.push_back (shared_ptr<DCPVideo> (
239                                           new DCPVideo (
240                                                   pv,
241                                                   position,
242                                                   _film->video_frame_rate(),
243                                                   _film->j2k_bandwidth(),
244                                                   _film->resolution(),
245                                                   _film->log()
246                                                   )
247                                           ));
248
249                 /* The queue might not be empty any more, so notify anything which is
250                    waiting on that.
251                 */
252                 _empty_condition.notify_all ();
253         }
254
255         _last_player_video = pv;
256 }
257
258 void
259 Encoder::terminate_threads ()
260 {
261         boost::mutex::scoped_lock threads_lock (_threads_mutex);
262
263         int n = 0;
264         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
265                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
266                 (*i)->interrupt ();
267                 DCPOMATIC_ASSERT ((*i)->joinable ());
268                 try {
269                         (*i)->join ();
270                 } catch (boost::thread_interrupted& e) {
271                         /* This is to be expected */
272                 }
273                 delete *i;
274                 LOG_GENERAL_NC ("Thread terminated");
275                 ++n;
276         }
277
278         _threads.clear ();
279 }
280
281 void
282 Encoder::encoder_thread (optional<EncodeServerDescription> server)
283 try
284 {
285         if (server) {
286                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
287         } else {
288                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
289         }
290
291         /* Number of seconds that we currently wait between attempts
292            to connect to the server; not relevant for localhost
293            encodings.
294         */
295         int remote_backoff = 0;
296
297         while (true) {
298
299                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
300                 boost::mutex::scoped_lock lock (_queue_mutex);
301                 while (_queue.empty ()) {
302                         _empty_condition.wait (lock);
303                 }
304
305                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
306                 shared_ptr<DCPVideo> vf = _queue.front ();
307                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
308                 _queue.pop_front ();
309
310                 lock.unlock ();
311
312                 optional<Data> encoded;
313
314                 /* We need to encode this input */
315                 if (server) {
316                         try {
317                                 encoded = vf->encode_remotely (server.get ());
318
319                                 if (remote_backoff > 0) {
320                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
321                                 }
322
323                                 /* This job succeeded, so remove any backoff */
324                                 remote_backoff = 0;
325
326                         } catch (std::exception& e) {
327                                 if (remote_backoff < 60) {
328                                         /* back off more */
329                                         remote_backoff += 10;
330                                 }
331                                 LOG_ERROR (
332                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
333                                         vf->index(), server->host_name(), e.what(), remote_backoff
334                                         );
335                         }
336
337                 } else {
338                         try {
339                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
340                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
341                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
342                         } catch (std::exception& e) {
343                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
344                                 throw;
345                         }
346                 }
347
348                 if (encoded) {
349                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
350                         frame_done ();
351                 } else {
352                         lock.lock ();
353                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
354                         _queue.push_front (vf);
355                         lock.unlock ();
356                 }
357
358                 if (remote_backoff > 0) {
359                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
360                 }
361
362                 /* The queue might not be full any more, so notify anything that is waiting on that */
363                 lock.lock ();
364                 _full_condition.notify_all ();
365         }
366 }
367 catch (boost::thread_interrupted& e) {
368         /* Ignore these and just stop the thread */
369         _full_condition.notify_all ();
370 }
371 catch (...)
372 {
373         store_current ();
374         /* Wake anything waiting on _full_condition so it can see the exception */
375         _full_condition.notify_all ();
376 }
377
378 void
379 Encoder::servers_list_changed ()
380 {
381         terminate_threads ();
382
383         /* XXX: could re-use threads */
384
385         boost::mutex::scoped_lock lm (_threads_mutex);
386
387 #ifdef BOOST_THREAD_PLATFORM_WIN32
388         OSVERSIONINFO info;
389         info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
390         GetVersionEx (&info);
391         bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
392         if (windows_xp) {
393                 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
394         }
395 #endif
396
397         if (!Config::instance()->only_servers_encode ()) {
398                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
399                         boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
400                         _threads.push_back (t);
401 #ifdef BOOST_THREAD_PLATFORM_WIN32
402                         if (windows_xp) {
403                                 SetThreadAffinityMask (t->native_handle(), 1 << i);
404                         }
405 #endif
406                 }
407         }
408
409         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
410                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
411                 for (int j = 0; j < i.threads(); ++j) {
412                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
413                 }
414         }
415
416         _writer->set_encoder_threads (_threads.size ());
417 }