No-op; variable renaming.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include <libcxml/cxml.h>
38 #include <boost/lambda/lambda.hpp>
39 #include <iostream>
40
41 #include "i18n.h"
42
43 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
44 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
45 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
46
47 using std::pair;
48 using std::string;
49 using std::vector;
50 using std::list;
51 using std::cout;
52 using std::min;
53 using std::make_pair;
54 using boost::shared_ptr;
55 using boost::weak_ptr;
56 using boost::optional;
57 using boost::scoped_array;
58
59 int const Encoder::_history_size = 25;
60
61 /** @param f Film that we are encoding */
62 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
63         : _film (film)
64         , _job (j)
65         , _video_frames_enqueued (0)
66         , _terminate (false)
67         , _writer (writer)
68 {
69
70 }
71
72 Encoder::~Encoder ()
73 {
74         terminate_threads ();
75 }
76
77 /** Add a worker thread for a each thread on a remote server.  Caller must hold
78  *  a lock on _mutex, or know that one is not currently required to
79  *  safely modify _threads.
80  */
81 void
82 Encoder::add_worker_threads (ServerDescription d)
83 {
84         LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), d.threads(), d.host_name ());
85         for (int i = 0; i < d.threads(); ++i) {
86                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
87         }
88
89         _writer->set_encoder_threads (_threads.size ());
90 }
91
92 void
93 Encoder::begin ()
94 {
95         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
96                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
97         }
98
99         _writer->set_encoder_threads (_threads.size ());
100
101         if (!ServerFinder::instance()->disabled ()) {
102                 _server_found_connection = ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
103         }
104 }
105
106 void
107 Encoder::end ()
108 {
109         boost::mutex::scoped_lock lock (_mutex);
110
111         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
112
113         /* Keep waking workers until the queue is empty */
114         while (!_queue.empty ()) {
115                 _empty_condition.notify_all ();
116                 _full_condition.wait (lock);
117         }
118
119         lock.unlock ();
120         
121         terminate_threads ();
122
123         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
124
125         /* The following sequence of events can occur in the above code:
126              1. a remote worker takes the last image off the queue
127              2. the loop above terminates
128              3. the remote worker fails to encode the image and puts it back on the queue
129              4. the remote worker is then terminated by terminate_threads
130
131              So just mop up anything left in the queue here.
132         */
133
134         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
135                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
136                 try {
137                         _writer->write (
138                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
139                                 (*i)->index (),
140                                 (*i)->eyes ()
141                                 );
142                         frame_done ();
143                 } catch (std::exception& e) {
144                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
145                 }
146         }
147 }       
148
149 /** @return an estimate of the current number of frames we are encoding per second,
150  *  or 0 if not known.
151  */
152 float
153 Encoder::current_encoding_rate () const
154 {
155         boost::mutex::scoped_lock lock (_state_mutex);
156         if (int (_time_history.size()) < _history_size) {
157                 return 0;
158         }
159
160         struct timeval now;
161         gettimeofday (&now, 0);
162
163         return _history_size / (seconds (now) - seconds (_time_history.back ()));
164 }
165
166 /** @return Number of video frames that have been sent out */
167 int
168 Encoder::video_frames_out () const
169 {
170         boost::mutex::scoped_lock (_state_mutex);
171         return _video_frames_enqueued;
172 }
173
174 /** Should be called when a frame has been encoded successfully.
175  *  @param n Source frame index.
176  */
177 void
178 Encoder::frame_done ()
179 {
180         boost::mutex::scoped_lock lock (_state_mutex);
181         
182         struct timeval tv;
183         gettimeofday (&tv, 0);
184         _time_history.push_front (tv);
185         if (int (_time_history.size()) > _history_size) {
186                 _time_history.pop_back ();
187         }
188 }
189
190 /** Called in order, so each time this is called the supplied frame is the one
191  *  after the previous one.
192  */
193 void
194 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
195 {
196         _waker.nudge ();
197         
198         boost::mutex::scoped_lock lock (_mutex);
199
200         /* XXX: discard 3D here if required */
201
202         /* Wait until the queue has gone down a bit */
203         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
204                 LOG_TIMING ("decoder sleeps with queue of %1", _queue.size());
205                 _full_condition.wait (lock);
206                 LOG_TIMING ("decoder wakes with queue of %1", _queue.size());
207         }
208
209         if (_terminate) {
210                 return;
211         }
212
213         _writer->rethrow ();
214         /* Re-throw any exception raised by one of our threads.  If more
215            than one has thrown an exception, only one will be rethrown, I think;
216            but then, if that happens something has gone badly wrong.
217         */
218         rethrow ();
219
220         if (_writer->can_fake_write (_video_frames_enqueued)) {
221                 /* We can fake-write this frame */
222                 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
223                 frame_done ();
224         } else if (pv->has_j2k ()) {
225                 /* This frame already has JPEG2000 data, so just write it */
226                 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
227         } else if (_last_player_video && pv->same (_last_player_video)) {
228                 _writer->repeat (_video_frames_enqueued, pv->eyes ());
229         } else {
230                 /* Queue this new frame for encoding */
231                 LOG_TIMING ("adding to queue of %1", _queue.size ());
232                 _queue.push_back (shared_ptr<DCPVideo> (
233                                           new DCPVideo (
234                                                   pv,
235                                                   _video_frames_enqueued,
236                                                   _film->video_frame_rate(),
237                                                   _film->j2k_bandwidth(),
238                                                   _film->resolution(),
239                                                   _film->burn_subtitles(),
240                                                   _film->log()
241                                                   )
242                                           ));
243
244                 /* The queue might not be empty any more, so notify anything which is
245                    waiting on that.
246                 */
247                 _empty_condition.notify_all ();
248         }
249
250         if (pv->eyes() != EYES_LEFT) {
251                 ++_video_frames_enqueued;
252         }
253
254         _last_player_video = pv;
255 }
256
257 void
258 Encoder::terminate_threads ()
259 {
260         {
261                 boost::mutex::scoped_lock lock (_mutex);
262                 _terminate = true;
263                 _full_condition.notify_all ();
264                 _empty_condition.notify_all ();
265         }
266
267         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
268                 if ((*i)->joinable ()) {
269                         (*i)->join ();
270                 }
271                 delete *i;
272         }
273
274         _threads.clear ();
275 }
276
277 void
278 Encoder::encoder_thread (optional<ServerDescription> server)
279 try
280 {
281         /* Number of seconds that we currently wait between attempts
282            to connect to the server; not relevant for localhost
283            encodings.
284         */
285         int remote_backoff = 0;
286         
287         while (true) {
288
289                 LOG_TIMING ("[%1] encoder thread sleeps", boost::this_thread::get_id());
290                 boost::mutex::scoped_lock lock (_mutex);
291                 while (_queue.empty () && !_terminate) {
292                         _empty_condition.wait (lock);
293                 }
294
295                 if (_terminate) {
296                         return;
297                 }
298
299                 LOG_TIMING ("[%1] encoder thread wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
300                 shared_ptr<DCPVideo> vf = _queue.front ();
301                 LOG_TIMING ("[%1] encoder thread pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
302                 _queue.pop_front ();
303                 
304                 lock.unlock ();
305
306                 optional<Data> encoded;
307
308                 /* We need to encode this input */
309                 if (server) {
310                         try {
311                                 encoded = vf->encode_remotely (server.get ());
312                                 
313                                 if (remote_backoff > 0) {
314                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
315                                 }
316                                 
317                                 /* This job succeeded, so remove any backoff */
318                                 remote_backoff = 0;
319                                 
320                         } catch (std::exception& e) {
321                                 if (remote_backoff < 60) {
322                                         /* back off more */
323                                         remote_backoff += 10;
324                                 }
325                                 LOG_ERROR (
326                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
327                                         vf->index(), server->host_name(), e.what(), remote_backoff
328                                         );
329                         }
330                         
331                 } else {
332                         try {
333                                 LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index());
334                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
335                                 LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index());
336                         } catch (std::exception& e) {
337                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
338                         }
339                 }
340
341                 if (encoded) {
342                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
343                         frame_done ();
344                 } else {
345                         lock.lock ();
346                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
347                         _queue.push_front (vf);
348                         lock.unlock ();
349                 }
350
351                 if (remote_backoff > 0) {
352                         dcpomatic_sleep (remote_backoff);
353                 }
354
355                 /* The queue might not be full any more, so notify anything that is waiting on that */
356                 lock.lock ();
357                 _full_condition.notify_all ();
358         }
359 }
360 catch (...)
361 {
362         store_current ();
363 }
364
365 void
366 Encoder::server_found (ServerDescription s)
367 {
368         add_worker_threads (s);
369 }