Merge branch 'content-burn-subs' into 2.0
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include <libcxml/cxml.h>
38 #include <boost/lambda/lambda.hpp>
39 #include <iostream>
40
41 #include "i18n.h"
42
43 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
44 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
45 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
46
47 using std::pair;
48 using std::string;
49 using std::vector;
50 using std::list;
51 using std::cout;
52 using std::min;
53 using std::make_pair;
54 using boost::shared_ptr;
55 using boost::weak_ptr;
56 using boost::optional;
57 using boost::scoped_array;
58
59 int const Encoder::_history_size = 25;
60
61 /** @param f Film that we are encoding */
62 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
63         : _film (film)
64         , _job (j)
65         , _video_frames_enqueued (0)
66         , _terminate (false)
67         , _writer (writer)
68 {
69
70 }
71
72 Encoder::~Encoder ()
73 {
74         terminate_threads ();
75 }
76
77 /** Add a worker thread for a each thread on a remote server.  Caller must hold
78  *  a lock on _mutex, or know that one is not currently required to
79  *  safely modify _threads.
80  */
81 void
82 Encoder::add_worker_threads (ServerDescription d)
83 {
84         LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), d.threads(), d.host_name ());
85         for (int i = 0; i < d.threads(); ++i) {
86                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
87         }
88
89         _writer->set_encoder_threads (_threads.size ());
90 }
91
92 void
93 Encoder::begin ()
94 {
95         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
96                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
97         }
98
99         _writer->set_encoder_threads (_threads.size ());
100
101         if (!ServerFinder::instance()->disabled ()) {
102                 _server_found_connection = ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
103         }
104 }
105
106 void
107 Encoder::end ()
108 {
109         boost::mutex::scoped_lock lock (_mutex);
110
111         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
112
113         /* Keep waking workers until the queue is empty */
114         while (!_queue.empty ()) {
115                 _empty_condition.notify_all ();
116                 _full_condition.wait (lock);
117         }
118
119         lock.unlock ();
120
121         terminate_threads ();
122
123         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
124
125         /* The following sequence of events can occur in the above code:
126              1. a remote worker takes the last image off the queue
127              2. the loop above terminates
128              3. the remote worker fails to encode the image and puts it back on the queue
129              4. the remote worker is then terminated by terminate_threads
130
131              So just mop up anything left in the queue here.
132         */
133
134         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
135                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
136                 try {
137                         _writer->write (
138                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
139                                 (*i)->index (),
140                                 (*i)->eyes ()
141                                 );
142                         frame_done ();
143                 } catch (std::exception& e) {
144                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
145                 }
146         }
147 }
148
149 /** @return an estimate of the current number of frames we are encoding per second,
150  *  or 0 if not known.
151  */
152 float
153 Encoder::current_encoding_rate () const
154 {
155         boost::mutex::scoped_lock lock (_state_mutex);
156         if (int (_time_history.size()) < _history_size) {
157                 return 0;
158         }
159
160         struct timeval now;
161         gettimeofday (&now, 0);
162
163         return _history_size / (seconds (now) - seconds (_time_history.back ()));
164 }
165
166 /** @return Number of video frames that have been sent out */
167 int
168 Encoder::video_frames_out () const
169 {
170         boost::mutex::scoped_lock (_state_mutex);
171         return _video_frames_enqueued;
172 }
173
174 /** Should be called when a frame has been encoded successfully.
175  *  @param n Source frame index.
176  */
177 void
178 Encoder::frame_done ()
179 {
180         boost::mutex::scoped_lock lock (_state_mutex);
181
182         struct timeval tv;
183         gettimeofday (&tv, 0);
184         _time_history.push_front (tv);
185         if (int (_time_history.size()) > _history_size) {
186                 _time_history.pop_back ();
187         }
188 }
189
190 /** Called in order, so each time this is called the supplied frame is the one
191  *  after the previous one.
192  */
193 void
194 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
195 {
196         _waker.nudge ();
197
198         boost::mutex::scoped_lock lock (_mutex);
199
200         /* XXX: discard 3D here if required */
201
202         /* Wait until the queue has gone down a bit */
203         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
204                 LOG_TIMING ("decoder sleeps with queue of %1", _queue.size());
205                 _full_condition.wait (lock);
206                 LOG_TIMING ("decoder wakes with queue of %1", _queue.size());
207         }
208
209         if (_terminate) {
210                 return;
211         }
212
213         _writer->rethrow ();
214         /* Re-throw any exception raised by one of our threads.  If more
215            than one has thrown an exception, only one will be rethrown, I think;
216            but then, if that happens something has gone badly wrong.
217         */
218         rethrow ();
219
220         if (_writer->can_fake_write (_video_frames_enqueued)) {
221                 /* We can fake-write this frame */
222                 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
223                 frame_done ();
224         } else if (pv->has_j2k ()) {
225                 /* This frame already has JPEG2000 data, so just write it */
226                 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
227         } else if (_last_player_video && pv->same (_last_player_video)) {
228                 _writer->repeat (_video_frames_enqueued, pv->eyes ());
229         } else {
230                 /* Queue this new frame for encoding */
231                 LOG_TIMING ("adding to queue of %1", _queue.size ());
232                 _queue.push_back (shared_ptr<DCPVideo> (
233                                           new DCPVideo (
234                                                   pv,
235                                                   _video_frames_enqueued,
236                                                   _film->video_frame_rate(),
237                                                   _film->j2k_bandwidth(),
238                                                   _film->resolution(),
239                                                   _film->log()
240                                                   )
241                                           ));
242
243                 /* The queue might not be empty any more, so notify anything which is
244                    waiting on that.
245                 */
246                 _empty_condition.notify_all ();
247         }
248
249         if (pv->eyes() != EYES_LEFT) {
250                 ++_video_frames_enqueued;
251         }
252
253         _last_player_video = pv;
254 }
255
256 void
257 Encoder::terminate_threads ()
258 {
259         {
260                 boost::mutex::scoped_lock lock (_mutex);
261                 _terminate = true;
262                 _full_condition.notify_all ();
263                 _empty_condition.notify_all ();
264         }
265
266         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
267                 if ((*i)->joinable ()) {
268                         (*i)->join ();
269                 }
270                 delete *i;
271         }
272
273         _threads.clear ();
274 }
275
276 void
277 Encoder::encoder_thread (optional<ServerDescription> server)
278 try
279 {
280         /* Number of seconds that we currently wait between attempts
281            to connect to the server; not relevant for localhost
282            encodings.
283         */
284         int remote_backoff = 0;
285
286         while (true) {
287
288                 LOG_TIMING ("[%1] encoder thread sleeps", boost::this_thread::get_id());
289                 boost::mutex::scoped_lock lock (_mutex);
290                 while (_queue.empty () && !_terminate) {
291                         _empty_condition.wait (lock);
292                 }
293
294                 if (_terminate) {
295                         return;
296                 }
297
298                 LOG_TIMING ("[%1] encoder thread wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
299                 shared_ptr<DCPVideo> vf = _queue.front ();
300                 LOG_TIMING ("[%1] encoder thread pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
301                 _queue.pop_front ();
302
303                 lock.unlock ();
304
305                 optional<Data> encoded;
306
307                 /* We need to encode this input */
308                 if (server) {
309                         try {
310                                 encoded = vf->encode_remotely (server.get ());
311
312                                 if (remote_backoff > 0) {
313                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
314                                 }
315
316                                 /* This job succeeded, so remove any backoff */
317                                 remote_backoff = 0;
318
319                         } catch (std::exception& e) {
320                                 if (remote_backoff < 60) {
321                                         /* back off more */
322                                         remote_backoff += 10;
323                                 }
324                                 LOG_ERROR (
325                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
326                                         vf->index(), server->host_name(), e.what(), remote_backoff
327                                         );
328                         }
329
330                 } else {
331                         try {
332                                 LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index());
333                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
334                                 LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index());
335                         } catch (std::exception& e) {
336                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
337                         }
338                 }
339
340                 if (encoded) {
341                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
342                         frame_done ();
343                 } else {
344                         lock.lock ();
345                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
346                         _queue.push_front (vf);
347                         lock.unlock ();
348                 }
349
350                 if (remote_backoff > 0) {
351                         dcpomatic_sleep (remote_backoff);
352                 }
353
354                 /* The queue might not be full any more, so notify anything that is waiting on that */
355                 lock.lock ();
356                 _full_condition.notify_all ();
357         }
358 }
359 catch (...)
360 {
361         store_current ();
362 }
363
364 void
365 Encoder::server_found (ServerDescription s)
366 {
367         add_worker_threads (s);
368 }