Try to not start jobs if a dependant fails.
authorCarl Hetherington <cth@carlh.net>
Wed, 10 Oct 2012 11:53:06 +0000 (12:53 +0100)
committerCarl Hetherington <cth@carlh.net>
Wed, 10 Oct 2012 11:53:06 +0000 (12:53 +0100)
23 files changed:
src/lib/ab_transcode_job.cc
src/lib/ab_transcode_job.h
src/lib/check_hashes_job.cc
src/lib/check_hashes_job.h
src/lib/copy_from_dvd_job.cc
src/lib/copy_from_dvd_job.h
src/lib/examine_content_job.cc
src/lib/examine_content_job.h
src/lib/film.cc
src/lib/job.cc
src/lib/job.h
src/lib/job_manager.cc
src/lib/job_manager.h
src/lib/make_dcp_job.cc
src/lib/make_dcp_job.h
src/lib/scp_dcp_job.cc
src/lib/scp_dcp_job.h
src/lib/thumbs_job.cc
src/lib/thumbs_job.h
src/lib/transcode_job.cc
src/lib/transcode_job.h
src/wx/film_viewer.cc
test/test.cc

index d94f56d0a4c0a46538fc19d3e437273a3ff4b4bf..fd8236bf028ec54a2fb10d565bc6e9be433fb2f3 100644 (file)
@@ -35,8 +35,8 @@ using namespace boost;
  *  @param o Options.
  *  @Param l A log that we can write to.
  */
-ABTranscodeJob::ABTranscodeJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
-       : Job (s, o, l)
+ABTranscodeJob::ABTranscodeJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
+       : Job (s, o, l, req)
 {
        _fs_b.reset (new FilmState (*_fs));
        _fs_b->scaler = Config::instance()->reference_scaler ();
index 478049068400a8935f11348ae82f3f017881b53b..4b80593f4de8c6bbdff4bee6104109226a58a8ca 100644 (file)
@@ -34,7 +34,7 @@
 class ABTranscodeJob : public Job
 {
 public:
-       ABTranscodeJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l);
+       ABTranscodeJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l, boost::shared_ptr<Job> req);
 
        std::string name () const;
        void run ();
index f60a2d40d62cff94e24473503dc5a84e733e7357..f07a5ab2accaf9d98172bbb2ba9da7cd213b0516 100644 (file)
@@ -31,8 +31,8 @@
 using namespace std;
 using namespace boost;
 
-CheckHashesJob::CheckHashesJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
-       : Job (s, o, l)
+CheckHashesJob::CheckHashesJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
+       : Job (s, o, l, req)
        , _bad (0)
 {
 
@@ -73,13 +73,13 @@ CheckHashesJob::run ()
                shared_ptr<Job> tc;
 
                if (_fs->dcp_ab) {
-                       tc.reset (new ABTranscodeJob (_fs, _opt, _log));
+                       tc.reset (new ABTranscodeJob (_fs, _opt, _log, shared_from_this()));
                } else {
-                       tc.reset (new TranscodeJob (_fs, _opt, _log));
+                       tc.reset (new TranscodeJob (_fs, _opt, _log, shared_from_this()));
                }
                
                JobManager::instance()->add_after (shared_from_this(), tc);
-               JobManager::instance()->add_after (tc, shared_ptr<Job> (new CheckHashesJob (_fs, _opt, _log)));
+               JobManager::instance()->add_after (tc, shared_ptr<Job> (new CheckHashesJob (_fs, _opt, _log, tc)));
        }
                
        set_progress (1);
index b59cf031b8c43588a50c56db7015ae130ab88a6f..6a68e936c5a746cd10699f90d7f74863f2a18804 100644 (file)
@@ -22,7 +22,7 @@
 class CheckHashesJob : public Job
 {
 public:
-       CheckHashesJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l);
+       CheckHashesJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l, boost::shared_ptr<Job> req);
 
        std::string name () const;
        void run ();
index d1000f54cb815dbfdf3b3b7480b130136db0a967..f7281fc10e303479c2a0bcfdaef6e26121df059f 100644 (file)
@@ -35,8 +35,8 @@ using namespace boost;
 /** @param fs FilmState for the film to write DVD data into.
  *  @param l Log that we can write to.
  */
-CopyFromDVDJob::CopyFromDVDJob (shared_ptr<const FilmState> fs, Log* l)
-       : Job (fs, shared_ptr<Options> (), l)
+CopyFromDVDJob::CopyFromDVDJob (shared_ptr<const FilmState> fs, Log* l, shared_ptr<Job> req)
+       : Job (fs, shared_ptr<Options> (), l, req)
 {
 
 }
index 6b56f6f0a5fa084e519308cca03a1b0cb708e4d4..ce3837100c23e769563ee3b2a6b8bde89e9b442f 100644 (file)
@@ -29,7 +29,7 @@
 class CopyFromDVDJob : public Job
 {
 public:
-       CopyFromDVDJob (boost::shared_ptr<const FilmState>, Log *);
+       CopyFromDVDJob (boost::shared_ptr<const FilmState>, Log *, boost::shared_ptr<Job> req);
 
        std::string name () const;
        void run ();
index d77ede2f91abf1d1c0d0d9348ed19247429a3272..36b4cbabc95f453e52656dc7a62c7071e42d449b 100644 (file)
@@ -30,8 +30,8 @@
 using namespace std;
 using namespace boost;
 
-ExamineContentJob::ExamineContentJob (shared_ptr<const FilmState> fs, Log* l)
-       : Job (fs, shared_ptr<Options> (), l)
+ExamineContentJob::ExamineContentJob (shared_ptr<const FilmState> fs, Log* l, shared_ptr<Job> req)
+       : Job (fs, shared_ptr<Options> (), l, req)
 {
 
 }
index d149341b4456589e9d1fdda31fc72d978e32d2aa..3bbd673a87e66aab5521b0ddb1666d5404da66ab 100644 (file)
@@ -31,7 +31,7 @@ class Decoder;
 class ExamineContentJob : public Job
 {
 public:
-       ExamineContentJob (boost::shared_ptr<const FilmState>, Log *);
+       ExamineContentJob (boost::shared_ptr<const FilmState>, Log *, boost::shared_ptr<Job> req);
        ~ExamineContentJob ();
 
        std::string name () const;
index e2b3d4bc31391cb4a226caa64442102abd6f7cf7..00d37c097aec72e676f2cfc485bdf57c3960900d 100644 (file)
@@ -534,16 +534,18 @@ Film::make_dcp (bool transcode, int freq)
        o->padding = format()->dcp_padding (this);
        o->ratio = format()->ratio_as_float (this);
 
+       shared_ptr<Job> r;
+
        if (transcode) {
                if (_state.dcp_ab) {
-                       JobManager::instance()->add (shared_ptr<Job> (new ABTranscodeJob (fs, o, log ())));
+                       r = JobManager::instance()->add (shared_ptr<Job> (new ABTranscodeJob (fs, o, log(), shared_ptr<Job> ())));
                } else {
-                       JobManager::instance()->add (shared_ptr<Job> (new TranscodeJob (fs, o, log ())));
+                       r = JobManager::instance()->add (shared_ptr<Job> (new TranscodeJob (fs, o, log(), shared_ptr<Job> ())));
                }
        }
 
-       JobManager::instance()->add (shared_ptr<Job> (new CheckHashesJob (fs, o, log ())));
-       JobManager::instance()->add (shared_ptr<Job> (new MakeDCPJob (fs, o, log ())));
+       r = JobManager::instance()->add (shared_ptr<Job> (new CheckHashesJob (fs, o, log(), r)));
+       JobManager::instance()->add (shared_ptr<Job> (new MakeDCPJob (fs, o, log(), r)));
 }
 
 shared_ptr<FilmState>
@@ -582,7 +584,7 @@ Film::examine_content ()
                return;
        }
        
-       _examine_content_job.reset (new ExamineContentJob (state_copy (), log ()));
+       _examine_content_job.reset (new ExamineContentJob (state_copy (), log(), shared_ptr<Job> ()));
        _examine_content_job->Finished.connect (sigc::mem_fun (*this, &Film::examine_content_post_gui));
        JobManager::instance()->add (_examine_content_job);
 }
@@ -631,14 +633,14 @@ Film::set_still_duration (int d)
 void
 Film::send_dcp_to_tms ()
 {
-       shared_ptr<Job> j (new SCPDCPJob (state_copy (), log ()));
+       shared_ptr<Job> j (new SCPDCPJob (state_copy (), log(), shared_ptr<Job> ()));
        JobManager::instance()->add (j);
 }
 
 void
 Film::copy_from_dvd ()
 {
-       shared_ptr<Job> j (new CopyFromDVDJob (state_copy (), log ()));
+       shared_ptr<Job> j (new CopyFromDVDJob (state_copy (), log(), shared_ptr<Job> ()));
        j->Finished.connect (sigc::mem_fun (*this, &Film::copy_from_dvd_post_gui));
        JobManager::instance()->add (j);
 }
index 39ce4173a78a8af19621e5f4f31f95a946ea170c..d3871bf7225da22ed1911416c1b2ee4df4cffd5d 100644 (file)
@@ -34,10 +34,11 @@ using namespace boost;
  *  @param o Options.
  *  @param l A log that we can write to.
  */
-Job::Job (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
+Job::Job (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
        : _fs (s)
        , _opt (o)
        , _log (l)
+       , _required (req)
        , _state (NEW)
        , _start_time (0)
        , _progress_unknown (false)
@@ -80,6 +81,13 @@ Job::run_wrapper ()
        }
 }
 
+bool
+Job::is_new () const
+{
+       boost::mutex::scoped_lock lm (_state_mutex);
+       return _state == NEW;
+}
+
 /** @return true if the job is running */
 bool
 Job::running () const
index 802bf468da24d71a723fc31929d1fcdd0810cca0..f50ed078434efb6cd3661de2802a4c528fb18c7d 100644 (file)
@@ -39,7 +39,7 @@ class Options;
 class Job : public boost::enable_shared_from_this<Job>
 {
 public:
-       Job (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l);
+       Job (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l, boost::shared_ptr<Job> req);
 
        /** @return user-readable name of this job */
        virtual std::string name () const = 0;
@@ -48,6 +48,7 @@ public:
        
        void start ();
 
+       bool is_new () const;
        bool running () const;
        bool finished () const;
        bool finished_ok () const;
@@ -66,6 +67,10 @@ public:
 
        void emit_finished ();
 
+       boost::shared_ptr<Job> required () const {
+               return _required;
+       }
+
        /** Emitted from the GUI thread */
        sigc::signal0<void> Finished;
 
@@ -95,6 +100,8 @@ private:
 
        void run_wrapper ();
 
+       boost::shared_ptr<Job> _required;
+
        /** mutex for _state and _error */
        mutable boost::mutex _state_mutex;
        /** current state of the job */
index 76fcc6c5d736d72a25b3079584706bb4b6b8963d..562c887de9b57479c75f702928ac2ae6aec5d313 100644 (file)
@@ -37,11 +37,12 @@ JobManager::JobManager ()
        boost::thread (boost::bind (&JobManager::scheduler, this));
 }
 
-void
+shared_ptr<Job>
 JobManager::add (shared_ptr<Job> j)
 {
        boost::mutex::scoped_lock lm (_mutex);
        _jobs.push_back (j);
+       return j;
 }
 
 void
@@ -93,18 +94,15 @@ JobManager::scheduler ()
        while (1) {
                {
                        boost::mutex::scoped_lock lm (_mutex);
-                       int running = 0;
-                       shared_ptr<Job> first_new;
                        for (list<shared_ptr<Job> >::iterator i = _jobs.begin(); i != _jobs.end(); ++i) {
-                               if ((*i)->running ()) {
-                                       ++running;
-                               } else if (!(*i)->finished () && first_new == 0) {
-                                       first_new = *i;
-                               }
-
-                               if (running == 0 && first_new) {
-                                       first_new->start ();
-                                       break;
+                               if ((*i)->is_new()) {
+                                       shared_ptr<Job> r = (*i)->required ();
+                                       if (!r || r->finished_ok ()) {
+                                               (*i)->start ();
+
+                                               /* Only start one job at once */
+                                               break;
+                                       }
                                }
                        }
                }
index 8b79fd67d330c6078563da2f5972a8766801095c..4b70738f0bb4a121c7e7b9e5c7c699ccfe3d4aa1 100644 (file)
@@ -28,16 +28,12 @@ class Job;
 
 /** @class JobManager
  *  @brief A simple scheduler for jobs.
- *
- *  JobManager simply keeps a list of pending jobs, and assumes that all the jobs
- *  are sufficiently CPU intensive that there is no point running them in parallel;
- *  so jobs are just run one after the other.
  */
 class JobManager
 {
 public:
 
-       void add (boost::shared_ptr<Job>);
+       boost::shared_ptr<Job> add (boost::shared_ptr<Job>);
        void add_after (boost::shared_ptr<Job> after, boost::shared_ptr<Job> j);
        std::list<boost::shared_ptr<Job> > get () const;
        bool work_to_do () const;
index ae4bb4fbed2ae266f16db966ed002ab31152c506..b42a38429ff2800c1e15a13509be08bd0743e2f4 100644 (file)
@@ -43,8 +43,8 @@ using namespace boost;
  *  @param o Options.
  *  @param l Log.
  */
-MakeDCPJob::MakeDCPJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
-       : Job (s, o, l)
+MakeDCPJob::MakeDCPJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
+       : Job (s, o, l, req)
 {
        
 }
index 677bed424d0329ecfc1b9d1fd71ce76ec8723097..c350a819c3288214ad295136debd8c87f384d69c 100644 (file)
@@ -29,7 +29,7 @@
 class MakeDCPJob : public Job
 {
 public:
-       MakeDCPJob (boost::shared_ptr<const FilmState>, boost::shared_ptr<const Options>, Log *);
+       MakeDCPJob (boost::shared_ptr<const FilmState>, boost::shared_ptr<const Options>, Log *, boost::shared_ptr<Job> req);
 
        std::string name () const;
        void run ();
index dac4a602c556b43fc70428743e77342bd4b5245d..90122cea7e0aa09db03c0103264b5e160b0ff42c 100644 (file)
@@ -91,8 +91,8 @@ public:
 };
 
 
-SCPDCPJob::SCPDCPJob (shared_ptr<const FilmState> s, Log* l)
-       : Job (s, shared_ptr<const Options> (), l)
+SCPDCPJob::SCPDCPJob (shared_ptr<const FilmState> s, Log* l, shared_ptr<Job> req)
+       : Job (s, shared_ptr<const Options> (), l, req)
        , _status ("Waiting")
 {
 
index 1c795be4783ba2d5586a6fd39ad59cabcc28fc2c..b457fdf5bf580d2aebfe4b0293c87293191629a3 100644 (file)
@@ -26,7 +26,7 @@
 class SCPDCPJob : public Job
 {
 public:
-       SCPDCPJob (boost::shared_ptr<const FilmState>, Log *);
+       SCPDCPJob (boost::shared_ptr<const FilmState>, Log *, boost::shared_ptr<Job> req);
 
        std::string name () const;
        void run ();
index f6ed75ff7cd73efdecaddb0d1708f2b60d25faff..779a1d5d19972eaef6dcfc2ee9573d0994b7fadd 100644 (file)
@@ -35,8 +35,8 @@ using namespace boost;
  *  @param o Options.
  *  @param l A log that we can write to.
  */
-ThumbsJob::ThumbsJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
-       : Job (s, o, l)
+ThumbsJob::ThumbsJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
+       : Job (s, o, l, req)
 {
        
 }
index 1dd69a0f9bc9aa8aa2d9701a7cde7b9e277f0a69..f7e30d576ca8da774297cf952a94686f6a7ba81f 100644 (file)
@@ -31,7 +31,7 @@ class FilmState;
 class ThumbsJob : public Job
 {
 public:
-       ThumbsJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l);
+       ThumbsJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l, boost::shared_ptr<Job> req);
        std::string name () const;
        void run ();
 };
index e1ba82359357ec203ff7e7ff12a10f9557caa8ad..a53a4b6ad1591acb7227bbc578a37a3e014f613a 100644 (file)
@@ -39,8 +39,8 @@ using namespace boost;
  *  @param o Options.
  *  @param l A log that we can write to.
  */
-TranscodeJob::TranscodeJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l)
-       : Job (s, o, l)
+TranscodeJob::TranscodeJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
+       : Job (s, o, l, req)
 {
        
 }
index 737f10de968ea4b9ee0500cf51bfe2f3b6bc060c..fe68a49100630dac9b9cad0bfc72cf284f016b44 100644 (file)
@@ -32,7 +32,7 @@ class Encoder;
 class TranscodeJob : public Job
 {
 public:
-       TranscodeJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l);
+       TranscodeJob (boost::shared_ptr<const FilmState> s, boost::shared_ptr<const Options> o, Log* l, boost::shared_ptr<Job> req);
        
        std::string name () const;
        void run ();
index 3c7d76bce03edaf9dad1ab53ebd12e27606f2533..6ffe4e66a847f2f92aa4ccd7a9638df2c2d797a0 100644 (file)
@@ -266,7 +266,7 @@ FilmViewer::update_thumbs ()
        o->decode_audio = false;
        o->decode_video_frequency = 128;
        
-       shared_ptr<Job> j (new ThumbsJob (s, o, _film->log ()));
+       shared_ptr<Job> j (new ThumbsJob (s, o, _film->log(), shared_ptr<Job> ()));
        j->Finished.connect (sigc::mem_fun (_film, &Film::update_thumbs_post_gui));
        JobManager::instance()->add (j);
 }
index 26bca33e5ea33e97e0588049c0d4c6ba32f2a785..c801d538e9ab52a631eafca271c49df8fe2fdfe0 100644 (file)
@@ -35,6 +35,7 @@
 #include "config.h"
 #include "server.h"
 #include "cross.h"
+#include "job.h"
 #define BOOST_TEST_DYN_LINK
 #define BOOST_TEST_MODULE dvdomatic_test
 #include <boost/test/unit_test.hpp>
@@ -305,6 +306,9 @@ BOOST_AUTO_TEST_CASE (client_server_test)
 
        new thread (boost::bind (&Server::run, server, 2));
 
+       /* Let the server get itself ready */
+       dvdomatic_sleep (1);
+
        ServerDescription description ("localhost", 2);
 
        list<thread*> threads;
@@ -357,7 +361,7 @@ BOOST_AUTO_TEST_CASE (make_dcp_with_range_test)
        film.set_dcp_frames (42);
        film.make_dcp (true);
 
-       while (JobManager::instance()->work_to_do ()) {
+       while (JobManager::instance()->work_to_do() && !JobManager::instance()->errors()) {
                dvdomatic_sleep (1);
        }
 
@@ -386,3 +390,97 @@ BOOST_AUTO_TEST_CASE (audio_sampling_rate_test)
        fs.audio_sample_rate = 48000;
        BOOST_CHECK_EQUAL (fs.target_sample_rate(), 47952);
 }
+
+class TestJob : public Job
+{
+public:
+       TestJob (shared_ptr<const FilmState> s, shared_ptr<const Options> o, Log* l, shared_ptr<Job> req)
+               : Job (s, o, l, req)
+       {
+
+       }
+
+       void set_finished_ok () {
+               set_state (FINISHED_OK);
+       }
+
+       void set_finished_error () {
+               set_state (FINISHED_ERROR);
+       }
+
+       void run ()
+       {
+               while (1) {
+                       if (finished ()) {
+                               return;
+                       }
+               }
+       }
+
+       string name () const {
+               return "";
+       }
+};
+
+BOOST_AUTO_TEST_CASE (job_manager_test)
+{
+       shared_ptr<const FilmState> s;
+       shared_ptr<const Options> o;
+       FileLog log ("build/test/job_manager_test.log");
+
+       /* Single job, no dependency */
+       shared_ptr<TestJob> a (new TestJob (s, o, &log, shared_ptr<Job> ()));
+
+       JobManager::instance()->add (a);
+       dvdomatic_sleep (1);
+       BOOST_CHECK_EQUAL (a->running (), true);
+       a->set_finished_ok ();
+       dvdomatic_sleep (2);
+       BOOST_CHECK_EQUAL (a->finished_ok(), true);
+
+       /* Two jobs, no dependency */
+       a.reset (new TestJob (s, o, &log, shared_ptr<Job> ()));
+       shared_ptr<TestJob> b (new TestJob (s, o, &log, shared_ptr<Job> ()));
+
+       JobManager::instance()->add (a);
+       JobManager::instance()->add (b);
+       dvdomatic_sleep (2);
+       BOOST_CHECK_EQUAL (a->running (), true);
+       BOOST_CHECK_EQUAL (b->running (), true);
+       a->set_finished_ok ();
+       b->set_finished_ok ();
+       dvdomatic_sleep (2);
+       BOOST_CHECK_EQUAL (a->finished_ok (), true);
+       BOOST_CHECK_EQUAL (b->finished_ok (), true);
+
+       /* Two jobs, dependency */
+       a.reset (new TestJob (s, o, &log, shared_ptr<Job> ()));
+       b.reset (new TestJob (s, o, &log, a));
+
+       JobManager::instance()->add (a);
+       JobManager::instance()->add (b);
+       dvdomatic_sleep (2);
+       BOOST_CHECK_EQUAL (a->running(), true);
+       BOOST_CHECK_EQUAL (b->running(), false);
+       a->set_finished_ok ();
+       dvdomatic_sleep (2);
+       BOOST_CHECK_EQUAL (a->finished_ok(), true);
+       BOOST_CHECK_EQUAL (b->running(), true);
+       b->set_finished_ok ();
+       dvdomatic_sleep (2);
+       BOOST_CHECK_EQUAL (b->finished_ok(), true);
+
+       /* Two jobs, dependency, first fails */
+       a.reset (new TestJob (s, o, &log, shared_ptr<Job> ()));
+       b.reset (new TestJob (s, o, &log, a));
+
+       JobManager::instance()->add (a);
+       JobManager::instance()->add (b);
+       dvdomatic_sleep (2);
+       BOOST_CHECK_EQUAL (a->running(), true);
+       BOOST_CHECK_EQUAL (b->running(), false);
+       a->set_finished_error ();
+       dvdomatic_sleep (2);
+       BOOST_CHECK_EQUAL (a->finished_in_error(), true);
+       BOOST_CHECK_EQUAL (b->running(), false);
+}