Fix hang on session going away when there is nothing to process. Fixes #3284.
[ardour.git] / libs / ardour / graph.cc
1 /*
2   Copyright (C) 2010 Paul Davis
3   Author: Torben Hohn
4
5   This program is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published by
7   the Free Software Foundation; either version 2 of the License, or
8   (at your option) any later version.
9
10   This program is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with this program; if not, write to the Free Software
17   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
19 */
20 #include <stdio.h>
21 #include <cmath>
22
23 #include "pbd/compose.h"
24 #include "pbd/cpus.h"
25
26 #include "ardour/debug.h"
27 #include "ardour/graph.h"
28 #include "ardour/types.h"
29 #include "ardour/session.h"
30 #include "ardour/route.h"
31 #include "ardour/process_thread.h"
32 #include "ardour/audioengine.h"
33
34 #include <jack/thread.h>
35
36 #include "i18n.h"
37
38 using namespace ARDOUR;
39 using namespace PBD;
40 using namespace std;
41
42
43 Graph::Graph (Session & session) 
44         : SessionHandleRef (session) 
45 {
46         pthread_mutex_init( &_trigger_mutex, NULL);
47         sem_init( &_execution_sem, 0, 0 );
48
49         sem_init( &_callback_start_sem, 0, 0 );
50         sem_init( &_callback_done_sem,  0, 0 );
51         sem_init( &_cleanup_sem,  0, 0 );
52
53         _execution_tokens = 0;
54
55         _current_chain = 0;
56         _pending_chain = 0;
57         _setup_chain   = 1;
58         _quit_threads = false;
59         _graph_empty = true;
60
61         int num_cpu = hardware_concurrency();
62         int num_threads = num_cpu;
63         int pu = Config->get_processor_usage ();
64
65         if (pu < 0) {
66                 /* pu is negative: use "pu" less cores for DSP than appear to be available
67                  */
68
69                 if (-pu < num_threads) {
70                         num_threads += pu; 
71                 } else {
72                         num_threads = 1;
73                 }
74         } else {
75                 /* use "pu" cores, if available
76                  */
77
78                 if (pu <= num_threads) {
79                         num_threads = pu;
80                 } 
81         }
82
83         info << string_compose (_("Using %2 threads on %1 CPUs"), num_cpu, num_threads) << endmsg;
84
85         _thread_list.push_back (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::main_thread, this), 100000));
86
87         for (int i = 1; i < num_threads; ++i) {
88                 _thread_list.push_back (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::helper_thread, this), 100000));
89         }
90 }
91
92 void
93 Graph::session_going_away()
94 {
95         _quit_threads = true;
96
97         for (unsigned int i=0; i<_thread_list.size(); i++) {
98                 sem_post( &_execution_sem);
99         }
100
101         sem_post( &_callback_start_sem);
102
103         for (list<pthread_t>::iterator i = _thread_list.begin(); i != _thread_list.end(); i++) {
104                 void* status;
105                 pthread_join (*i, &status);
106         }
107
108         // now drop all references on the nodes.
109         _nodes_rt[0].clear();
110         _nodes_rt[1].clear();
111         _init_trigger_list[0].clear();
112         _init_trigger_list[1].clear();
113         _trigger_queue.clear();
114 }
115
116 void
117 Graph::clear_other_chain ()
118 {
119         Glib::Mutex::Lock ls (_swap_mutex);
120
121         while (1) {
122                 if (_setup_chain != _pending_chain) {
123
124                         for (node_list_t::iterator ni=_nodes_rt[_setup_chain].begin(); ni!=_nodes_rt[_setup_chain].end(); ni++) {
125                                 (*ni)->_activation_set[_setup_chain].clear();
126                         }
127
128                         _nodes_rt[_setup_chain].clear ();
129                         _init_trigger_list[_setup_chain].clear ();
130                         break;
131                 }
132                 /* setup chain == pending chain - we have
133                    to wait till this is no longer true.
134                 */
135                 _cleanup_cond.wait (_swap_mutex);                
136         }
137 }
138
139 void
140 Graph::prep()
141 {
142         node_list_t::iterator i;
143         int chain;
144
145         if (_swap_mutex.trylock()) {
146                 // we got the swap mutex.
147                 if (_current_chain != _pending_chain)
148                 {
149                         // printf ("chain swap ! %d -> %d\n", _current_chain, _pending_chain);
150                         _setup_chain = _current_chain;
151                         _current_chain = _pending_chain;
152                         _cleanup_cond.signal ();
153                 }
154                 _swap_mutex.unlock ();
155         }
156
157         chain = _current_chain;
158
159         _graph_empty = true;
160         for (i=_nodes_rt[chain].begin(); i!=_nodes_rt[chain].end(); i++) {
161                 (*i)->prep( chain);
162                 _graph_empty = false;
163         }
164         _finished_refcount = _init_finished_refcount[chain];
165
166         for (i=_init_trigger_list[chain].begin(); i!=_init_trigger_list[chain].end(); i++) {
167                 this->trigger( i->get() );
168         }
169 }
170
171 void
172 Graph::trigger (GraphNode* n)
173 {
174         pthread_mutex_lock (&_trigger_mutex);
175         _trigger_queue.push_back( n);
176         pthread_mutex_unlock (&_trigger_mutex);
177 }
178
179 void
180 Graph::dec_ref()
181 {
182         if (g_atomic_int_dec_and_test (&_finished_refcount)) {
183
184                 // ok... this cycle is finished now.
185                 // we are the only thread alive.
186         
187                 this->restart_cycle();
188         }
189 }
190
191 void
192 Graph::restart_cycle()
193 {
194         //printf( "cycle_done chain: %d\n", _current_chain);
195
196         // we are through. wakeup our caller.
197   again:
198         sem_post( &_callback_done_sem);
199
200         // block until we are triggered.
201         sem_wait( &_callback_start_sem);
202         if (_quit_threads)
203                 return;
204
205         //printf( "cycle_start\n" );
206
207         this->prep();
208         if (_graph_empty)
209                 goto again;
210         //printf( "cycle_start chain: %d\n", _current_chain);
211
212         // returning will restart the cycle.
213         //  starting with waking up the others.
214 }
215
216 static bool
217 is_feedback (boost::shared_ptr<RouteList> routelist, Route* from, boost::shared_ptr<Route> to)
218 {
219         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
220                 if ((*ri).get() == from)
221                         return false;
222                 if ((*ri) == to)
223                         return true;
224         }
225         assert(0);
226         return false;
227 }
228
229 static bool
230 is_feedback (boost::shared_ptr<RouteList> routelist, boost::shared_ptr<Route> from, Route* to)
231 {
232         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
233                 if ((*ri).get() == to)
234                         return true;
235                 if ((*ri) == from)
236                         return false;
237         }
238         assert(0);
239         return false;
240 }
241
242 void
243 Graph::rechain (boost::shared_ptr<RouteList> routelist)
244 {
245         node_list_t::iterator ni;
246         Glib::Mutex::Lock ls (_swap_mutex);
247
248         int chain = _setup_chain;
249         DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
250         // set all refcounts to 0;
251
252         _init_finished_refcount[chain] = 0;
253         _init_trigger_list[chain].clear();
254
255         _nodes_rt[chain].clear();
256
257         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
258                 node_ptr_t n = boost::dynamic_pointer_cast<GraphNode> (*ri);
259
260                 n->_init_refcount[chain] = 0;
261                 n->_activation_set[chain].clear();
262                 _nodes_rt[chain].push_back(n);
263         }
264
265         // now add refs for the connections.
266
267         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
268                 bool has_input  = false;
269                 bool has_output = false;
270
271                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
272
273                 for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
274                         if (rp->direct_feeds (*ri)) {
275                                 if (is_feedback (routelist, rp.get(), *ri)) {
276                                         continue; 
277                                 }
278
279                                 has_output = true;
280                                 (*ni)->_activation_set[chain].insert (boost::dynamic_pointer_cast<GraphNode> (*ri) );
281                         }
282                 }
283
284                 for (Route::FedBy::iterator fi=rp->fed_by().begin(); fi!=rp->fed_by().end(); fi++) {
285                         if (boost::shared_ptr<Route> r = fi->r.lock()) {
286                                 if (!is_feedback (routelist, r, rp.get())) {
287                                         has_input = true;
288                                 }
289                         }
290                 }
291
292                 for (node_set_t::iterator ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
293                         (*ai)->_init_refcount[chain] += 1;
294                 }
295
296                 if (!has_input)
297                         _init_trigger_list[chain].push_back (*ni);
298
299                 if (!has_output)
300                         _init_finished_refcount[chain] += 1;
301         } 
302
303         _pending_chain = chain;
304         dump(chain);
305 }
306
307 bool
308 Graph::run_one()
309 {
310         GraphNode* to_run;
311
312         pthread_mutex_lock (&_trigger_mutex);
313         if (_trigger_queue.size()) {
314                 to_run = _trigger_queue.back();
315                 _trigger_queue.pop_back();
316         } else {
317                 to_run = 0;
318         }
319
320         int wakeup = min ((int) _execution_tokens, (int) _trigger_queue.size());
321         _execution_tokens -= wakeup;
322
323         for (int i=0; i<wakeup; i++ ) {
324                 sem_post (&_execution_sem);
325         }
326
327         while (to_run == 0) {
328                 _execution_tokens += 1;
329                 pthread_mutex_unlock (&_trigger_mutex);
330                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_self()));
331                 sem_wait (&_execution_sem);
332                 if (_quit_threads)
333                         return true;
334                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_self()));
335                 pthread_mutex_lock (&_trigger_mutex);
336                 if (_trigger_queue.size()) {
337                         to_run = _trigger_queue.back();
338                         _trigger_queue.pop_back();
339                 }
340         }
341         pthread_mutex_unlock (&_trigger_mutex);
342
343         to_run->process();
344         to_run->finish (_current_chain);
345
346         return false;
347 }
348
349 static void get_rt()
350 {
351         if (!jack_is_realtime (AudioEngine::instance()->jack())) {
352                 return;
353         }
354
355         int priority = jack_client_real_time_priority (AudioEngine::instance()->jack());
356
357         if (priority) {
358                 struct sched_param rtparam;
359         
360                 memset (&rtparam, 0, sizeof (rtparam));
361                 rtparam.sched_priority = priority;
362         
363                 pthread_setschedparam (pthread_self(), SCHED_FIFO, &rtparam);
364         }
365 }
366
367 void
368 Graph::helper_thread()
369 {
370         ProcessThread *pt = new ProcessThread;
371
372         pt->get_buffers();
373         get_rt();
374
375         while(1) {
376                 if (run_one()) {
377                         break;
378                 }
379         }
380
381         pt->drop_buffers();
382 }
383
384 void
385 Graph::main_thread()
386 {
387         ProcessThread *pt = new ProcessThread;
388
389         pt->get_buffers();
390         get_rt();
391
392   again:
393         sem_wait (&_callback_start_sem);
394
395         this->prep();
396
397         if (_graph_empty && !_quit_threads) {
398                 sem_post (&_callback_done_sem);
399                 goto again;
400         }
401
402         while (1) {
403                 if (run_one()) {
404                         break;
405                 }
406         }
407
408         pt->drop_buffers();
409 }
410
411 void
412 Graph::dump (int chain)
413 {
414 #ifndef NDEBUG
415         node_list_t::iterator ni;
416         node_set_t::iterator ai;
417
418         chain = _pending_chain;
419
420         DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
421         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
422                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
423                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", rp->name().c_str(), (*ni)->_init_refcount[chain]));
424                 for (ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
425                         DEBUG_TRACE (DEBUG::Graph, string_compose ("  triggers: %1\n", boost::dynamic_pointer_cast<Route>(*ai)->name().c_str()));
426                 }
427         }
428
429         DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
430         for (ni=_init_trigger_list[chain].begin(); ni!=_init_trigger_list[chain].end(); ni++) {
431                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", boost::dynamic_pointer_cast<Route>(*ni)->name().c_str(), (*ni)->_init_refcount[chain]));
432         }
433
434         DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _init_finished_refcount[chain]));
435 #endif
436 }
437
438 int
439 Graph::silent_process_routes (nframes_t nframes, framepos_t start_frame, framepos_t end_frame,
440                               bool can_record, bool rec_monitors_input, bool& need_butler)
441 {
442         _process_nframes = nframes;
443         _process_start_frame = start_frame;
444         _process_end_frame = end_frame;
445         _process_can_record = can_record;
446         _process_rec_monitors_input = rec_monitors_input;
447
448         _process_silent = true;
449         _process_noroll = false;
450         _process_retval = 0;
451         _process_need_butler = false;
452
453         if (!_graph_empty) {
454                 sem_post (&_callback_start_sem);
455                 sem_wait (&_callback_done_sem);
456         }
457
458         need_butler = _process_need_butler;
459
460         return _process_retval;
461 }
462
463 int
464 Graph::process_routes (nframes_t nframes, framepos_t start_frame, framepos_t end_frame, int declick,
465                        bool can_record, bool rec_monitors_input, bool& need_butler)
466 {
467         _process_nframes = nframes;
468         _process_start_frame = start_frame;
469         _process_end_frame = end_frame;
470         _process_can_record = can_record;
471         _process_rec_monitors_input = rec_monitors_input;
472         _process_declick = declick;
473
474         _process_silent = false;
475         _process_noroll = false;
476         _process_retval = 0;
477         _process_need_butler = false;
478
479         sem_post (&_callback_start_sem);
480         sem_wait (&_callback_done_sem);
481
482         need_butler = _process_need_butler;
483
484         return _process_retval;
485 }
486
487 int
488 Graph::routes_no_roll (nframes_t nframes, framepos_t start_frame, framepos_t end_frame, 
489                        bool non_rt_pending, bool can_record, int declick)
490 {
491         _process_nframes = nframes;
492         _process_start_frame = start_frame;
493         _process_end_frame = end_frame;
494         _process_can_record = can_record;
495         _process_declick = declick;
496         _process_non_rt_pending = non_rt_pending;
497
498         _process_silent = false;
499         _process_noroll = true;
500         _process_retval = 0;
501         _process_need_butler = false;
502
503         sem_post (&_callback_start_sem);
504         sem_wait (&_callback_done_sem);
505
506         return _process_retval;
507 }
508 void
509 Graph::process_one_route (Route* route)
510 {
511         bool need_butler = false;
512         int retval;
513
514         assert (route);
515
516         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_self(), route->name()));
517
518         if (_process_silent) {
519                 retval = route->silent_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_can_record, _process_rec_monitors_input, need_butler);
520         } else if (_process_noroll) {
521                 route->set_pending_declick (_process_declick);
522                 retval = route->no_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_non_rt_pending, _process_can_record, _process_declick);
523         } else {
524                 route->set_pending_declick (_process_declick);
525                 retval = route->roll (_process_nframes, _process_start_frame, _process_end_frame, _process_declick, _process_can_record, _process_rec_monitors_input, need_butler);
526         }
527
528         if (retval) {
529                 _process_retval = retval;
530         }
531     
532         if (need_butler) {
533                 _process_need_butler = true;
534         }
535 }
536
537
538