1724879d0656ffe38b0a6962374a1489744af377
[dcpomatic.git] / src / lib / job_manager.cc
1 /*
2     Copyright (C) 2012-2020 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file  src/job_manager.cc
22  *  @brief A simple scheduler for jobs.
23  */
24
25 #include "job_manager.h"
26 #include "job.h"
27 #include "cross.h"
28 #include "analyse_audio_job.h"
29 #include "analyse_subtitles_job.h"
30 #include "film.h"
31 #include <boost/thread.hpp>
32 #include <iostream>
33
34 using std::string;
35 using std::list;
36 using std::cout;
37 using std::shared_ptr;
38 using std::weak_ptr;
39 using boost::function;
40 using std::dynamic_pointer_cast;
41 using boost::optional;
42 using boost::bind;
43
44 JobManager* JobManager::_instance = 0;
45
46 JobManager::JobManager ()
47         : _terminate (false)
48         , _paused (false)
49 {
50
51 }
52
53 void
54 JobManager::start ()
55 {
56         _scheduler = boost::thread (boost::bind(&JobManager::scheduler, this));
57 #ifdef DCPOMATIC_LINUX
58         pthread_setname_np (_scheduler.native_handle(), "job-scheduler");
59 #endif
60 }
61
62 JobManager::~JobManager ()
63 {
64         boost::this_thread::disable_interruption dis;
65
66         for (auto& i: _connections) {
67                 i.disconnect ();
68         }
69
70         {
71                 boost::mutex::scoped_lock lm (_mutex);
72                 _terminate = true;
73                 _empty_condition.notify_all ();
74         }
75
76         try {
77                 _scheduler.join();
78         } catch (...) {}
79 }
80
81 shared_ptr<Job>
82 JobManager::add (shared_ptr<Job> j)
83 {
84         {
85                 boost::mutex::scoped_lock lm (_mutex);
86                 _jobs.push_back (j);
87                 _empty_condition.notify_all ();
88         }
89
90         emit (boost::bind (boost::ref (JobAdded), weak_ptr<Job> (j)));
91
92         return j;
93 }
94
95 shared_ptr<Job>
96 JobManager::add_after (shared_ptr<Job> after, shared_ptr<Job> j)
97 {
98         {
99                 boost::mutex::scoped_lock lm (_mutex);
100                 list<shared_ptr<Job> >::iterator i = find (_jobs.begin(), _jobs.end(), after);
101                 DCPOMATIC_ASSERT (i != _jobs.end());
102                 _jobs.insert (i, j);
103                 _empty_condition.notify_all ();
104         }
105
106         emit (boost::bind (boost::ref (JobAdded), weak_ptr<Job> (j)));
107
108         return j;
109 }
110
111 list<shared_ptr<Job> >
112 JobManager::get () const
113 {
114         boost::mutex::scoped_lock lm (_mutex);
115         return _jobs;
116 }
117
118 bool
119 JobManager::work_to_do () const
120 {
121         boost::mutex::scoped_lock lm (_mutex);
122         list<shared_ptr<Job> >::const_iterator i = _jobs.begin();
123         while (i != _jobs.end() && (*i)->finished()) {
124                 ++i;
125         }
126
127         return i != _jobs.end ();
128 }
129
130 bool
131 JobManager::errors () const
132 {
133         boost::mutex::scoped_lock lm (_mutex);
134         for (auto i: _jobs) {
135                 if (i->finished_in_error ()) {
136                         return true;
137                 }
138         }
139
140         return false;
141 }
142
143 void
144 JobManager::scheduler ()
145 {
146         while (true) {
147
148                 boost::mutex::scoped_lock lm (_mutex);
149
150                 while (true) {
151                         bool have_new = false;
152                         bool have_running = false;
153                         for (auto i: _jobs) {
154                                 if (i->running()) {
155                                         have_running = true;
156                                 }
157                                 if (i->is_new()) {
158                                         have_new = true;
159                                 }
160                         }
161
162                         if ((!have_running && have_new) || _terminate) {
163                                 break;
164                         }
165
166                         _empty_condition.wait (lm);
167                 }
168
169                 if (_terminate) {
170                         break;
171                 }
172
173                 for (auto i: _jobs) {
174                         if (i->is_new()) {
175                                 _connections.push_back (i->FinishedImmediate.connect(bind(&JobManager::job_finished, this)));
176                                 i->start ();
177                                 emit (boost::bind (boost::ref (ActiveJobsChanged), _last_active_job, i->json_name()));
178                                 _last_active_job = i->json_name ();
179                                 /* Only start one job at once */
180                                 break;
181                         }
182                 }
183         }
184 }
185
186 void
187 JobManager::job_finished ()
188 {
189         {
190                 boost::mutex::scoped_lock lm (_mutex);
191                 emit (boost::bind (boost::ref (ActiveJobsChanged), _last_active_job, optional<string>()));
192                 _last_active_job = optional<string>();
193         }
194
195         _empty_condition.notify_all ();
196 }
197
198 JobManager *
199 JobManager::instance ()
200 {
201         if (_instance == 0) {
202                 _instance = new JobManager ();
203                 _instance->start ();
204         }
205
206         return _instance;
207 }
208
209 void
210 JobManager::drop ()
211 {
212         delete _instance;
213         _instance = 0;
214 }
215
216 void
217 JobManager::analyse_audio (
218         shared_ptr<const Film> film,
219         shared_ptr<const Playlist> playlist,
220         bool from_zero,
221         boost::signals2::connection& connection,
222         function<void()> ready
223         )
224 {
225         {
226                 boost::mutex::scoped_lock lm (_mutex);
227
228                 for (auto i: _jobs) {
229                         shared_ptr<AnalyseAudioJob> a = dynamic_pointer_cast<AnalyseAudioJob> (i);
230                         if (a && a->path() == film->audio_analysis_path(playlist) && !i->finished_cancelled()) {
231                                 i->when_finished (connection, ready);
232                                 return;
233                         }
234                 }
235         }
236
237         shared_ptr<AnalyseAudioJob> job;
238
239         {
240                 boost::mutex::scoped_lock lm (_mutex);
241
242                 job.reset (new AnalyseAudioJob (film, playlist, from_zero));
243                 connection = job->Finished.connect (ready);
244                 _jobs.push_back (job);
245                 _empty_condition.notify_all ();
246         }
247
248         emit (boost::bind (boost::ref (JobAdded), weak_ptr<Job> (job)));
249 }
250
251
252 void
253 JobManager::analyse_subtitles (
254         shared_ptr<const Film> film,
255         shared_ptr<Content> content,
256         boost::signals2::connection& connection,
257         function<void()> ready
258         )
259 {
260         {
261                 boost::mutex::scoped_lock lm (_mutex);
262
263                 for (auto i: _jobs) {
264                         shared_ptr<AnalyseSubtitlesJob> a = dynamic_pointer_cast<AnalyseSubtitlesJob> (i);
265                         if (a && a->path() == film->subtitle_analysis_path(content)) {
266                                 i->when_finished (connection, ready);
267                                 return;
268                         }
269                 }
270         }
271
272         shared_ptr<AnalyseSubtitlesJob> job;
273
274         {
275                 boost::mutex::scoped_lock lm (_mutex);
276
277                 job.reset (new AnalyseSubtitlesJob(film, content));
278                 connection = job->Finished.connect (ready);
279                 _jobs.push_back (job);
280                 _empty_condition.notify_all ();
281         }
282
283         emit (boost::bind (boost::ref (JobAdded), weak_ptr<Job> (job)));
284 }
285
286
287 void
288 JobManager::increase_priority (shared_ptr<Job> job)
289 {
290         bool changed = false;
291
292         {
293                 boost::mutex::scoped_lock lm (_mutex);
294                 list<shared_ptr<Job> >::iterator last = _jobs.end ();
295                 for (list<shared_ptr<Job> >::iterator i = _jobs.begin(); i != _jobs.end(); ++i) {
296                         if (*i == job && last != _jobs.end()) {
297                                 swap (*i, *last);
298                                 changed = true;
299                                 break;
300                         }
301                         last = i;
302                 }
303         }
304
305         if (changed) {
306                 priority_changed ();
307         }
308 }
309
310 void
311 JobManager::priority_changed ()
312 {
313         {
314                 boost::mutex::scoped_lock lm (_mutex);
315
316                 bool first = true;
317                 for (auto i: _jobs) {
318                         if (first) {
319                                 if (i->is_new ()) {
320                                         i->start ();
321                                 } else if (i->paused_by_priority ()) {
322                                         i->resume ();
323                                 }
324                                 first = false;
325                         } else {
326                                 if (i->running ()) {
327                                         i->pause_by_priority ();
328                                 }
329                         }
330                 }
331         }
332
333         emit (boost::bind (boost::ref (JobsReordered)));
334 }
335
336 void
337 JobManager::decrease_priority (shared_ptr<Job> job)
338 {
339         bool changed = false;
340
341         {
342                 boost::mutex::scoped_lock lm (_mutex);
343                 for (list<shared_ptr<Job> >::iterator i = _jobs.begin(); i != _jobs.end(); ++i) {
344                         list<shared_ptr<Job> >::iterator next = i;
345                         ++next;
346                         if (*i == job && next != _jobs.end()) {
347                                 swap (*i, *next);
348                                 changed = true;
349                                 break;
350                         }
351                 }
352         }
353
354         if (changed) {
355                 priority_changed ();
356         }
357 }
358
359 void
360 JobManager::pause ()
361 {
362         boost::mutex::scoped_lock lm (_mutex);
363
364         if (_paused) {
365                 return;
366         }
367
368         for (auto i: _jobs) {
369                 if (i->pause_by_user()) {
370                         _paused_job = i;
371                 }
372         }
373
374         _paused = true;
375 }
376
377 void
378 JobManager::resume ()
379 {
380         boost::mutex::scoped_lock lm (_mutex);
381         if (!_paused) {
382                 return;
383         }
384
385         if (_paused_job) {
386                 _paused_job->resume ();
387         }
388
389         _paused_job.reset ();
390         _paused = false;
391 }