Include tidying.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
45 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
46 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
47
48 using std::list;
49 using std::cout;
50 using boost::shared_ptr;
51 using boost::weak_ptr;
52 using boost::optional;
53
54 int const Encoder::_history_size = 25;
55
56 /** @param f Film that we are encoding */
57 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
58         : _film (film)
59         , _job (j)
60         , _video_frames_enqueued (0)
61         , _left_done (false)
62         , _right_done (false)
63         , _terminate (false)
64         , _writer (writer)
65 {
66         servers_list_changed ();
67 }
68
69 Encoder::~Encoder ()
70 {
71         terminate_threads ();
72 }
73
74 /** Add a worker thread for a each thread on a remote server.  Caller must hold
75  *  a lock on _mutex, or know that one is not currently required to
76  *  safely modify _threads.
77  */
78 void
79 Encoder::add_worker_threads (ServerDescription d)
80 {
81         LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), d.threads(), d.host_name ());
82         for (int i = 0; i < d.threads(); ++i) {
83                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
84         }
85
86         _writer->set_encoder_threads (_threads.size ());
87 }
88
89 void
90 Encoder::begin ()
91 {
92         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
93                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
94         }
95
96         _writer->set_encoder_threads (_threads.size ());
97
98         if (!ServerFinder::instance()->disabled ()) {
99                 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
100         }
101 }
102
103 void
104 Encoder::end ()
105 {
106         boost::mutex::scoped_lock lock (_mutex);
107
108         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
109
110         /* Keep waking workers until the queue is empty */
111         while (!_queue.empty ()) {
112                 _empty_condition.notify_all ();
113                 _full_condition.wait (lock);
114         }
115
116         lock.unlock ();
117
118         terminate_threads ();
119
120         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
121
122         /* The following sequence of events can occur in the above code:
123              1. a remote worker takes the last image off the queue
124              2. the loop above terminates
125              3. the remote worker fails to encode the image and puts it back on the queue
126              4. the remote worker is then terminated by terminate_threads
127
128              So just mop up anything left in the queue here.
129         */
130
131         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
132                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
133                 try {
134                         _writer->write (
135                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
136                                 (*i)->index (),
137                                 (*i)->eyes ()
138                                 );
139                         frame_done ();
140                 } catch (std::exception& e) {
141                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
142                 }
143         }
144 }
145
146 /** @return an estimate of the current number of frames we are encoding per second,
147  *  or 0 if not known.
148  */
149 float
150 Encoder::current_encoding_rate () const
151 {
152         boost::mutex::scoped_lock lock (_state_mutex);
153         if (int (_time_history.size()) < _history_size) {
154                 return 0;
155         }
156
157         struct timeval now;
158         gettimeofday (&now, 0);
159
160         return _history_size / (seconds (now) - seconds (_time_history.back ()));
161 }
162
163 /** @return Number of video frames that have been sent out */
164 int
165 Encoder::video_frames_out () const
166 {
167         boost::mutex::scoped_lock (_state_mutex);
168         return _video_frames_enqueued;
169 }
170
171 /** Should be called when a frame has been encoded successfully.
172  *  @param n Source frame index.
173  */
174 void
175 Encoder::frame_done ()
176 {
177         boost::mutex::scoped_lock lock (_state_mutex);
178
179         struct timeval tv;
180         gettimeofday (&tv, 0);
181         _time_history.push_front (tv);
182         if (int (_time_history.size()) > _history_size) {
183                 _time_history.pop_back ();
184         }
185 }
186
187 /** Called in order, so each time this is called the supplied frame is the one
188  *  after the previous one.
189  */
190 void
191 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
192 {
193         _waker.nudge ();
194
195         boost::mutex::scoped_lock lock (_mutex);
196
197         /* XXX: discard 3D here if required */
198
199         /* Wait until the queue has gone down a bit */
200         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
201                 LOG_TIMING ("decoder sleeps with queue of %1", _queue.size());
202                 _full_condition.wait (lock);
203                 LOG_TIMING ("decoder wakes with queue of %1", _queue.size());
204         }
205
206         if (_terminate) {
207                 return;
208         }
209
210         _writer->rethrow ();
211         /* Re-throw any exception raised by one of our threads.  If more
212            than one has thrown an exception, only one will be rethrown, I think;
213            but then, if that happens something has gone badly wrong.
214         */
215         rethrow ();
216
217         if (_writer->can_fake_write (_video_frames_enqueued)) {
218                 /* We can fake-write this frame */
219                 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
220                 frame_done ();
221         } else if (pv->has_j2k ()) {
222                 /* This frame already has JPEG2000 data, so just write it */
223                 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
224         } else if (_last_player_video && pv->same (_last_player_video)) {
225                 _writer->repeat (_video_frames_enqueued, pv->eyes ());
226         } else {
227                 /* Queue this new frame for encoding */
228                 LOG_TIMING ("adding to queue of %1", _queue.size ());
229                 _queue.push_back (shared_ptr<DCPVideo> (
230                                           new DCPVideo (
231                                                   pv,
232                                                   _video_frames_enqueued,
233                                                   _film->video_frame_rate(),
234                                                   _film->j2k_bandwidth(),
235                                                   _film->resolution(),
236                                                   _film->log()
237                                                   )
238                                           ));
239
240                 /* The queue might not be empty any more, so notify anything which is
241                    waiting on that.
242                 */
243                 _empty_condition.notify_all ();
244         }
245
246         switch (pv->eyes ()) {
247         case EYES_BOTH:
248                 ++_video_frames_enqueued;
249                 break;
250         case EYES_LEFT:
251                 _left_done = true;
252                 break;
253         case EYES_RIGHT:
254                 _right_done = true;
255                 break;
256         default:
257                 break;
258         }
259
260         if (_left_done && _right_done) {
261                 ++_video_frames_enqueued;
262                 _left_done = _right_done = false;
263         }
264
265         _last_player_video = pv;
266 }
267
268 void
269 Encoder::terminate_threads ()
270 {
271         {
272                 boost::mutex::scoped_lock lock (_mutex);
273                 _terminate = true;
274                 _full_condition.notify_all ();
275                 _empty_condition.notify_all ();
276         }
277
278         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
279                 if ((*i)->joinable ()) {
280                         (*i)->join ();
281                 }
282                 delete *i;
283         }
284
285         _threads.clear ();
286         _terminate = false;
287 }
288
289 void
290 Encoder::encoder_thread (optional<ServerDescription> server)
291 try
292 {
293         /* Number of seconds that we currently wait between attempts
294            to connect to the server; not relevant for localhost
295            encodings.
296         */
297         int remote_backoff = 0;
298
299         while (true) {
300
301                 LOG_TIMING ("[%1] encoder thread sleeps", boost::this_thread::get_id());
302                 boost::mutex::scoped_lock lock (_mutex);
303                 while (_queue.empty () && !_terminate) {
304                         _empty_condition.wait (lock);
305                 }
306
307                 if (_terminate) {
308                         return;
309                 }
310
311                 LOG_TIMING ("[%1] encoder thread wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
312                 shared_ptr<DCPVideo> vf = _queue.front ();
313                 LOG_TIMING ("[%1] encoder thread pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
314                 _queue.pop_front ();
315
316                 lock.unlock ();
317
318                 optional<Data> encoded;
319
320                 /* We need to encode this input */
321                 if (server) {
322                         try {
323                                 encoded = vf->encode_remotely (server.get ());
324
325                                 if (remote_backoff > 0) {
326                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
327                                 }
328
329                                 /* This job succeeded, so remove any backoff */
330                                 remote_backoff = 0;
331
332                         } catch (std::exception& e) {
333                                 if (remote_backoff < 60) {
334                                         /* back off more */
335                                         remote_backoff += 10;
336                                 }
337                                 LOG_ERROR (
338                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
339                                         vf->index(), server->host_name(), e.what(), remote_backoff
340                                         );
341                         }
342
343                 } else {
344                         try {
345                                 LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index());
346                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
347                                 LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index());
348                         } catch (std::exception& e) {
349                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
350                         }
351                 }
352
353                 if (encoded) {
354                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
355                         frame_done ();
356                 } else {
357                         lock.lock ();
358                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
359                         _queue.push_front (vf);
360                         lock.unlock ();
361                 }
362
363                 if (remote_backoff > 0) {
364                         dcpomatic_sleep (remote_backoff);
365                 }
366
367                 /* The queue might not be full any more, so notify anything that is waiting on that */
368                 lock.lock ();
369                 _full_condition.notify_all ();
370         }
371 }
372 catch (...)
373 {
374         store_current ();
375 }
376
377 void
378 Encoder::servers_list_changed ()
379 {
380         terminate_threads ();
381         BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
382                 add_worker_threads (i);
383         }
384 }