Move ServerDescription into its own header.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include "server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
41 #include <iostream>
42
43 #include "i18n.h"
44
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
48
49 using std::list;
50 using std::cout;
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
54
55 int const Encoder::_history_size = 25;
56
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
59         : _film (film)
60         , _job (j)
61         , _video_frames_enqueued (0)
62         , _left_done (false)
63         , _right_done (false)
64         , _terminate (false)
65         , _writer (writer)
66 {
67         servers_list_changed ();
68 }
69
70 Encoder::~Encoder ()
71 {
72         terminate_threads ();
73 }
74
75 void
76 Encoder::begin ()
77 {
78         if (!ServerFinder::instance()->disabled ()) {
79                 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
80         }
81 }
82
83 void
84 Encoder::end ()
85 {
86         boost::mutex::scoped_lock lock (_mutex);
87
88         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
89
90         /* Keep waking workers until the queue is empty */
91         while (!_queue.empty ()) {
92                 _empty_condition.notify_all ();
93                 _full_condition.wait (lock);
94         }
95
96         lock.unlock ();
97
98         terminate_threads ();
99
100         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
101
102         /* The following sequence of events can occur in the above code:
103              1. a remote worker takes the last image off the queue
104              2. the loop above terminates
105              3. the remote worker fails to encode the image and puts it back on the queue
106              4. the remote worker is then terminated by terminate_threads
107
108              So just mop up anything left in the queue here.
109         */
110
111         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
112                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
113                 try {
114                         _writer->write (
115                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
116                                 (*i)->index (),
117                                 (*i)->eyes ()
118                                 );
119                         frame_done ();
120                 } catch (std::exception& e) {
121                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
122                 }
123         }
124 }
125
126 /** @return an estimate of the current number of frames we are encoding per second,
127  *  or 0 if not known.
128  */
129 float
130 Encoder::current_encoding_rate () const
131 {
132         boost::mutex::scoped_lock lock (_state_mutex);
133         if (int (_time_history.size()) < _history_size) {
134                 return 0;
135         }
136
137         struct timeval now;
138         gettimeofday (&now, 0);
139
140         return _history_size / (seconds (now) - seconds (_time_history.back ()));
141 }
142
143 /** @return Number of video frames that have been sent out */
144 int
145 Encoder::video_frames_out () const
146 {
147         boost::mutex::scoped_lock (_state_mutex);
148         return _video_frames_enqueued;
149 }
150
151 /** Should be called when a frame has been encoded successfully.
152  *  @param n Source frame index.
153  */
154 void
155 Encoder::frame_done ()
156 {
157         boost::mutex::scoped_lock lock (_state_mutex);
158
159         struct timeval tv;
160         gettimeofday (&tv, 0);
161         _time_history.push_front (tv);
162         if (int (_time_history.size()) > _history_size) {
163                 _time_history.pop_back ();
164         }
165 }
166
167 /** Called in order, so each time this is called the supplied frame is the one
168  *  after the previous one.
169  */
170 void
171 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
172 {
173         _waker.nudge ();
174
175         boost::mutex::scoped_lock lock (_mutex);
176
177         /* XXX: discard 3D here if required */
178
179         /* Wait until the queue has gone down a bit */
180         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
181                 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
182                 _full_condition.wait (lock);
183                 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
184         }
185
186         if (_terminate) {
187                 return;
188         }
189
190         _writer->rethrow ();
191         /* Re-throw any exception raised by one of our threads.  If more
192            than one has thrown an exception, only one will be rethrown, I think;
193            but then, if that happens something has gone badly wrong.
194         */
195         rethrow ();
196
197         if (_writer->can_fake_write (_video_frames_enqueued)) {
198                 /* We can fake-write this frame */
199                 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
200                 frame_done ();
201         } else if (pv->has_j2k ()) {
202                 /* This frame already has JPEG2000 data, so just write it */
203                 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
204         } else if (_last_player_video && pv->same (_last_player_video)) {
205                 _writer->repeat (_video_frames_enqueued, pv->eyes ());
206         } else {
207                 /* Queue this new frame for encoding */
208                 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
209                 _queue.push_back (shared_ptr<DCPVideo> (
210                                           new DCPVideo (
211                                                   pv,
212                                                   _video_frames_enqueued,
213                                                   _film->video_frame_rate(),
214                                                   _film->j2k_bandwidth(),
215                                                   _film->resolution(),
216                                                   _film->log()
217                                                   )
218                                           ));
219
220                 /* The queue might not be empty any more, so notify anything which is
221                    waiting on that.
222                 */
223                 _empty_condition.notify_all ();
224         }
225
226         switch (pv->eyes ()) {
227         case EYES_BOTH:
228                 ++_video_frames_enqueued;
229                 break;
230         case EYES_LEFT:
231                 _left_done = true;
232                 break;
233         case EYES_RIGHT:
234                 _right_done = true;
235                 break;
236         default:
237                 break;
238         }
239
240         if (_left_done && _right_done) {
241                 ++_video_frames_enqueued;
242                 _left_done = _right_done = false;
243         }
244
245         _last_player_video = pv;
246 }
247
248 void
249 Encoder::terminate_threads ()
250 {
251         {
252                 boost::mutex::scoped_lock lock (_mutex);
253                 _terminate = true;
254                 _full_condition.notify_all ();
255                 _empty_condition.notify_all ();
256         }
257
258         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
259                 if ((*i)->joinable ()) {
260                         (*i)->join ();
261                 }
262                 delete *i;
263         }
264
265         _threads.clear ();
266         _terminate = false;
267 }
268
269 void
270 Encoder::encoder_thread (optional<ServerDescription> server)
271 try
272 {
273         if (server) {
274                 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
275         } else {
276                 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
277         }
278
279         /* Number of seconds that we currently wait between attempts
280            to connect to the server; not relevant for localhost
281            encodings.
282         */
283         int remote_backoff = 0;
284
285         while (true) {
286
287                 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
288                 boost::mutex::scoped_lock lock (_mutex);
289                 while (_queue.empty () && !_terminate) {
290                         _empty_condition.wait (lock);
291                 }
292
293                 if (_terminate) {
294                         return;
295                 }
296
297                 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
298                 shared_ptr<DCPVideo> vf = _queue.front ();
299                 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
300                 _queue.pop_front ();
301
302                 lock.unlock ();
303
304                 optional<Data> encoded;
305
306                 /* We need to encode this input */
307                 if (server) {
308                         try {
309                                 encoded = vf->encode_remotely (server.get ());
310
311                                 if (remote_backoff > 0) {
312                                         LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
313                                 }
314
315                                 /* This job succeeded, so remove any backoff */
316                                 remote_backoff = 0;
317
318                         } catch (std::exception& e) {
319                                 if (remote_backoff < 60) {
320                                         /* back off more */
321                                         remote_backoff += 10;
322                                 }
323                                 LOG_ERROR (
324                                         N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
325                                         vf->index(), server->host_name(), e.what(), remote_backoff
326                                         );
327                         }
328
329                 } else {
330                         try {
331                                 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
332                                 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
333                                 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
334                         } catch (std::exception& e) {
335                                 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
336                         }
337                 }
338
339                 if (encoded) {
340                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
341                         frame_done ();
342                 } else {
343                         lock.lock ();
344                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
345                         _queue.push_front (vf);
346                         lock.unlock ();
347                 }
348
349                 if (remote_backoff > 0) {
350                         dcpomatic_sleep (remote_backoff);
351                 }
352
353                 /* The queue might not be full any more, so notify anything that is waiting on that */
354                 lock.lock ();
355                 _full_condition.notify_all ();
356         }
357 }
358 catch (...)
359 {
360         store_current ();
361 }
362
363 void
364 Encoder::servers_list_changed ()
365 {
366         terminate_threads ();
367
368         /* XXX: could re-use threads */
369
370         if (!Config::instance()->only_servers_encode ()) {
371                 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
372                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
373                 }
374         }
375
376         BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
377                 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
378                 for (int j = 0; j < i.threads(); ++j) {
379                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
380                 }
381         }
382
383         _writer->set_encoder_threads (_threads.size ());
384 }