2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
29 #include "dcp_video.h"
33 #include "server_finder.h"
35 #include "player_video.h"
37 #include "server_description.h"
38 #include "compose.hpp"
39 #include <libcxml/cxml.h>
40 #include <boost/foreach.hpp>
45 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
55 int const Encoder::_history_size = 25;
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
61 , _video_frames_enqueued (0)
67 servers_list_changed ();
78 if (!ServerFinder::instance()->disabled ()) {
79 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
86 boost::mutex::scoped_lock lock (_mutex);
88 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
90 /* Keep waking workers until the queue is empty */
91 while (!_queue.empty ()) {
92 _empty_condition.notify_all ();
93 _full_condition.wait (lock);
100 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
102 /* The following sequence of events can occur in the above code:
103 1. a remote worker takes the last image off the queue
104 2. the loop above terminates
105 3. the remote worker fails to encode the image and puts it back on the queue
106 4. the remote worker is then terminated by terminate_threads
108 So just mop up anything left in the queue here.
111 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
112 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
115 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
120 } catch (std::exception& e) {
121 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
126 /** @return an estimate of the current number of frames we are encoding per second,
130 Encoder::current_encoding_rate () const
132 boost::mutex::scoped_lock lock (_state_mutex);
133 if (int (_time_history.size()) < _history_size) {
138 gettimeofday (&now, 0);
140 return _history_size / (seconds (now) - seconds (_time_history.back ()));
143 /** @return Number of video frames that have been sent out */
145 Encoder::video_frames_out () const
147 boost::mutex::scoped_lock (_state_mutex);
148 return _video_frames_enqueued;
151 /** Should be called when a frame has been encoded successfully.
152 * @param n Source frame index.
155 Encoder::frame_done ()
157 boost::mutex::scoped_lock lock (_state_mutex);
160 gettimeofday (&tv, 0);
161 _time_history.push_front (tv);
162 if (int (_time_history.size()) > _history_size) {
163 _time_history.pop_back ();
167 /** Called in order, so each time this is called the supplied frame is the one
168 * after the previous one.
171 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
175 boost::mutex::scoped_lock lock (_mutex);
177 /* XXX: discard 3D here if required */
179 /* Wait until the queue has gone down a bit */
180 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
181 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
182 _full_condition.wait (lock);
183 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
191 /* Re-throw any exception raised by one of our threads. If more
192 than one has thrown an exception, only one will be rethrown, I think;
193 but then, if that happens something has gone badly wrong.
197 if (_writer->can_fake_write (_video_frames_enqueued)) {
198 /* We can fake-write this frame */
199 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
201 } else if (pv->has_j2k ()) {
202 /* This frame already has JPEG2000 data, so just write it */
203 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
204 } else if (_last_player_video && pv->same (_last_player_video)) {
205 _writer->repeat (_video_frames_enqueued, pv->eyes ());
207 /* Queue this new frame for encoding */
208 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
209 _queue.push_back (shared_ptr<DCPVideo> (
212 _video_frames_enqueued,
213 _film->video_frame_rate(),
214 _film->j2k_bandwidth(),
220 /* The queue might not be empty any more, so notify anything which is
223 _empty_condition.notify_all ();
226 switch (pv->eyes ()) {
228 ++_video_frames_enqueued;
240 if (_left_done && _right_done) {
241 ++_video_frames_enqueued;
242 _left_done = _right_done = false;
245 _last_player_video = pv;
249 Encoder::terminate_threads ()
252 boost::mutex::scoped_lock lock (_mutex);
254 _full_condition.notify_all ();
255 _empty_condition.notify_all ();
258 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
259 if ((*i)->joinable ()) {
270 Encoder::encoder_thread (optional<ServerDescription> server)
274 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
276 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
279 /* Number of seconds that we currently wait between attempts
280 to connect to the server; not relevant for localhost
283 int remote_backoff = 0;
287 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
288 boost::mutex::scoped_lock lock (_mutex);
289 while (_queue.empty () && !_terminate) {
290 _empty_condition.wait (lock);
297 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
298 shared_ptr<DCPVideo> vf = _queue.front ();
299 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
304 optional<Data> encoded;
306 /* We need to encode this input */
309 encoded = vf->encode_remotely (server.get ());
311 if (remote_backoff > 0) {
312 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
315 /* This job succeeded, so remove any backoff */
318 } catch (std::exception& e) {
319 if (remote_backoff < 60) {
321 remote_backoff += 10;
324 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
325 vf->index(), server->host_name(), e.what(), remote_backoff
331 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
332 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
333 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
334 } catch (std::exception& e) {
335 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
340 _writer->write (encoded.get(), vf->index (), vf->eyes ());
344 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
345 _queue.push_front (vf);
349 if (remote_backoff > 0) {
350 dcpomatic_sleep (remote_backoff);
353 /* The queue might not be full any more, so notify anything that is waiting on that */
355 _full_condition.notify_all ();
364 Encoder::servers_list_changed ()
366 terminate_threads ();
368 /* XXX: could re-use threads */
370 if (!Config::instance()->only_servers_encode ()) {
371 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
372 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
376 BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
377 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
378 for (int j = 0; j < i.threads(); ++j) {
379 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
383 _writer->set_encoder_threads (_threads.size ());