2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
29 #include "dcp_video.h"
33 #include "server_finder.h"
35 #include "player_video.h"
37 #include "compose.hpp"
38 #include <libcxml/cxml.h>
39 #include <boost/foreach.hpp>
44 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_GENERAL);
45 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), Log::TYPE_ERROR);
46 #define LOG_TIMING(...) _film->log()->microsecond_log (String::compose (__VA_ARGS__), Log::TYPE_TIMING);
50 using boost::shared_ptr;
51 using boost::weak_ptr;
52 using boost::optional;
54 int const Encoder::_history_size = 25;
56 /** @param f Film that we are encoding */
57 Encoder::Encoder (shared_ptr<const Film> film, weak_ptr<Job> j, shared_ptr<Writer> writer)
60 , _video_frames_enqueued (0)
66 servers_list_changed ();
77 if (!ServerFinder::instance()->disabled ()) {
78 _server_found_connection = ServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
85 boost::mutex::scoped_lock lock (_mutex);
87 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
89 /* Keep waking workers until the queue is empty */
90 while (!_queue.empty ()) {
91 _empty_condition.notify_all ();
92 _full_condition.wait (lock);
99 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
101 /* The following sequence of events can occur in the above code:
102 1. a remote worker takes the last image off the queue
103 2. the loop above terminates
104 3. the remote worker fails to encode the image and puts it back on the queue
105 4. the remote worker is then terminated by terminate_threads
107 So just mop up anything left in the queue here.
110 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
111 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
114 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
119 } catch (std::exception& e) {
120 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
125 /** @return an estimate of the current number of frames we are encoding per second,
129 Encoder::current_encoding_rate () const
131 boost::mutex::scoped_lock lock (_state_mutex);
132 if (int (_time_history.size()) < _history_size) {
137 gettimeofday (&now, 0);
139 return _history_size / (seconds (now) - seconds (_time_history.back ()));
142 /** @return Number of video frames that have been sent out */
144 Encoder::video_frames_out () const
146 boost::mutex::scoped_lock (_state_mutex);
147 return _video_frames_enqueued;
150 /** Should be called when a frame has been encoded successfully.
151 * @param n Source frame index.
154 Encoder::frame_done ()
156 boost::mutex::scoped_lock lock (_state_mutex);
159 gettimeofday (&tv, 0);
160 _time_history.push_front (tv);
161 if (int (_time_history.size()) > _history_size) {
162 _time_history.pop_back ();
166 /** Called in order, so each time this is called the supplied frame is the one
167 * after the previous one.
170 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
174 boost::mutex::scoped_lock lock (_mutex);
176 /* XXX: discard 3D here if required */
178 /* Wait until the queue has gone down a bit */
179 while (_queue.size() >= _threads.size() * 2 && !_terminate) {
180 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
181 _full_condition.wait (lock);
182 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
190 /* Re-throw any exception raised by one of our threads. If more
191 than one has thrown an exception, only one will be rethrown, I think;
192 but then, if that happens something has gone badly wrong.
196 if (_writer->can_fake_write (_video_frames_enqueued)) {
197 /* We can fake-write this frame */
198 _writer->fake_write (_video_frames_enqueued, pv->eyes ());
200 } else if (pv->has_j2k ()) {
201 /* This frame already has JPEG2000 data, so just write it */
202 _writer->write (pv->j2k(), _video_frames_enqueued, pv->eyes ());
203 } else if (_last_player_video && pv->same (_last_player_video)) {
204 _writer->repeat (_video_frames_enqueued, pv->eyes ());
206 /* Queue this new frame for encoding */
207 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
208 _queue.push_back (shared_ptr<DCPVideo> (
211 _video_frames_enqueued,
212 _film->video_frame_rate(),
213 _film->j2k_bandwidth(),
219 /* The queue might not be empty any more, so notify anything which is
222 _empty_condition.notify_all ();
225 switch (pv->eyes ()) {
227 ++_video_frames_enqueued;
239 if (_left_done && _right_done) {
240 ++_video_frames_enqueued;
241 _left_done = _right_done = false;
244 _last_player_video = pv;
248 Encoder::terminate_threads ()
251 boost::mutex::scoped_lock lock (_mutex);
253 _full_condition.notify_all ();
254 _empty_condition.notify_all ();
257 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
258 if ((*i)->joinable ()) {
269 Encoder::encoder_thread (optional<ServerDescription> server)
273 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
275 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
278 /* Number of seconds that we currently wait between attempts
279 to connect to the server; not relevant for localhost
282 int remote_backoff = 0;
286 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
287 boost::mutex::scoped_lock lock (_mutex);
288 while (_queue.empty () && !_terminate) {
289 _empty_condition.wait (lock);
296 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
297 shared_ptr<DCPVideo> vf = _queue.front ();
298 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
303 optional<Data> encoded;
305 /* We need to encode this input */
308 encoded = vf->encode_remotely (server.get ());
310 if (remote_backoff > 0) {
311 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
314 /* This job succeeded, so remove any backoff */
317 } catch (std::exception& e) {
318 if (remote_backoff < 60) {
320 remote_backoff += 10;
323 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
324 vf->index(), server->host_name(), e.what(), remote_backoff
330 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
331 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
332 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
333 } catch (std::exception& e) {
334 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
339 _writer->write (encoded.get(), vf->index (), vf->eyes ());
343 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
344 _queue.push_front (vf);
348 if (remote_backoff > 0) {
349 dcpomatic_sleep (remote_backoff);
352 /* The queue might not be full any more, so notify anything that is waiting on that */
354 _full_condition.notify_all ();
363 Encoder::servers_list_changed ()
365 terminate_threads ();
367 /* XXX: could re-use threads */
369 if (!Config::instance()->only_servers_encode ()) {
370 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
371 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<ServerDescription> ())));
375 BOOST_FOREACH (ServerDescription i, ServerFinder::instance()->servers ()) {
376 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
377 for (int j = 0; j < i.threads(); ++j) {
378 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
382 _writer->set_encoder_threads (_threads.size ());