2 Copyright (C) 2010 Paul Davis
5 This program is free software; you can redistribute it and/or modify
6 it under the terms of the GNU General Public License as published by
7 the Free Software Foundation; either version 2 of the License, or
8 (at your option) any later version.
10 This program is distributed in the hope that it will be useful,
11 but WITHOUT ANY WARRANTY; without even the implied warranty of
12 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 GNU General Public License for more details.
15 You should have received a copy of the GNU General Public License
16 along with this program; if not, write to the Free Software
17 Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 #include "pbd/compose.h"
24 #include "pbd/debug_rt_alloc.h"
26 #include "ardour/debug.h"
27 #include "ardour/graph.h"
28 #include "ardour/types.h"
29 #include "ardour/session.h"
30 #include "ardour/route.h"
31 #include "ardour/process_thread.h"
32 #include "ardour/audioengine.h"
34 #include <jack/thread.h>
38 using namespace ARDOUR;
43 static Graph* graph = 0;
49 return !graph->in_process_thread ();
55 Graph::Graph (Session & session)
56 : SessionHandleRef (session)
57 , _quit_threads (false)
58 , _execution_sem ("graph_execution", 0)
59 , _callback_start_sem ("graph_start", 0)
60 , _callback_done_sem ("graph_done", 0)
61 , _cleanup_sem ("graph_cleanup", 0)
63 pthread_mutex_init( &_trigger_mutex, NULL);
65 /* XXX: rather hacky `fix' to stop _trigger_queue.push_back() allocating
66 memory in the RT thread.
68 _trigger_queue.reserve (8192);
70 _execution_tokens = 0;
75 _quit_threads = false;
82 pbd_alloc_allowed = &::alloc_allowed;
86 /** Set up threads for running the graph */
88 Graph::reset_thread_list ()
90 uint32_t num_threads = how_many_dsp_threads ();
92 /* For now, we shouldn't be using the graph code if we only have 1 DSP thread */
93 assert (num_threads > 1);
95 /* don't bother doing anything here if we already have the right
99 if (_thread_list.size() == num_threads) {
103 Glib::Threads::Mutex::Lock lm (_session.engine().process_lock());
104 AudioBackendNativeThread a_thread;
106 if (!_thread_list.empty()) {
110 if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::main_thread, this), &a_thread, 100000) != 0) {
111 throw failed_constructor ();
114 _thread_list.push_back (a_thread);
116 for (uint32_t i = 1; i < num_threads; ++i) {
117 if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::helper_thread, this), &a_thread, 100000) != 0) {
118 throw failed_constructor ();
121 _thread_list.push_back (a_thread);
126 Graph::session_going_away()
130 // now drop all references on the nodes.
131 _nodes_rt[0].clear();
132 _nodes_rt[1].clear();
133 _init_trigger_list[0].clear();
134 _init_trigger_list[1].clear();
135 _trigger_queue.clear();
139 Graph::drop_threads ()
141 _quit_threads = true;
143 for (unsigned int i=0; i< _thread_list.size(); i++) {
144 _execution_sem.signal ();
147 _callback_start_sem.signal ();
149 for (list<AudioBackendNativeThread>::iterator i = _thread_list.begin(); i != _thread_list.end(); ++i) {
150 AudioEngine::instance()->wait_for_process_thread_exit (*i);
153 _thread_list.clear ();
155 _execution_tokens = 0;
157 _quit_threads = false;
161 Graph::clear_other_chain ()
163 Glib::Threads::Mutex::Lock ls (_swap_mutex);
166 if (_setup_chain != _pending_chain) {
168 for (node_list_t::iterator ni=_nodes_rt[_setup_chain].begin(); ni!=_nodes_rt[_setup_chain].end(); ni++) {
169 (*ni)->_activation_set[_setup_chain].clear();
172 _nodes_rt[_setup_chain].clear ();
173 _init_trigger_list[_setup_chain].clear ();
176 /* setup chain == pending chain - we have
177 to wait till this is no longer true.
179 _cleanup_cond.wait (_swap_mutex);
186 node_list_t::iterator i;
189 if (_swap_mutex.trylock()) {
190 // we got the swap mutex.
191 if (_current_chain != _pending_chain)
193 // printf ("chain swap ! %d -> %d\n", _current_chain, _pending_chain);
194 _setup_chain = _current_chain;
195 _current_chain = _pending_chain;
196 _cleanup_cond.signal ();
198 _swap_mutex.unlock ();
201 chain = _current_chain;
204 for (i=_nodes_rt[chain].begin(); i!=_nodes_rt[chain].end(); i++) {
206 _graph_empty = false;
208 _finished_refcount = _init_finished_refcount[chain];
210 /* Trigger the initial nodes for processing, which are the ones at the `input' end */
211 pthread_mutex_lock (&_trigger_mutex);
212 for (i=_init_trigger_list[chain].begin(); i!=_init_trigger_list[chain].end(); i++) {
213 /* don't use ::trigger here, as we have already locked the mutex */
214 _trigger_queue.push_back (i->get ());
216 pthread_mutex_unlock (&_trigger_mutex);
220 Graph::trigger (GraphNode* n)
222 pthread_mutex_lock (&_trigger_mutex);
223 _trigger_queue.push_back (n);
224 pthread_mutex_unlock (&_trigger_mutex);
227 /** Called when a node at the `output' end of the chain (ie one that has no-one to feed)
233 if (g_atomic_int_dec_and_test (&_finished_refcount)) {
235 /* We have run all the nodes that are at the `output' end of
236 the graph, so there is nothing more to do this time around.
244 Graph::restart_cycle()
246 // we are through. wakeup our caller.
249 _callback_done_sem.signal ();
251 /* Block until the a process callback triggers us */
252 _callback_start_sem.wait();
264 // returning will restart the cycle.
265 // starting with waking up the others.
268 /** Rechain our stuff using a list of routes (which can be in any order) and
269 * a directed graph of their interconnections, which is guaranteed to be
274 Graph::rechain (boost::shared_ptr<RouteList> routelist, GraphEdges const & edges)
276 Glib::Threads::Mutex::Lock ls (_swap_mutex);
278 int chain = _setup_chain;
279 DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
281 /* This will become the number of nodes that do not feed any other node;
282 once we have processed this number of those nodes, we have finished.
284 _init_finished_refcount[chain] = 0;
286 /* This will become a list of nodes that are not fed by another node, ie
287 those at the `input' end.
289 _init_trigger_list[chain].clear();
291 _nodes_rt[chain].clear();
293 /* Clear things out, and make _nodes_rt[chain] a copy of routelist */
294 for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
295 (*ri)->_init_refcount[chain] = 0;
296 (*ri)->_activation_set[chain].clear();
297 _nodes_rt[chain].push_back (*ri);
300 // now add refs for the connections.
302 for (node_list_t::iterator ni = _nodes_rt[chain].begin(); ni != _nodes_rt[chain].end(); ni++) {
304 boost::shared_ptr<Route> r = boost::dynamic_pointer_cast<Route> (*ni);
306 /* The routes that are directly fed by r */
307 set<GraphVertex> fed_from_r = edges.from (r);
309 /* Hence whether r has an output */
310 bool const has_output = !fed_from_r.empty ();
312 /* Set up r's activation set */
313 for (set<GraphVertex>::iterator i = fed_from_r.begin(); i != fed_from_r.end(); ++i) {
314 r->_activation_set[chain].insert (*i);
317 /* r has an input if there are some incoming edges to r in the graph */
318 bool const has_input = !edges.has_none_to (r);
320 /* Increment the refcount of any route that we directly feed */
321 for (node_set_t::iterator ai = r->_activation_set[chain].begin(); ai != r->_activation_set[chain].end(); ai++) {
322 (*ai)->_init_refcount[chain] += 1;
326 /* no input, so this node needs to be triggered initially to get things going */
327 _init_trigger_list[chain].push_back (*ni);
331 /* no output, so this is one of the nodes that we can count off to decide
334 _init_finished_refcount[chain] += 1;
338 _pending_chain = chain;
342 /** Called by both the main thread and all helpers.
343 * @return true to quit, false to carry on.
350 pthread_mutex_lock (&_trigger_mutex);
351 if (_trigger_queue.size()) {
352 to_run = _trigger_queue.back();
353 _trigger_queue.pop_back();
358 /* the number of threads that are asleep */
359 int et = _execution_tokens;
360 /* the number of nodes that need to be run */
361 int ts = _trigger_queue.size();
363 /* hence how many threads to wake up */
364 int wakeup = min (et, ts);
365 /* update the number of threads that will still be sleeping */
366 _execution_tokens -= wakeup;
368 DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 signals %2\n", pthread_self(), wakeup));
370 for (int i = 0; i < wakeup; i++) {
371 _execution_sem.signal ();
374 while (to_run == 0) {
375 _execution_tokens += 1;
376 pthread_mutex_unlock (&_trigger_mutex);
377 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_self()));
378 _execution_sem.wait ();
382 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_self()));
383 pthread_mutex_lock (&_trigger_mutex);
384 if (_trigger_queue.size()) {
385 to_run = _trigger_queue.back();
386 _trigger_queue.pop_back();
389 pthread_mutex_unlock (&_trigger_mutex);
392 to_run->finish (_current_chain);
394 DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 has finished run_one()\n", pthread_self()));
400 Graph::helper_thread()
402 suspend_rt_malloc_checks ();
403 ProcessThread* pt = new ProcessThread ();
404 resume_rt_malloc_checks ();
417 /** Here's the main graph thread */
421 suspend_rt_malloc_checks ();
422 ProcessThread* pt = new ProcessThread ();
423 resume_rt_malloc_checks ();
428 _callback_start_sem.wait ();
430 DEBUG_TRACE(DEBUG::ProcessThreads, "main thread is awake\n");
438 if (_graph_empty && !_quit_threads) {
439 _callback_done_sem.signal ();
440 DEBUG_TRACE(DEBUG::ProcessThreads, "main thread sees graph done, goes back to sleep\n");
444 /* This loop will run forever */
446 DEBUG_TRACE(DEBUG::ProcessThreads, "main thread runs one graph node\n");
456 Graph::dump (int chain)
459 node_list_t::iterator ni;
460 node_set_t::iterator ai;
462 chain = _pending_chain;
464 DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
465 for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
466 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
467 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1 refcount: %2\n", rp->name().c_str(), (*ni)->_init_refcount[chain]));
468 for (ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
469 DEBUG_TRACE (DEBUG::Graph, string_compose (" triggers: %1\n", boost::dynamic_pointer_cast<Route>(*ai)->name().c_str()));
473 DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
474 for (ni=_init_trigger_list[chain].begin(); ni!=_init_trigger_list[chain].end(); ni++) {
475 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1 refcount: %2\n", boost::dynamic_pointer_cast<Route>(*ni)->name().c_str(), (*ni)->_init_refcount[chain]));
478 DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _init_finished_refcount[chain]));
483 Graph::silent_process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, bool& need_butler)
485 _process_nframes = nframes;
486 _process_start_frame = start_frame;
487 _process_end_frame = end_frame;
489 _process_silent = true;
490 _process_noroll = false;
492 _process_need_butler = false;
495 DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for silent process\n");
496 _callback_start_sem.signal ();
497 _callback_done_sem.wait ();
500 need_butler = _process_need_butler;
502 return _process_retval;
506 Graph::process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, int declick, bool& need_butler)
508 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
510 _process_nframes = nframes;
511 _process_start_frame = start_frame;
512 _process_end_frame = end_frame;
513 _process_declick = declick;
515 _process_silent = false;
516 _process_noroll = false;
518 _process_need_butler = false;
520 DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for non-silent process\n");
521 _callback_start_sem.signal ();
522 _callback_done_sem.wait ();
524 DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
526 need_butler = _process_need_butler;
528 return _process_retval;
532 Graph::routes_no_roll (pframes_t nframes, framepos_t start_frame, framepos_t end_frame,
533 bool non_rt_pending, int declick)
535 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
537 _process_nframes = nframes;
538 _process_start_frame = start_frame;
539 _process_end_frame = end_frame;
540 _process_declick = declick;
541 _process_non_rt_pending = non_rt_pending;
543 _process_silent = false;
544 _process_noroll = true;
546 _process_need_butler = false;
548 DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for no-roll process\n");
549 _callback_start_sem.signal ();
550 _callback_done_sem.wait ();
552 return _process_retval;
555 Graph::process_one_route (Route* route)
557 bool need_butler = false;
562 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_self(), route->name()));
564 if (_process_silent) {
565 retval = route->silent_roll (_process_nframes, _process_start_frame, _process_end_frame, need_butler);
566 } else if (_process_noroll) {
567 route->set_pending_declick (_process_declick);
568 retval = route->no_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_non_rt_pending);
570 route->set_pending_declick (_process_declick);
571 retval = route->roll (_process_nframes, _process_start_frame, _process_end_frame, _process_declick, need_butler);
575 _process_retval = retval;
579 _process_need_butler = true;
584 Graph::in_process_thread () const
586 for (list<AudioBackendNativeThread>::const_iterator i = _thread_list.begin (); i != _thread_list.end(); ++i) {
587 if (self_thread_equal (*i)) {