A few comments.
[ardour.git] / libs / ardour / graph.cc
1 /*
2   Copyright (C) 2010 Paul Davis
3   Author: Torben Hohn
4
5   This program is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published by
7   the Free Software Foundation; either version 2 of the License, or
8   (at your option) any later version.
9
10   This program is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with this program; if not, write to the Free Software
17   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
19 */
20 #include <stdio.h>
21 #include <cmath>
22
23 #include "pbd/compose.h"
24 #include "pbd/debug_rt_alloc.h"
25
26 #include "ardour/debug.h"
27 #include "ardour/graph.h"
28 #include "ardour/types.h"
29 #include "ardour/session.h"
30 #include "ardour/route.h"
31 #include "ardour/process_thread.h"
32 #include "ardour/audioengine.h"
33
34 #include <jack/thread.h>
35
36 #include "i18n.h"
37
38 using namespace ARDOUR;
39 using namespace PBD;
40 using namespace std;
41
42 #ifdef DEBUG_RT_ALLOC
43 static Graph* graph = 0;
44
45 extern "C" {
46
47 int alloc_allowed ()
48 {
49         return !graph->in_process_thread ();
50 }
51
52 }
53 #endif
54
55 Graph::Graph (Session & session)
56         : SessionHandleRef (session)
57         , _quit_threads (false)
58         , _execution_sem ("graph_execution", 0)
59         , _callback_start_sem ("graph_start", 0)
60         , _callback_done_sem ("graph_done", 0)
61         , _cleanup_sem ("graph_cleanup", 0)
62 {
63         pthread_mutex_init( &_trigger_mutex, NULL);
64
65         /* XXX: rather hacky `fix' to stop _trigger_queue.push_back() allocating
66            memory in the RT thread.
67         */
68         _trigger_queue.reserve (8192);
69
70         _execution_tokens = 0;
71
72         _current_chain = 0;
73         _pending_chain = 0;
74         _setup_chain   = 1;
75         _quit_threads = false;
76         _graph_empty = true;
77
78         reset_thread_list ();
79
80         Config->ParameterChanged.connect_same_thread (processor_usage_connection, boost::bind (&Graph::parameter_changed, this, _1));
81
82 #ifdef DEBUG_RT_ALLOC
83         graph = this;
84         pbd_alloc_allowed = &::alloc_allowed;
85 #endif
86 }
87
88 void
89 Graph::parameter_changed (std::string param)
90 {
91         if (param == X_("processor-usage")) {
92                 reset_thread_list ();
93         }
94 }
95
96 void
97 Graph::reset_thread_list ()
98 {
99         uint32_t num_threads = how_many_dsp_threads ();
100
101         /* don't bother doing anything here if we already have the right
102            number of threads.
103         */
104
105         if (_thread_list.size() == num_threads) {
106                 return;
107         }
108
109         Glib::Mutex::Lock lm (_session.engine().process_lock());
110         pthread_t a_thread;
111
112         if (!_thread_list.empty()) {
113                 drop_threads ();
114         }
115
116 #if 0
117         /* XXX this only makes sense when we can use just the AudioEngine thread
118            and still keep the graph current with the route list
119         */
120         if (num_threads <= 1) {
121                 /* no point creating 1 thread - the AudioEngine already gives us one
122                  */
123                 return;
124         }
125 #endif
126         if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::main_thread, this), &a_thread, 100000) == 0) {
127                 _thread_list.push_back (a_thread);
128         }
129
130         for (uint32_t i = 1; i < num_threads; ++i) {
131                 if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::helper_thread, this), &a_thread, 100000) == 0) {
132                         _thread_list.push_back (a_thread);
133                 }
134         }
135 }
136
137 void
138 Graph::session_going_away()
139 {
140         drop_threads ();
141
142         // now drop all references on the nodes.
143         _nodes_rt[0].clear();
144         _nodes_rt[1].clear();
145         _init_trigger_list[0].clear();
146         _init_trigger_list[1].clear();
147         _trigger_queue.clear();
148 }
149
150 void
151 Graph::drop_threads ()
152 {
153         _quit_threads = true;
154
155         for (unsigned int i=0; i< _thread_list.size(); i++) {
156                 _execution_sem.signal ();
157         }
158
159         _callback_start_sem.signal ();
160
161         for (list<pthread_t>::iterator i = _thread_list.begin(); i != _thread_list.end(); ++i) {
162                 void* status;
163                 pthread_join (*i, &status);
164         }
165
166         _thread_list.clear ();
167
168         _execution_tokens = 0;
169
170         _quit_threads = false;
171 }
172
173 void
174 Graph::clear_other_chain ()
175 {
176         Glib::Mutex::Lock ls (_swap_mutex);
177
178         while (1) {
179                 if (_setup_chain != _pending_chain) {
180
181                         for (node_list_t::iterator ni=_nodes_rt[_setup_chain].begin(); ni!=_nodes_rt[_setup_chain].end(); ni++) {
182                                 (*ni)->_activation_set[_setup_chain].clear();
183                         }
184
185                         _nodes_rt[_setup_chain].clear ();
186                         _init_trigger_list[_setup_chain].clear ();
187                         break;
188                 }
189                 /* setup chain == pending chain - we have
190                    to wait till this is no longer true.
191                 */
192                 _cleanup_cond.wait (_swap_mutex);
193         }
194 }
195
196 void
197 Graph::prep()
198 {
199         node_list_t::iterator i;
200         int chain;
201
202         if (_swap_mutex.trylock()) {
203                 // we got the swap mutex.
204                 if (_current_chain != _pending_chain)
205                 {
206                         // printf ("chain swap ! %d -> %d\n", _current_chain, _pending_chain);
207                         _setup_chain = _current_chain;
208                         _current_chain = _pending_chain;
209                         _cleanup_cond.signal ();
210                 }
211                 _swap_mutex.unlock ();
212         }
213
214         chain = _current_chain;
215
216         _graph_empty = true;
217         for (i=_nodes_rt[chain].begin(); i!=_nodes_rt[chain].end(); i++) {
218                 (*i)->prep( chain);
219                 _graph_empty = false;
220         }
221         _finished_refcount = _init_finished_refcount[chain];
222
223         for (i=_init_trigger_list[chain].begin(); i!=_init_trigger_list[chain].end(); i++) {
224                 this->trigger( i->get() );
225         }
226 }
227
228 void
229 Graph::trigger (GraphNode* n)
230 {
231         pthread_mutex_lock (&_trigger_mutex);
232         _trigger_queue.push_back (n);
233         pthread_mutex_unlock (&_trigger_mutex);
234 }
235
236 void
237 Graph::dec_ref()
238 {
239         if (g_atomic_int_dec_and_test (&_finished_refcount)) {
240
241                 /* We have run all the nodes that are at the `output' end of
242                    the graph, so there is nothing more to do this time around.
243                 */
244
245                 this->restart_cycle();
246         }
247 }
248
249 void
250 Graph::restart_cycle()
251 {
252         // we are through. wakeup our caller.
253
254   again:
255         _callback_done_sem.signal ();
256
257         // block until we are triggered.
258         _callback_start_sem.wait();
259
260         if (_quit_threads) {
261                 return;
262         }
263
264         this->prep();
265
266         if (_graph_empty) {
267                 goto again;
268         }
269
270         // returning will restart the cycle.
271         // starting with waking up the others.
272 }
273
274 static bool
275 is_feedback (boost::shared_ptr<RouteList> routelist, Route* from, boost::shared_ptr<Route> to)
276 {
277         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
278                 if ((*ri).get() == from)
279                         return false;
280                 if ((*ri) == to)
281                         return true;
282         }
283         assert(0);
284         return false;
285 }
286
287 static bool
288 is_feedback (boost::shared_ptr<RouteList> routelist, boost::shared_ptr<Route> from, Route* to)
289 {
290         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
291                 if ((*ri).get() == to)
292                         return true;
293                 if ((*ri) == from)
294                         return false;
295         }
296         assert(0);
297         return false;
298 }
299
300 void
301 Graph::rechain (boost::shared_ptr<RouteList> routelist)
302 {
303         node_list_t::iterator ni;
304         Glib::Mutex::Lock ls (_swap_mutex);
305
306         int chain = _setup_chain;
307         DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
308         // set all refcounts to 0;
309
310         /* This will become the number of nodes that do not feed any other node;
311            once we have processed this number of those nodes, we have finished.
312         */
313         _init_finished_refcount[chain] = 0;
314
315         /* This will become a list of nodes that are not fed by another node, ie
316            those at the `input' end.
317         */
318         _init_trigger_list[chain].clear();
319
320         _nodes_rt[chain].clear();
321
322         /* Clear things out, and make _nodes_rt[chain] a copy of routelist */
323         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
324                 node_ptr_t n = boost::dynamic_pointer_cast<GraphNode> (*ri);
325
326                 n->_init_refcount[chain] = 0;
327                 n->_activation_set[chain].clear();
328                 _nodes_rt[chain].push_back(n);
329         }
330
331         // now add refs for the connections.
332
333         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
334
335                 /* We will set this to true if the node *ni is directly or
336                    indirectly fed by anything (without feedback)
337                 */
338                 bool has_input  = false;
339
340                 /* We will set this to true if the node *ni directly feeds
341                    anything (without feedback)
342                 */
343                 bool has_output = false;
344
345                 /* We will also set up *ni's _activation_set to contain any nodes
346                    that it directly feeds.
347                 */
348
349                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
350
351                 for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
352                         if (rp->direct_feeds (*ri)) {
353                                 if (is_feedback (routelist, rp.get(), *ri)) {
354                                         continue;
355                                 }
356
357                                 has_output = true;
358                                 (*ni)->_activation_set[chain].insert (boost::dynamic_pointer_cast<GraphNode> (*ri) );
359                         }
360                 }
361
362                 for (Route::FedBy::iterator fi=rp->fed_by().begin(); fi!=rp->fed_by().end(); fi++) {
363                         if (boost::shared_ptr<Route> r = fi->r.lock()) {
364                                 if (!is_feedback (routelist, r, rp.get())) {
365                                         has_input = true;
366                                 }
367                         }
368                 }
369
370                 /* Increment the refcount of any route that we directly feed */
371                 for (node_set_t::iterator ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
372                         (*ai)->_init_refcount[chain] += 1;
373                 }
374
375                 if (!has_input)
376                         _init_trigger_list[chain].push_back (*ni);
377
378                 if (!has_output)
379                         _init_finished_refcount[chain] += 1;
380         }
381
382         _pending_chain = chain;
383         dump(chain);
384 }
385
386 bool
387 Graph::run_one()
388 {
389         GraphNode* to_run;
390
391         pthread_mutex_lock (&_trigger_mutex);
392         if (_trigger_queue.size()) {
393                 to_run = _trigger_queue.back();
394                 _trigger_queue.pop_back();
395         } else {
396                 to_run = 0;
397         }
398
399         int et = _execution_tokens;
400         int ts = _trigger_queue.size();
401
402         int wakeup = min (et, ts);
403         _execution_tokens -= wakeup;
404
405         DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 signals %2\n", pthread_self(), wakeup));
406
407         for (int i = 0; i < wakeup; i++) {
408                 _execution_sem.signal ();
409         }
410
411         while (to_run == 0) {
412                 _execution_tokens += 1;
413                 pthread_mutex_unlock (&_trigger_mutex);
414                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_self()));
415                 _execution_sem.wait ();
416                 if (_quit_threads) {
417                         return true;
418                 }
419                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_self()));
420                 pthread_mutex_lock (&_trigger_mutex);
421                 if (_trigger_queue.size()) {
422                         to_run = _trigger_queue.back();
423                         _trigger_queue.pop_back();
424                 }
425         }
426         pthread_mutex_unlock (&_trigger_mutex);
427
428         to_run->process();
429         to_run->finish (_current_chain);
430
431         DEBUG_TRACE(DEBUG::ProcessThreads, string_compose ("%1 has finished run_one()\n", pthread_self()));
432
433         return false;
434 }
435
436 static void get_rt()
437 {
438         if (!jack_is_realtime (AudioEngine::instance()->jack())) {
439                 return;
440         }
441
442         int priority = jack_client_real_time_priority (AudioEngine::instance()->jack());
443
444         if (priority) {
445                 struct sched_param rtparam;
446
447                 memset (&rtparam, 0, sizeof (rtparam));
448                 rtparam.sched_priority = priority;
449
450                 pthread_setschedparam (pthread_self(), SCHED_FIFO, &rtparam);
451         }
452 }
453
454 void
455 Graph::helper_thread()
456 {
457         suspend_rt_malloc_checks ();
458         ProcessThread* pt = new ProcessThread ();
459         resume_rt_malloc_checks ();
460
461         pt->get_buffers();
462         get_rt();
463
464         while(1) {
465                 if (run_one()) {
466                         break;
467                 }
468         }
469
470         pt->drop_buffers();
471 }
472
473 void
474 Graph::main_thread()
475 {
476         suspend_rt_malloc_checks ();
477         ProcessThread* pt = new ProcessThread ();
478         resume_rt_malloc_checks ();
479
480         pt->get_buffers();
481         get_rt();
482
483   again:
484         _callback_start_sem.wait ();
485         DEBUG_TRACE(DEBUG::ProcessThreads, "main thread is awake\n");
486
487         if (_quit_threads) {
488                 return;
489         }
490
491         this->prep();
492
493         if (_graph_empty && !_quit_threads) {
494                 _callback_done_sem.signal ();
495                 DEBUG_TRACE(DEBUG::ProcessThreads, "main thread sees graph done, goes back to slee\n");
496                 goto again;
497         }
498
499         while (1) {
500                 DEBUG_TRACE(DEBUG::ProcessThreads, "main thread runs one graph node\n");
501                 if (run_one()) {
502                         break;
503                 }
504         }
505
506         pt->drop_buffers();
507 }
508
509 void
510 Graph::dump (int chain)
511 {
512 #ifndef NDEBUG
513         node_list_t::iterator ni;
514         node_set_t::iterator ai;
515
516         chain = _pending_chain;
517
518         DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
519         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
520                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
521                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", rp->name().c_str(), (*ni)->_init_refcount[chain]));
522                 for (ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
523                         DEBUG_TRACE (DEBUG::Graph, string_compose ("  triggers: %1\n", boost::dynamic_pointer_cast<Route>(*ai)->name().c_str()));
524                 }
525         }
526
527         DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
528         for (ni=_init_trigger_list[chain].begin(); ni!=_init_trigger_list[chain].end(); ni++) {
529                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", boost::dynamic_pointer_cast<Route>(*ni)->name().c_str(), (*ni)->_init_refcount[chain]));
530         }
531
532         DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _init_finished_refcount[chain]));
533 #endif
534 }
535
536 int
537 Graph::silent_process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, bool& need_butler)
538 {
539         _process_nframes = nframes;
540         _process_start_frame = start_frame;
541         _process_end_frame = end_frame;
542
543         _process_silent = true;
544         _process_noroll = false;
545         _process_retval = 0;
546         _process_need_butler = false;
547
548         if (!_graph_empty) {
549                 DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for silent process\n");
550                 _callback_start_sem.signal ();
551                 _callback_done_sem.wait ();
552         }
553
554         need_butler = _process_need_butler;
555
556         return _process_retval;
557 }
558
559 int
560 Graph::process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, int declick, bool& need_butler)
561 {
562         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
563
564         _process_nframes = nframes;
565         _process_start_frame = start_frame;
566         _process_end_frame = end_frame;
567         _process_declick = declick;
568
569         _process_silent = false;
570         _process_noroll = false;
571         _process_retval = 0;
572         _process_need_butler = false;
573
574         DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for non-silent process\n");
575         _callback_start_sem.signal ();
576         _callback_done_sem.wait ();
577
578         DEBUG_TRACE (DEBUG::ProcessThreads, "graph execution complete\n");
579
580         need_butler = _process_need_butler;
581
582         return _process_retval;
583 }
584
585 int
586 Graph::routes_no_roll (pframes_t nframes, framepos_t start_frame, framepos_t end_frame,
587                        bool non_rt_pending, int declick)
588 {
589         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
590
591         _process_nframes = nframes;
592         _process_start_frame = start_frame;
593         _process_end_frame = end_frame;
594         _process_declick = declick;
595         _process_non_rt_pending = non_rt_pending;
596
597         _process_silent = false;
598         _process_noroll = true;
599         _process_retval = 0;
600         _process_need_butler = false;
601
602         DEBUG_TRACE(DEBUG::ProcessThreads, "wake graph for no-roll process\n");
603         _callback_start_sem.signal ();
604         _callback_done_sem.wait ();
605
606         return _process_retval;
607 }
608 void
609 Graph::process_one_route (Route* route)
610 {
611         bool need_butler = false;
612         int retval;
613
614         assert (route);
615
616         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_self(), route->name()));
617
618         if (_process_silent) {
619                 retval = route->silent_roll (_process_nframes, _process_start_frame, _process_end_frame, need_butler);
620         } else if (_process_noroll) {
621                 route->set_pending_declick (_process_declick);
622                 retval = route->no_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_non_rt_pending);
623         } else {
624                 route->set_pending_declick (_process_declick);
625                 retval = route->roll (_process_nframes, _process_start_frame, _process_end_frame, _process_declick, need_butler);
626         }
627
628         if (retval) {
629                 _process_retval = retval;
630         }
631
632         if (need_butler) {
633                 _process_need_butler = true;
634         }
635 }
636
637 bool
638 Graph::in_process_thread () const
639 {
640         list<pthread_t>::const_iterator i = _thread_list.begin ();
641         while (i != _thread_list.end() && *i != pthread_self ()) {
642                 ++i;
643         }
644
645         return i != _thread_list.end ();
646 }