Another attempt to fix clang build.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include <boost/lambda/lambda.hpp>
26 #include <libcxml/cxml.h>
27 #include "encoder.h"
28 #include "util.h"
29 #include "film.h"
30 #include "log.h"
31 #include "config.h"
32 #include "dcp_video_frame.h"
33 #include "server.h"
34 #include "cross.h"
35 #include "writer.h"
36 #include "server_finder.h"
37 #include "player.h"
38
39 #include "i18n.h"
40
41 using std::pair;
42 using std::string;
43 using std::stringstream;
44 using std::vector;
45 using std::list;
46 using std::cout;
47 using std::min;
48 using std::make_pair;
49 using boost::shared_ptr;
50 using boost::weak_ptr;
51 using boost::optional;
52 using boost::scoped_array;
53
54 int const Encoder::_history_size = 25;
55
56 /** @param f Film that we are encoding */
57 Encoder::Encoder (shared_ptr<const Film> f, weak_ptr<Job> j)
58         : _film (f)
59         , _job (j)
60         , _video_frames_out (0)
61         , _terminate (false)
62 {
63         _have_a_real_frame[EYES_BOTH] = false;
64         _have_a_real_frame[EYES_LEFT] = false;
65         _have_a_real_frame[EYES_RIGHT] = false;
66 }
67
68 Encoder::~Encoder ()
69 {
70         terminate_threads ();
71         if (_writer) {
72                 _writer->finish ();
73         }
74 }
75
76 /** Add a worker thread for a each thread on a remote server.  Caller must hold
77  *  a lock on _mutex, or know that one is not currently required to
78  *  safely modify _threads.
79  */
80 void
81 Encoder::add_worker_threads (ServerDescription d)
82 {
83         for (int i = 0; i < d.threads(); ++i) {
84                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
85         }
86 }
87
88 void
89 Encoder::process_begin ()
90 {
91         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
92                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
93         }
94
95         _writer.reset (new Writer (_film, _job));
96         ServerFinder::instance()->connect (boost::bind (&Encoder::server_found, this, _1));
97 }
98
99
100 void
101 Encoder::process_end ()
102 {
103         boost::mutex::scoped_lock lock (_mutex);
104
105         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
106
107         /* Keep waking workers until the queue is empty */
108         while (!_queue.empty ()) {
109                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
110                 _condition.notify_all ();
111                 _condition.wait (lock);
112         }
113
114         lock.unlock ();
115         
116         terminate_threads ();
117
118         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
119
120         /* The following sequence of events can occur in the above code:
121              1. a remote worker takes the last image off the queue
122              2. the loop above terminates
123              3. the remote worker fails to encode the image and puts it back on the queue
124              4. the remote worker is then terminated by terminate_threads
125
126              So just mop up anything left in the queue here.
127         */
128
129         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
130                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
131                 try {
132                         _writer->write ((*i)->encode_locally(), (*i)->frame (), (*i)->eyes ());
133                         frame_done ();
134                 } catch (std::exception& e) {
135                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
136                 }
137         }
138                 
139         _writer->finish ();
140         _writer.reset ();
141 }       
142
143 /** @return an estimate of the current number of frames we are encoding per second,
144  *  or 0 if not known.
145  */
146 float
147 Encoder::current_encoding_rate () const
148 {
149         boost::mutex::scoped_lock lock (_state_mutex);
150         if (int (_time_history.size()) < _history_size) {
151                 return 0;
152         }
153
154         struct timeval now;
155         gettimeofday (&now, 0);
156
157         return _history_size / (seconds (now) - seconds (_time_history.back ()));
158 }
159
160 /** @return Number of video frames that have been sent out */
161 int
162 Encoder::video_frames_out () const
163 {
164         boost::mutex::scoped_lock (_state_mutex);
165         return _video_frames_out;
166 }
167
168 /** Should be called when a frame has been encoded successfully.
169  *  @param n Source frame index.
170  */
171 void
172 Encoder::frame_done ()
173 {
174         boost::mutex::scoped_lock lock (_state_mutex);
175         
176         struct timeval tv;
177         gettimeofday (&tv, 0);
178         _time_history.push_front (tv);
179         if (int (_time_history.size()) > _history_size) {
180                 _time_history.pop_back ();
181         }
182 }
183
184 void
185 Encoder::process_video (shared_ptr<PlayerImage> image, Eyes eyes, ColourConversion conversion, bool same)
186 {
187         boost::mutex::scoped_lock lock (_mutex);
188
189         /* XXX: discard 3D here if required */
190
191         /* Wait until the queue has gone down a bit */
192         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
193                 TIMING ("decoder sleeps with queue of %1", _queue.size());
194                 _condition.wait (lock);
195                 TIMING ("decoder wakes with queue of %1", _queue.size());
196         }
197
198         if (_terminate) {
199                 return;
200         }
201
202         if (_writer->thrown ()) {
203                 _writer->rethrow ();
204         }
205
206         if (_writer->can_fake_write (_video_frames_out)) {
207                 _writer->fake_write (_video_frames_out, eyes);
208                 _have_a_real_frame[eyes] = false;
209                 frame_done ();
210         } else if (same && _have_a_real_frame[eyes]) {
211                 /* Use the last frame that we encoded. */
212                 _writer->repeat (_video_frames_out, eyes);
213                 frame_done ();
214         } else {
215                 /* Queue this new frame for encoding */
216                 TIMING ("adding to queue of %1", _queue.size ());
217                 _queue.push_back (shared_ptr<DCPVideoFrame> (
218                                           new DCPVideoFrame (
219                                                   image->image(PIX_FMT_RGB24, false), _video_frames_out, eyes, conversion, _film->video_frame_rate(),
220                                                   _film->j2k_bandwidth(), _film->resolution(), _film->log()
221                                                   )
222                                           ));
223                 
224                 _condition.notify_all ();
225                 _have_a_real_frame[eyes] = true;
226         }
227
228         if (eyes != EYES_LEFT) {
229                 ++_video_frames_out;
230         }
231 }
232
233 void
234 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
235 {
236         _writer->write (data);
237 }
238
239 void
240 Encoder::terminate_threads ()
241 {
242         {
243                 boost::mutex::scoped_lock lock (_mutex);
244                 _terminate = true;
245                 _condition.notify_all ();
246         }
247
248         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
249                 if ((*i)->joinable ()) {
250                         (*i)->join ();
251                 }
252                 delete *i;
253         }
254
255         _threads.clear ();
256 }
257
258 void
259 Encoder::encoder_thread (optional<ServerDescription> server)
260 {
261         /* Number of seconds that we currently wait between attempts
262            to connect to the server; not relevant for localhost
263            encodings.
264         */
265         int remote_backoff = 0;
266         
267         while (1) {
268
269                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
270                 boost::mutex::scoped_lock lock (_mutex);
271                 while (_queue.empty () && !_terminate) {
272                         _condition.wait (lock);
273                 }
274
275                 if (_terminate) {
276                         return;
277                 }
278
279                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
280                 shared_ptr<DCPVideoFrame> vf = _queue.front ();
281                 TIMING ("encoder thread %1 pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->frame(), vf->eyes ());
282                 _queue.pop_front ();
283                 
284                 lock.unlock ();
285
286                 shared_ptr<EncodedData> encoded;
287
288                 if (server) {
289                         try {
290                                 encoded = vf->encode_remotely (server.get ());
291
292                                 if (remote_backoff > 0) {
293                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
294                                 }
295                                 
296                                 /* This job succeeded, so remove any backoff */
297                                 remote_backoff = 0;
298                                 
299                         } catch (std::exception& e) {
300                                 if (remote_backoff < 60) {
301                                         /* back off more */
302                                         remote_backoff += 10;
303                                 }
304                                 _film->log()->log (
305                                         String::compose (
306                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
307                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
308                                         );
309                         }
310                                 
311                 } else {
312                         try {
313                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
314                                 encoded = vf->encode_locally ();
315                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
316                         } catch (std::exception& e) {
317                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
318                         }
319                 }
320
321                 if (encoded) {
322                         _writer->write (encoded, vf->frame (), vf->eyes ());
323                         frame_done ();
324                 } else {
325                         lock.lock ();
326                         _film->log()->log (
327                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
328                                 );
329                         _queue.push_front (vf);
330                         lock.unlock ();
331                 }
332
333                 if (remote_backoff > 0) {
334                         dcpomatic_sleep (remote_backoff);
335                 }
336
337                 lock.lock ();
338                 _condition.notify_all ();
339         }
340 }
341
342 void
343 Encoder::server_found (ServerDescription s)
344 {
345         add_worker_threads (s);
346 }