remove unused semaphore
[ardour.git] / libs / ardour / graph.cc
1 /*
2   Copyright (C) 2010 Paul Davis
3   Author: Torben Hohn
4
5   This program is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published by
7   the Free Software Foundation; either version 2 of the License, or
8   (at your option) any later version.
9
10   This program is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with this program; if not, write to the Free Software
17   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
19 */
20 #include <stdio.h>
21 #include <cmath>
22
23 #include "pbd/compose.h"
24 #include "pbd/debug_rt_alloc.h"
25 #include "pbd/pthread_utils.h"
26
27 #include "ardour/debug.h"
28 #include "ardour/graph.h"
29 #include "ardour/types.h"
30 #include "ardour/session.h"
31 #include "ardour/route.h"
32 #include "ardour/process_thread.h"
33 #include "ardour/audioengine.h"
34
35 #include "pbd/i18n.h"
36
37 using namespace ARDOUR;
38 using namespace PBD;
39 using namespace std;
40
41 #ifdef DEBUG_RT_ALLOC
42 static Graph* graph = 0;
43
44 extern "C" {
45
46 int alloc_allowed ()
47 {
48         return !graph->in_process_thread ();
49 }
50
51 }
52 #endif
53
54 Graph::Graph (Session & session)
55         : SessionHandleRef (session)
56         , _threads_active (false)
57         , _execution_sem ("graph_execution", 0)
58         , _callback_start_sem ("graph_start", 0)
59         , _callback_done_sem ("graph_done", 0)
60 {
61         pthread_mutex_init( &_trigger_mutex, NULL);
62
63         /* XXX: rather hacky `fix' to stop _trigger_queue.push_back() allocating
64          * memory in the RT thread.
65          */
66         _trigger_queue.reserve (8192);
67
68         _execution_tokens = 0;
69
70         _current_chain = 0;
71         _pending_chain = 0;
72         _setup_chain   = 1;
73         _graph_empty = true;
74
75
76         ARDOUR::AudioEngine::instance()->Running.connect_same_thread (engine_connections, boost::bind (&Graph::reset_thread_list, this));
77         ARDOUR::AudioEngine::instance()->Stopped.connect_same_thread (engine_connections, boost::bind (&Graph::engine_stopped, this));
78         ARDOUR::AudioEngine::instance()->Halted.connect_same_thread (engine_connections, boost::bind (&Graph::engine_stopped, this));
79
80         reset_thread_list ();
81
82 #ifdef DEBUG_RT_ALLOC
83         graph = this;
84         pbd_alloc_allowed = &::alloc_allowed;
85 #endif
86 }
87
88 void
89 Graph::engine_stopped ()
90 {
91         if (AudioEngine::instance()->process_thread_count() != 0) {
92                 drop_threads ();
93         }
94 }
95
96 /** Set up threads for running the graph */
97 void
98 Graph::reset_thread_list ()
99 {
100         uint32_t num_threads = how_many_dsp_threads ();
101
102         /* For now, we shouldn't be using the graph code if we only have 1 DSP thread */
103         assert (num_threads > 1);
104
105         /* don't bother doing anything here if we already have the right
106          * number of threads.
107          */
108
109         if (AudioEngine::instance()->process_thread_count() == num_threads) {
110                 return;
111         }
112
113         Glib::Threads::Mutex::Lock lm (_session.engine().process_lock());
114
115         if (AudioEngine::instance()->process_thread_count() != 0) {
116                 drop_threads ();
117         }
118
119         _threads_active = true;
120
121         if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::main_thread, this)) != 0) {
122                 throw failed_constructor ();
123         }
124
125         for (uint32_t i = 1; i < num_threads; ++i) {
126                 if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::helper_thread, this))) {
127                         throw failed_constructor ();
128                 }
129         }
130 }
131
132 void
133 Graph::session_going_away()
134 {
135         drop_threads ();
136
137         // now drop all references on the nodes.
138         _nodes_rt[0].clear();
139         _nodes_rt[1].clear();
140         _init_trigger_list[0].clear();
141         _init_trigger_list[1].clear();
142         _trigger_queue.clear();
143 }
144
145 void
146 Graph::drop_threads ()
147 {
148         Glib::Threads::Mutex::Lock ls (_swap_mutex);
149         _threads_active = false;
150
151         uint32_t thread_count = AudioEngine::instance()->process_thread_count ();
152
153         for (unsigned int i=0; i < thread_count; i++) {
154                 pthread_mutex_lock (&_trigger_mutex);
155                 _execution_sem.signal ();
156                 pthread_mutex_unlock (&_trigger_mutex);
157         }
158
159         pthread_mutex_lock (&_trigger_mutex);
160         _callback_start_sem.signal ();
161         pthread_mutex_unlock (&_trigger_mutex);
162
163         AudioEngine::instance()->join_process_threads ();
164
165         _execution_tokens = 0;
166 }
167
168 void
169 Graph::clear_other_chain ()
170 {
171         Glib::Threads::Mutex::Lock ls (_swap_mutex);
172
173         while (1) {
174                 if (_setup_chain != _pending_chain) {
175
176                         for (node_list_t::iterator ni=_nodes_rt[_setup_chain].begin(); ni!=_nodes_rt[_setup_chain].end(); ni++) {
177                                 (*ni)->_activation_set[_setup_chain].clear();
178                         }
179
180                         _nodes_rt[_setup_chain].clear ();
181                         _init_trigger_list[_setup_chain].clear ();
182                         break;
183                 }
184                 /* setup chain == pending chain - we have
185                  * to wait till this is no longer true.
186                  */
187                 _cleanup_cond.wait (_swap_mutex);
188         }
189 }
190
191 void
192 Graph::prep()
193 {
194         node_list_t::iterator i;
195         int chain;
196
197         if (_swap_mutex.trylock()) {
198                 // we got the swap mutex.
199                 if (_current_chain != _pending_chain)
200                 {
201                         // printf ("chain swap ! %d -> %d\n", _current_chain, _pending_chain);
202                         _setup_chain = _current_chain;
203                         _current_chain = _pending_chain;
204                         _cleanup_cond.signal ();
205                 }
206                 _swap_mutex.unlock ();
207         }
208
209         chain = _current_chain;
210
211         _graph_empty = true;
212         for (i=_nodes_rt[chain].begin(); i!=_nodes_rt[chain].end(); i++) {
213                 (*i)->prep( chain);
214                 _graph_empty = false;
215         }
216         _finished_refcount = _init_finished_refcount[chain];
217
218         /* Trigger the initial nodes for processing, which are the ones at the `input' end */
219         pthread_mutex_lock (&_trigger_mutex);
220         for (i=_init_trigger_list[chain].begin(); i!=_init_trigger_list[chain].end(); i++) {
221                 /* don't use ::trigger here, as we have already locked the mutex */
222                 _trigger_queue.push_back (i->get ());
223         }
224         pthread_mutex_unlock (&_trigger_mutex);
225 }
226
227 void
228 Graph::trigger (GraphNode* n)
229 {
230         pthread_mutex_lock (&_trigger_mutex);
231         _trigger_queue.push_back (n);
232         pthread_mutex_unlock (&_trigger_mutex);
233 }
234
235 /** Called when a node at the `output' end of the chain (ie one that has no-one to feed)
236  *  is finished.
237  */
238 void
239 Graph::dec_ref()
240 {
241         if (g_atomic_int_dec_and_test (const_cast<gint*> (&_finished_refcount))) {
242
243                 /* We have run all the nodes that are at the `output' end of
244                  * the graph, so there is nothing more to do this time around.
245                  */
246
247                 restart_cycle ();
248         }
249 }
250
251 void
252 Graph::restart_cycle()
253 {
254         // we are through. wakeup our caller.
255
256 again:
257         _callback_done_sem.signal ();
258
259         /* Block until the a process callback triggers us */
260         _callback_start_sem.wait();
261
262         if (!_threads_active) {
263                 return;
264         }
265
266         prep ();
267
268         if (_graph_empty && _threads_active) {
269                 goto again;
270         }
271
272         // returning will restart the cycle.
273         // starting with waking up the others.
274 }
275
276 /** Rechain our stuff using a list of routes (which can be in any order) and
277  *  a directed graph of their interconnections, which is guaranteed to be
278  *  acyclic.
279  */
280 void
281 Graph::rechain (boost::shared_ptr<RouteList> routelist, GraphEdges const & edges)
282 {
283         Glib::Threads::Mutex::Lock ls (_swap_mutex);
284
285         int chain = _setup_chain;
286         DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
287
288         /* This will become the number of nodes that do not feed any other node;
289          * once we have processed this number of those nodes, we have finished.
290          */
291         _init_finished_refcount[chain] = 0;
292
293         /* This will become a list of nodes that are not fed by another node, ie
294          * those at the `input' end.
295          */
296         _init_trigger_list[chain].clear();
297
298         _nodes_rt[chain].clear();
299
300         /* Clear things out, and make _nodes_rt[chain] a copy of routelist */
301         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
302                 (*ri)->_init_refcount[chain] = 0;
303                 (*ri)->_activation_set[chain].clear();
304                 _nodes_rt[chain].push_back (*ri);
305         }
306
307         // now add refs for the connections.
308
309         for (node_list_t::iterator ni = _nodes_rt[chain].begin(); ni != _nodes_rt[chain].end(); ni++) {
310
311                 boost::shared_ptr<Route> r = boost::dynamic_pointer_cast<Route> (*ni);
312
313                 /* The routes that are directly fed by r */
314                 set<GraphVertex> fed_from_r = edges.from (r);
315
316                 /* Hence whether r has an output */
317                 bool const has_output = !fed_from_r.empty ();
318
319                 /* Set up r's activation set */
320                 for (set<GraphVertex>::iterator i = fed_from_r.begin(); i != fed_from_r.end(); ++i) {
321                         r->_activation_set[chain].insert (*i);
322                 }
323
324                 /* r has an input if there are some incoming edges to r in the graph */
325                 bool const has_input = !edges.has_none_to (r);
326
327                 /* Increment the refcount of any route that we directly feed */
328                 for (node_set_t::iterator ai = r->_activation_set[chain].begin(); ai != r->_activation_set[chain].end(); ai++) {
329                         (*ai)->_init_refcount[chain] += 1;
330                 }
331
332                 if (!has_input) {
333                         /* no input, so this node needs to be triggered initially to get things going */
334                         _init_trigger_list[chain].push_back (*ni);
335                 }
336
337                 if (!has_output) {
338                         /* no output, so this is one of the nodes that we can count off to decide
339                          * if we've finished
340                          */
341                         _init_finished_refcount[chain] += 1;
342                 }
343         }
344
345         _pending_chain = chain;
346         dump(chain);
347 }
348
349 /** Called by both the main thread and all helpers.
350  *  @return true to quit, false to carry on.
351  */
352 bool
353 Graph::run_one()
354 {
355         GraphNode* to_run;
356
357         pthread_mutex_lock (&_trigger_mutex);
358         if (_trigger_queue.size()) {
359                 to_run = _trigger_queue.back();
360                 _trigger_queue.pop_back();
361         } else {
362                 to_run = 0;
363         }
364
365         /* the number of threads that are asleep */
366         int et = _execution_tokens;
367         /* the number of nodes that need to be run */
368         int ts = _trigger_queue.size();
369
370         /* hence how many threads to wake up */
371         int wakeup = min (et, ts);
372         /* update the number of threads that will still be sleeping */
373         _execution_tokens -= wakeup;
374
375         DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 signals %2\n", pthread_name(), wakeup));
376
377         for (int i = 0; i < wakeup; i++) {
378                 _execution_sem.signal ();
379         }
380
381         while (to_run == 0) {
382                 _execution_tokens += 1;
383                 pthread_mutex_unlock (&_trigger_mutex);
384                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_name()));
385                 _execution_sem.wait ();
386                 if (!_threads_active) {
387                         return true;
388                 }
389                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_name()));
390                 pthread_mutex_lock (&_trigger_mutex);
391                 if (_trigger_queue.size()) {
392                         to_run = _trigger_queue.back();
393                         _trigger_queue.pop_back();
394                 }
395         }
396         pthread_mutex_unlock (&_trigger_mutex);
397
398         to_run->process();
399         to_run->finish (_current_chain);
400
401         DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 has finished run_one()\n", pthread_name()));
402
403         return !_threads_active;
404 }
405
406 void
407 Graph::helper_thread()
408 {
409         suspend_rt_malloc_checks ();
410         ProcessThread* pt = new ProcessThread ();
411         resume_rt_malloc_checks ();
412
413         pt->get_buffers();
414
415         while(1) {
416                 if (run_one()) {
417                         break;
418                 }
419         }
420
421         pt->drop_buffers();
422         delete pt;
423 }
424
425 /** Here's the main graph thread */
426 void
427 Graph::main_thread()
428 {
429         suspend_rt_malloc_checks ();
430         ProcessThread* pt = new ProcessThread ();
431         resume_rt_malloc_checks ();
432
433         pt->get_buffers();
434
435 again:
436         _callback_start_sem.wait ();
437
438         DEBUG_TRACE(DEBUG::ProcessThreads, "main thread is awake\n");
439
440         if (!_threads_active) {
441                 return;
442         }
443
444         prep ();
445
446         if (_graph_empty && _threads_active) {
447                 _callback_done_sem.signal ();
448                 DEBUG_TRACE(DEBUG::ProcessThreads, "main thread sees graph done, goes back to sleep\n");
449                 goto again;
450         }
451
452         /* This loop will run forever */
453         while (1) {
454                 DEBUG_TRACE(DEBUG::ProcessThreads, "main thread runs one graph node\n");
455                 if (run_one()) {
456                         break;
457                 }
458         }
459
460         pt->drop_buffers();
461         delete (pt);
462 }
463
464 void
465 Graph::dump (int chain)
466 {
467 #ifndef NDEBUG
468         node_list_t::iterator ni;
469         node_set_t::iterator ai;
470
471         chain = _pending_chain;
472
473         DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
474         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
475                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
476                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", rp->name().c_str(), (*ni)->_init_refcount[chain]));
477                 for (ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
478                         DEBUG_TRACE (DEBUG::Graph, string_compose ("  triggers: %1\n", boost::dynamic_pointer_cast<Route>(*ai)->name().c_str()));
479                 }
480         }
481
482         DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
483         for (ni=_init_trigger_list[chain].begin(); ni!=_init_trigger_list[chain].end(); ni++) {
484                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", boost::dynamic_pointer_cast<Route>(*ni)->name().c_str(), (*ni)->_init_refcount[chain]));
485         }
486
487         DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _init_finished_refcount[chain]));
488 #endif
489 }
490
491 int
492 Graph::silent_process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, bool& need_butler)
493 {
494         if (!_threads_active) return 0;
495
496         _process_nframes = nframes;
497         _process_start_frame = start_frame;
498         _process_end_frame = end_frame;
499
500         _process_silent = true;
501         _process_noroll = false;
502         _process_retval = 0;
503         _process_need_butler = false;
504
505         if (!_graph_empty) {
506                 DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for silent process\n");
507                 _callback_start_sem.signal ();
508                 _callback_done_sem.wait ();
509         }
510
511         need_butler = _process_need_butler;
512
513         return _process_retval;
514 }
515
516 int
517 Graph::process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, int declick, bool& need_butler)
518 {
519         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
520
521         if (!_threads_active) return 0;
522
523         _process_nframes = nframes;
524         _process_start_frame = start_frame;
525         _process_end_frame = end_frame;
526         _process_declick = declick;
527
528         _process_silent = false;
529         _process_noroll = false;
530         _process_retval = 0;
531         _process_need_butler = false;
532
533         DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for non-silent process\n");
534         _callback_start_sem.signal ();
535         _callback_done_sem.wait ();
536
537         DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
538
539         need_butler = _process_need_butler;
540
541         return _process_retval;
542 }
543
544 int
545 Graph::routes_no_roll (pframes_t nframes, framepos_t start_frame, framepos_t end_frame,
546                        bool non_rt_pending, int declick)
547 {
548         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
549
550         if (!_threads_active) return 0;
551
552         _process_nframes = nframes;
553         _process_start_frame = start_frame;
554         _process_end_frame = end_frame;
555         _process_declick = declick;
556         _process_non_rt_pending = non_rt_pending;
557
558         _process_silent = false;
559         _process_noroll = true;
560         _process_retval = 0;
561         _process_need_butler = false;
562
563         DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for no-roll process\n");
564         _callback_start_sem.signal ();
565         _callback_done_sem.wait ();
566
567         return _process_retval;
568 }
569 void
570 Graph::process_one_route (Route* route)
571 {
572         bool need_butler = false;
573         int retval;
574
575         assert (route);
576
577         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_name(), route->name()));
578
579         if (_process_silent) {
580                 retval = route->silent_roll (_process_nframes, _process_start_frame, _process_end_frame, need_butler);
581         } else if (_process_noroll) {
582                 route->set_pending_declick (_process_declick);
583                 retval = route->no_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_non_rt_pending);
584         } else {
585                 route->set_pending_declick (_process_declick);
586                 retval = route->roll (_process_nframes, _process_start_frame, _process_end_frame, _process_declick, need_butler);
587         }
588
589         if (retval) {
590                 _process_retval = retval;
591         }
592
593         if (need_butler) {
594                 _process_need_butler = true;
595         }
596 }
597
598 bool
599 Graph::in_process_thread () const
600 {
601         return AudioEngine::instance()->in_process_thread ();
602 }