Rename Server -> EncodeServer, ServerFinder -> EncodeServerFinder, ServerDescription...
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "cross.h"
31 #include "writer.h"
32 #include "encode_server_finder.h"
33 #include "player.h"
34 #include "player_video.h"
35 #include "encode_server_description.h"
36 #include "compose.hpp"
37 #include <libcxml/cxml.h>
38 #include <boost/foreach.hpp>
39 #include <iostream>
40
41 #include "i18n.h"
42
43 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
44 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
45 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
46 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
47
48 using std::list;
49 using std::cout;
50 using boost::shared_ptr;
51 using boost::weak_ptr;
52 using boost::optional;
53 using dcp::Data;
54
55 int const Encoder::_history_size = 25;
56
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
59         : _film (film)
60         , _position (0)
61         , _terminate_enqueue (false)
62         , _terminate_encoding (false)
63         , _writer (writer)
64 {
65         servers_list_changed ();
66 }
67
68 Encoder::~Encoder ()
69 {
70         terminate_threads ();
71
72         boost::mutex::scoped_lock lm (_queue_mutex);
73         _terminate_enqueue = true;
74         _full_condition.notify_all ();
75         _empty_condition.notify_all ();
76 }
77
78 void
79 Encoder::begin ()
80 {
81         if (!EncodeServerFinder::instance()->disabled ()) {
82                 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
83         }
84 }
85
86 void
87 Encoder::end ()
88 {
89         boost::mutex::scoped_lock lock (_queue_mutex);
90
91         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
92
93         /* Keep waking workers until the queue is empty */
94         while (!_queue.empty ()) {
95                 rethrow ();
96                 _empty_condition.notify_all ();
97                 _full_condition.wait (lock);
98         }
99
100         lock.unlock ();
101
102         LOG_GENERAL_NC (N_("Terminating encoder threads"));
103
104         terminate_threads ();
105
106         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
107
108         /* The following sequence of events can occur in the above code:
109              1. a remote worker takes the last image off the queue
110              2. the loop above terminates
111              3. the remote worker fails to encode the image and puts it back on the queue
112              4. the remote worker is then terminated by terminate_threads
113
114              So just mop up anything left in the queue here.
115         */
116
117         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
118                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
119                 try {
120                         _writer->write (
121                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
122                                 (*i)->index (),
123                                 (*i)->eyes ()
124                                 );
125                         frame_done ();
126                 } catch (std::exception& e) {
127                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
128                 }
129         }
130 }
131
132 /** @return an estimate of the current number of frames we are encoding per second,
133  *  or 0 if not known.
134  */
135 float
136 Encoder::current_encoding_rate () const
137 {
138         boost::mutex::scoped_lock lock (_state_mutex);
139         if (int (_time_history.size()) < _history_size) {
140                 return 0;
141         }
142
143         struct timeval now;
144         gettimeofday (&now, 0);
145
146         return _history_size / (seconds (now) - seconds (_time_history.back ()));
147 }
148
149 /** @return Number of video frames that have been sent out */
150 int
151 Encoder::video_frames_out () const
152 {
153         boost::mutex::scoped_lock (_state_mutex);
154         return _position;
155 }
156
157 /** Should be called when a frame has been encoded successfully.
158  *  @param n Source frame index.
159  */
160 void
161 Encoder::frame_done ()
162 {
163         boost::mutex::scoped_lock lock (_state_mutex);
164
165         struct timeval tv;
166         gettimeofday (&tv, 0);
167         _time_history.push_front (tv);
168         if (int (_time_history.size()) > _history_size) {
169                 _time_history.pop_back ();
170         }
171 }
172
173 /** Called to start encoding of the next video frame in the DCP.  This is called in order,
174  *  so each time the supplied frame is the one after the previous one.
175  *  pv represents one video frame, and could be empty if there is nothing to encode
176  *  for this DCP frame.
177  */
178 void
179 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
180 {
181         BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
182                 enqueue (i);
183         }
184         ++_position;
185 }
186
187 void
188 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
189 {
190         _waker.nudge ();
191
192         size_t threads = 0;
193         {
194                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
195                 threads = _threads.size ();
196         }
197
198         boost::mutex::scoped_lock queue_lock (_queue_mutex);
199
200         /* XXX: discard 3D here if required */
201
202         /* Wait until the queue has gone down a bit */
203         while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
204                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
205                 _full_condition.wait (queue_lock);
206                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
207         }
208
209         if (_terminate_enqueue) {
210                 return;
211         }
212
213         _writer->rethrow ();
214         /* Re-throw any exception raised by one of our threads.  If more
215            than one has thrown an exception, only one will be rethrown, I think;
216            but then, if that happens something has gone badly wrong.
217         */
218         rethrow ();
219
220         if (_writer->can_fake_write (_position)) {
221                 /* We can fake-write this frame */
222                 _writer->fake_write (_position, pv->eyes ());
223                 frame_done ();
224         } else if (pv->has_j2k ()) {
225                 /* This frame already has JPEG2000 data, so just write it */
226                 _writer->write (pv->j2k(), _position, pv->eyes ());
227         } else if (_last_player_video && _writer->can_repeat(_position) && pv->same (_last_player_video)) {
228                 _writer->repeat (_position, pv->eyes ());
229         } else {
230                 /* Queue this new frame for encoding */
231                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
232                 _queue.push_back (shared_ptr<DCPVideo> (
233                                           new DCPVideo (
234                                                   pv,
235                                                   _position,
236                                                   _film->video_frame_rate(),
237                                                   _film->j2k_bandwidth(),
238                                                   _film->resolution(),
239                                                   _film->log()
240                                                   )
241                                           ));
242
243                 /* The queue might not be empty any more, so notify anything which is
244                    waiting on that.
245                 */
246                 _empty_condition.notify_all ();
247         }
248
249         _last_player_video = pv;
250 }
251
252 void
253 Encoder::terminate_threads ()
254 {
255         {
256                 boost::mutex::scoped_lock queue_lock (_queue_mutex);
257                 _terminate_encoding = true;
258         }
259
260         boost::mutex::scoped_lock threads_lock (_threads_mutex);
261
262         int n = 0;
263         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
264                 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
265                 (*i)->interrupt ();
266                 if ((*i)->joinable ()) {
267                         (*i)->join ();
268                 }
269                 delete *i;
270                 LOG_GENERAL_NC ("Thread terminated");
271                 ++n;
272         }
273
274         _threads.clear ();
275         _terminate_encoding = false;
276 }
277
278 void
279 Encoder::encoder_thread (optional<EncodeServerDescription> server)
280 try
281 {
282         if (server) {
283                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
284         } else {
285                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
286         }
287
288         /* Number of seconds that we currently wait between attempts
289            to connect to the server; not relevant for localhost
290            encodings.
291         */
292         int remote_backoff = 0;
293
294         while (true) {
295
296                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
297                 boost::mutex::scoped_lock lock (_queue_mutex);
298                 while (_queue.empty () && !_terminate_encoding) {
299                         _empty_condition.wait (lock);
300                 }
301
302                 if (_terminate_encoding) {
303                         return;
304                 }
305
306                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
307                 shared_ptr<DCPVideo> vf = _queue.front ();
308                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
309                 _queue.pop_front ();
310
311                 lock.unlock ();
312
313                 optional<Data> encoded;
314
315                 /* We need to encode this input */
316                 if (server) {
317                         try {
318                                 encoded = vf->encode_remotely (server.get ());
319
320                                 if (remote_backoff > 0) {
321                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
322                                 }
323
324                                 /* This job succeeded, so remove any backoff */
325                                 remote_backoff = 0;
326
327                         } catch (std::exception& e) {
328                                 if (remote_backoff < 60) {
329                                         /* back off more */
330                                         remote_backoff += 10;
331                                 }
332                                 LOG_ERROR (
333                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
334                                         vf->index(), server->host_name(), e.what(), remote_backoff
335                                         );
336                         }
337
338                 } else {
339                         try {
340                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
341                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
342                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
343                         } catch (std::exception& e) {
344                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
345                                 throw;
346                         }
347                 }
348
349                 if (encoded) {
350                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
351                         frame_done ();
352                 } else {
353                         lock.lock ();
354                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
355                         _queue.push_front (vf);
356                         lock.unlock ();
357                 }
358
359                 if (remote_backoff > 0) {
360                         boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
361                 }
362
363                 /* The queue might not be full any more, so notify anything that is waiting on that */
364                 lock.lock ();
365                 _full_condition.notify_all ();
366         }
367 }
368 catch (...)
369 {
370         store_current ();
371         /* Wake anything waiting on _full_condition so it can see the exception */
372         _full_condition.notify_all ();
373 }
374
375 void
376 Encoder::servers_list_changed ()
377 {
378         terminate_threads ();
379
380         /* XXX: could re-use threads */
381
382         boost::mutex::scoped_lock lm (_threads_mutex);
383
384         if (!Config::instance()->only_servers_encode ()) {
385                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
386                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ())));
387                 }
388         }
389
390         BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
391                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
392                 for (int j = 0; j < i.threads(); ++j) {
393                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
394                 }
395         }
396
397         _writer->set_encoder_threads (_threads.size ());
398 }