Tidy up J2KImageProxy a bit.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include "encoder.h"
25 #include "util.h"
26 #include "film.h"
27 #include "log.h"
28 #include "config.h"
29 #include "dcp_video.h"
30 #include "server.h"
31 #include "cross.h"
32 #include "writer.h"
33 #include "server_finder.h"
34 #include "player.h"
35 #include "player_video.h"
36 #include "data.h"
37 #include <libcxml/cxml.h>
38 #include <boost/lambda/lambda.hpp>
39 #include <iostream>
40
41 #include "i18n.h"
42
43 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
44 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
45 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
46
47 using std::pair;
48 using std::string;
49 using std::vector;
50 using std::list;
51 using std::cout;
52 using std::min;
53 using std::make_pair;
54 using boost::shared_ptr;
55 using boost::weak_ptr;
56 using boost::optional;
57 using boost::scoped_array;
58
59 int const Encoder::_history_size = 25;
60
61 /** @param f Film that we are encoding */
62 Encoder::Encoder (shared_ptr<const Film> f, weak_ptr<Job> j, shared_ptr<Writer> writer)
63         : _film (f)
64         , _job (j)
65         , _video_frames_out (0)
66         , _terminate (false)
67         , _writer (writer)
68 {
69
70 }
71
72 Encoder::~Encoder ()
73 {
74         terminate_threads ();
75 }
76
77 /** Add a worker thread for a each thread on a remote server.  Caller must hold
78  *  a lock on _mutex, or know that one is not currently required to
79  *  safely modify _threads.
80  */
81 void
82 Encoder::add_worker_threads (ServerDescription d)
83 {
84         LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), d.threads(), d.host_name ());
85         for (int i = 0; i < d.threads(); ++i) {
86                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
87         }
88
89         _writer->set_encoder_threads (_threads.size ());
90 }
91
92 void
93 Encoder::begin ()
94 {
95         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
96                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
97         }
98
99         _writer->set_encoder_threads (_threads.size ());
100
101         if (!ServerFinder::instance()->disabled ()) {
102                 _server_found_connection = ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
103         }
104 }
105
106 void
107 Encoder::end ()
108 {
109         boost::mutex::scoped_lock lock (_mutex);
110
111         LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
112
113         /* Keep waking workers until the queue is empty */
114         while (!_queue.empty ()) {
115                 _empty_condition.notify_all ();
116                 _full_condition.wait (lock);
117         }
118
119         lock.unlock ();
120         
121         terminate_threads ();
122
123         LOG_GENERAL (N_("Mopping up %1"), _queue.size());
124
125         /* The following sequence of events can occur in the above code:
126              1. a remote worker takes the last image off the queue
127              2. the loop above terminates
128              3. the remote worker fails to encode the image and puts it back on the queue
129              4. the remote worker is then terminated by terminate_threads
130
131              So just mop up anything left in the queue here.
132         */
133
134         for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
135                 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
136                 try {
137                         _writer->write (
138                                 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
139                                 (*i)->index (),
140                                 (*i)->eyes ()
141                                 );
142                         frame_done ();
143                 } catch (std::exception& e) {
144                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
145                 }
146         }
147 }       
148
149 /** @return an estimate of the current number of frames we are encoding per second,
150  *  or 0 if not known.
151  */
152 float
153 Encoder::current_encoding_rate () const
154 {
155         boost::mutex::scoped_lock lock (_state_mutex);
156         if (int (_time_history.size()) < _history_size) {
157                 return 0;
158         }
159
160         struct timeval now;
161         gettimeofday (&now, 0);
162
163         return _history_size / (seconds (now) - seconds (_time_history.back ()));
164 }
165
166 /** @return Number of video frames that have been sent out */
167 int
168 Encoder::video_frames_out () const
169 {
170         boost::mutex::scoped_lock (_state_mutex);
171         return _video_frames_out;
172 }
173
174 /** Should be called when a frame has been encoded successfully.
175  *  @param n Source frame index.
176  */
177 void
178 Encoder::frame_done ()
179 {
180         boost::mutex::scoped_lock lock (_state_mutex);
181         
182         struct timeval tv;
183         gettimeofday (&tv, 0);
184         _time_history.push_front (tv);
185         if (int (_time_history.size()) > _history_size) {
186                 _time_history.pop_back ();
187         }
188 }
189
190 /** Called in order, so each time this is called the supplied frame is the one
191  *  after the previous one.
192  */
193 void
194 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
195 {
196         _waker.nudge ();
197         
198         boost::mutex::scoped_lock lock (_mutex);
199
200         /* XXX: discard 3D here if required */
201
202         /* Wait until the queue has gone down a bit */
203         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
204                 LOG_TIMING ("decoder sleeps with queue of %1", _queue.size());
205                 _full_condition.wait (lock);
206                 LOG_TIMING ("decoder wakes with queue of %1", _queue.size());
207         }
208
209         if (_terminate) {
210                 return;
211         }
212
213         _writer->rethrow ();
214         /* Re-throw any exception raised by one of our threads.  If more
215            than one has thrown an exception, only one will be rethrown, I think;
216            but then, if that happens something has gone badly wrong.
217         */
218         rethrow ();
219
220         if (_writer->can_fake_write (_video_frames_out)) {
221                 /* We can fake-write this frame */
222                 _writer->fake_write (_video_frames_out, pv->eyes ());
223                 frame_done ();
224         } else if (pv->has_j2k ()) {
225                 /* This frame already has JPEG2000 data, so just write it */
226                 _writer->write (pv->j2k(), _video_frames_out, pv->eyes ());
227         } else {
228                 /* Queue this new frame for encoding */
229                 LOG_TIMING ("adding to queue of %1", _queue.size ());
230                 _queue.push_back (shared_ptr<DCPVideo> (
231                                           new DCPVideo (
232                                                   pv,
233                                                   _video_frames_out,
234                                                   _film->video_frame_rate(),
235                                                   _film->j2k_bandwidth(),
236                                                   _film->resolution(),
237                                                   _film->burn_subtitles(),
238                                                   _film->log()
239                                                   )
240                                           ));
241
242                 /* The queue might not be empty any more, so notify anything which is
243                    waiting on that.
244                 */
245                 _empty_condition.notify_all ();
246         }
247
248         if (pv->eyes() != EYES_LEFT) {
249                 ++_video_frames_out;
250         }
251 }
252
253 void
254 Encoder::terminate_threads ()
255 {
256         {
257                 boost::mutex::scoped_lock lock (_mutex);
258                 _terminate = true;
259                 _full_condition.notify_all ();
260                 _empty_condition.notify_all ();
261         }
262
263         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
264                 if ((*i)->joinable ()) {
265                         (*i)->join ();
266                 }
267                 delete *i;
268         }
269
270         _threads.clear ();
271 }
272
273 void
274 Encoder::encoder_thread (optional<ServerDescription> server)
275 try
276 {
277         /* Number of seconds that we currently wait between attempts
278            to connect to the server; not relevant for localhost
279            encodings.
280         */
281         int remote_backoff = 0;
282         shared_ptr<DCPVideo> last_dcp_video;
283         optional<Data> last_encoded;
284         
285         while (true) {
286
287                 LOG_TIMING ("[%1] encoder thread sleeps", boost::this_thread::get_id());
288                 boost::mutex::scoped_lock lock (_mutex);
289                 while (_queue.empty () && !_terminate) {
290                         _empty_condition.wait (lock);
291                 }
292
293                 if (_terminate) {
294                         return;
295                 }
296
297                 LOG_TIMING ("[%1] encoder thread wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
298                 shared_ptr<DCPVideo> vf = _queue.front ();
299                 LOG_TIMING ("[%1] encoder thread pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
300                 _queue.pop_front ();
301                 
302                 lock.unlock ();
303
304                 optional<Data> encoded;
305
306                 if (last_dcp_video && vf->same (last_dcp_video)) {
307                         /* We already have encoded data for the same input as this one, so take a short-cut */
308                         encoded = last_encoded;
309                 } else {
310                         /* We need to encode this input */
311                         if (server) {
312                                 try {
313                                         encoded = vf->encode_remotely (server.get ());
314                                         
315                                         if (remote_backoff > 0) {
316                                                 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
317                                         }
318                                         
319                                         /* This job succeeded, so remove any backoff */
320                                         remote_backoff = 0;
321                                         
322                                 } catch (std::exception& e) {
323                                         if (remote_backoff < 60) {
324                                                 /* back off more */
325                                                 remote_backoff += 10;
326                                         }
327                                         LOG_ERROR (
328                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
329                                                 vf->index(), server->host_name(), e.what(), remote_backoff
330                                                 );
331                                 }
332                                 
333                         } else {
334                                 try {
335                                         LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index());
336                                         encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
337                                         LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index());
338                                 } catch (std::exception& e) {
339                                         LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
340                                 }
341                         }
342                 }
343
344                 last_dcp_video = vf;
345                 last_encoded = encoded;
346
347                 if (encoded) {
348                         _writer->write (encoded.get(), vf->index (), vf->eyes ());
349                         frame_done ();
350                 } else {
351                         lock.lock ();
352                         LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
353                         _queue.push_front (vf);
354                         lock.unlock ();
355                 }
356
357                 if (remote_backoff > 0) {
358                         dcpomatic_sleep (remote_backoff);
359                 }
360
361                 /* The queue might not be full any more, so notify anything that is waiting on that */
362                 lock.lock ();
363                 _full_condition.notify_all ();
364         }
365 }
366 catch (...)
367 {
368         store_current ();
369 }
370
371 void
372 Encoder::server_found (ServerDescription s)
373 {
374         add_worker_threads (s);
375 }