2 * Copyright (C) 2010-2012 Carl Hetherington <carl@carlh.net>
3 * Copyright (C) 2010-2017 Paul Davis <paul@linuxaudiosystems.com>
4 * Copyright (C) 2013 John Emmas <john@creativepost.co.uk>
5 * Copyright (C) 2013 Tim Mayberry <mojofunk@gmail.com>
6 * Copyright (C) 2015-2019 Robin Gareus <robin@gareus.org>
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License as published by
10 * the Free Software Foundation; either version 2 of the License, or
11 * (at your option) any later version.
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License along
19 * with this program; if not, write to the Free Software Foundation, Inc.,
20 * 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
26 #include "pbd/compose.h"
27 #include "pbd/debug_rt_alloc.h"
28 #include "pbd/pthread_utils.h"
30 #include "ardour/audioengine.h"
31 #include "ardour/debug.h"
32 #include "ardour/graph.h"
33 #include "ardour/process_thread.h"
34 #include "ardour/route.h"
35 #include "ardour/session.h"
36 #include "ardour/types.h"
40 using namespace ARDOUR;
45 static Graph* graph = 0;
52 return !graph->in_process_thread ();
57 #define g_atomic_uint_get(x) static_cast<guint> (g_atomic_int_get (x))
59 Graph::Graph (Session& session)
60 : SessionHandleRef (session)
61 , _execution_sem ("graph_execution", 0)
62 , _callback_start_sem ("graph_start", 0)
63 , _callback_done_sem ("graph_done", 0)
69 g_atomic_int_set (&_terminal_refcnt, 0);
70 g_atomic_int_set (&_terminate, 0);
71 g_atomic_int_set (&_n_workers, 0);
72 g_atomic_int_set (&_idle_thread_cnt, 0);
73 g_atomic_int_set (&_trigger_queue_size, 0);
75 /* pre-allocate memory */
76 _trigger_queue.reserve (1024);
78 ARDOUR::AudioEngine::instance ()->Running.connect_same_thread (engine_connections, boost::bind (&Graph::reset_thread_list, this));
79 ARDOUR::AudioEngine::instance ()->Stopped.connect_same_thread (engine_connections, boost::bind (&Graph::engine_stopped, this));
80 ARDOUR::AudioEngine::instance ()->Halted.connect_same_thread (engine_connections, boost::bind (&Graph::engine_stopped, this));
86 pbd_alloc_allowed = &::alloc_allowed;
91 Graph::engine_stopped ()
94 cerr << "Graph::engine_stopped. n_thread: " << AudioEngine::instance ()->process_thread_count () << endl;
96 if (AudioEngine::instance ()->process_thread_count () != 0) {
101 /** Set up threads for running the graph */
103 Graph::reset_thread_list ()
105 uint32_t num_threads = how_many_dsp_threads ();
106 guint n_workers = g_atomic_uint_get (&_n_workers);
108 /* For now, we shouldn't be using the graph code if we only have 1 DSP thread */
109 assert (num_threads > 1);
110 assert (AudioEngine::instance ()->process_thread_count () == n_workers);
112 /* don't bother doing anything here if we already have the right
116 if (AudioEngine::instance ()->process_thread_count () == num_threads) {
120 Glib::Threads::Mutex::Lock lm (_session.engine ().process_lock ());
126 /* Allow threads to run */
127 g_atomic_int_set (&_terminate, 0);
129 if (AudioEngine::instance ()->create_process_thread (boost::bind (&Graph::main_thread, this)) != 0) {
130 throw failed_constructor ();
133 for (uint32_t i = 1; i < num_threads; ++i) {
134 if (AudioEngine::instance ()->create_process_thread (boost::bind (&Graph::helper_thread, this))) {
135 throw failed_constructor ();
139 while (g_atomic_uint_get (&_n_workers) + 1 != num_threads) {
145 Graph::session_going_away ()
149 // now drop all references on the nodes.
150 _nodes_rt[0].clear ();
151 _nodes_rt[1].clear ();
152 _init_trigger_list[0].clear ();
153 _init_trigger_list[1].clear ();
154 g_atomic_int_set (&_trigger_queue_size, 0);
155 _trigger_queue.clear ();
159 Graph::drop_threads ()
161 Glib::Threads::Mutex::Lock ls (_swap_mutex);
163 /* Flag threads to terminate */
164 g_atomic_int_set (&_terminate, 1);
166 /* Wake-up sleeping threads */
167 guint tc = g_atomic_uint_get (&_idle_thread_cnt);
168 assert (tc == g_atomic_uint_get (&_n_workers));
169 for (guint i = 0; i < tc; ++i) {
170 _execution_sem.signal ();
173 /* and the main thread */
174 _callback_start_sem.signal ();
176 /* join process threads */
177 AudioEngine::instance ()->join_process_threads ();
179 g_atomic_int_set (&_n_workers, 0);
180 g_atomic_int_set (&_idle_thread_cnt, 0);
182 /* signal main process thread if it's waiting for an already terminated thread */
183 _callback_done_sem.signal ();
186 * This is somewhat ugly, yet if a thread is killed (e.g jackd terminates
187 * abnormally), some semaphores are still unlocked.
190 int d1 = _execution_sem.reset ();
191 int d2 = _callback_start_sem.reset ();
192 int d3 = _callback_done_sem.reset ();
193 cerr << "Graph::drop_threads() sema-counts: " << d1 << ", " << d2 << ", " << d3 << endl;
195 _execution_sem.reset ();
196 _callback_start_sem.reset ();
197 _callback_done_sem.reset ();
201 /* special case route removal -- called from Session::remove_routes */
203 Graph::clear_other_chain ()
205 Glib::Threads::Mutex::Lock ls (_swap_mutex);
208 if (_setup_chain != _pending_chain) {
209 for (node_list_t::iterator ni = _nodes_rt[_setup_chain].begin (); ni != _nodes_rt[_setup_chain].end (); ++ni) {
210 (*ni)->_activation_set[_setup_chain].clear ();
213 _nodes_rt[_setup_chain].clear ();
214 _init_trigger_list[_setup_chain].clear ();
217 /* setup chain == pending chain - we have
218 * to wait till this is no longer true.
220 _cleanup_cond.wait (_swap_mutex);
227 if (_swap_mutex.trylock ()) {
228 /* swap mutex acquired */
229 if (_current_chain != _pending_chain) {
231 _setup_chain = _current_chain;
232 _current_chain = _pending_chain;
233 /* ensure that all nodes can be queued */
234 _trigger_queue.reserve (_nodes_rt[_current_chain].size ());
235 assert (g_atomic_uint_get (&_trigger_queue_size) == 0);
236 _cleanup_cond.signal ();
238 _swap_mutex.unlock ();
243 int chain = _current_chain;
245 node_list_t::iterator i;
246 for (i = _nodes_rt[chain].begin (); i != _nodes_rt[chain].end (); ++i) {
248 _graph_empty = false;
251 assert (_graph_empty != (_n_terminal_nodes[chain] > 0));
253 g_atomic_int_set (&_terminal_refcnt, _n_terminal_nodes[chain]);
255 /* Trigger the initial nodes for processing, which are the ones at the `input' end */
256 for (i = _init_trigger_list[chain].begin (); i != _init_trigger_list[chain].end (); i++) {
257 g_atomic_int_inc (&_trigger_queue_size);
258 _trigger_queue.push_back (i->get ());
263 Graph::trigger (GraphNode* n)
265 g_atomic_int_inc (&_trigger_queue_size);
266 _trigger_queue.push_back (n);
269 /** Called when a node at the `output' end of the chain (ie one that has no-one to feed)
273 Graph::reached_terminal_node ()
275 if (g_atomic_int_dec_and_test (&_terminal_refcnt)) {
278 /* We have run all the nodes that are at the `output' end of
279 * the graph, so there is nothing more to do this time around.
281 assert (g_atomic_uint_get (&_trigger_queue_size) == 0);
284 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 cycle done.\n", pthread_name ()));
286 _callback_done_sem.signal ();
288 /* Ensure that all background threads are idle.
289 * When freewheeling there may be an immediate restart:
290 * If there are more threads than CPU cores, some worker-
291 * threads may only be "on the way" to become idle.
293 guint n_workers = g_atomic_uint_get (&_n_workers);
294 while (g_atomic_uint_get (&_idle_thread_cnt) != n_workers) {
298 /* Block until the a process callback */
299 _callback_start_sem.wait ();
301 if (g_atomic_int_get (&_terminate)) {
305 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 prepare new cycle.\n", pthread_name ()));
307 /* Prepare next cycle:
308 * - Reset terminal reference count
309 * - queue initial nodes
313 if (_graph_empty && !g_atomic_int_get (&_terminate)) {
316 /* .. continue in worker-thread */
320 /** Rechain our stuff using a list of routes (which can be in any order) and
321 * a directed graph of their interconnections, which is guaranteed to be
325 Graph::rechain (boost::shared_ptr<RouteList> routelist, GraphEdges const& edges)
327 Glib::Threads::Mutex::Lock ls (_swap_mutex);
329 int chain = _setup_chain;
330 DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
332 /* This will become the number of nodes that do not feed any other node;
333 * once we have processed this number of those nodes, we have finished.
335 _n_terminal_nodes[chain] = 0;
337 /* This will become a list of nodes that are not fed by another node, ie
338 * those at the `input' end.
340 _init_trigger_list[chain].clear ();
342 _nodes_rt[chain].clear ();
344 /* Clear things out, and make _nodes_rt[chain] a copy of routelist */
345 for (RouteList::iterator ri = routelist->begin (); ri != routelist->end (); ri++) {
346 (*ri)->_init_refcount[chain] = 0;
347 (*ri)->_activation_set[chain].clear ();
348 _nodes_rt[chain].push_back (*ri);
351 // now add refs for the connections.
353 for (node_list_t::iterator ni = _nodes_rt[chain].begin (); ni != _nodes_rt[chain].end (); ni++) {
354 boost::shared_ptr<Route> r = boost::dynamic_pointer_cast<Route> (*ni);
356 /* The routes that are directly fed by r */
357 set<GraphVertex> fed_from_r = edges.from (r);
359 /* Hence whether r has an output */
360 bool const has_output = !fed_from_r.empty ();
362 /* Set up r's activation set */
363 for (set<GraphVertex>::iterator i = fed_from_r.begin (); i != fed_from_r.end (); ++i) {
364 r->_activation_set[chain].insert (*i);
367 /* r has an input if there are some incoming edges to r in the graph */
368 bool const has_input = !edges.has_none_to (r);
370 /* Increment the refcount of any route that we directly feed */
371 for (node_set_t::iterator ai = r->_activation_set[chain].begin (); ai != r->_activation_set[chain].end (); ai++) {
372 (*ai)->_init_refcount[chain] += 1;
376 /* no input, so this node needs to be triggered initially to get things going */
377 _init_trigger_list[chain].push_back (*ni);
381 /* no output, so this is one of the nodes that we can count off to decide
384 _n_terminal_nodes[chain] += 1;
388 _pending_chain = chain;
392 /** Called by both the main thread and all helpers. */
396 GraphNode* to_run = NULL;
398 if (g_atomic_int_get (&_terminate)) {
402 if (_trigger_queue.pop_front (to_run)) {
403 /* Wake up idle threads, but at most as many as there's
404 * work in the trigger queue that can be processed by
406 * This thread as not yet decreased _trigger_queue_size.
408 guint idle_cnt = g_atomic_uint_get (&_idle_thread_cnt);
409 guint work_avail = g_atomic_uint_get (&_trigger_queue_size);
410 guint wakeup = std::min (idle_cnt + 1, work_avail);
412 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 signals %2 threads\n", pthread_name (), wakeup));
413 for (guint i = 1; i < wakeup; ++i) {
414 _execution_sem.signal ();
419 /* Wait for work, fall asleep */
420 g_atomic_int_inc (&_idle_thread_cnt);
421 assert (g_atomic_uint_get (&_idle_thread_cnt) <= _n_workers);
423 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_name ()));
424 _execution_sem.wait ();
426 if (g_atomic_int_get (&_terminate)) {
430 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_name ()));
432 g_atomic_int_dec_and_test (&_idle_thread_cnt);
434 /* Try to find some work to do */
435 _trigger_queue.pop_front (to_run);
438 /* Process the graph-node */
439 g_atomic_int_dec_and_test (&_trigger_queue_size);
440 to_run->run (_current_chain);
442 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 has finished run_one()\n", pthread_name ()));
446 Graph::helper_thread ()
448 g_atomic_int_inc (&_n_workers);
449 guint id = g_atomic_uint_get (&_n_workers);
451 /* This is needed for ARDOUR::Session requests called from rt-processors
452 * in particular Lua scripts may do cross-thread calls */
453 if (!SessionEvent::has_per_thread_pool ()) {
455 snprintf (name, 64, "RT-%u-%p", id, (void*)DEBUG_THREAD_SELF);
456 pthread_set_name (name);
457 SessionEvent::create_per_thread_pool (name, 64);
458 PBD::notify_event_loops_about_thread_creation (pthread_self (), name, 64);
461 suspend_rt_malloc_checks ();
462 ProcessThread* pt = new ProcessThread ();
463 resume_rt_malloc_checks ();
467 while (!g_atomic_int_get (&_terminate)) {
475 /** Here's the main graph thread */
477 Graph::main_thread ()
479 /* first time setup */
481 suspend_rt_malloc_checks ();
482 ProcessThread* pt = new ProcessThread ();
484 /* This is needed for ARDOUR::Session requests called from rt-processors
485 * in particular Lua scripts may do cross-thread calls */
486 if (!SessionEvent::has_per_thread_pool ()) {
488 snprintf (name, 64, "RT-main-%p", (void*)DEBUG_THREAD_SELF);
489 pthread_set_name (name);
490 SessionEvent::create_per_thread_pool (name, 64);
491 PBD::notify_event_loops_about_thread_creation (pthread_self (), name, 64);
493 resume_rt_malloc_checks ();
497 /* Wait for initial process callback */
499 _callback_start_sem.wait ();
501 DEBUG_TRACE (DEBUG::ProcessThreads, "main thread is awake\n");
503 if (g_atomic_int_get (&_terminate)) {
509 /* Bootstrap the trigger-list
510 * (later this is done by Graph_reached_terminal_node) */
513 if (_graph_empty && !g_atomic_int_get (&_terminate)) {
514 _callback_done_sem.signal ();
515 DEBUG_TRACE (DEBUG::ProcessThreads, "main thread sees graph done, goes back to sleep\n");
519 /* After setup, the main-thread just becomes a normal worker */
520 while (!g_atomic_int_get (&_terminate)) {
529 Graph::dump (int chain) const
532 node_list_t::const_iterator ni;
533 node_set_t::const_iterator ai;
535 chain = _pending_chain;
537 DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
538 for (ni = _nodes_rt[chain].begin (); ni != _nodes_rt[chain].end (); ni++) {
539 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route> (*ni);
540 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1 refcount: %2\n", rp->name ().c_str (), (*ni)->_init_refcount[chain]));
541 for (ai = (*ni)->_activation_set[chain].begin (); ai != (*ni)->_activation_set[chain].end (); ai++) {
542 DEBUG_TRACE (DEBUG::Graph, string_compose (" triggers: %1\n", boost::dynamic_pointer_cast<Route> (*ai)->name ().c_str ()));
546 DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
547 for (ni = _init_trigger_list[chain].begin (); ni != _init_trigger_list[chain].end (); ni++) {
548 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1 refcount: %2\n", boost::dynamic_pointer_cast<Route> (*ni)->name ().c_str (), (*ni)->_init_refcount[chain]));
551 DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _n_terminal_nodes[chain]));
556 Graph::plot (std::string const& file_name) const
558 Glib::Threads::Mutex::Lock ls (_swap_mutex);
559 int chain = _current_chain;
561 node_list_t::const_iterator ni;
562 node_set_t::const_iterator ai;
566 ss << " node [shape = ellipse];\n";
568 for (ni = _nodes_rt[chain].begin (); ni != _nodes_rt[chain].end (); ni++) {
569 boost::shared_ptr<Route> sr = boost::dynamic_pointer_cast<Route> (*ni);
570 std::string sn = string_compose ("%1 (%2)", sr->name (), (*ni)->_init_refcount[chain]);
571 if ((*ni)->_init_refcount[chain] == 0 && (*ni)->_activation_set[chain].size() == 0) {
572 ss << " \"" << sn << "\"[style=filled,fillcolor=gold1];\n";
573 } else if ((*ni)->_init_refcount[chain] == 0) {
574 ss << " \"" << sn << "\"[style=filled,fillcolor=lightskyblue1];\n";
575 } else if ((*ni)->_activation_set[chain].size() == 0) {
576 ss << " \"" << sn << "\"[style=filled,fillcolor=aquamarine2];\n";
578 for (ai = (*ni)->_activation_set[chain].begin (); ai != (*ni)->_activation_set[chain].end (); ai++) {
579 boost::shared_ptr<Route> dr = boost::dynamic_pointer_cast<Route> (*ai);
580 std::string dn = string_compose ("%1 (%2)", dr->name (), (*ai)->_init_refcount[chain]);
581 bool sends_only = false;
582 sr->feeds (dr, &sends_only);
584 ss << " edge [style=dashed];\n";
586 ss << " \"" << sn << "\" -> \"" << dn << "\"\n";
588 ss << " edge [style=solid];\n";
595 if (!g_file_set_contents (file_name.c_str(), ss.str().c_str(), -1, &err)) {
597 error << string_compose (_("Could not graph to file (%1)"), err->message) << endmsg;
606 Graph::process_routes (pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool& need_butler)
608 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("graph execution from %1 to %2 = %3\n", start_sample, end_sample, nframes));
610 if (g_atomic_int_get (&_terminate)) {
614 _process_nframes = nframes;
615 _process_start_sample = start_sample;
616 _process_end_sample = end_sample;
618 _process_noroll = false;
620 _process_need_butler = false;
622 DEBUG_TRACE (DEBUG::ProcessThreads, "wake graph for non-silent process\n");
623 _callback_start_sem.signal ();
624 _callback_done_sem.wait ();
625 DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
627 need_butler = _process_need_butler;
629 return _process_retval;
633 Graph::routes_no_roll (pframes_t nframes, samplepos_t start_sample, samplepos_t end_sample, bool non_rt_pending)
635 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_sample, end_sample, nframes));
637 if (g_atomic_int_get (&_terminate)) {
641 _process_nframes = nframes;
642 _process_start_sample = start_sample;
643 _process_end_sample = end_sample;
644 _process_non_rt_pending = non_rt_pending;
646 _process_noroll = true;
648 _process_need_butler = false;
650 DEBUG_TRACE (DEBUG::ProcessThreads, "wake graph for no-roll process\n");
651 _callback_start_sem.signal ();
652 _callback_done_sem.wait ();
653 DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
655 return _process_retval;
658 Graph::process_one_route (Route* route)
660 bool need_butler = false;
665 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_name (), route->name ()));
667 if (_process_noroll) {
668 retval = route->no_roll (_process_nframes, _process_start_sample, _process_end_sample, _process_non_rt_pending);
670 retval = route->roll (_process_nframes, _process_start_sample, _process_end_sample, need_butler);
674 _process_retval = retval;
678 _process_need_butler = true;
683 Graph::in_process_thread () const
685 return AudioEngine::instance ()->in_process_thread ();