No-op; rename a variable and add some comments.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2014 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/lambda/lambda.hpp>
26 #include <libcxml/cxml.h>
27 #include "encoder.h"
28 #include "util.h"
29 #include "film.h"
30 #include "log.h"
31 #include "config.h"
32 #include "dcp_video.h"
33 #include "server.h"
34 #include "cross.h"
35 #include "writer.h"
36 #include "server_finder.h"
37 #include "player.h"
38 #include "player_video.h"
39
40 #include "i18n.h"
41
42 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
43 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
44 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
45
46 using std::pair;
47 using std::string;
48 using std::vector;
49 using std::list;
50 using std::cout;
51 using std::min;
52 using std::make_pair;
53 using boost::shared_ptr;
54 using boost::weak_ptr;
55 using boost::optional;
56 using boost::scoped_array;
57
58 int const Encoder::_history_size = 25;
59
60 /** @param f Film that we are encoding */
61 Encoder::Encoder (shared_ptr<const Film> f, weak_ptr<Job> j, shared_ptr<Writer> writer)
62         : _film (f)
63         , _job (j)
64         , _video_frames_out (0)
65         , _terminate (false)
66         , _writer (writer)
67 {
68
69 }
70
71 Encoder::~Encoder ()
72 {
73         terminate_threads ();
74 }
75
76 /** Add a worker thread for a each thread on a remote server.  Caller must hold
77  *  a lock on _mutex, or know that one is not currently required to
78  *  safely modify _threads.
79  */
80 void
81 Encoder::add_worker_threads (ServerDescription d)
82 {
83         LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), d.host_name ());
84         for (int i = 0; i < d.threads(); ++i) {
85                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
86         }
87 }
88
89 void
90 Encoder::begin ()
91 {
92         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
93                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
94         }
95
96         ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
97 }
98
99 void
100 Encoder::end ()
101 {
102         boost::mutex::scoped_lock lock (_mutex);
103
104         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
105
106         /* Keep waking workers until the queue is empty */
107         while (!_queue.empty ()) {
108                 _empty_condition.notify_all ();
109                 _full_condition.wait (lock);
110         }
111
112         lock.unlock ();
113         
114         terminate_threads ();
115
116         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
117
118         /* The following sequence of events can occur in the above code:
119              1. a remote worker takes the last image off the queue
120              2. the loop above terminates
121              3. the remote worker fails to encode the image and puts it back on the queue
122              4. the remote worker is then terminated by terminate_threads
123
124              So just mop up anything left in the queue here.
125         */
126
127         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
128                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
129                 try {
130                         _writer->write ((*i)->encode_locally(), (*i)->index (), (*i)->eyes ());
131                         frame_done ();
132                 } catch (std::exception& e) {
133                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
134                 }
135         }
136 }       
137
138 /** @return an estimate of the current number of frames we are encoding per second,
139  *  or 0 if not known.
140  */
141 float
142 Encoder::current_encoding_rate () const
143 {
144         boost::mutex::scoped_lock lock (_state_mutex);
145         if (int (_time_history.size()) < _history_size) {
146                 return 0;
147         }
148
149         struct timeval now;
150         gettimeofday (&now, 0);
151
152         return _history_size / (seconds (now) - seconds (_time_history.back ()));
153 }
154
155 /** @return Number of video frames that have been sent out */
156 int
157 Encoder::video_frames_out () const
158 {
159         boost::mutex::scoped_lock (_state_mutex);
160         return _video_frames_out;
161 }
162
163 /** Should be called when a frame has been encoded successfully.
164  *  @param n Source frame index.
165  */
166 void
167 Encoder::frame_done ()
168 {
169         boost::mutex::scoped_lock lock (_state_mutex);
170         
171         struct timeval tv;
172         gettimeofday (&tv, 0);
173         _time_history.push_front (tv);
174         if (int (_time_history.size()) > _history_size) {
175                 _time_history.pop_back ();
176         }
177 }
178
179 void
180 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
181 {
182         _waker.nudge ();
183         
184         boost::mutex::scoped_lock lock (_mutex);
185
186         /* XXX: discard 3D here if required */
187
188         /* Wait until the queue has gone down a bit */
189         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
190                 LOG_TIMING ("decoder sleeps with queue of %1", _queue.size());
191                 _full_condition.wait (lock);
192                 LOG_TIMING ("decoder wakes with queue of %1", _queue.size());
193         }
194
195         if (_terminate) {
196                 return;
197         }
198
199         _writer->rethrow ();
200         /* Re-throw any exception raised by one of our threads.  If more
201            than one has thrown an exception, only one will be rethrown, I think;
202            but then, if that happens something has gone badly wrong.
203         */
204         rethrow ();
205
206         if (_writer->can_fake_write (_video_frames_out)) {
207                 /* We can fake-write this frame */
208                 _writer->fake_write (_video_frames_out, pv->eyes ());
209                 frame_done ();
210         } else if (pv->has_j2k ()) {
211                 /* This frame already has JPEG2000 data, so just write it */
212                 _writer->write (pv->j2k(), _video_frames_out, pv->eyes ());
213         } else {
214                 /* Queue this new frame for encoding */
215                 LOG_TIMING ("adding to queue of %1", _queue.size ());
216                 _queue.push_back (shared_ptr<DCPVideo> (
217                                           new DCPVideo (
218                                                   pv,
219                                                   _video_frames_out,
220                                                   _film->video_frame_rate(),
221                                                   _film->j2k_bandwidth(),
222                                                   _film->resolution(),
223                                                   _film->burn_subtitles(),
224                                                   _film->log()
225                                                   )
226                                           ));
227
228                 /* The queue might not be empty any more, so notify anything which is
229                    waiting on that.
230                 */
231                 _empty_condition.notify_all ();
232         }
233
234         if (pv->eyes() != EYES_LEFT) {
235                 ++_video_frames_out;
236         }
237 }
238
239 void
240 Encoder::terminate_threads ()
241 {
242         {
243                 boost::mutex::scoped_lock lock (_mutex);
244                 _terminate = true;
245                 _full_condition.notify_all ();
246                 _empty_condition.notify_all ();
247         }
248
249         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
250                 if ((*i)->joinable ()) {
251                         (*i)->join ();
252                 }
253                 delete *i;
254         }
255
256         _threads.clear ();
257 }
258
259 void
260 Encoder::encoder_thread (optional<ServerDescription> server)
261 try
262 {
263         /* Number of seconds that we currently wait between attempts
264            to connect to the server; not relevant for localhost
265            encodings.
266         */
267         int remote_backoff = 0;
268         
269         while (true) {
270
271                 LOG_TIMING ("[%1] encoder thread sleeps", boost::this_thread::get_id());
272                 boost::mutex::scoped_lock lock (_mutex);
273                 while (_queue.empty () && !_terminate) {
274                         _empty_condition.wait (lock);
275                 }
276
277                 if (_terminate) {
278                         return;
279                 }
280
281                 LOG_TIMING ("[%1] encoder thread wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
282                 shared_ptr<DCPVideo> vf = _queue.front ();
283                 LOG_TIMING ("[%1] encoder thread pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
284                 _queue.pop_front ();
285                 
286                 lock.unlock ();
287
288                 shared_ptr<EncodedData> encoded;
289
290                 if (server) {
291                         try {
292                                 encoded = vf->encode_remotely (server.get ());
293
294                                 if (remote_backoff > 0) {
295                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
296                                 }
297                                 
298                                 /* This job succeeded, so remove any backoff */
299                                 remote_backoff = 0;
300                                 
301                         } catch (std::exception& e) {
302                                 if (remote_backoff < 60) {
303                                         /* back off more */
304                                         remote_backoff += 10;
305                                 }
306                                 LOG_ERROR (
307                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
308                                         vf->index(), server->host_name(), e.what(), remote_backoff
309                                         );
310                         }
311                                 
312                 } else {
313                         try {
314                                 LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index());
315                                 encoded = vf->encode_locally ();
316                                 LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index());
317                         } catch (std::exception& e) {
318                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
319                         }
320                 }
321
322                 if (encoded) {
323                         _writer->write (encoded, vf->index (), vf->eyes ());
324                         frame_done ();
325                 } else {
326                         lock.lock ();
327                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
328                         _queue.push_front (vf);
329                         lock.unlock ();
330                 }
331
332                 if (remote_backoff > 0) {
333                         dcpomatic_sleep (remote_backoff);
334                 }
335
336                 /* The queue might not be full any more, so notify anything that is waiting on that */
337                 lock.lock ();
338                 _full_condition.notify_all ();
339         }
340 }
341 catch (...)
342 {
343         store_current ();
344 }
345
346 void
347 Encoder::server_found (ServerDescription s)
348 {
349         add_worker_threads (s);
350 }