Merge branch 'cairocanvas'
[ardour.git] / libs / ardour / graph.cc
1 /*
2   Copyright (C) 2010 Paul Davis
3   Author: Torben Hohn
4
5   This program is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published by
7   the Free Software Foundation; either version 2 of the License, or
8   (at your option) any later version.
9
10   This program is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with this program; if not, write to the Free Software
17   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
19 */
20 #include <stdio.h>
21 #include <cmath>
22
23 #include "pbd/compose.h"
24 #include "pbd/debug_rt_alloc.h"
25 #include "pbd/pthread_utils.h"
26
27 #include "ardour/debug.h"
28 #include "ardour/graph.h"
29 #include "ardour/types.h"
30 #include "ardour/session.h"
31 #include "ardour/route.h"
32 #include "ardour/process_thread.h"
33 #include "ardour/audioengine.h"
34
35 #include "i18n.h"
36
37 using namespace ARDOUR;
38 using namespace PBD;
39 using namespace std;
40
41 #ifdef DEBUG_RT_ALLOC
42 static Graph* graph = 0;
43
44 extern "C" {
45
46 int alloc_allowed ()
47 {
48         return !graph->in_process_thread ();
49 }
50
51 }
52 #endif
53
54 Graph::Graph (Session & session)
55         : SessionHandleRef (session)
56         , _quit_threads (false)
57         , _execution_sem ("graph_execution", 0)
58         , _callback_start_sem ("graph_start", 0)
59         , _callback_done_sem ("graph_done", 0)
60         , _cleanup_sem ("graph_cleanup", 0)
61 {
62         pthread_mutex_init( &_trigger_mutex, NULL);
63
64         /* XXX: rather hacky `fix' to stop _trigger_queue.push_back() allocating
65            memory in the RT thread.
66         */
67         _trigger_queue.reserve (8192);
68
69         _execution_tokens = 0;
70
71         _current_chain = 0;
72         _pending_chain = 0;
73         _setup_chain   = 1;
74         _quit_threads = false;
75         _graph_empty = true;
76
77         reset_thread_list ();
78
79 #ifdef DEBUG_RT_ALLOC
80         graph = this;
81         pbd_alloc_allowed = &::alloc_allowed;
82 #endif
83 }
84
85 /** Set up threads for running the graph */
86 void
87 Graph::reset_thread_list ()
88 {
89         uint32_t num_threads = how_many_dsp_threads ();
90
91         /* For now, we shouldn't be using the graph code if we only have 1 DSP thread */
92         assert (num_threads > 1);
93
94         /* don't bother doing anything here if we already have the right
95            number of threads.
96         */
97
98         if (AudioEngine::instance()->process_thread_count() == num_threads) {
99                 return;
100         }
101
102         Glib::Threads::Mutex::Lock lm (_session.engine().process_lock());
103
104         if (AudioEngine::instance()->process_thread_count() != 0) {
105                 drop_threads ();
106         }
107
108         if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::main_thread, this)) != 0) {
109                 throw failed_constructor ();
110         }
111
112         for (uint32_t i = 1; i < num_threads; ++i) {
113                 if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::helper_thread, this))) {
114                         throw failed_constructor ();
115                 }
116         }
117 }
118
119 void
120 Graph::session_going_away()
121 {
122         drop_threads ();
123
124         // now drop all references on the nodes.
125         _nodes_rt[0].clear();
126         _nodes_rt[1].clear();
127         _init_trigger_list[0].clear();
128         _init_trigger_list[1].clear();
129         _trigger_queue.clear();
130 }
131
132 void
133 Graph::drop_threads ()
134 {
135         _quit_threads = true;
136
137         uint32_t thread_count = AudioEngine::instance()->process_thread_count ();
138
139         for (unsigned int i=0; i < thread_count; i++) {
140                 _execution_sem.signal ();
141         }
142
143         _callback_start_sem.signal ();
144
145         AudioEngine::instance()->join_process_threads ();
146
147         _execution_tokens = 0;
148
149         _quit_threads = false;
150 }
151
152 void
153 Graph::clear_other_chain ()
154 {
155         Glib::Threads::Mutex::Lock ls (_swap_mutex);
156
157         while (1) {
158                 if (_setup_chain != _pending_chain) {
159
160                         for (node_list_t::iterator ni=_nodes_rt[_setup_chain].begin(); ni!=_nodes_rt[_setup_chain].end(); ni++) {
161                                 (*ni)->_activation_set[_setup_chain].clear();
162                         }
163
164                         _nodes_rt[_setup_chain].clear ();
165                         _init_trigger_list[_setup_chain].clear ();
166                         break;
167                 }
168                 /* setup chain == pending chain - we have
169                    to wait till this is no longer true.
170                 */
171                 _cleanup_cond.wait (_swap_mutex);
172         }
173 }
174
175 void
176 Graph::prep()
177 {
178         node_list_t::iterator i;
179         int chain;
180
181         if (_swap_mutex.trylock()) {
182                 // we got the swap mutex.
183                 if (_current_chain != _pending_chain)
184                 {
185                         // printf ("chain swap ! %d -> %d\n", _current_chain, _pending_chain);
186                         _setup_chain = _current_chain;
187                         _current_chain = _pending_chain;
188                         _cleanup_cond.signal ();
189                 }
190                 _swap_mutex.unlock ();
191         }
192
193         chain = _current_chain;
194
195         _graph_empty = true;
196         for (i=_nodes_rt[chain].begin(); i!=_nodes_rt[chain].end(); i++) {
197                 (*i)->prep( chain);
198                 _graph_empty = false;
199         }
200         _finished_refcount = _init_finished_refcount[chain];
201
202         /* Trigger the initial nodes for processing, which are the ones at the `input' end */
203         pthread_mutex_lock (&_trigger_mutex);
204         for (i=_init_trigger_list[chain].begin(); i!=_init_trigger_list[chain].end(); i++) {
205                 /* don't use ::trigger here, as we have already locked the mutex */
206                 _trigger_queue.push_back (i->get ());
207         }
208         pthread_mutex_unlock (&_trigger_mutex);
209 }
210
211 void
212 Graph::trigger (GraphNode* n)
213 {
214         pthread_mutex_lock (&_trigger_mutex);
215         _trigger_queue.push_back (n);
216         pthread_mutex_unlock (&_trigger_mutex);
217 }
218
219 /** Called when a node at the `output' end of the chain (ie one that has no-one to feed)
220  *  is finished.
221  */
222 void
223 Graph::dec_ref()
224 {
225         if (g_atomic_int_dec_and_test (const_cast<gint*> (&_finished_refcount))) {
226
227                 /* We have run all the nodes that are at the `output' end of
228                    the graph, so there is nothing more to do this time around.
229                 */
230
231                 restart_cycle ();
232         }
233 }
234
235 void
236 Graph::restart_cycle()
237 {
238         // we are through. wakeup our caller.
239
240   again:
241         _callback_done_sem.signal ();
242
243         /* Block until the a process callback triggers us */
244         _callback_start_sem.wait();
245
246         if (_quit_threads) {
247                 return;
248         }
249
250         prep ();
251
252         if (_graph_empty) {
253                 goto again;
254         }
255
256         // returning will restart the cycle.
257         // starting with waking up the others.
258 }
259
260 /** Rechain our stuff using a list of routes (which can be in any order) and
261  *  a directed graph of their interconnections, which is guaranteed to be
262  *  acyclic.
263  */
264
265 void
266 Graph::rechain (boost::shared_ptr<RouteList> routelist, GraphEdges const & edges)
267 {
268         Glib::Threads::Mutex::Lock ls (_swap_mutex);
269
270         int chain = _setup_chain;
271         DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
272
273         /* This will become the number of nodes that do not feed any other node;
274            once we have processed this number of those nodes, we have finished.
275         */
276         _init_finished_refcount[chain] = 0;
277
278         /* This will become a list of nodes that are not fed by another node, ie
279            those at the `input' end.
280         */
281         _init_trigger_list[chain].clear();
282
283         _nodes_rt[chain].clear();
284
285         /* Clear things out, and make _nodes_rt[chain] a copy of routelist */
286         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
287                 (*ri)->_init_refcount[chain] = 0;
288                 (*ri)->_activation_set[chain].clear();
289                 _nodes_rt[chain].push_back (*ri);
290         }
291
292         // now add refs for the connections.
293
294         for (node_list_t::iterator ni = _nodes_rt[chain].begin(); ni != _nodes_rt[chain].end(); ni++) {
295
296                 boost::shared_ptr<Route> r = boost::dynamic_pointer_cast<Route> (*ni);
297
298                 /* The routes that are directly fed by r */
299                 set<GraphVertex> fed_from_r = edges.from (r);
300
301                 /* Hence whether r has an output */
302                 bool const has_output = !fed_from_r.empty ();
303
304                 /* Set up r's activation set */
305                 for (set<GraphVertex>::iterator i = fed_from_r.begin(); i != fed_from_r.end(); ++i) {
306                         r->_activation_set[chain].insert (*i);
307                 }
308
309                 /* r has an input if there are some incoming edges to r in the graph */
310                 bool const has_input = !edges.has_none_to (r);
311
312                 /* Increment the refcount of any route that we directly feed */
313                 for (node_set_t::iterator ai = r->_activation_set[chain].begin(); ai != r->_activation_set[chain].end(); ai++) {
314                         (*ai)->_init_refcount[chain] += 1;
315                 }
316
317                 if (!has_input) {
318                         /* no input, so this node needs to be triggered initially to get things going */
319                         _init_trigger_list[chain].push_back (*ni);
320                 }
321
322                 if (!has_output) {
323                         /* no output, so this is one of the nodes that we can count off to decide
324                            if we've finished
325                         */
326                         _init_finished_refcount[chain] += 1;
327                 }
328         }
329
330         _pending_chain = chain;
331         dump(chain);
332 }
333
334 /** Called by both the main thread and all helpers.
335  *  @return true to quit, false to carry on.
336  */
337 bool
338 Graph::run_one()
339 {
340         GraphNode* to_run;
341
342         pthread_mutex_lock (&_trigger_mutex);
343         if (_trigger_queue.size()) {
344                 to_run = _trigger_queue.back();
345                 _trigger_queue.pop_back();
346         } else {
347                 to_run = 0;
348         }
349
350         /* the number of threads that are asleep */
351         int et = _execution_tokens;
352         /* the number of nodes that need to be run */
353         int ts = _trigger_queue.size();
354
355         /* hence how many threads to wake up */
356         int wakeup = min (et, ts);
357         /* update the number of threads that will still be sleeping */
358         _execution_tokens -= wakeup;
359
360         DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 signals %2\n", pthread_name(), wakeup));
361
362         for (int i = 0; i < wakeup; i++) {
363                 _execution_sem.signal ();
364         }
365
366         while (to_run == 0) {
367                 _execution_tokens += 1;
368                 pthread_mutex_unlock (&_trigger_mutex);
369                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_name()));
370                 _execution_sem.wait ();
371                 if (_quit_threads) {
372                         return true;
373                 }
374                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_name()));
375                 pthread_mutex_lock (&_trigger_mutex);
376                 if (_trigger_queue.size()) {
377                         to_run = _trigger_queue.back();
378                         _trigger_queue.pop_back();
379                 }
380         }
381         pthread_mutex_unlock (&_trigger_mutex);
382
383         to_run->process();
384         to_run->finish (_current_chain);
385
386         DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 has finished run_one()\n", pthread_name()));
387
388         return false;
389 }
390
391 void
392 Graph::helper_thread()
393 {
394         suspend_rt_malloc_checks ();
395         ProcessThread* pt = new ProcessThread ();
396         resume_rt_malloc_checks ();
397
398         pt->get_buffers();
399
400         while(1) {
401                 if (run_one()) {
402                         break;
403                 }
404         }
405
406         pt->drop_buffers();
407 }
408
409 /** Here's the main graph thread */
410 void
411 Graph::main_thread()
412 {
413         suspend_rt_malloc_checks ();
414         ProcessThread* pt = new ProcessThread ();
415         resume_rt_malloc_checks ();
416
417         pt->get_buffers();
418
419   again:
420         _callback_start_sem.wait ();
421         
422         DEBUG_TRACE(DEBUG::ProcessThreads, "main thread is awake\n");
423
424         if (_quit_threads) {
425                 return;
426         }
427
428         prep ();
429
430         if (_graph_empty && !_quit_threads) {
431                 _callback_done_sem.signal ();
432                 DEBUG_TRACE(DEBUG::ProcessThreads, "main thread sees graph done, goes back to sleep\n");
433                 goto again;
434         }
435
436         /* This loop will run forever */
437         while (1) {
438                 DEBUG_TRACE(DEBUG::ProcessThreads, "main thread runs one graph node\n");
439                 if (run_one()) {
440                         break;
441                 }
442         }
443
444         pt->drop_buffers();
445 }
446
447 void
448 Graph::dump (int chain)
449 {
450 #ifndef NDEBUG
451         node_list_t::iterator ni;
452         node_set_t::iterator ai;
453
454         chain = _pending_chain;
455
456         DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
457         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
458                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
459                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", rp->name().c_str(), (*ni)->_init_refcount[chain]));
460                 for (ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
461                         DEBUG_TRACE (DEBUG::Graph, string_compose ("  triggers: %1\n", boost::dynamic_pointer_cast<Route>(*ai)->name().c_str()));
462                 }
463         }
464
465         DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
466         for (ni=_init_trigger_list[chain].begin(); ni!=_init_trigger_list[chain].end(); ni++) {
467                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", boost::dynamic_pointer_cast<Route>(*ni)->name().c_str(), (*ni)->_init_refcount[chain]));
468         }
469
470         DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _init_finished_refcount[chain]));
471 #endif
472 }
473
474 int
475 Graph::silent_process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, bool& need_butler)
476 {
477         _process_nframes = nframes;
478         _process_start_frame = start_frame;
479         _process_end_frame = end_frame;
480
481         _process_silent = true;
482         _process_noroll = false;
483         _process_retval = 0;
484         _process_need_butler = false;
485
486         if (!_graph_empty) {
487                 DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for silent process\n");
488                 _callback_start_sem.signal ();
489                 _callback_done_sem.wait ();
490         }
491
492         need_butler = _process_need_butler;
493
494         return _process_retval;
495 }
496
497 int
498 Graph::process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, int declick, bool& need_butler)
499 {
500         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
501
502         _process_nframes = nframes;
503         _process_start_frame = start_frame;
504         _process_end_frame = end_frame;
505         _process_declick = declick;
506
507         _process_silent = false;
508         _process_noroll = false;
509         _process_retval = 0;
510         _process_need_butler = false;
511
512         DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for non-silent process\n");
513         _callback_start_sem.signal ();
514         _callback_done_sem.wait ();
515
516         DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
517
518         need_butler = _process_need_butler;
519
520         return _process_retval;
521 }
522
523 int
524 Graph::routes_no_roll (pframes_t nframes, framepos_t start_frame, framepos_t end_frame,
525                        bool non_rt_pending, int declick)
526 {
527         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
528
529         _process_nframes = nframes;
530         _process_start_frame = start_frame;
531         _process_end_frame = end_frame;
532         _process_declick = declick;
533         _process_non_rt_pending = non_rt_pending;
534
535         _process_silent = false;
536         _process_noroll = true;
537         _process_retval = 0;
538         _process_need_butler = false;
539
540         DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for no-roll process\n");
541         _callback_start_sem.signal ();
542         _callback_done_sem.wait ();
543
544         return _process_retval;
545 }
546 void
547 Graph::process_one_route (Route* route)
548 {
549         bool need_butler = false;
550         int retval;
551
552         assert (route);
553
554         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_name(), route->name()));
555
556         if (_process_silent) {
557                 retval = route->silent_roll (_process_nframes, _process_start_frame, _process_end_frame, need_butler);
558         } else if (_process_noroll) {
559                 route->set_pending_declick (_process_declick);
560                 retval = route->no_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_non_rt_pending);
561         } else {
562                 route->set_pending_declick (_process_declick);
563                 retval = route->roll (_process_nframes, _process_start_frame, _process_end_frame, _process_declick, need_butler);
564         }
565
566         if (retval) {
567                 _process_retval = retval;
568         }
569
570         if (need_butler) {
571                 _process_need_butler = true;
572         }
573 }
574
575 bool
576 Graph::in_process_thread () const
577 {
578         return AudioEngine::instance()->in_process_thread ();
579 }