Merge branch '2.0' of ssh://git.carlh.net/home/carl/git/dcpomatic2 into 2.0
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include <libcxml/cxml.h>
38 #include <boost/lambda/lambda.hpp>
39 #include <boost/foreach.hpp>
40 #include <iostream>
41
42 #include "i18n.h"
43
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
45 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
46 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
47
48 using std::pair;
49 using std::string;
50 using std::vector;
51 using std::list;
52 using std::cout;
53 using std::min;
54 using std::make_pair;
55 using boost::shared_ptr;
56 using boost::weak_ptr;
57 using boost::optional;
58 using boost::scoped_array;
59
60 int const Encoder::_history_size = 25;
61
62 /** @param f Film that we are encoding */
63 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
64         : _film (film)
65         , _job (j)
66         , _video_frames_enqueued (0)
67         , _terminate (false)
68         , _writer (writer)
69 {
70         servers_list_changed ();
71 }
72
73 Encoder::~Encoder ()
74 {
75         terminate_threads ();
76 }
77
78 /** Add a worker thread for a each thread on a remote server.  Caller must hold
79  *  a lock on _mutex, or know that one is not currently required to
80  *  safely modify _threads.
81  */
82 void
83 Encoder::add_worker_threads (ServerDescription d)
84 {
85         LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), d.threads(), d.host_name ());
86         for (int i = 0; i < d.threads(); ++i) {
87                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
88         }
89
90         _writer->set_encoder_threads (_threads.size ());
91 }
92
93 void
94 Encoder::begin ()
95 {
96         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
97                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
98         }
99
100         _writer->set_encoder_threads (_threads.size ());
101
102         if (!ServerFinder::instance()->disabled ()) {
103                 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
104         }
105 }
106
107 void
108 Encoder::end ()
109 {
110         boost::mutex::scoped_lock lock (_mutex);
111
112         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
113
114         /* Keep waking workers until the queue is empty */
115         while (!_queue.empty ()) {
116                 _empty_condition.notify_all ();
117                 _full_condition.wait (lock);
118         }
119
120         lock.unlock ();
121
122         terminate_threads ();
123
124         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
125
126         /* The following sequence of events can occur in the above code:
127              1. a remote worker takes the last image off the queue
128              2. the loop above terminates
129              3. the remote worker fails to encode the image and puts it back on the queue
130              4. the remote worker is then terminated by terminate_threads
131
132              So just mop up anything left in the queue here.
133         */
134
135         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
136                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
137                 try {
138                         _writer->write (
139                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
140                                 (*i)->index (),
141                                 (*i)->eyes ()
142                                 );
143                         frame_done ();
144                 } catch (std::exception& e) {
145                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
146                 }
147         }
148 }
149
150 /** @return an estimate of the current number of frames we are encoding per second,
151  *  or 0 if not known.
152  */
153 float
154 Encoder::current_encoding_rate () const
155 {
156         boost::mutex::scoped_lock lock (_state_mutex);
157         if (int (_time_history.size()) < _history_size) {
158                 return 0;
159         }
160
161         struct timeval now;
162         gettimeofday (&now, 0);
163
164         return _history_size / (seconds (now) - seconds (_time_history.back ()));
165 }
166
167 /** @return Number of video frames that have been sent out */
168 int
169 Encoder::video_frames_out () const
170 {
171         boost::mutex::scoped_lock (_state_mutex);
172         return _video_frames_enqueued;
173 }
174
175 /** Should be called when a frame has been encoded successfully.
176  *  @param n Source frame index.
177  */
178 void
179 Encoder::frame_done ()
180 {
181         boost::mutex::scoped_lock lock (_state_mutex);
182
183         struct timeval tv;
184         gettimeofday (&tv, 0);
185         _time_history.push_front (tv);
186         if (int (_time_history.size()) > _history_size) {
187                 _time_history.pop_back ();
188         }
189 }
190
191 /** Called in order, so each time this is called the supplied frame is the one
192  *  after the previous one.
193  */
194 void
195 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
196 {
197         _waker.nudge ();
198
199         boost::mutex::scoped_lock lock (_mutex);
200
201         /* XXX: discard 3D here if required */
202
203         /* Wait until the queue has gone down a bit */
204         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
205                 LOG_TIMING ("decoder sleeps with queue of %1", _queue.size());
206                 _full_condition.wait (lock);
207                 LOG_TIMING ("decoder wakes with queue of %1", _queue.size());
208         }
209
210         if (_terminate) {
211                 return;
212         }
213
214         _writer->rethrow ();
215         /* Re-throw any exception raised by one of our threads.  If more
216            than one has thrown an exception, only one will be rethrown, I think;
217            but then, if that happens something has gone badly wrong.
218         */
219         rethrow ();
220
221         if (_writer->can_fake_write (_video_frames_enqueued)) {
222                 /* We can fake-write this frame */
223                 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
224                 frame_done ();
225         } else if (pv->has_j2k ()) {
226                 /* This frame already has JPEG2000 data, so just write it */
227                 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
228         } else if (_last_player_video && pv->same (_last_player_video)) {
229                 _writer->repeat (_video_frames_enqueued, pv->eyes ());
230         } else {
231                 /* Queue this new frame for encoding */
232                 LOG_TIMING ("adding to queue of %1", _queue.size ());
233                 _queue.push_back (shared_ptr<DCPVideo> (
234                                           new DCPVideo (
235                                                   pv,
236                                                   _video_frames_enqueued,
237                                                   _film->video_frame_rate(),
238                                                   _film->j2k_bandwidth(),
239                                                   _film->resolution(),
240                                                   _film->log()
241                                                   )
242                                           ));
243
244                 /* The queue might not be empty any more, so notify anything which is
245                    waiting on that.
246                 */
247                 _empty_condition.notify_all ();
248         }
249
250         if (pv->eyes() != EYES_LEFT) {
251                 ++_video_frames_enqueued;
252         }
253
254         _last_player_video = pv;
255 }
256
257 void
258 Encoder::terminate_threads ()
259 {
260         {
261                 boost::mutex::scoped_lock lock (_mutex);
262                 _terminate = true;
263                 _full_condition.notify_all ();
264                 _empty_condition.notify_all ();
265         }
266
267         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
268                 if ((*i)->joinable ()) {
269                         (*i)->join ();
270                 }
271                 delete *i;
272         }
273
274         _threads.clear ();
275         _terminate = false;
276 }
277
278 void
279 Encoder::encoder_thread (optional<ServerDescription> server)
280 try
281 {
282         /* Number of seconds that we currently wait between attempts
283            to connect to the server; not relevant for localhost
284            encodings.
285         */
286         int remote_backoff = 0;
287
288         while (true) {
289
290                 LOG_TIMING ("[%1] encoder thread sleeps", boost::this_thread::get_id());
291                 boost::mutex::scoped_lock lock (_mutex);
292                 while (_queue.empty () && !_terminate) {
293                         _empty_condition.wait (lock);
294                 }
295
296                 if (_terminate) {
297                         return;
298                 }
299
300                 LOG_TIMING ("[%1] encoder thread wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
301                 shared_ptr<DCPVideo> vf = _queue.front ();
302                 LOG_TIMING ("[%1] encoder thread pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
303                 _queue.pop_front ();
304
305                 lock.unlock ();
306
307                 optional<Data> encoded;
308
309                 /* We need to encode this input */
310                 if (server) {
311                         try {
312                                 encoded = vf->encode_remotely (server.get ());
313
314                                 if (remote_backoff > 0) {
315                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
316                                 }
317
318                                 /* This job succeeded, so remove any backoff */
319                                 remote_backoff = 0;
320
321                         } catch (std::exception& e) {
322                                 if (remote_backoff < 60) {
323                                         /* back off more */
324                                         remote_backoff += 10;
325                                 }
326                                 LOG_ERROR (
327                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
328                                         vf->index(), server->host_name(), e.what(), remote_backoff
329                                         );
330                         }
331
332                 } else {
333                         try {
334                                 LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index());
335                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
336                                 LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index());
337                         } catch (std::exception& e) {
338                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
339                         }
340                 }
341
342                 if (encoded) {
343                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
344                         frame_done ();
345                 } else {
346                         lock.lock ();
347                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
348                         _queue.push_front (vf);
349                         lock.unlock ();
350                 }
351
352                 if (remote_backoff > 0) {
353                         dcpomatic_sleep (remote_backoff);
354                 }
355
356                 /* The queue might not be full any more, so notify anything that is waiting on that */
357                 lock.lock ();
358                 _full_condition.notify_all ();
359         }
360 }
361 catch (...)
362 {
363         store_current ();
364 }
365
366 void
367 Encoder::servers_list_changed ()
368 {
369         terminate_threads ();
370         BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
371                 add_worker_threads (i);
372         }
373 }