Merge remote-tracking branch 'origin/master' into 2.0
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2014 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/lambda/lambda.hpp>
26 #include <libcxml/cxml.h>
27 #include "encoder.h"
28 #include "util.h"
29 #include "film.h"
30 #include "log.h"
31 #include "config.h"
32 #include "dcp_video.h"
33 #include "server.h"
34 #include "cross.h"
35 #include "writer.h"
36 #include "server_finder.h"
37 #include "player.h"
38 #include "player_video.h"
39
40 #include "i18n.h"
41
42 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
43 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
44 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
45
46 using std::pair;
47 using std::string;
48 using std::vector;
49 using std::list;
50 using std::cout;
51 using std::min;
52 using std::make_pair;
53 using boost::shared_ptr;
54 using boost::weak_ptr;
55 using boost::optional;
56 using boost::scoped_array;
57
58 int const Encoder::_history_size = 25;
59
60 /** @param f Film that we are encoding */
61 Encoder::Encoder (shared_ptr<const Film> f, weak_ptr<Job> j, shared_ptr<Writer> writer)
62         : _film (f)
63         , _job (j)
64         , _video_frames_out (0)
65         , _terminate (false)
66         , _writer (writer)
67 {
68
69 }
70
71 Encoder::~Encoder ()
72 {
73         terminate_threads ();
74 }
75
76 /** Add a worker thread for a each thread on a remote server.  Caller must hold
77  *  a lock on _mutex, or know that one is not currently required to
78  *  safely modify _threads.
79  */
80 void
81 Encoder::add_worker_threads (ServerDescription d)
82 {
83         LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), d.host_name ());
84         for (int i = 0; i < d.threads(); ++i) {
85                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
86         }
87 }
88
89 void
90 Encoder::begin ()
91 {
92         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
93                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
94         }
95
96         ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
97 }
98
99 void
100 Encoder::end ()
101 {
102         boost::mutex::scoped_lock lock (_mutex);
103
104         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
105
106         /* Keep waking workers until the queue is empty */
107         while (!_queue.empty ()) {
108                 _empty_condition.notify_all ();
109                 _full_condition.wait (lock);
110         }
111
112         lock.unlock ();
113         
114         terminate_threads ();
115
116         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
117
118         /* The following sequence of events can occur in the above code:
119              1. a remote worker takes the last image off the queue
120              2. the loop above terminates
121              3. the remote worker fails to encode the image and puts it back on the queue
122              4. the remote worker is then terminated by terminate_threads
123
124              So just mop up anything left in the queue here.
125         */
126
127         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
128                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
129                 try {
130                         _writer->write ((*i)->encode_locally(), (*i)->index (), (*i)->eyes ());
131                         frame_done ();
132                 } catch (std::exception& e) {
133                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
134                 }
135         }
136 }       
137
138 /** @return an estimate of the current number of frames we are encoding per second,
139  *  or 0 if not known.
140  */
141 float
142 Encoder::current_encoding_rate () const
143 {
144         boost::mutex::scoped_lock lock (_state_mutex);
145         if (int (_time_history.size()) < _history_size) {
146                 return 0;
147         }
148
149         struct timeval now;
150         gettimeofday (&now, 0);
151
152         return _history_size / (seconds (now) - seconds (_time_history.back ()));
153 }
154
155 /** @return Number of video frames that have been sent out */
156 int
157 Encoder::video_frames_out () const
158 {
159         boost::mutex::scoped_lock (_state_mutex);
160         return _video_frames_out;
161 }
162
163 /** Should be called when a frame has been encoded successfully.
164  *  @param n Source frame index.
165  */
166 void
167 Encoder::frame_done ()
168 {
169         boost::mutex::scoped_lock lock (_state_mutex);
170         
171         struct timeval tv;
172         gettimeofday (&tv, 0);
173         _time_history.push_front (tv);
174         if (int (_time_history.size()) > _history_size) {
175                 _time_history.pop_back ();
176         }
177 }
178
179 void
180 Encoder::enqueue (shared_ptr<PlayerVideo> pvf)
181 {
182         _waker.nudge ();
183         
184         boost::mutex::scoped_lock lock (_mutex);
185
186         /* XXX: discard 3D here if required */
187
188         /* Wait until the queue has gone down a bit */
189         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
190                 LOG_TIMING ("decoder sleeps with queue of %1", _queue.size());
191                 _full_condition.wait (lock);
192                 LOG_TIMING ("decoder wakes with queue of %1", _queue.size());
193         }
194
195         if (_terminate) {
196                 return;
197         }
198
199         _writer->rethrow ();
200         /* Re-throw any exception raised by one of our threads.  If more
201            than one has thrown an exception, only one will be rethrown, I think;
202            but then, if that happens something has gone badly wrong.
203         */
204         rethrow ();
205
206         if (_writer->can_fake_write (_video_frames_out)) {
207                 _writer->fake_write (_video_frames_out, pvf->eyes ());
208                 frame_done ();
209         } else if (pvf->has_j2k ()) {
210                 _writer->write (pvf->j2k(), _video_frames_out, pvf->eyes ());
211         } else {
212                 /* Queue this new frame for encoding */
213                 LOG_TIMING ("adding to queue of %1", _queue.size ());
214                 _queue.push_back (shared_ptr<DCPVideo> (
215                                           new DCPVideo (
216                                                   pvf,
217                                                   _video_frames_out,
218                                                   _film->video_frame_rate(),
219                                                   _film->j2k_bandwidth(),
220                                                   _film->resolution(),
221                                                   _film->burn_subtitles(),
222                                                   _film->log()
223                                                   )
224                                           ));
225
226                 /* The queue might not be empty any more, so notify anything which is
227                    waiting on that.
228                 */
229                 _empty_condition.notify_all ();
230         }
231
232         if (pvf->eyes() != EYES_LEFT) {
233                 ++_video_frames_out;
234         }
235 }
236
237 void
238 Encoder::terminate_threads ()
239 {
240         {
241                 boost::mutex::scoped_lock lock (_mutex);
242                 _terminate = true;
243                 _full_condition.notify_all ();
244                 _empty_condition.notify_all ();
245         }
246
247         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
248                 if ((*i)->joinable ()) {
249                         (*i)->join ();
250                 }
251                 delete *i;
252         }
253
254         _threads.clear ();
255 }
256
257 void
258 Encoder::encoder_thread (optional<ServerDescription> server)
259 try
260 {
261         /* Number of seconds that we currently wait between attempts
262            to connect to the server; not relevant for localhost
263            encodings.
264         */
265         int remote_backoff = 0;
266         
267         while (true) {
268
269                 LOG_TIMING ("[%1] encoder thread sleeps", boost::this_thread::get_id());
270                 boost::mutex::scoped_lock lock (_mutex);
271                 while (_queue.empty () && !_terminate) {
272                         _empty_condition.wait (lock);
273                 }
274
275                 if (_terminate) {
276                         return;
277                 }
278
279                 LOG_TIMING ("[%1] encoder thread wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
280                 shared_ptr<DCPVideo> vf = _queue.front ();
281                 LOG_TIMING ("[%1] encoder thread pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
282                 _queue.pop_front ();
283                 
284                 lock.unlock ();
285
286                 shared_ptr<EncodedData> encoded;
287
288                 if (server) {
289                         try {
290                                 encoded = vf->encode_remotely (server.get ());
291
292                                 if (remote_backoff > 0) {
293                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
294                                 }
295                                 
296                                 /* This job succeeded, so remove any backoff */
297                                 remote_backoff = 0;
298                                 
299                         } catch (std::exception& e) {
300                                 if (remote_backoff < 60) {
301                                         /* back off more */
302                                         remote_backoff += 10;
303                                 }
304                                 LOG_ERROR (
305                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
306                                         vf->index(), server->host_name(), e.what(), remote_backoff
307                                         );
308                         }
309                                 
310                 } else {
311                         try {
312                                 LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index());
313                                 encoded = vf->encode_locally ();
314                                 LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index());
315                         } catch (std::exception& e) {
316                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
317                         }
318                 }
319
320                 if (encoded) {
321                         _writer->write (encoded, vf->index (), vf->eyes ());
322                         frame_done ();
323                 } else {
324                         lock.lock ();
325                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
326                         _queue.push_front (vf);
327                         lock.unlock ();
328                 }
329
330                 if (remote_backoff > 0) {
331                         dcpomatic_sleep (remote_backoff);
332                 }
333
334                 /* The queue might not be full any more, so notify anything that is waiting on that */
335                 lock.lock ();
336                 _full_condition.notify_all ();
337         }
338 }
339 catch (...)
340 {
341         store_current ();
342 }
343
344 void
345 Encoder::server_found (ServerDescription s)
346 {
347         add_worker_threads (s);
348 }