#include "film.h"
#include "job.h"
#include "job_manager.h"
+#include "util.h"
#include <boost/thread.hpp>
using std::dynamic_pointer_cast;
+using std::function;
using std::list;
using std::make_shared;
using std::shared_ptr;
using std::string;
using std::weak_ptr;
using boost::bind;
-using boost::function;
using boost::optional;
{
boost::mutex::scoped_lock lm (_mutex);
_terminate = true;
- _empty_condition.notify_all ();
+ _schedule_condition.notify_all();
}
try {
{
boost::mutex::scoped_lock lm (_mutex);
_jobs.push_back (j);
- _empty_condition.notify_all ();
+ _schedule_condition.notify_all();
}
emit (boost::bind(boost::ref(JobAdded), weak_ptr<Job>(j)));
auto i = find (_jobs.begin(), _jobs.end(), after);
DCPOMATIC_ASSERT (i != _jobs.end());
_jobs.insert (i, j);
- _empty_condition.notify_all ();
+ _schedule_condition.notify_all();
}
emit (boost::bind(boost::ref(JobAdded), weak_ptr<Job>(j)));
boost::mutex::scoped_lock lm (_mutex);
- while (true) {
- bool have_new = false;
- bool have_running = false;
- for (auto i: _jobs) {
- if (i->running()) {
- have_running = true;
- }
- if (i->is_new()) {
- have_new = true;
- }
- }
-
- if ((!have_running && have_new) || _terminate) {
- break;
- }
-
- _empty_condition.wait (lm);
- }
-
if (_terminate) {
break;
}
+ bool have_running = false;
for (auto i: _jobs) {
- if (i->is_new()) {
- _connections.push_back (i->FinishedImmediate.connect(bind(&JobManager::job_finished, this)));
- i->start ();
+ if ((have_running || _paused) && i->running()) {
+ /* We already have a running job, or are totally paused, so this job should not be running */
+ i->pause_by_priority();
+ } else if (!have_running && !_paused && (i->is_new() || i->paused_by_priority())) {
+ /* We don't have a running job, and we should have one, so start/resume this */
+ if (i->is_new()) {
+ _connections.push_back (i->FinishedImmediate.connect(bind(&JobManager::job_finished, this)));
+ i->start ();
+ } else {
+ i->resume ();
+ }
emit (boost::bind (boost::ref (ActiveJobsChanged), _last_active_job, i->json_name()));
_last_active_job = i->json_name ();
- /* Only start one job at once */
- break;
+ have_running = true;
+ } else if (!have_running && i->running()) {
+ have_running = true;
}
}
+
+ _schedule_condition.wait(lm);
}
}
_last_active_job = optional<string>();
}
- _empty_condition.notify_all ();
+ _schedule_condition.notify_all();
}
shared_ptr<const Playlist> playlist,
bool from_zero,
boost::signals2::connection& connection,
- function<void()> ready
+ function<void (Job::Result)> ready
)
{
{
job = make_shared<AnalyseAudioJob> (film, playlist, from_zero);
connection = job->Finished.connect (ready);
_jobs.push_back (job);
- _empty_condition.notify_all ();
+ _schedule_condition.notify_all ();
}
emit (boost::bind (boost::ref (JobAdded), weak_ptr<Job> (job)));
shared_ptr<const Film> film,
shared_ptr<Content> content,
boost::signals2::connection& connection,
- function<void()> ready
+ function<void (Job::Result)> ready
)
{
{
for (auto i: _jobs) {
auto a = dynamic_pointer_cast<AnalyseSubtitlesJob> (i);
- if (a && a->path() == film->subtitle_analysis_path(content)) {
+ if (a && a->path() == film->subtitle_analysis_path(content) && !i->finished_cancelled()) {
i->when_finished (connection, ready);
return;
}
job = make_shared<AnalyseSubtitlesJob>(film, content);
connection = job->Finished.connect (ready);
_jobs.push_back (job);
- _empty_condition.notify_all ();
+ _schedule_condition.notify_all ();
}
emit (boost::bind(boost::ref(JobAdded), weak_ptr<Job>(job)));
void
JobManager::increase_priority (shared_ptr<Job> job)
-{
- bool changed = false;
-
- {
- boost::mutex::scoped_lock lm (_mutex);
- auto last = _jobs.end ();
- for (auto i = _jobs.begin(); i != _jobs.end(); ++i) {
- if (*i == job && last != _jobs.end()) {
- swap (*i, *last);
- changed = true;
- break;
- }
- last = i;
- }
- }
-
- if (changed) {
- priority_changed ();
- }
-}
-
-
-void
-JobManager::priority_changed ()
{
{
boost::mutex::scoped_lock lm (_mutex);
-
- bool first = true;
- for (auto i: _jobs) {
- if (first) {
- if (i->is_new ()) {
- i->start ();
- } else if (i->paused_by_priority ()) {
- i->resume ();
- }
- first = false;
- } else {
- if (i->running ()) {
- i->pause_by_priority ();
- }
- }
+ auto iter = std::find(_jobs.begin(), _jobs.end(), job);
+ if (iter == _jobs.begin() || iter == _jobs.end()) {
+ return;
}
+ swap(*iter, *std::prev(iter));
}
- emit (boost::bind(boost::ref(JobsReordered)));
+ _schedule_condition.notify_all();
+ emit(boost::bind(boost::ref(JobsReordered)));
}
void
JobManager::decrease_priority (shared_ptr<Job> job)
{
- bool changed = false;
-
{
boost::mutex::scoped_lock lm (_mutex);
- for (auto i = _jobs.begin(); i != _jobs.end(); ++i) {
- auto next = i;
- ++next;
- if (*i == job && next != _jobs.end()) {
- swap (*i, *next);
- changed = true;
- break;
- }
+ auto iter = std::find(_jobs.begin(), _jobs.end(), job);
+ if (iter == _jobs.end() || std::next(iter) == _jobs.end()) {
+ return;
}
+ swap(*iter, *std::next(iter));
}
- if (changed) {
- priority_changed ();
- }
+ _schedule_condition.notify_all();
+ emit(boost::bind(boost::ref(JobsReordered)));
}
+/** Pause all job processing */
void
JobManager::pause ()
{
boost::mutex::scoped_lock lm (_mutex);
-
- if (_paused) {
- return;
- }
-
- for (auto i: _jobs) {
- if (i->pause_by_user()) {
- _paused_job = i;
- }
- }
-
_paused = true;
+ _schedule_condition.notify_all();
}
+/** Resume processing jobs after a previous pause() */
void
JobManager::resume ()
{
boost::mutex::scoped_lock lm (_mutex);
- if (!_paused) {
- return;
- }
-
- if (_paused_job) {
- _paused_job->resume ();
- }
-
- _paused_job.reset ();
_paused = false;
+ _schedule_condition.notify_all();
}