Hand-apply 33b76b675d747fd828aba91d9d857227cb8a8244 from master; make sure signals...
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2014 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include <libcxml/cxml.h>
37 #include <boost/lambda/lambda.hpp>
38 #include <iostream>
39
40 #include "i18n.h"
41
42 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
43 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
44 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
45
46 using std::pair;
47 using std::string;
48 using std::vector;
49 using std::list;
50 using std::cout;
51 using std::min;
52 using std::make_pair;
53 using boost::shared_ptr;
54 using boost::weak_ptr;
55 using boost::optional;
56 using boost::scoped_array;
57
58 int const Encoder::_history_size = 25;
59
60 /** @param f Film that we are encoding */
61 Encoder::Encoder (shared_ptr<const Film> f, weak_ptr<Job> j, shared_ptr<Writer> writer)
62         : _film (f)
63         , _job (j)
64         , _video_frames_out (0)
65         , _terminate (false)
66         , _writer (writer)
67 {
68
69 }
70
71 Encoder::~Encoder ()
72 {
73         terminate_threads ();
74 }
75
76 /** Add a worker thread for a each thread on a remote server.  Caller must hold
77  *  a lock on _mutex, or know that one is not currently required to
78  *  safely modify _threads.
79  */
80 void
81 Encoder::add_worker_threads (ServerDescription d)
82 {
83         LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), d.threads(), d.host_name ());
84         for (int i = 0; i < d.threads(); ++i) {
85                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
86         }
87 }
88
89 void
90 Encoder::begin ()
91 {
92         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
93                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
94         }
95
96         if (!ServerFinder::instance()->disabled ()) {
97                 _server_found_connection = ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
98         }
99 }
100
101 void
102 Encoder::end ()
103 {
104         boost::mutex::scoped_lock lock (_mutex);
105
106         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
107
108         /* Keep waking workers until the queue is empty */
109         while (!_queue.empty ()) {
110                 _empty_condition.notify_all ();
111                 _full_condition.wait (lock);
112         }
113
114         lock.unlock ();
115         
116         terminate_threads ();
117
118         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
119
120         /* The following sequence of events can occur in the above code:
121              1. a remote worker takes the last image off the queue
122              2. the loop above terminates
123              3. the remote worker fails to encode the image and puts it back on the queue
124              4. the remote worker is then terminated by terminate_threads
125
126              So just mop up anything left in the queue here.
127         */
128
129         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
130                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
131                 try {
132                         _writer->write (
133                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
134                                 (*i)->index (),
135                                 (*i)->eyes ()
136                                 );
137                         frame_done ();
138                 } catch (std::exception& e) {
139                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
140                 }
141         }
142 }       
143
144 /** @return an estimate of the current number of frames we are encoding per second,
145  *  or 0 if not known.
146  */
147 float
148 Encoder::current_encoding_rate () const
149 {
150         boost::mutex::scoped_lock lock (_state_mutex);
151         if (int (_time_history.size()) < _history_size) {
152                 return 0;
153         }
154
155         struct timeval now;
156         gettimeofday (&now, 0);
157
158         return _history_size / (seconds (now) - seconds (_time_history.back ()));
159 }
160
161 /** @return Number of video frames that have been sent out */
162 int
163 Encoder::video_frames_out () const
164 {
165         boost::mutex::scoped_lock (_state_mutex);
166         return _video_frames_out;
167 }
168
169 /** Should be called when a frame has been encoded successfully.
170  *  @param n Source frame index.
171  */
172 void
173 Encoder::frame_done ()
174 {
175         boost::mutex::scoped_lock lock (_state_mutex);
176         
177         struct timeval tv;
178         gettimeofday (&tv, 0);
179         _time_history.push_front (tv);
180         if (int (_time_history.size()) > _history_size) {
181                 _time_history.pop_back ();
182         }
183 }
184
185 /** Called in order, so each time this is called the supplied frame is the one
186  *  after the previous one.
187  */
188 void
189 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
190 {
191         _waker.nudge ();
192         
193         boost::mutex::scoped_lock lock (_mutex);
194
195         /* XXX: discard 3D here if required */
196
197         /* Wait until the queue has gone down a bit */
198         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
199                 LOG_TIMING ("decoder sleeps with queue of %1", _queue.size());
200                 _full_condition.wait (lock);
201                 LOG_TIMING ("decoder wakes with queue of %1", _queue.size());
202         }
203
204         if (_terminate) {
205                 return;
206         }
207
208         _writer->rethrow ();
209         /* Re-throw any exception raised by one of our threads.  If more
210            than one has thrown an exception, only one will be rethrown, I think;
211            but then, if that happens something has gone badly wrong.
212         */
213         rethrow ();
214
215         if (_writer->can_fake_write (_video_frames_out)) {
216                 /* We can fake-write this frame */
217                 _writer->fake_write (_video_frames_out, pv->eyes ());
218                 frame_done ();
219         } else if (pv->has_j2k ()) {
220                 /* This frame already has JPEG2000 data, so just write it */
221                 _writer->write (pv->j2k(), _video_frames_out, pv->eyes ());
222         } else {
223                 /* Queue this new frame for encoding */
224                 LOG_TIMING ("adding to queue of %1", _queue.size ());
225                 _queue.push_back (shared_ptr<DCPVideo> (
226                                           new DCPVideo (
227                                                   pv,
228                                                   _video_frames_out,
229                                                   _film->video_frame_rate(),
230                                                   _film->j2k_bandwidth(),
231                                                   _film->resolution(),
232                                                   _film->burn_subtitles(),
233                                                   _film->log()
234                                                   )
235                                           ));
236
237                 /* The queue might not be empty any more, so notify anything which is
238                    waiting on that.
239                 */
240                 _empty_condition.notify_all ();
241         }
242
243         if (pv->eyes() != EYES_LEFT) {
244                 ++_video_frames_out;
245         }
246 }
247
248 void
249 Encoder::terminate_threads ()
250 {
251         {
252                 boost::mutex::scoped_lock lock (_mutex);
253                 _terminate = true;
254                 _full_condition.notify_all ();
255                 _empty_condition.notify_all ();
256         }
257
258         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
259                 if ((*i)->joinable ()) {
260                         (*i)->join ();
261                 }
262                 delete *i;
263         }
264
265         _threads.clear ();
266 }
267
268 void
269 Encoder::encoder_thread (optional<ServerDescription> server)
270 try
271 {
272         /* Number of seconds that we currently wait between attempts
273            to connect to the server; not relevant for localhost
274            encodings.
275         */
276         int remote_backoff = 0;
277         shared_ptr<DCPVideo> last_dcp_video;
278         shared_ptr<EncodedData> last_encoded;
279         
280         while (true) {
281
282                 LOG_TIMING ("[%1] encoder thread sleeps", boost::this_thread::get_id());
283                 boost::mutex::scoped_lock lock (_mutex);
284                 while (_queue.empty () && !_terminate) {
285                         _empty_condition.wait (lock);
286                 }
287
288                 if (_terminate) {
289                         return;
290                 }
291
292                 LOG_TIMING ("[%1] encoder thread wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
293                 shared_ptr<DCPVideo> vf = _queue.front ();
294                 LOG_TIMING ("[%1] encoder thread pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
295                 _queue.pop_front ();
296                 
297                 lock.unlock ();
298
299                 shared_ptr<EncodedData> encoded;
300
301                 if (last_dcp_video && vf->same (last_dcp_video)) {
302                         /* We already have encoded data for the same input as this one, so take a short-cut */
303                         encoded = last_encoded;
304                 } else {
305                         /* We need to encode this input */
306                         if (server) {
307                                 try {
308                                         encoded = vf->encode_remotely (server.get ());
309                                         
310                                         if (remote_backoff > 0) {
311                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
312                                         }
313                                         
314                                         /* This job succeeded, so remove any backoff */
315                                         remote_backoff = 0;
316                                         
317                                 } catch (std::exception& e) {
318                                         if (remote_backoff < 60) {
319                                                 /* back off more */
320                                                 remote_backoff += 10;
321                                         }
322                                         LOG_ERROR (
323                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
324                                                 vf->index(), server->host_name(), e.what(), remote_backoff
325                                                 );
326                                 }
327                                 
328                         } else {
329                                 try {
330                                         LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index());
331                                         encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
332                                         LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index());
333                                 } catch (std::exception& e) {
334                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
335                                 }
336                         }
337                 }
338
339                 last_dcp_video = vf;
340                 last_encoded = encoded;
341
342                 if (encoded) {
343                         _writer->write (encoded, vf->index (), vf->eyes ());
344                         frame_done ();
345                 } else {
346                         lock.lock ();
347                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
348                         _queue.push_front (vf);
349                         lock.unlock ();
350                 }
351
352                 if (remote_backoff > 0) {
353                         dcpomatic_sleep (remote_backoff);
354                 }
355
356                 /* The queue might not be full any more, so notify anything that is waiting on that */
357                 lock.lock ();
358                 _full_condition.notify_all ();
359         }
360 }
361 catch (...)
362 {
363         store_current ();
364 }
365
366 void
367 Encoder::server_found (ServerDescription s)
368 {
369         add_worker_threads (s);
370 }