2 Copyright (C) 2012 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
30 #include "dcp_video_frame.h"
39 using std::stringstream;
45 using boost::shared_ptr;
46 using boost::optional;
48 int const Encoder::_history_size = 25;
50 /** @param f Film that we are encoding */
51 Encoder::Encoder (shared_ptr<const Film> f, shared_ptr<Job> j)
54 , _video_frames_out (0)
57 _have_a_real_frame[EYES_BOTH] = false;
58 _have_a_real_frame[EYES_LEFT] = false;
59 _have_a_real_frame[EYES_RIGHT] = false;
71 Encoder::process_begin ()
73 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
74 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, (ServerDescription *) 0)));
77 vector<ServerDescription*> servers = Config::instance()->servers ();
79 for (vector<ServerDescription*>::iterator i = servers.begin(); i != servers.end(); ++i) {
80 for (int j = 0; j < (*i)->threads (); ++j) {
81 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, *i)));
85 _writer.reset (new Writer (_film, _job));
90 Encoder::process_end ()
92 boost::mutex::scoped_lock lock (_mutex);
94 _film->log()->log (String::compose (N_("Clearing queue of %1"), _queue.size ()));
96 /* Keep waking workers until the queue is empty */
97 while (!_queue.empty ()) {
98 _film->log()->log (String::compose (N_("Waking with %1"), _queue.size ()), Log::VERBOSE);
99 _condition.notify_all ();
100 _condition.wait (lock);
105 terminate_threads ();
107 _film->log()->log (String::compose (N_("Mopping up %1"), _queue.size()));
109 /* The following sequence of events can occur in the above code:
110 1. a remote worker takes the last image off the queue
111 2. the loop above terminates
112 3. the remote worker fails to encode the image and puts it back on the queue
113 4. the remote worker is then terminated by terminate_threads
115 So just mop up anything left in the queue here.
118 for (list<shared_ptr<DCPVideoFrame> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
119 _film->log()->log (String::compose (N_("Encode left-over frame %1"), (*i)->frame ()));
121 _writer->write ((*i)->encode_locally(), (*i)->frame (), (*i)->eyes ());
123 } catch (std::exception& e) {
124 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
132 /** @return an estimate of the current number of frames we are encoding per second,
136 Encoder::current_encoding_rate () const
138 boost::mutex::scoped_lock lock (_history_mutex);
139 if (int (_time_history.size()) < _history_size) {
144 gettimeofday (&now, 0);
146 return _history_size / (seconds (now) - seconds (_time_history.back ()));
149 /** @return Number of video frames that have been sent out */
151 Encoder::video_frames_out () const
153 boost::mutex::scoped_lock (_history_mutex);
154 return _video_frames_out;
157 /** Should be called when a frame has been encoded successfully.
158 * @param n Source frame index.
161 Encoder::frame_done ()
163 boost::mutex::scoped_lock lock (_history_mutex);
166 gettimeofday (&tv, 0);
167 _time_history.push_front (tv);
168 if (int (_time_history.size()) > _history_size) {
169 _time_history.pop_back ();
174 Encoder::process_video (shared_ptr<const Image> image, Eyes eyes, bool same)
176 boost::mutex::scoped_lock lock (_mutex);
178 /* Wait until the queue has gone down a bit */
179 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
180 TIMING ("decoder sleeps with queue of %1", _queue.size());
181 _condition.wait (lock);
182 TIMING ("decoder wakes with queue of %1", _queue.size());
189 if (_writer->thrown ()) {
193 if (_writer->can_fake_write (_video_frames_out)) {
194 _writer->fake_write (_video_frames_out, eyes);
195 _have_a_real_frame[eyes] = false;
197 } else if (same && _have_a_real_frame[eyes]) {
198 /* Use the last frame that we encoded. */
199 _writer->repeat (_video_frames_out, eyes);
202 /* Queue this new frame for encoding */
203 TIMING ("adding to queue of %1", _queue.size ());
204 _queue.push_back (shared_ptr<DCPVideoFrame> (
206 image, _video_frames_out, eyes, _film->dcp_video_frame_rate(),
207 _film->j2k_bandwidth(), _film->log()
211 _condition.notify_all ();
212 _have_a_real_frame[eyes] = true;
219 Encoder::process_audio (shared_ptr<const AudioBuffers> data)
221 _writer->write (data);
225 Encoder::terminate_threads ()
227 boost::mutex::scoped_lock lock (_mutex);
229 _condition.notify_all ();
232 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
233 if ((*i)->joinable ()) {
243 Encoder::encoder_thread (ServerDescription* server)
245 /* Number of seconds that we currently wait between attempts
246 to connect to the server; not relevant for localhost
249 int remote_backoff = 0;
253 TIMING ("encoder thread %1 sleeps", boost::this_thread::get_id());
254 boost::mutex::scoped_lock lock (_mutex);
255 while (_queue.empty () && !_terminate) {
256 _condition.wait (lock);
263 TIMING ("encoder thread %1 wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
264 shared_ptr<DCPVideoFrame> vf = _queue.front ();
265 _film->log()->log (String::compose (N_("Encoder thread %1 pops frame %2 from queue"), boost::this_thread::get_id(), vf->frame()), Log::VERBOSE);
270 shared_ptr<EncodedData> encoded;
274 encoded = vf->encode_remotely (server);
276 if (remote_backoff > 0) {
277 _film->log()->log (String::compose (N_("%1 was lost, but now she is found; removing backoff"), server->host_name ()));
280 /* This job succeeded, so remove any backoff */
283 } catch (std::exception& e) {
284 if (remote_backoff < 60) {
286 remote_backoff += 10;
290 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
291 vf->frame(), server->host_name(), e.what(), remote_backoff)
297 TIMING ("encoder thread %1 begins local encode of %2", boost::this_thread::get_id(), vf->frame());
298 encoded = vf->encode_locally ();
299 TIMING ("encoder thread %1 finishes local encode of %2", boost::this_thread::get_id(), vf->frame());
300 } catch (std::exception& e) {
301 _film->log()->log (String::compose (N_("Local encode failed (%1)"), e.what ()));
306 _writer->write (encoded, vf->frame (), vf->eyes ());
311 String::compose (N_("Encoder thread %1 pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->frame())
313 _queue.push_front (vf);
317 if (remote_backoff > 0) {
318 dcpomatic_sleep (remote_backoff);
322 _condition.notify_all ();