2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
29 #include "dcp_video.h"
33 #include "server_finder.h"
35 #include "player_video.h"
36 #include "server_description.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
45 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
46 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
47 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
51 using boost::shared_ptr;
52 using boost::weak_ptr;
53 using boost::optional;
56 int const Encoder::_history_size = 25;
58 /** @param f Film that we are encoding */
59 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
63 , _terminate_enqueue (false)
64 , _terminate_encoding (false)
67 servers_list_changed ();
74 boost::mutex::scoped_lock lm (_queue_mutex);
75 _terminate_enqueue = true;
76 _full_condition.notify_all ();
77 _empty_condition.notify_all ();
83 if (!ServerFinder::instance()->disabled ()) {
84 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
91 boost::mutex::scoped_lock lock (_queue_mutex);
93 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
95 /* Keep waking workers until the queue is empty */
96 while (!_queue.empty ()) {
98 _empty_condition.notify_all ();
99 _full_condition.wait (lock);
104 LOG_GENERAL_NC (N_("Terminating encoder threads"));
106 terminate_threads ();
108 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
110 /* The following sequence of events can occur in the above code:
111 1. a remote worker takes the last image off the queue
112 2. the loop above terminates
113 3. the remote worker fails to encode the image and puts it back on the queue
114 4. the remote worker is then terminated by terminate_threads
116 So just mop up anything left in the queue here.
119 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
120 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
123 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
128 } catch (std::exception& e) {
129 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
134 /** @return an estimate of the current number of frames we are encoding per second,
138 Encoder::current_encoding_rate () const
140 boost::mutex::scoped_lock lock (_state_mutex);
141 if (int (_time_history.size()) < _history_size) {
146 gettimeofday (&now, 0);
148 return _history_size / (seconds (now) - seconds (_time_history.back ()));
151 /** @return Number of video frames that have been sent out */
153 Encoder::video_frames_out () const
155 boost::mutex::scoped_lock (_state_mutex);
159 /** Should be called when a frame has been encoded successfully.
160 * @param n Source frame index.
163 Encoder::frame_done ()
165 boost::mutex::scoped_lock lock (_state_mutex);
168 gettimeofday (&tv, 0);
169 _time_history.push_front (tv);
170 if (int (_time_history.size()) > _history_size) {
171 _time_history.pop_back ();
175 /** Called to start encoding of the next video frame in the DCP. This is called in order,
176 * so each time the supplied frame is the one after the previous one.
177 * pv represents one video frame, and could be empty if there is nothing to encode
178 * for this DCP frame.
181 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
183 BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
190 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
196 boost::mutex::scoped_lock threads_lock (_threads_mutex);
197 threads = _threads.size ();
200 boost::mutex::scoped_lock queue_lock (_queue_mutex);
202 /* XXX: discard 3D here if required */
204 /* Wait until the queue has gone down a bit */
205 while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
206 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
207 _full_condition.wait (queue_lock);
208 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
211 if (_terminate_enqueue) {
216 /* Re-throw any exception raised by one of our threads. If more
217 than one has thrown an exception, only one will be rethrown, I think;
218 but then, if that happens something has gone badly wrong.
222 if (_writer->can_fake_write (_position)) {
223 /* We can fake-write this frame */
224 _writer->fake_write (_position, pv->eyes ());
226 } else if (pv->has_j2k ()) {
227 /* This frame already has JPEG2000 data, so just write it */
228 _writer->write (pv->j2k(), _position, pv->eyes ());
229 } else if (_last_player_video && _writer->can_repeat(_position) && pv->same (_last_player_video)) {
230 _writer->repeat (_position, pv->eyes ());
232 /* Queue this new frame for encoding */
233 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
234 _queue.push_back (shared_ptr<DCPVideo> (
238 _film->video_frame_rate(),
239 _film->j2k_bandwidth(),
245 /* The queue might not be empty any more, so notify anything which is
248 _empty_condition.notify_all ();
251 _last_player_video = pv;
255 Encoder::terminate_threads ()
258 boost::mutex::scoped_lock queue_lock (_queue_mutex);
259 _terminate_encoding = true;
262 boost::mutex::scoped_lock threads_lock (_threads_mutex);
264 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
266 if ((*i)->joinable ()) {
273 _terminate_encoding = false;
277 Encoder::encoder_thread (optional<ServerDescription> server)
281 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
283 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
286 /* Number of seconds that we currently wait between attempts
287 to connect to the server; not relevant for localhost
290 int remote_backoff = 0;
294 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
295 boost::mutex::scoped_lock lock (_queue_mutex);
296 while (_queue.empty () && !_terminate_encoding) {
297 _empty_condition.wait (lock);
300 if (_terminate_encoding) {
304 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
305 shared_ptr<DCPVideo> vf = _queue.front ();
306 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
311 optional<Data> encoded;
313 /* We need to encode this input */
316 encoded = vf->encode_remotely (server.get ());
318 if (remote_backoff > 0) {
319 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
322 /* This job succeeded, so remove any backoff */
325 } catch (std::exception& e) {
326 if (remote_backoff < 60) {
328 remote_backoff += 10;
331 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
332 vf->index(), server->host_name(), e.what(), remote_backoff
338 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
339 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
340 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
341 } catch (std::exception& e) {
342 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
348 _writer->write (encoded.get(), vf->index (), vf->eyes ());
352 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
353 _queue.push_front (vf);
357 if (remote_backoff > 0) {
358 boost::this_thread::sleep_for (boost::chrono::seconds (remote_backoff));
361 /* The queue might not be full any more, so notify anything that is waiting on that */
363 _full_condition.notify_all ();
369 /* Wake anything waiting on _full_condition so it can see the exception */
370 _full_condition.notify_all ();
374 Encoder::servers_list_changed ()
376 terminate_threads ();
378 /* XXX: could re-use threads */
380 boost::mutex::scoped_lock lm (_threads_mutex);
382 if (!Config::instance()->only_servers_encode ()) {
383 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
384 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
388 BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
389 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
390 for (int j = 0; j < i.threads(); ++j) {
391 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
395 _writer->set_encoder_threads (_threads.size ());