From 4ee083dc0862b30325c709e913772a6898378d0e Mon Sep 17 00:00:00 2001 From: Carl Hetherington Date: Wed, 24 Nov 2021 00:16:15 +0100 Subject: [PATCH] Simplify and fix job scheduler, especially with respect to the priority system. --- src/lib/job_manager.cc | 72 ++++++----------------- src/lib/job_manager.h | 1 - test/{job_test.cc => job_manager_test.cc} | 47 +++++++++++++-- test/wscript | 2 +- 4 files changed, 62 insertions(+), 60 deletions(-) rename test/{job_test.cc => job_manager_test.cc} (62%) diff --git a/src/lib/job_manager.cc b/src/lib/job_manager.cc index d8c0b02f2..608df7ef0 100644 --- a/src/lib/job_manager.cc +++ b/src/lib/job_manager.cc @@ -159,39 +159,30 @@ JobManager::scheduler () boost::mutex::scoped_lock lm (_mutex); - while (true) { - bool have_new = false; - bool have_running = false; - for (auto i: _jobs) { - if (i->running()) { - have_running = true; - } - if (i->is_new()) { - have_new = true; - } - } - - if ((!have_running && have_new) || _terminate) { - break; - } - - _empty_condition.wait (lm); - } - if (_terminate) { break; } + bool have_running = false; for (auto i: _jobs) { - if (i->is_new()) { - _connections.push_back (i->FinishedImmediate.connect(bind(&JobManager::job_finished, this))); - i->start (); + if (have_running && i->running()) { + i->pause_by_priority(); + } else if (!have_running && (i->is_new() || i->paused_by_priority())) { + if (i->is_new()) { + _connections.push_back (i->FinishedImmediate.connect(bind(&JobManager::job_finished, this))); + i->start (); + } else { + i->resume (); + } emit (boost::bind (boost::ref (ActiveJobsChanged), _last_active_job, i->json_name())); _last_active_job = i->json_name (); - /* Only start one job at once */ - break; + have_running = true; + } else if (!have_running && i->running()) { + have_running = true; } } + + _empty_condition.wait (lm); } } @@ -319,35 +310,9 @@ JobManager::increase_priority (shared_ptr job) } if (changed) { - priority_changed (); - } -} - - -void -JobManager::priority_changed () -{ - { - boost::mutex::scoped_lock lm (_mutex); - - bool first = true; - for (auto i: _jobs) { - if (first) { - if (i->is_new ()) { - i->start (); - } else if (i->paused_by_priority ()) { - i->resume (); - } - first = false; - } else { - if (i->running ()) { - i->pause_by_priority (); - } - } - } + _empty_condition.notify_all (); + emit (boost::bind(boost::ref(JobsReordered))); } - - emit (boost::bind(boost::ref(JobsReordered))); } @@ -370,7 +335,8 @@ JobManager::decrease_priority (shared_ptr job) } if (changed) { - priority_changed (); + _empty_condition.notify_all (); + emit (boost::bind(boost::ref(JobsReordered))); } } diff --git a/src/lib/job_manager.h b/src/lib/job_manager.h index ff5800aa8..71db33fd6 100644 --- a/src/lib/job_manager.h +++ b/src/lib/job_manager.h @@ -96,7 +96,6 @@ private: ~JobManager (); void scheduler (); void start (); - void priority_changed (); void job_finished (); mutable boost::mutex _mutex; diff --git a/test/job_test.cc b/test/job_manager_test.cc similarity index 62% rename from test/job_test.cc rename to test/job_manager_test.cc index 7fb240843..1baa5767e 100644 --- a/test/job_test.cc +++ b/test/job_manager_test.cc @@ -19,8 +19,8 @@ */ -/** @file test/job_test.cc - * @brief Test Job and JobManager. +/** @file test/job_manager_test.cc + * @brief Test JobManager. * @ingroup selfcontained */ @@ -34,6 +34,7 @@ using std::make_shared; using std::shared_ptr; using std::string; +using std::vector; class TestJob : public Job @@ -77,7 +78,7 @@ public: }; -BOOST_AUTO_TEST_CASE (job_manager_test) +BOOST_AUTO_TEST_CASE (job_manager_test1) { shared_ptr film; @@ -86,8 +87,44 @@ BOOST_AUTO_TEST_CASE (job_manager_test) JobManager::instance()->add (a); dcpomatic_sleep_seconds (1); - BOOST_CHECK_EQUAL (a->running (), true); + BOOST_CHECK (a->running()); a->set_finished_ok (); dcpomatic_sleep_seconds (2); - BOOST_CHECK_EQUAL (a->finished_ok(), true); + BOOST_CHECK (a->finished_ok()); } + + +BOOST_AUTO_TEST_CASE (job_manager_test2) +{ + shared_ptr film; + + vector> jobs; + for (int i = 0; i < 16; ++i) { + auto job = make_shared(film); + jobs.push_back (job); + JobManager::instance()->add (job); + } + + dcpomatic_sleep_seconds (1); + BOOST_CHECK (jobs[0]->running()); + jobs[0]->set_finished_ok(); + + dcpomatic_sleep_seconds (1); + BOOST_CHECK (!jobs[0]->running()); + BOOST_CHECK (jobs[1]->running()); + + /* Push our jobs[5] to the top of the list */ + for (int i = 0; i < 5; ++i) { + JobManager::instance()->increase_priority(jobs[5]); + } + + dcpomatic_sleep_seconds (1); + for (int i = 0; i < 16; ++i) { + BOOST_CHECK (i == 5 ? jobs[i]->running() : !jobs[i]->running()); + } + + for (auto job: jobs) { + job->set_finished_ok(); + } +} + diff --git a/test/wscript b/test/wscript index ff6895d9a..a5366b3ec 100644 --- a/test/wscript +++ b/test/wscript @@ -96,7 +96,7 @@ def build(bld): interrupt_encoder_test.cc isdcf_name_test.cc j2k_bandwidth_test.cc - job_test.cc + job_manager_test.cc kdm_naming_test.cc low_bitrate_test.cc markers_test.cc -- 2.30.2