2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
29 #include "dcp_video.h"
33 #include "server_finder.h"
35 #include "player_video.h"
37 #include <libcxml/cxml.h>
38 #include <boost/lambda/lambda.hpp>
39 #include <boost/foreach.hpp>
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
45 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
46 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
55 using boost::shared_ptr;
56 using boost::weak_ptr;
57 using boost::optional;
58 using boost::scoped_array;
60 int const Encoder::_history_size = 25;
62 /** @param f Film that we are encoding */
63 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
66 , _video_frames_enqueued (0)
72 servers_list_changed ();
80 /** Add a worker thread for a each thread on a remote server. Caller must hold
81 * a lock on _mutex, or know that one is not currently required to
82 * safely modify _threads.
85 Encoder::add_worker_threads (ServerDescription d)
87 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), d.threads(), d.host_name ());
88 for (int i = 0; i < d.threads(); ++i) {
89 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, d)));
92 _writer->set_encoder_threads (_threads.size ());
98 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
99 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
102 _writer->set_encoder_threads (_threads.size ());
104 if (!ServerFinder::instance()->disabled ()) {
105 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
112 boost::mutex::scoped_lock lock (_mutex);
114 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
116 /* Keep waking workers until the queue is empty */
117 while (!_queue.empty ()) {
118 _empty_condition.notify_all ();
119 _full_condition.wait (lock);
124 terminate_threads ();
126 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
128 /* The following sequence of events can occur in the above code:
129 1. a remote worker takes the last image off the queue
130 2. the loop above terminates
131 3. the remote worker fails to encode the image and puts it back on the queue
132 4. the remote worker is then terminated by terminate_threads
134 So just mop up anything left in the queue here.
137 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
138 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
141 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
146 } catch (std::exception& e) {
147 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
152 /** @return an estimate of the current number of frames we are encoding per second,
156 Encoder::current_encoding_rate () const
158 boost::mutex::scoped_lock lock (_state_mutex);
159 if (int (_time_history.size()) < _history_size) {
164 gettimeofday (&now, 0);
166 return _history_size / (seconds (now) - seconds (_time_history.back ()));
169 /** @return Number of video frames that have been sent out */
171 Encoder::video_frames_out () const
173 boost::mutex::scoped_lock (_state_mutex);
174 return _video_frames_enqueued;
177 /** Should be called when a frame has been encoded successfully.
178 * @param n Source frame index.
181 Encoder::frame_done ()
183 boost::mutex::scoped_lock lock (_state_mutex);
186 gettimeofday (&tv, 0);
187 _time_history.push_front (tv);
188 if (int (_time_history.size()) > _history_size) {
189 _time_history.pop_back ();
193 /** Called in order, so each time this is called the supplied frame is the one
194 * after the previous one.
197 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
201 boost::mutex::scoped_lock lock (_mutex);
203 /* XXX: discard 3D here if required */
205 /* Wait until the queue has gone down a bit */
206 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
207 LOG_TIMING ("decoder sleeps with queue of %1", _queue.size());
208 _full_condition.wait (lock);
209 LOG_TIMING ("decoder wakes with queue of %1", _queue.size());
217 /* Re-throw any exception raised by one of our threads. If more
218 than one has thrown an exception, only one will be rethrown, I think;
219 but then, if that happens something has gone badly wrong.
223 if (_writer->can_fake_write (_video_frames_enqueued)) {
224 /* We can fake-write this frame */
225 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
227 } else if (pv->has_j2k ()) {
228 /* This frame already has JPEG2000 data, so just write it */
229 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
230 } else if (_last_player_video && pv->same (_last_player_video)) {
231 _writer->repeat (_video_frames_enqueued, pv->eyes ());
233 /* Queue this new frame for encoding */
234 LOG_TIMING ("adding to queue of %1", _queue.size ());
235 _queue.push_back (shared_ptr<DCPVideo> (
238 _video_frames_enqueued,
239 _film->video_frame_rate(),
240 _film->j2k_bandwidth(),
246 /* The queue might not be empty any more, so notify anything which is
249 _empty_condition.notify_all ();
252 switch (pv->eyes ()) {
254 ++_video_frames_enqueued;
266 if (_left_done && _right_done) {
267 ++_video_frames_enqueued;
268 _left_done = _right_done = false;
271 _last_player_video = pv;
275 Encoder::terminate_threads ()
278 boost::mutex::scoped_lock lock (_mutex);
280 _full_condition.notify_all ();
281 _empty_condition.notify_all ();
284 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
285 if ((*i)->joinable ()) {
296 Encoder::encoder_thread (optional<ServerDescription> server)
299 /* Number of seconds that we currently wait between attempts
300 to connect to the server; not relevant for localhost
303 int remote_backoff = 0;
307 LOG_TIMING ("[%1] encoder thread sleeps", boost::this_thread::get_id());
308 boost::mutex::scoped_lock lock (_mutex);
309 while (_queue.empty () && !_terminate) {
310 _empty_condition.wait (lock);
317 LOG_TIMING ("[%1] encoder thread wakes with queue of %2", boost::this_thread::get_id(), _queue.size());
318 shared_ptr<DCPVideo> vf = _queue.front ();
319 LOG_TIMING ("[%1] encoder thread pops frame %2 (%3) from queue", boost::this_thread::get_id(), vf->index(), vf->eyes ());
324 optional<Data> encoded;
326 /* We need to encode this input */
329 encoded = vf->encode_remotely (server.get ());
331 if (remote_backoff > 0) {
332 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
335 /* This job succeeded, so remove any backoff */
338 } catch (std::exception& e) {
339 if (remote_backoff < 60) {
341 remote_backoff += 10;
344 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
345 vf->index(), server->host_name(), e.what(), remote_backoff
351 LOG_TIMING ("[%1] encoder thread begins local encode of %2", boost::this_thread::get_id(), vf->index());
352 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
353 LOG_TIMING ("[%1] encoder thread finishes local encode of %2", boost::this_thread::get_id(), vf->index());
354 } catch (std::exception& e) {
355 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
360 _writer->write (encoded.get(), vf->index (), vf->eyes ());
364 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
365 _queue.push_front (vf);
369 if (remote_backoff > 0) {
370 dcpomatic_sleep (remote_backoff);
373 /* The queue might not be full any more, so notify anything that is waiting on that */
375 _full_condition.notify_all ();
384 Encoder::servers_list_changed ()
386 terminate_threads ();
387 BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
388 add_worker_threads (i);