Extract common code out into kdm_for_screen()
[dcpomatic.git] / src / lib / job_manager.cc
1 /*
2     Copyright (C) 2012-2020 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file  src/job_manager.cc
22  *  @brief A simple scheduler for jobs.
23  */
24
25 #include "job_manager.h"
26 #include "job.h"
27 #include "cross.h"
28 #include "analyse_audio_job.h"
29 #include "analyse_subtitles_job.h"
30 #include "film.h"
31 #include <boost/thread.hpp>
32 #include <boost/foreach.hpp>
33 #include <iostream>
34
35 using std::string;
36 using std::list;
37 using std::cout;
38 using boost::shared_ptr;
39 using boost::weak_ptr;
40 using boost::function;
41 using boost::dynamic_pointer_cast;
42 using boost::optional;
43 using boost::bind;
44
45 JobManager* JobManager::_instance = 0;
46
47 JobManager::JobManager ()
48         : _terminate (false)
49         , _paused (false)
50 {
51
52 }
53
54 void
55 JobManager::start ()
56 {
57         _scheduler = boost::thread (boost::bind(&JobManager::scheduler, this));
58 #ifdef DCPOMATIC_LINUX
59         pthread_setname_np (_scheduler.native_handle(), "job-scheduler");
60 #endif
61 }
62
63 JobManager::~JobManager ()
64 {
65         BOOST_FOREACH (boost::signals2::connection& i, _connections) {
66                 i.disconnect ();
67         }
68
69         {
70                 boost::mutex::scoped_lock lm (_mutex);
71                 _terminate = true;
72                 _empty_condition.notify_all ();
73         }
74
75         if (_scheduler.joinable()) {
76                 try {
77                         _scheduler.join();
78                 } catch (...) {
79
80                 }
81         }
82 }
83
84 shared_ptr<Job>
85 JobManager::add (shared_ptr<Job> j)
86 {
87         {
88                 boost::mutex::scoped_lock lm (_mutex);
89                 _jobs.push_back (j);
90                 _empty_condition.notify_all ();
91         }
92
93         emit (boost::bind (boost::ref (JobAdded), weak_ptr<Job> (j)));
94
95         return j;
96 }
97
98 shared_ptr<Job>
99 JobManager::add_after (shared_ptr<Job> after, shared_ptr<Job> j)
100 {
101         {
102                 boost::mutex::scoped_lock lm (_mutex);
103                 list<shared_ptr<Job> >::iterator i = find (_jobs.begin(), _jobs.end(), after);
104                 DCPOMATIC_ASSERT (i != _jobs.end());
105                 _jobs.insert (i, j);
106                 _empty_condition.notify_all ();
107         }
108
109         emit (boost::bind (boost::ref (JobAdded), weak_ptr<Job> (j)));
110
111         return j;
112 }
113
114 list<shared_ptr<Job> >
115 JobManager::get () const
116 {
117         boost::mutex::scoped_lock lm (_mutex);
118         return _jobs;
119 }
120
121 bool
122 JobManager::work_to_do () const
123 {
124         boost::mutex::scoped_lock lm (_mutex);
125         list<shared_ptr<Job> >::const_iterator i = _jobs.begin();
126         while (i != _jobs.end() && (*i)->finished()) {
127                 ++i;
128         }
129
130         return i != _jobs.end ();
131 }
132
133 bool
134 JobManager::errors () const
135 {
136         boost::mutex::scoped_lock lm (_mutex);
137         BOOST_FOREACH (shared_ptr<Job> i, _jobs) {
138                 if (i->finished_in_error ()) {
139                         return true;
140                 }
141         }
142
143         return false;
144 }
145
146 void
147 JobManager::scheduler ()
148 {
149         while (true) {
150
151                 boost::mutex::scoped_lock lm (_mutex);
152
153                 while (true) {
154                         bool have_new = false;
155                         bool have_running = false;
156                         BOOST_FOREACH (shared_ptr<Job> i, _jobs) {
157                                 if (i->running()) {
158                                         have_running = true;
159                                 }
160                                 if (i->is_new()) {
161                                         have_new = true;
162                                 }
163                         }
164
165                         if ((!have_running && have_new) || _terminate) {
166                                 break;
167                         }
168
169                         _empty_condition.wait (lm);
170                 }
171
172                 if (_terminate) {
173                         break;
174                 }
175
176                 BOOST_FOREACH (shared_ptr<Job> i, _jobs) {
177                         if (i->is_new()) {
178                                 _connections.push_back (i->FinishedImmediate.connect(bind(&JobManager::job_finished, this)));
179                                 i->start ();
180                                 emit (boost::bind (boost::ref (ActiveJobsChanged), _last_active_job, i->json_name()));
181                                 _last_active_job = i->json_name ();
182                                 /* Only start one job at once */
183                                 break;
184                         }
185                 }
186         }
187 }
188
189 void
190 JobManager::job_finished ()
191 {
192         {
193                 boost::mutex::scoped_lock lm (_mutex);
194                 emit (boost::bind (boost::ref (ActiveJobsChanged), _last_active_job, optional<string>()));
195                 _last_active_job = optional<string>();
196         }
197
198         _empty_condition.notify_all ();
199 }
200
201 JobManager *
202 JobManager::instance ()
203 {
204         if (_instance == 0) {
205                 _instance = new JobManager ();
206                 _instance->start ();
207         }
208
209         return _instance;
210 }
211
212 void
213 JobManager::drop ()
214 {
215         delete _instance;
216         _instance = 0;
217 }
218
219 void
220 JobManager::analyse_audio (
221         shared_ptr<const Film> film,
222         shared_ptr<const Playlist> playlist,
223         bool from_zero,
224         boost::signals2::connection& connection,
225         function<void()> ready
226         )
227 {
228         {
229                 boost::mutex::scoped_lock lm (_mutex);
230
231                 BOOST_FOREACH (shared_ptr<Job> i, _jobs) {
232                         shared_ptr<AnalyseAudioJob> a = dynamic_pointer_cast<AnalyseAudioJob> (i);
233                         if (a && a->path() == film->audio_analysis_path(playlist)) {
234                                 i->when_finished (connection, ready);
235                                 return;
236                         }
237                 }
238         }
239
240         shared_ptr<AnalyseAudioJob> job;
241
242         {
243                 boost::mutex::scoped_lock lm (_mutex);
244
245                 job.reset (new AnalyseAudioJob (film, playlist, from_zero));
246                 connection = job->Finished.connect (ready);
247                 _jobs.push_back (job);
248                 _empty_condition.notify_all ();
249         }
250
251         emit (boost::bind (boost::ref (JobAdded), weak_ptr<Job> (job)));
252 }
253
254
255 void
256 JobManager::analyse_subtitles (
257         shared_ptr<const Film> film,
258         shared_ptr<Content> content,
259         boost::signals2::connection& connection,
260         function<void()> ready
261         )
262 {
263         {
264                 boost::mutex::scoped_lock lm (_mutex);
265
266                 BOOST_FOREACH (shared_ptr<Job> i, _jobs) {
267                         shared_ptr<AnalyseSubtitlesJob> a = dynamic_pointer_cast<AnalyseSubtitlesJob> (i);
268                         if (a && a->path() == film->subtitle_analysis_path(content)) {
269                                 i->when_finished (connection, ready);
270                                 return;
271                         }
272                 }
273         }
274
275         shared_ptr<AnalyseSubtitlesJob> job;
276
277         {
278                 boost::mutex::scoped_lock lm (_mutex);
279
280                 job.reset (new AnalyseSubtitlesJob(film, content));
281                 connection = job->Finished.connect (ready);
282                 _jobs.push_back (job);
283                 _empty_condition.notify_all ();
284         }
285
286         emit (boost::bind (boost::ref (JobAdded), weak_ptr<Job> (job)));
287 }
288
289
290 void
291 JobManager::increase_priority (shared_ptr<Job> job)
292 {
293         bool changed = false;
294
295         {
296                 boost::mutex::scoped_lock lm (_mutex);
297                 list<shared_ptr<Job> >::iterator last = _jobs.end ();
298                 for (list<shared_ptr<Job> >::iterator i = _jobs.begin(); i != _jobs.end(); ++i) {
299                         if (*i == job && last != _jobs.end()) {
300                                 swap (*i, *last);
301                                 changed = true;
302                                 break;
303                         }
304                         last = i;
305                 }
306         }
307
308         if (changed) {
309                 priority_changed ();
310         }
311 }
312
313 void
314 JobManager::priority_changed ()
315 {
316         {
317                 boost::mutex::scoped_lock lm (_mutex);
318
319                 bool first = true;
320                 BOOST_FOREACH (shared_ptr<Job> i, _jobs) {
321                         if (first) {
322                                 if (i->is_new ()) {
323                                         i->start ();
324                                 } else if (i->paused_by_priority ()) {
325                                         i->resume ();
326                                 }
327                                 first = false;
328                         } else {
329                                 if (i->running ()) {
330                                         i->pause_by_priority ();
331                                 }
332                         }
333                 }
334         }
335
336         emit (boost::bind (boost::ref (JobsReordered)));
337 }
338
339 void
340 JobManager::decrease_priority (shared_ptr<Job> job)
341 {
342         bool changed = false;
343
344         {
345                 boost::mutex::scoped_lock lm (_mutex);
346                 for (list<shared_ptr<Job> >::iterator i = _jobs.begin(); i != _jobs.end(); ++i) {
347                         list<shared_ptr<Job> >::iterator next = i;
348                         ++next;
349                         if (*i == job && next != _jobs.end()) {
350                                 swap (*i, *next);
351                                 changed = true;
352                                 break;
353                         }
354                 }
355         }
356
357         if (changed) {
358                 priority_changed ();
359         }
360 }
361
362 void
363 JobManager::pause ()
364 {
365         boost::mutex::scoped_lock lm (_mutex);
366
367         if (_paused) {
368                 return;
369         }
370
371         BOOST_FOREACH (shared_ptr<Job> i, _jobs) {
372                 if (i->pause_by_user()) {
373                         _paused_job = i;
374                 }
375         }
376
377         _paused = true;
378 }
379
380 void
381 JobManager::resume ()
382 {
383         boost::mutex::scoped_lock lm (_mutex);
384         if (!_paused) {
385                 return;
386         }
387
388         if (_paused_job) {
389                 _paused_job->resume ();
390         }
391
392         _paused_job.reset ();
393         _paused = false;
394 }