Rename EncodedData -> Data and trim it a bit.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include <libcxml/cxml.h>
37 #include <boost/lambda/lambda.hpp>
38 #include <iostream>
39
40 #include "i18n.h"
41
42 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
43 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
44 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
45
46 using std::pair;
47 using std::string;
48 using std::vector;
49 using std::list;
50 using std::cout;
51 using std::min;
52 using std::make_pair;
53 using boost::shared_ptr;
54 using boost::weak_ptr;
55 using boost::optional;
56 using boost::scoped_array;
57
58 int const Encoder::_history_size = 25;
59
60 /** @param f Film that we are encoding */
61 Encoder::Encoder (shared_ptr<const Film> f, weak_ptr<Job> j, shared_ptr<Writer> writer)
62         : _film (f)
63         , _job (j)
64         , _video_frames_out (0)
65         , _terminate (false)
66         , _writer (writer)
67 {
68
69 }
70
71 Encoder::~Encoder ()
72 {
73         terminate_threads ();
74 }
75
76 /** Add a worker thread for a each thread on a remote server.  Caller must hold
77  *  a lock on _mutex, or know that one is not currently required to
78  *  safely modify _threads.
79  */
80 void
81 Encoder::add_worker_threads (ServerDescription d)
82 {
83         LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), d.threads(), d.host_name ());
84         for (int i = 0; i < d.threads(); ++i) {
85                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
86         }
87
88         _writer->set_encoder_threads (_threads.size ());
89 }
90
91 void
92 Encoder::begin ()
93 {
94         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
95                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
96         }
97
98         _writer->set_encoder_threads (_threads.size ());
99
100         if (!ServerFinder::instance()->disabled ()) {
101                 _server_found_connection = ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
102         }
103 }
104
105 void
106 Encoder::end ()
107 {
108         boost::mutex::scoped_lock lock (_mutex);
109
110         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
111
112         /* Keep waking workers until the queue is empty */
113         while (!_queue.empty ()) {
114                 _empty_condition.notify_all ();
115                 _full_condition.wait (lock);
116         }
117
118         lock.unlock ();
119         
120         terminate_threads ();
121
122         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
123
124         /* The following sequence of events can occur in the above code:
125              1. a remote worker takes the last image off the queue
126              2. the loop above terminates
127              3. the remote worker fails to encode the image and puts it back on the queue
128              4. the remote worker is then terminated by terminate_threads
129
130              So just mop up anything left in the queue here.
131         */
132
133         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
134                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
135                 try {
136                         _writer->write (
137                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
138                                 (*i)->index (),
139                                 (*i)->eyes ()
140                                 );
141                         frame_done ();
142                 } catch (std::exception& e) {
143                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
144                 }
145         }
146 }       
147
148 /** @return an estimate of the current number of frames we are encoding per second,
149  *  or 0 if not known.
150  */
151 float
152 Encoder::current_encoding_rate () const
153 {
154         boost::mutex::scoped_lock lock (_state_mutex);
155         if (int (_time_history.size()) < _history_size) {
156                 return 0;
157         }
158
159         struct timeval now;
160         gettimeofday (&now, 0);
161
162         return _history_size / (seconds (now) - seconds (_time_history.back ()));
163 }
164
165 /** @return Number of video frames that have been sent out */
166 int
167 Encoder::video_frames_out () const
168 {
169         boost::mutex::scoped_lock (_state_mutex);
170         return _video_frames_out;
171 }
172
173 /** Should be called when a frame has been encoded successfully.
174  *  @param n Source frame index.
175  */
176 void
177 Encoder::frame_done ()
178 {
179         boost::mutex::scoped_lock lock (_state_mutex);
180         
181         struct timeval tv;
182         gettimeofday (&tv, 0);
183         _time_history.push_front (tv);
184         if (int (_time_history.size()) > _history_size) {
185                 _time_history.pop_back ();
186         }
187 }
188
189 /** Called in order, so each time this is called the supplied frame is the one
190  *  after the previous one.
191  */
192 void
193 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
194 {
195         _waker.nudge ();
196         
197         boost::mutex::scoped_lock lock (_mutex);
198
199         /* XXX: discard 3D here if required */
200
201         /* Wait until the queue has gone down a bit */
202         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
203                 LOG_TIMING ("decoder sleeps with queue of %1", _queue.size());
204                 _full_condition.wait (lock);
205                 LOG_TIMING ("decoder wakes with queue of %1", _queue.size());
206         }
207
208         if (_terminate) {
209                 return;
210         }
211
212         _writer->rethrow ();
213         /* Re-throw any exception raised by one of our threads.  If more
214            than one has thrown an exception, only one will be rethrown, I think;
215            but then, if that happens something has gone badly wrong.
216         */
217         rethrow ();
218
219         if (_writer->can_fake_write (_video_frames_out)) {
220                 /* We can fake-write this frame */
221                 _writer->fake_write (_video_frames_out, pv->eyes ());
222                 frame_done ();
223         } else if (pv->has_j2k ()) {
224                 /* This frame already has JPEG2000 data, so just write it */
225                 _writer->write (pv->j2k(), _video_frames_out, pv->eyes ());
226         } else {
227                 /* Queue this new frame for encoding */
228                 LOG_TIMING ("adding to queue of %1", _queue.size ());
229                 _queue.push_back (shared_ptr<DCPVideo> (
230                                           new DCPVideo (
231                                                   pv,
232                                                   _video_frames_out,
233                                                   _film->video_frame_rate(),
234                                                   _film->j2k_bandwidth(),
235                                                   _film->resolution(),
236                                                   _film->burn_subtitles(),
237                                                   _film->log()
238                                                   )
239                                           ));
240
241                 /* The queue might not be empty any more, so notify anything which is
242                    waiting on that.
243                 */
244                 _empty_condition.notify_all ();
245         }
246
247         if (pv->eyes() != EYES_LEFT) {
248                 ++_video_frames_out;
249         }
250 }
251
252 void
253 Encoder::terminate_threads ()
254 {
255         {
256                 boost::mutex::scoped_lock lock (_mutex);
257                 _terminate = true;
258                 _full_condition.notify_all ();
259                 _empty_condition.notify_all ();
260         }
261
262         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
263                 if ((*i)->joinable ()) {
264                         (*i)->join ();
265                 }
266                 delete *i;
267         }
268
269         _threads.clear ();
270 }
271
272 void
273 Encoder::encoder_thread (optional<ServerDescription> server)
274 try
275 {
276         /* Number of seconds that we currently wait between attempts
277            to connect to the server; not relevant for localhost
278            encodings.
279         */
280         int remote_backoff = 0;
281         shared_ptr<DCPVideo> last_dcp_video;
282         shared_ptr<Data> last_encoded;
283         
284         while (true) {
285
286                 LOG_TIMING ("[%1] encoder thread sleeps", boost::this_thread::get_id());
287                 boost::mutex::scoped_lock lock (_mutex);
288                 while (_queue.empty () && !_terminate) {
289                         _empty_condition.wait (lock);
290                 }
291
292                 if (_terminate) {
293                         return;
294                 }
295
296                 LOG_TIMING ("[%1] encoder thread wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
297                 shared_ptr<DCPVideo> vf = _queue.front ();
298                 LOG_TIMING ("[%1] encoder thread pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
299                 _queue.pop_front ();
300                 
301                 lock.unlock ();
302
303                 shared_ptr<Data> encoded;
304
305                 if (last_dcp_video && vf->same (last_dcp_video)) {
306                         /* We already have encoded data for the same input as this one, so take a short-cut */
307                         encoded = last_encoded;
308                 } else {
309                         /* We need to encode this input */
310                         if (server) {
311                                 try {
312                                         encoded = vf->encode_remotely (server.get ());
313                                         
314                                         if (remote_backoff > 0) {
315                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
316                                         }
317                                         
318                                         /* This job succeeded, so remove any backoff */
319                                         remote_backoff = 0;
320                                         
321                                 } catch (std::exception& e) {
322                                         if (remote_backoff < 60) {
323                                                 /* back off more */
324                                                 remote_backoff += 10;
325                                         }
326                                         LOG_ERROR (
327                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
328                                                 vf->index(), server->host_name(), e.what(), remote_backoff
329                                                 );
330                                 }
331                                 
332                         } else {
333                                 try {
334                                         LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index());
335                                         encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
336                                         LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index());
337                                 } catch (std::exception& e) {
338                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
339                                 }
340                         }
341                 }
342
343                 last_dcp_video = vf;
344                 last_encoded = encoded;
345
346                 if (encoded) {
347                         _writer->write (encoded, vf->index (), vf->eyes ());
348                         frame_done ();
349                 } else {
350                         lock.lock ();
351                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
352                         _queue.push_front (vf);
353                         lock.unlock ();
354                 }
355
356                 if (remote_backoff > 0) {
357                         dcpomatic_sleep (remote_backoff);
358                 }
359
360                 /* The queue might not be full any more, so notify anything that is waiting on that */
361                 lock.lock ();
362                 _full_condition.notify_all ();
363         }
364 }
365 catch (...)
366 {
367         store_current ();
368 }
369
370 void
371 Encoder::server_found (ServerDescription s)
372 {
373         add_worker_threads (s);
374 }