2 Copyright (C) 2012-2015 Carl Hetherington <cth@carlh.net>
4 This program is free software; you can redistribute it and/or modify
5 it under the terms of the GNU General Public License as published by
6 the Free Software Foundation; either version 2 of the License, or
7 (at your option) any later version.
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
14 You should have received a copy of the GNU General Public License
15 along with this program; if not, write to the Free Software
16 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20 /** @file src/encoder.h
21 * @brief Parent class for classes which can encode video and audio frames.
29 #include "dcp_video.h"
32 #include "encode_server_finder.h"
34 #include "player_video.h"
35 #include "encode_server_description.h"
36 #include "compose.hpp"
37 #include <libcxml/cxml.h>
38 #include <boost/foreach.hpp>
43 #define LOG_GENERAL(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_GENERAL);
44 #define LOG_GENERAL_NC(...) _film->log()->log (__VA_ARGS__, LogEntry::TYPE_GENERAL);
45 #define LOG_ERROR(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_ERROR);
46 #define LOG_TIMING(...) _film->log()->log (String::compose (__VA_ARGS__), LogEntry::TYPE_TIMING);
50 using boost::shared_ptr;
51 using boost::weak_ptr;
52 using boost::optional;
55 int const Encoder::_history_size = 25;
57 /** @param f Film that we are encoding */
58 Encoder::Encoder (shared_ptr<const Film> film, shared_ptr<Writer> writer)
61 , _terminate_enqueue (false)
62 , _terminate_encoding (false)
65 servers_list_changed ();
72 boost::mutex::scoped_lock lm (_queue_mutex);
73 _terminate_enqueue = true;
74 _full_condition.notify_all ();
75 _empty_condition.notify_all ();
81 if (!EncodeServerFinder::instance()->disabled ()) {
82 _server_found_connection = EncodeServerFinder::instance()->ServersListChanged.connect (boost::bind (&Encoder::servers_list_changed, this));
89 boost::mutex::scoped_lock lock (_queue_mutex);
91 LOG_GENERAL (N_("Clearing queue of %1"), _queue.size ());
93 /* Keep waking workers until the queue is empty */
94 while (!_queue.empty ()) {
96 _empty_condition.notify_all ();
97 _full_condition.wait (lock);
102 LOG_GENERAL_NC (N_("Terminating encoder threads"));
104 terminate_threads ();
106 LOG_GENERAL (N_("Mopping up %1"), _queue.size());
108 /* The following sequence of events can occur in the above code:
109 1. a remote worker takes the last image off the queue
110 2. the loop above terminates
111 3. the remote worker fails to encode the image and puts it back on the queue
112 4. the remote worker is then terminated by terminate_threads
114 So just mop up anything left in the queue here.
117 for (list<shared_ptr<DCPVideo> >::iterator i = _queue.begin(); i != _queue.end(); ++i) {
118 LOG_GENERAL (N_("Encode left-over frame %1"), (*i)->index ());
121 (*i)->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2)),
126 } catch (std::exception& e) {
127 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
132 /** @return an estimate of the current number of frames we are encoding per second,
136 Encoder::current_encoding_rate () const
138 boost::mutex::scoped_lock lock (_state_mutex);
139 if (int (_time_history.size()) < _history_size) {
144 gettimeofday (&now, 0);
146 return _history_size / (seconds (now) - seconds (_time_history.back ()));
149 /** @return Number of video frames that have been sent out */
151 Encoder::video_frames_out () const
153 boost::mutex::scoped_lock (_state_mutex);
157 /** Should be called when a frame has been encoded successfully.
158 * @param n Source frame index.
161 Encoder::frame_done ()
163 boost::mutex::scoped_lock lock (_state_mutex);
166 gettimeofday (&tv, 0);
167 _time_history.push_front (tv);
168 if (int (_time_history.size()) > _history_size) {
169 _time_history.pop_back ();
173 /** Called to start encoding of the next video frame in the DCP. This is called in order,
174 * so each time the supplied frame is the one after the previous one.
175 * pv represents one video frame, and could be empty if there is nothing to encode
176 * for this DCP frame.
179 Encoder::encode (list<shared_ptr<PlayerVideo> > pv)
181 BOOST_FOREACH (shared_ptr<PlayerVideo> i, pv) {
188 Encoder::enqueue (shared_ptr<PlayerVideo> pv)
194 boost::mutex::scoped_lock threads_lock (_threads_mutex);
195 threads = _threads.size ();
198 boost::mutex::scoped_lock queue_lock (_queue_mutex);
200 /* XXX: discard 3D here if required */
202 /* Wait until the queue has gone down a bit */
203 while (_queue.size() >= threads * 2 && !_terminate_enqueue) {
204 LOG_TIMING ("decoder-sleep queue=%1", _queue.size());
205 _full_condition.wait (queue_lock);
206 LOG_TIMING ("decoder-wake queue=%1", _queue.size());
209 if (_terminate_enqueue) {
214 /* Re-throw any exception raised by one of our threads. If more
215 than one has thrown an exception, only one will be rethrown, I think;
216 but then, if that happens something has gone badly wrong.
220 if (_writer->can_fake_write (_position)) {
221 /* We can fake-write this frame */
222 _writer->fake_write (_position, pv->eyes ());
224 } else if (pv->has_j2k ()) {
225 /* This frame already has JPEG2000 data, so just write it */
226 _writer->write (pv->j2k(), _position, pv->eyes ());
227 } else if (_last_player_video && _writer->can_repeat(_position) && pv->same (_last_player_video)) {
228 _writer->repeat (_position, pv->eyes ());
230 /* Queue this new frame for encoding */
231 LOG_TIMING ("add-frame-to-queue queue=%1", _queue.size ());
232 _queue.push_back (shared_ptr<DCPVideo> (
236 _film->video_frame_rate(),
237 _film->j2k_bandwidth(),
243 /* The queue might not be empty any more, so notify anything which is
246 _empty_condition.notify_all ();
249 _last_player_video = pv;
253 Encoder::terminate_threads ()
256 boost::mutex::scoped_lock queue_lock (_queue_mutex);
257 _terminate_encoding = true;
260 boost::mutex::scoped_lock threads_lock (_threads_mutex);
263 for (list<boost::thread *>::iterator i = _threads.begin(); i != _threads.end(); ++i) {
264 LOG_GENERAL ("Terminating thread %1 of %2", n + 1, _threads.size ());
266 if ((*i)->joinable ()) {
270 LOG_GENERAL_NC ("Thread terminated");
275 _terminate_encoding = false;
279 Encoder::encoder_thread (optional<EncodeServerDescription> server)
283 LOG_TIMING ("start-encoder-thread thread=%1 server=%2", boost::this_thread::get_id (), server->host_name ());
285 LOG_TIMING ("start-encoder-thread thread=%1 server=localhost", boost::this_thread::get_id ());
288 /* Number of seconds that we currently wait between attempts
289 to connect to the server; not relevant for localhost
292 int remote_backoff = 0;
296 LOG_TIMING ("encoder-sleep thread=%1", boost::this_thread::get_id());
297 boost::mutex::scoped_lock lock (_queue_mutex);
298 while (_queue.empty () && !_terminate_encoding) {
299 _empty_condition.wait (lock);
302 if (_terminate_encoding) {
306 LOG_TIMING ("encoder-wake thread=%1 queue=%2", boost::this_thread::get_id(), _queue.size());
307 shared_ptr<DCPVideo> vf = _queue.front ();
308 LOG_TIMING ("encoder-pop thread=%1 frame=%2 eyes=%3", boost::this_thread::get_id(), vf->index(), vf->eyes ());
313 optional<Data> encoded;
315 /* We need to encode this input */
318 encoded = vf->encode_remotely (server.get ());
320 if (remote_backoff > 0) {
321 LOG_GENERAL ("%1 was lost, but now she is found; removing backoff", server->host_name ());
324 /* This job succeeded, so remove any backoff */
327 } catch (std::exception& e) {
328 if (remote_backoff < 60) {
330 remote_backoff += 10;
333 N_("Remote encode of %1 on %2 failed (%3); thread sleeping for %4s"),
334 vf->index(), server->host_name(), e.what(), remote_backoff
340 LOG_TIMING ("start-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
341 encoded = vf->encode_locally (boost::bind (&Log::dcp_log, _film->log().get(), _1, _2));
342 LOG_TIMING ("finish-local-encode thread=%1 frame=%2", boost::this_thread::get_id(), vf->index());
343 } catch (std::exception& e) {
344 LOG_ERROR (N_("Local encode failed (%1)"), e.what ());
350 _writer->write (encoded.get(), vf->index (), vf->eyes ());
354 LOG_GENERAL (N_("[%1] Encoder thread pushes frame %2 back onto queue after failure"), boost::this_thread::get_id(), vf->index());
355 _queue.push_front (vf);
359 if (remote_backoff > 0) {
360 boost::this_thread::sleep (boost::posix_time::seconds (remote_backoff));
363 /* The queue might not be full any more, so notify anything that is waiting on that */
365 _full_condition.notify_all ();
371 /* Wake anything waiting on _full_condition so it can see the exception */
372 _full_condition.notify_all ();
376 Encoder::servers_list_changed ()
378 terminate_threads ();
380 /* XXX: could re-use threads */
382 boost::mutex::scoped_lock lm (_threads_mutex);
384 #ifdef BOOST_THREAD_PLATFORM_WIN32
386 info.dwOSVersionInfoSize = sizeof (OSVERSIONINFO);
387 GetVersionEx (&info);
388 bool const windows_xp = (info.dwMajorVersion == 5 && info.dwMinorVersion == 1);
390 LOG_GENERAL_NC (N_("Setting thread affinity for Windows XP"));
394 if (!Config::instance()->only_servers_encode ()) {
395 for (int i = 0; i < Config::instance()->num_local_encoding_threads (); ++i) {
396 boost::thread* t = new boost::thread (boost::bind (&Encoder::encoder_thread, this, optional<EncodeServerDescription> ()));
397 _threads.push_back (t);
398 #ifdef BOOST_THREAD_PLATFORM_WIN32
400 SetThreadAffinityMask (t->native_handle(), 1 << i);
406 BOOST_FOREACH (EncodeServerDescription i, EncodeServerFinder::instance()->servers ()) {
407 LOG_GENERAL (N_("Adding %1 worker threads for remote %2"), i.threads(), i.host_name ());
408 for (int j = 0; j < i.threads(); ++j) {
409 _threads.push_back (new boost::thread (boost::bind (&Encoder::encoder_thread, this, i)));
413 _writer->set_encoder_threads (_threads.size ());