Port 65514eea7705fb12985cef448f08ceb47db6acab from 1.x; failure to handle separate...
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include <libcxml/cxml.h>
38 #include <boost/lambda/lambda.hpp>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
45 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
46 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
47
48 using std::pair;
49 using std::string;
50 using std::vector;
51 using std::list;
52 using std::cout;
53 using std::min;
54 using std::make_pair;
55 using boost::shared_ptr;
56 using boost::weak_ptr;
57 using boost::optional;
58 using boost::scoped_array;
59
60 int const Encoder::_history_size = 25;
61
62 /** @param f Film that we are encoding */
63 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
64         : _film (film)
65         , _job (j)
66         , _video_frames_enqueued (0)
67         , _left_done (false)
68         , _right_done (false)
69         , _terminate (false)
70         , _writer (writer)
71 {
72         servers_list_changed ();
73 }
74
75 Encoder::~Encoder ()
76 {
77         terminate_threads ();
78 }
79
80 /** Add a worker thread for a each thread on a remote server.  Caller must hold
81  *  a lock on _mutex, or know that one is not currently required to
82  *  safely modify _threads.
83  */
84 void
85 Encoder::add_worker_threads (ServerDescription d)
86 {
87         LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), d.threads(), d.host_name ());
88         for (int i = 0; i < d.threads(); ++i) {
89                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
90         }
91
92         _writer->set_encoder_threads (_threads.size ());
93 }
94
95 void
96 Encoder::begin ()
97 {
98         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
99                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
100         }
101
102         _writer->set_encoder_threads (_threads.size ());
103
104         if (!ServerFinder::instance()->disabled ()) {
105                 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
106         }
107 }
108
109 void
110 Encoder::end ()
111 {
112         boost::mutex::scoped_lock lock (_mutex);
113
114         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
115
116         /* Keep waking workers until the queue is empty */
117         while (!_queue.empty ()) {
118                 _empty_condition.notify_all ();
119                 _full_condition.wait (lock);
120         }
121
122         lock.unlock ();
123
124         terminate_threads ();
125
126         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
127
128         /* The following sequence of events can occur in the above code:
129              1. a remote worker takes the last image off the queue
130              2. the loop above terminates
131              3. the remote worker fails to encode the image and puts it back on the queue
132              4. the remote worker is then terminated by terminate_threads
133
134              So just mop up anything left in the queue here.
135         */
136
137         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
138                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
139                 try {
140                         _writer->write (
141                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
142                                 (*i)->index (),
143                                 (*i)->eyes ()
144                                 );
145                         frame_done ();
146                 } catch (std::exception& e) {
147                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
148                 }
149         }
150 }
151
152 /** @return an estimate of the current number of frames we are encoding per second,
153  *  or 0 if not known.
154  */
155 float
156 Encoder::current_encoding_rate () const
157 {
158         boost::mutex::scoped_lock lock (_state_mutex);
159         if (int (_time_history.size()) < _history_size) {
160                 return 0;
161         }
162
163         struct timeval now;
164         gettimeofday (&now, 0);
165
166         return _history_size / (seconds (now) - seconds (_time_history.back ()));
167 }
168
169 /** @return Number of video frames that have been sent out */
170 int
171 Encoder::video_frames_out () const
172 {
173         boost::mutex::scoped_lock (_state_mutex);
174         return _video_frames_enqueued;
175 }
176
177 /** Should be called when a frame has been encoded successfully.
178  *  @param n Source frame index.
179  */
180 void
181 Encoder::frame_done ()
182 {
183         boost::mutex::scoped_lock lock (_state_mutex);
184
185         struct timeval tv;
186         gettimeofday (&tv, 0);
187         _time_history.push_front (tv);
188         if (int (_time_history.size()) > _history_size) {
189                 _time_history.pop_back ();
190         }
191 }
192
193 /** Called in order, so each time this is called the supplied frame is the one
194  *  after the previous one.
195  */
196 void
197 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
198 {
199         _waker.nudge ();
200
201         boost::mutex::scoped_lock lock (_mutex);
202
203         /* XXX: discard 3D here if required */
204
205         /* Wait until the queue has gone down a bit */
206         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
207                 LOG_TIMING ("decoder sleeps with queue of %1", _queue.size());
208                 _full_condition.wait (lock);
209                 LOG_TIMING ("decoder wakes with queue of %1", _queue.size());
210         }
211
212         if (_terminate) {
213                 return;
214         }
215
216         _writer->rethrow ();
217         /* Re-throw any exception raised by one of our threads.  If more
218            than one has thrown an exception, only one will be rethrown, I think;
219            but then, if that happens something has gone badly wrong.
220         */
221         rethrow ();
222
223         if (_writer->can_fake_write (_video_frames_enqueued)) {
224                 /* We can fake-write this frame */
225                 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
226                 frame_done ();
227         } else if (pv->has_j2k ()) {
228                 /* This frame already has JPEG2000 data, so just write it */
229                 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
230         } else if (_last_player_video && pv->same (_last_player_video)) {
231                 _writer->repeat (_video_frames_enqueued, pv->eyes ());
232         } else {
233                 /* Queue this new frame for encoding */
234                 LOG_TIMING ("adding to queue of %1", _queue.size ());
235                 _queue.push_back (shared_ptr<DCPVideo> (
236                                           new DCPVideo (
237                                                   pv,
238                                                   _video_frames_enqueued,
239                                                   _film->video_frame_rate(),
240                                                   _film->j2k_bandwidth(),
241                                                   _film->resolution(),
242                                                   _film->log()
243                                                   )
244                                           ));
245
246                 /* The queue might not be empty any more, so notify anything which is
247                    waiting on that.
248                 */
249                 _empty_condition.notify_all ();
250         }
251
252         switch (pv->eyes ()) {
253         case EYES_BOTH:
254                 ++_video_frames_enqueued;
255                 break;
256         case EYES_LEFT:
257                 _left_done = true;
258                 break;
259         case EYES_RIGHT:
260                 _right_done = true;
261                 break;
262         default:
263                 break;
264         }
265
266         if (_left_done && _right_done) {
267                 ++_video_frames_enqueued;
268                 _left_done = _right_done = false;
269         }
270
271         _last_player_video = pv;
272 }
273
274 void
275 Encoder::terminate_threads ()
276 {
277         {
278                 boost::mutex::scoped_lock lock (_mutex);
279                 _terminate = true;
280                 _full_condition.notify_all ();
281                 _empty_condition.notify_all ();
282         }
283
284         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
285                 if ((*i)->joinable ()) {
286                         (*i)->join ();
287                 }
288                 delete *i;
289         }
290
291         _threads.clear ();
292         _terminate = false;
293 }
294
295 void
296 Encoder::encoder_thread (optional<ServerDescription> server)
297 try
298 {
299         /* Number of seconds that we currently wait between attempts
300            to connect to the server; not relevant for localhost
301            encodings.
302         */
303         int remote_backoff = 0;
304
305         while (true) {
306
307                 LOG_TIMING ("[%1] encoder thread sleeps", boost::this_thread::get_id());
308                 boost::mutex::scoped_lock lock (_mutex);
309                 while (_queue.empty () && !_terminate) {
310                         _empty_condition.wait (lock);
311                 }
312
313                 if (_terminate) {
314                         return;
315                 }
316
317                 LOG_TIMING ("[%1] encoder thread wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
318                 shared_ptr<DCPVideo> vf = _queue.front ();
319                 LOG_TIMING ("[%1] encoder thread pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
320                 _queue.pop_front ();
321
322                 lock.unlock ();
323
324                 optional<Data> encoded;
325
326                 /* We need to encode this input */
327                 if (server) {
328                         try {
329                                 encoded = vf->encode_remotely (server.get ());
330
331                                 if (remote_backoff > 0) {
332                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
333                                 }
334
335                                 /* This job succeeded, so remove any backoff */
336                                 remote_backoff = 0;
337
338                         } catch (std::exception& e) {
339                                 if (remote_backoff < 60) {
340                                         /* back off more */
341                                         remote_backoff += 10;
342                                 }
343                                 LOG_ERROR (
344                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
345                                         vf->index(), server->host_name(), e.what(), remote_backoff
346                                         );
347                         }
348
349                 } else {
350                         try {
351                                 LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index());
352                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
353                                 LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index());
354                         } catch (std::exception& e) {
355                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
356                         }
357                 }
358
359                 if (encoded) {
360                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
361                         frame_done ();
362                 } else {
363                         lock.lock ();
364                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
365                         _queue.push_front (vf);
366                         lock.unlock ();
367                 }
368
369                 if (remote_backoff > 0) {
370                         dcpomatic_sleep (remote_backoff);
371                 }
372
373                 /* The queue might not be full any more, so notify anything that is waiting on that */
374                 lock.lock ();
375                 _full_condition.notify_all ();
376         }
377 }
378 catch (...)
379 {
380         store_current ();
381 }
382
383 void
384 Encoder::servers_list_changed ()
385 {
386         terminate_threads ();
387         BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
388                 add_worker_threads (i);
389         }
390 }