Remove unused variable.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
48
49 using std::list;
50 using std::cout;
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
54 using dcp::Data;
55
56 int const Encoder::_history_size = 25;
57
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
60         : _film (film)
61         , _position (0)
62         , _terminate_enqueue (false)
63         , _terminate_encoding (false)
64         , _writer (writer)
65 {
66         servers_list_changed ();
67 }
68
69 Encoder::~Encoder ()
70 {
71         terminate_threads ();
72
73         boost::mutex::scoped_lock lm (_queue_mutex);
74         _terminate_enqueue = true;
75         _full_condition.notify_all ();
76         _empty_condition.notify_all ();
77 }
78
79 void
80 Encoder::begin ()
81 {
82         if (!ServerFinder::instance()->disabled ()) {
83                 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
84         }
85 }
86
87 void
88 Encoder::end ()
89 {
90         boost::mutex::scoped_lock lock (_queue_mutex);
91
92         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
93
94         /* Keep waking workers until the queue is empty */
95         while (!_queue.empty ()) {
96                 rethrow ();
97                 _empty_condition.notify_all ();
98                 _full_condition.wait (lock);
99         }
100
101         lock.unlock ();
102
103         LOG_GENERAL_NC (N_("Terminating encoder threads"));
104
105         terminate_threads ();
106
107         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
108
109         /* The following sequence of events can occur in the above code:
110              1. a remote worker takes the last image off the queue
111              2. the loop above terminates
112              3. the remote worker fails to encode the image and puts it back on the queue
113              4. the remote worker is then terminated by terminate_threads
114
115              So just mop up anything left in the queue here.
116         */
117
118         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
119                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
120                 try {
121                         _writer->write (
122                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
123                                 (*i)->index (),
124                                 (*i)->eyes ()
125                                 );
126                         frame_done ();
127                 } catch (std::exception& e) {
128                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
129                 }
130         }
131 }
132
133 /** @return an estimate of the current number of frames we are encoding per second,
134  *  or 0 if not known.
135  */
136 float
137 Encoder::current_encoding_rate () const
138 {
139         boost::mutex::scoped_lock lock (_state_mutex);
140         if (int (_time_history.size()) < _history_size) {
141                 return 0;
142         }
143
144         struct timeval now;
145         gettimeofday (&now, 0);
146
147         return _history_size / (seconds (now) - seconds (_time_history.back ()));
148 }
149
150 /** @return Number of video frames that have been sent out */
151 int
152 Encoder::video_frames_out () const
153 {
154         boost::mutex::scoped_lock (_state_mutex);
155         return _position;
156 }
157
158 /** Should be called when a frame has been encoded successfully.
159  *  @param n Source frame index.
160  */
161 void
162 Encoder::frame_done ()
163 {
164         boost::mutex::scoped_lock lock (_state_mutex);
165
166         struct timeval tv;
167         gettimeofday (&tv, 0);
168         _time_history.push_front (tv);
169         if (int (_time_history.size()) > _history_size) {
170                 _time_history.pop_back ();
171         }
172 }
173
174 /** Called to start encoding of the next video frame in the DCP.  This is called in order,
175  *  so each time the supplied frame is the one after the previous one.
176  *  pv represents one video frame, and could be empty if there is nothing to encode
177  *  for this DCP frame.
178  */
179 void
180 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
181 {
182         BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
183                 enqueue (i);
184         }
185         ++_position;
186 }
187
188 void
189 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
190 {
191         _waker.nudge ();
192
193         size_t threads = 0;
194         {
195                 boost::mutex::scoped_lock threads_lock (_threads_mutex);
196                 threads = _threads.size ();
197         }
198
199         boost::mutex::scoped_lock queue_lock (_queue_mutex);
200
201         /* XXX: discard 3D here if required */
202
203         /* Wait until the queue has gone down a bit */
204         while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
205                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
206                 _full_condition.wait (queue_lock);
207                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
208         }
209
210         if (_terminate_enqueue) {
211                 return;
212         }
213
214         _writer->rethrow ();
215         /* Re-throw any exception raised by one of our threads.  If more
216            than one has thrown an exception, only one will be rethrown, I think;
217            but then, if that happens something has gone badly wrong.
218         */
219         rethrow ();
220
221         if (_writer->can_fake_write (_position)) {
222                 /* We can fake-write this frame */
223                 _writer->fake_write (_position, pv->eyes ());
224                 frame_done ();
225         } else if (pv->has_j2k ()) {
226                 /* This frame already has JPEG2000 data, so just write it */
227                 _writer->write (pv->j2k(), _position, pv->eyes ());
228         } else if (_last_player_video && _writer->can_repeat(_position) && pv->same (_last_player_video)) {
229                 _writer->repeat (_position, pv->eyes ());
230         } else {
231                 /* Queue this new frame for encoding */
232                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
233                 _queue.push_back (shared_ptr<DCPVideo> (
234                                           new DCPVideo (
235                                                   pv,
236                                                   _position,
237                                                   _film->video_frame_rate(),
238                                                   _film->j2k_bandwidth(),
239                                                   _film->resolution(),
240                                                   _film->log()
241                                                   )
242                                           ));
243
244                 /* The queue might not be empty any more, so notify anything which is
245                    waiting on that.
246                 */
247                 _empty_condition.notify_all ();
248         }
249
250         _last_player_video = pv;
251 }
252
253 void
254 Encoder::terminate_threads ()
255 {
256         {
257                 boost::mutex::scoped_lock queue_lock (_queue_mutex);
258                 _terminate_encoding = true;
259         }
260
261         boost::mutex::scoped_lock threads_lock (_threads_mutex);
262
263         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
264                 (*i)->interrupt ();
265                 if ((*i)->joinable ()) {
266                         (*i)->join ();
267                 }
268                 delete *i;
269         }
270
271         _threads.clear ();
272         _terminate_encoding = false;
273 }
274
275 void
276 Encoder::encoder_thread (optional<ServerDescription> server)
277 try
278 {
279         if (server) {
280                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
281         } else {
282                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
283         }
284
285         /* Number of seconds that we currently wait between attempts
286            to connect to the server; not relevant for localhost
287            encodings.
288         */
289         int remote_backoff = 0;
290
291         while (true) {
292
293                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
294                 boost::mutex::scoped_lock lock (_queue_mutex);
295                 while (_queue.empty () && !_terminate_encoding) {
296                         _empty_condition.wait (lock);
297                 }
298
299                 if (_terminate_encoding) {
300                         return;
301                 }
302
303                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
304                 shared_ptr<DCPVideo> vf = _queue.front ();
305                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
306                 _queue.pop_front ();
307
308                 lock.unlock ();
309
310                 optional<Data> encoded;
311
312                 /* We need to encode this input */
313                 if (server) {
314                         try {
315                                 encoded = vf->encode_remotely (server.get ());
316
317                                 if (remote_backoff > 0) {
318                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
319                                 }
320
321                                 /* This job succeeded, so remove any backoff */
322                                 remote_backoff = 0;
323
324                         } catch (std::exception& e) {
325                                 if (remote_backoff < 60) {
326                                         /* back off more */
327                                         remote_backoff += 10;
328                                 }
329                                 LOG_ERROR (
330                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
331                                         vf->index(), server->host_name(), e.what(), remote_backoff
332                                         );
333                         }
334
335                 } else {
336                         try {
337                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
338                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
339                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
340                         } catch (std::exception& e) {
341                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
342                                 throw;
343                         }
344                 }
345
346                 if (encoded) {
347                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
348                         frame_done ();
349                 } else {
350                         lock.lock ();
351                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
352                         _queue.push_front (vf);
353                         lock.unlock ();
354                 }
355
356                 if (remote_backoff > 0) {
357                         boost::this_thread::sleep_for (boost::chrono::seconds (remote_backoff));
358                 }
359
360                 /* The queue might not be full any more, so notify anything that is waiting on that */
361                 lock.lock ();
362                 _full_condition.notify_all ();
363         }
364 }
365 catch (...)
366 {
367         store_current ();
368         /* Wake anything waiting on _full_condition so it can see the exception */
369         _full_condition.notify_all ();
370 }
371
372 void
373 Encoder::servers_list_changed ()
374 {
375         terminate_threads ();
376
377         /* XXX: could re-use threads */
378
379         boost::mutex::scoped_lock lm (_threads_mutex);
380
381         if (!Config::instance()->only_servers_encode ()) {
382                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
383                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
384                 }
385         }
386
387         BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
388                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
389                 for (int j = 0; j < i.threads(); ++j) {
390                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
391                 }
392         }
393
394         _writer->set_encoder_threads (_threads.size ());
395 }