Various bits mostly related to colour conversions.
[dcpomatic.git] / src / lib / encoder.cc
1 /*
2     Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
3
4     This program is free software; you can redistribute it and/or modify
5     it under the terms of the GNU General Public License as published by
6     the Free Software Foundation; either version 2 of the License, or
7     (at your option) any later version.
8
9     This program is distributed in the hope that it will be useful,
10     but WITHOUT ANY WARRANTY; without even the implied warranty of
11     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12     GNU General Public License for more details.
13
14     You should have received a copy of the GNU General Public License
15     along with this program; if not, write to the Free Software
16     Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
17
18 */
19
20 /** @file src/encoder.h
21  *  @brief Parent class for classes which can encode video and audio frames.
22  */
23
24 #include <iostream>
25 #include "encoder.h"
26 #include "util.h"
27 #include "film.h"
28 #include "log.h"
29 #include "config.h"
30 #include "dcp_video_frame.h"
31 #include "server.h"
32 #include "cross.h"
33 #include "writer.h"
34
35 #include "i18n.h"
36
37 using std::pair;
38 using std::string;
39 using std::stringstream;
40 using std::vector;
41 using std::list;
42 using std::cout;
43 using std::min;
44 using std::make_pair;
45 using boost::shared_ptr;
46 using boost::optional;
47
48 int const Encoder::_history_size = 25;
49
50 /** @param f Film that we are encoding */
51 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<Job> j)
52         : _film (f)
53         , _job (j)
54         , _video_frames_out (0)
55         , _terminate (false)
56 {
57         _have_a_real_frame[EYES_BOTH] = false;
58         _have_a_real_frame[EYES_LEFT] = false;
59         _have_a_real_frame[EYES_RIGHT] = false;
60 }
61
62 Encoder::~Encoder ()
63 {
64         terminate_threads ();
65         if (_writer) {
66                 _writer->finish ();
67         }
68 }
69
70 void
71 Encoder::process_begin ()
72 {
73         for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
74                 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
75         }
76
77         vector<ServerDescription> servers = Config::instance()->servers ();
78
79         for (vector<ServerDescription>::iterator i = servers.begin(); i != servers.end(); ++i) {
80                 for (int j = 0; j < i->threads (); ++j) {
81                         _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
82                 }
83         }
84
85         _writer.reset (new Writer (_film, _job));
86 }
87
88
89 void
90 Encoder::process_end ()
91 {
92         boost::mutex::scoped_lock lock (_mutex);
93
94         _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
95
96         /* Keep waking workers until the queue is empty */
97         while (!_queue.empty ()) {
98                 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
99                 _condition.notify_all ();
100                 _condition.wait (lock);
101         }
102
103         lock.unlock ();
104         
105         terminate_threads ();
106
107         _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
108
109         /* The following sequence of events can occur in the above code:
110              1. a remote worker takes the last image off the queue
111              2. the loop above terminates
112              3. the remote worker fails to encode the image and puts it back on the queue
113              4. the remote worker is then terminated by terminate_threads
114
115              So just mop up anything left in the queue here.
116         */
117
118         for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
119                 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
120                 try {
121                         _writer->write ((*i)->encode_locally(), (*i)->frame (), (*i)->eyes ());
122                         frame_done ();
123                 } catch (std::exception& e) {
124                         _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
125                 }
126         }
127
128         _writer->finish ();
129         _writer.reset ();
130 }       
131
132 /** @return an estimate of the current number of frames we are encoding per second,
133  *  or 0 if not known.
134  */
135 float
136 Encoder::current_encoding_rate () const
137 {
138         boost::mutex::scoped_lock lock (_history_mutex);
139         if (int (_time_history.size()) < _history_size) {
140                 return 0;
141         }
142
143         struct timeval now;
144         gettimeofday (&now, 0);
145
146         return _history_size / (seconds (now) - seconds (_time_history.back ()));
147 }
148
149 /** @return Number of video frames that have been sent out */
150 int
151 Encoder::video_frames_out () const
152 {
153         boost::mutex::scoped_lock (_history_mutex);
154         return _video_frames_out;
155 }
156
157 /** Should be called when a frame has been encoded successfully.
158  *  @param n Source frame index.
159  */
160 void
161 Encoder::frame_done ()
162 {
163         boost::mutex::scoped_lock lock (_history_mutex);
164         
165         struct timeval tv;
166         gettimeofday (&tv, 0);
167         _time_history.push_front (tv);
168         if (int (_time_history.size()) > _history_size) {
169                 _time_history.pop_back ();
170         }
171 }
172
173 void
174 Encoder::process_video (shared_ptr<const Image> image, Eyes eyes, bool same)
175 {
176         boost::mutex::scoped_lock lock (_mutex);
177
178         /* XXX: discard 3D here if required */
179
180         /* Wait until the queue has gone down a bit */
181         while (_queue.size() >= _threads.size() * 2 && !_terminate) {
182                 TIMING ("decoder sleeps with queue of %1", _queue.size());
183                 _condition.wait (lock);
184                 TIMING ("decoder wakes with queue of %1", _queue.size());
185         }
186
187         if (_terminate) {
188                 return;
189         }
190
191         if (_writer->thrown ()) {
192                 _writer->rethrow ();
193         }
194
195         if (_writer->can_fake_write (_video_frames_out)) {
196                 _writer->fake_write (_video_frames_out, eyes);
197                 _have_a_real_frame[eyes] = false;
198                 frame_done ();
199         } else if (same && _have_a_real_frame[eyes]) {
200                 /* Use the last frame that we encoded. */
201                 _writer->repeat (_video_frames_out, eyes);
202                 frame_done ();
203         } else {
204                 /* Queue this new frame for encoding */
205                 TIMING ("adding to queue of %1", _queue.size ());
206                 _queue.push_back (shared_ptr<DCPVideoFrame> (
207                                           new DCPVideoFrame (
208                                                   image, _video_frames_out, eyes, _film->video_frame_rate(),
209                                                   _film->j2k_bandwidth(), _film->log()
210                                                   )
211                                           ));
212                 
213                 _condition.notify_all ();
214                 _have_a_real_frame[eyes] = true;
215         }
216
217         if (eyes != EYES_LEFT) {
218                 ++_video_frames_out;
219         }
220 }
221
222 void
223 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
224 {
225         _writer->write (data);
226 }
227
228 void
229 Encoder::terminate_threads ()
230 {
231         boost::mutex::scoped_lock lock (_mutex);
232         _terminate = true;
233         _condition.notify_all ();
234         lock.unlock ();
235
236         for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
237                 if ((*i)->joinable ()) {
238                         (*i)->join ();
239                 }
240                 delete *i;
241         }
242
243         _threads.clear ();
244 }
245
246 void
247 Encoder::encoder_thread (optional<ServerDescription> server)
248 {
249         /* Number of seconds that we currently wait between attempts
250            to connect to the server; not relevant for localhost
251            encodings.
252         */
253         int remote_backoff = 0;
254         
255         while (1) {
256
257                 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
258                 boost::mutex::scoped_lock lock (_mutex);
259                 while (_queue.empty () && !_terminate) {
260                         _condition.wait (lock);
261                 }
262
263                 if (_terminate) {
264                         return;
265                 }
266
267                 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
268                 shared_ptr<DCPVideoFrame> vf = _queue.front ();
269                 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 (%3) from queue"), boost::this_thread::get_id(), vf->frame(), vf->eyes ()));
270                 _queue.pop_front ();
271                 
272                 lock.unlock ();
273
274                 shared_ptr<EncodedData> encoded;
275
276                 if (server) {
277                         try {
278                                 encoded = vf->encode_remotely (server.get ());
279
280                                 if (remote_backoff > 0) {
281                                         _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
282                                 }
283                                 
284                                 /* This job succeeded, so remove any backoff */
285                                 remote_backoff = 0;
286                                 
287                         } catch (std::exception& e) {
288                                 if (remote_backoff < 60) {
289                                         /* back off more */
290                                         remote_backoff += 10;
291                                 }
292                                 _film->log()->log (
293                                         String::compose (
294                                                 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
295                                                 vf->frame(), server->host_name(), e.what(), remote_backoff)
296                                         );
297                         }
298                                 
299                 } else {
300                         try {
301                                 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
302                                 encoded = vf->encode_locally ();
303                                 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
304                         } catch (std::exception& e) {
305                                 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
306                         }
307                 }
308
309                 if (encoded) {
310                         _writer->write (encoded, vf->frame (), vf->eyes ());
311                         frame_done ();
312                 } else {
313                         lock.lock ();
314                         _film->log()->log (
315                                 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
316                                 );
317                         _queue.push_front (vf);
318                         lock.unlock ();
319                 }
320
321                 if (remote_backoff > 0) {
322                         dcpomatic_sleep (remote_backoff);
323                 }
324
325                 lock.lock ();
326                 _condition.notify_all ();
327         }
328 }