Another macOS std::list boost::thread SNAFU.
[dcpomatic.git] / src / lib / job_manager.cc
1 /*
2     Copyright (C) 2012-2018 Carl Hetherington <cth@carlh.net>
3
4     This file is part of DCP-o-matic.
5
6     DCP-o-matic is free software; you can redistribute it and/or modify
7     it under the terms of the GNU General Public License as published by
8     the Free Software Foundation; either version 2 of the License, or
9     (at your option) any later version.
10
11     DCP-o-matic is distributed in the hope that it will be useful,
12     but WITHOUT ANY WARRANTY; without even the implied warranty of
13     MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14     GNU General Public License for more details.
15
16     You should have received a copy of the GNU General Public License
17     along with DCP-o-matic.  If not, see <http://www.gnu.org/licenses/>.
18
19 */
20
21 /** @file  src/job_manager.cc
22  *  @brief A simple scheduler for jobs.
23  */
24
25 #include "job_manager.h"
26 #include "job.h"
27 #include "cross.h"
28 #include "analyse_audio_job.h"
29 #include "film.h"
30 #include <boost/thread.hpp>
31 #include <boost/foreach.hpp>
32 #include <iostream>
33
34 using std::string;
35 using std::list;
36 using std::cout;
37 using boost::shared_ptr;
38 using boost::weak_ptr;
39 using boost::function;
40 using boost::dynamic_pointer_cast;
41 using boost::optional;
42 using boost::bind;
43
44 JobManager* JobManager::_instance = 0;
45
46 JobManager::JobManager ()
47         : _terminate (false)
48         , _paused (false)
49 {
50
51 }
52
53 void
54 JobManager::start ()
55 {
56         _scheduler = boost::thread (boost::bind(&JobManager::scheduler, this));
57 #ifdef DCPOMATIC_LINUX
58         pthread_setname_np (_scheduler.native_handle(), "job-scheduler");
59 #endif
60 }
61
62 JobManager::~JobManager ()
63 {
64         BOOST_FOREACH (boost::signals2::connection& i, _connections) {
65                 i.disconnect ();
66         }
67
68         {
69                 boost::mutex::scoped_lock lm (_mutex);
70                 _terminate = true;
71                 _empty_condition.notify_all ();
72         }
73
74         if (_scheduler.joinable()) {
75                 try {
76                         _scheduler.join();
77                 } catch (...) {
78
79                 }
80         }
81 }
82
83 shared_ptr<Job>
84 JobManager::add (shared_ptr<Job> j)
85 {
86         {
87                 boost::mutex::scoped_lock lm (_mutex);
88                 _jobs.push_back (j);
89                 _empty_condition.notify_all ();
90         }
91
92         emit (boost::bind (boost::ref (JobAdded), weak_ptr<Job> (j)));
93
94         return j;
95 }
96
97 shared_ptr<Job>
98 JobManager::add_after (shared_ptr<Job> after, shared_ptr<Job> j)
99 {
100         {
101                 boost::mutex::scoped_lock lm (_mutex);
102                 list<shared_ptr<Job> >::iterator i = find (_jobs.begin(), _jobs.end(), after);
103                 DCPOMATIC_ASSERT (i != _jobs.end());
104                 _jobs.insert (i, j);
105                 _empty_condition.notify_all ();
106         }
107
108         emit (boost::bind (boost::ref (JobAdded), weak_ptr<Job> (j)));
109
110         return j;
111 }
112
113 list<shared_ptr<Job> >
114 JobManager::get () const
115 {
116         boost::mutex::scoped_lock lm (_mutex);
117         return _jobs;
118 }
119
120 bool
121 JobManager::work_to_do () const
122 {
123         boost::mutex::scoped_lock lm (_mutex);
124         list<shared_ptr<Job> >::const_iterator i = _jobs.begin();
125         while (i != _jobs.end() && (*i)->finished()) {
126                 ++i;
127         }
128
129         return i != _jobs.end ();
130 }
131
132 bool
133 JobManager::errors () const
134 {
135         boost::mutex::scoped_lock lm (_mutex);
136         BOOST_FOREACH (shared_ptr<Job> i, _jobs) {
137                 if (i->finished_in_error ()) {
138                         return true;
139                 }
140         }
141
142         return false;
143 }
144
145 void
146 JobManager::scheduler ()
147 {
148         while (true) {
149
150                 boost::mutex::scoped_lock lm (_mutex);
151
152                 while (true) {
153                         bool have_new = false;
154                         bool have_running = false;
155                         BOOST_FOREACH (shared_ptr<Job> i, _jobs) {
156                                 if (i->running()) {
157                                         have_running = true;
158                                 }
159                                 if (i->is_new()) {
160                                         have_new = true;
161                                 }
162                         }
163
164                         if ((!have_running && have_new) || _terminate) {
165                                 break;
166                         }
167
168                         _empty_condition.wait (lm);
169                 }
170
171                 if (_terminate) {
172                         break;
173                 }
174
175                 BOOST_FOREACH (shared_ptr<Job> i, _jobs) {
176                         if (i->is_new()) {
177                                 _connections.push_back (i->FinishedImmediate.connect(bind(&JobManager::job_finished, this)));
178                                 i->start ();
179                                 emit (boost::bind (boost::ref (ActiveJobsChanged), _last_active_job, i->json_name()));
180                                 _last_active_job = i->json_name ();
181                                 /* Only start one job at once */
182                                 break;
183                         }
184                 }
185         }
186 }
187
188 void
189 JobManager::job_finished ()
190 {
191         {
192                 boost::mutex::scoped_lock lm (_mutex);
193                 emit (boost::bind (boost::ref (ActiveJobsChanged), _last_active_job, optional<string>()));
194                 _last_active_job = optional<string>();
195         }
196
197         _empty_condition.notify_all ();
198 }
199
200 JobManager *
201 JobManager::instance ()
202 {
203         if (_instance == 0) {
204                 _instance = new JobManager ();
205                 _instance->start ();
206         }
207
208         return _instance;
209 }
210
211 void
212 JobManager::drop ()
213 {
214         delete _instance;
215         _instance = 0;
216 }
217
218 void
219 JobManager::analyse_audio (
220         shared_ptr<const Film> film,
221         shared_ptr<const Playlist> playlist,
222         bool from_zero,
223         boost::signals2::connection& connection,
224         function<void()> ready
225         )
226 {
227         {
228                 boost::mutex::scoped_lock lm (_mutex);
229
230                 BOOST_FOREACH (shared_ptr<Job> i, _jobs) {
231                         shared_ptr<AnalyseAudioJob> a = dynamic_pointer_cast<AnalyseAudioJob> (i);
232                         if (a && a->path() == film->audio_analysis_path(playlist)) {
233                                 i->when_finished (connection, ready);
234                                 return;
235                         }
236                 }
237         }
238
239         shared_ptr<AnalyseAudioJob> job;
240
241         {
242                 boost::mutex::scoped_lock lm (_mutex);
243
244                 job.reset (new AnalyseAudioJob (film, playlist, from_zero));
245                 connection = job->Finished.connect (ready);
246                 _jobs.push_back (job);
247                 _empty_condition.notify_all ();
248         }
249
250         emit (boost::bind (boost::ref (JobAdded), weak_ptr<Job> (job)));
251 }
252
253 void
254 JobManager::increase_priority (shared_ptr<Job> job)
255 {
256         bool changed = false;
257
258         {
259                 boost::mutex::scoped_lock lm (_mutex);
260                 list<shared_ptr<Job> >::iterator last = _jobs.end ();
261                 for (list<shared_ptr<Job> >::iterator i = _jobs.begin(); i != _jobs.end(); ++i) {
262                         if (*i == job && last != _jobs.end()) {
263                                 swap (*i, *last);
264                                 changed = true;
265                                 break;
266                         }
267                         last = i;
268                 }
269         }
270
271         if (changed) {
272                 priority_changed ();
273         }
274 }
275
276 void
277 JobManager::priority_changed ()
278 {
279         {
280                 boost::mutex::scoped_lock lm (_mutex);
281
282                 bool first = true;
283                 BOOST_FOREACH (shared_ptr<Job> i, _jobs) {
284                         if (first) {
285                                 if (i->is_new ()) {
286                                         i->start ();
287                                 } else if (i->paused_by_priority ()) {
288                                         i->resume ();
289                                 }
290                                 first = false;
291                         } else {
292                                 if (i->running ()) {
293                                         i->pause_by_priority ();
294                                 }
295                         }
296                 }
297         }
298
299         emit (boost::bind (boost::ref (JobsReordered)));
300 }
301
302 void
303 JobManager::decrease_priority (shared_ptr<Job> job)
304 {
305         bool changed = false;
306
307         {
308                 boost::mutex::scoped_lock lm (_mutex);
309                 for (list<shared_ptr<Job> >::iterator i = _jobs.begin(); i != _jobs.end(); ++i) {
310                         list<shared_ptr<Job> >::iterator next = i;
311                         ++next;
312                         if (*i == job && next != _jobs.end()) {
313                                 swap (*i, *next);
314                                 changed = true;
315                                 break;
316                         }
317                 }
318         }
319
320         if (changed) {
321                 priority_changed ();
322         }
323 }
324
325 void
326 JobManager::pause ()
327 {
328         boost::mutex::scoped_lock lm (_mutex);
329
330         if (_paused) {
331                 return;
332         }
333
334         BOOST_FOREACH (shared_ptr<Job> i, _jobs) {
335                 if (i->pause_by_user()) {
336                         _paused_job = i;
337                 }
338         }
339
340         _paused = true;
341 }
342
343 void
344 JobManager::resume ()
345 {
346         boost::mutex::scoped_lock lm (_mutex);
347         if (!_paused) {
348                 return;
349         }
350
351         if (_paused_job) {
352                 _paused_job->resume ();
353         }
354
355         _paused_job.reset ();
356         _paused = false;
357 }