0d3749aff0b68fc7c7cab1b5ea37d64187be55a0
[ardour.git] / libs / ardour / graph.cc
1 /*
2   Copyright (C) 2010 Paul Davis
3   Author: Torben Hohn
4
5   This program is free software; you can redistribute it and/or modify
6   it under the terms of the GNU General Public License as published by
7   the Free Software Foundation; either version 2 of the License, or
8   (at your option) any later version.
9
10   This program is distributed in the hope that it will be useful,
11   but WITHOUT ANY WARRANTY; without even the implied warranty of
12   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13   GNU General Public License for more details.
14
15   You should have received a copy of the GNU General Public License
16   along with this program; if not, write to the Free Software
17   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
18
19 */
20 #include <stdio.h>
21 #include <cmath>
22
23 #include "pbd/compose.h"
24 #include "pbd/cpus.h"
25
26 #include "ardour/debug.h"
27 #include "ardour/graph.h"
28 #include "ardour/types.h"
29 #include "ardour/session.h"
30 #include "ardour/route.h"
31 #include "ardour/process_thread.h"
32 #include "ardour/audioengine.h"
33
34 #include <jack/thread.h>
35
36 #include "i18n.h"
37
38 using namespace ARDOUR;
39 using namespace PBD;
40 using namespace std;
41
42
43 Graph::Graph (Session & session) 
44         : SessionHandleRef (session) 
45         , _execution_sem ("graph_execution", 0)
46         , _callback_start_sem ("graph_start", 0)
47         , _callback_done_sem ("graph_done", 0)
48         , _cleanup_sem ("graph_cleanup", 0)
49 {
50         pthread_mutex_init( &_trigger_mutex, NULL);
51
52         /* XXX: rather hacky `fix' to stop _trigger_queue.push_back() allocating
53            memory in the RT thread.
54         */
55         _trigger_queue.reserve (8192);
56
57         _execution_tokens = 0;
58
59         _current_chain = 0;
60         _pending_chain = 0;
61         _setup_chain   = 1;
62         _quit_threads = false;
63         _graph_empty = true;
64
65         int num_cpu = hardware_concurrency();
66         int num_threads = num_cpu;
67         int pu = Config->get_processor_usage ();
68
69         if (pu < 0) {
70                 /* pu is negative: use "pu" less cores for DSP than appear to be available
71                  */
72
73                 if (-pu < num_threads) {
74                         num_threads += pu; 
75                 } else {
76                         num_threads = 1;
77                 }
78         } else {
79                 /* use "pu" cores, if available
80                  */
81
82                 if (pu <= num_threads) {
83                         num_threads = pu;
84                 } 
85         }
86
87         info << string_compose (_("Using %2 threads on %1 CPUs"), num_cpu, num_threads) << endmsg;
88
89         pthread_t a_thread;
90
91         if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::main_thread, this), &a_thread, 100000) == 0) {
92                 _thread_list.push_back (a_thread);
93         }
94
95         for (int i = 1; i < num_threads; ++i) {
96                 if (AudioEngine::instance()->create_process_thread (boost::bind (&Graph::helper_thread, this), &a_thread, 100000) == 0) {
97                         _thread_list.push_back (a_thread);
98                 }
99         }
100 }
101
102 void
103 Graph::session_going_away()
104 {
105         _quit_threads = true;
106
107         for (unsigned int i=0; i<_thread_list.size(); i++) {
108                 _execution_sem.signal ();
109         }
110
111         _callback_start_sem.signal ();
112
113         for (list<pthread_t>::iterator i = _thread_list.begin(); i != _thread_list.end(); i++) {
114                 void* status;
115                 pthread_join (*i, &status);
116         }
117
118         // now drop all references on the nodes.
119         _nodes_rt[0].clear();
120         _nodes_rt[1].clear();
121         _init_trigger_list[0].clear();
122         _init_trigger_list[1].clear();
123         _trigger_queue.clear();
124 }
125
126 void
127 Graph::clear_other_chain ()
128 {
129         Glib::Mutex::Lock ls (_swap_mutex);
130
131         while (1) {
132                 if (_setup_chain != _pending_chain) {
133
134                         for (node_list_t::iterator ni=_nodes_rt[_setup_chain].begin(); ni!=_nodes_rt[_setup_chain].end(); ni++) {
135                                 (*ni)->_activation_set[_setup_chain].clear();
136                         }
137
138                         _nodes_rt[_setup_chain].clear ();
139                         _init_trigger_list[_setup_chain].clear ();
140                         break;
141                 }
142                 /* setup chain == pending chain - we have
143                    to wait till this is no longer true.
144                 */
145                 _cleanup_cond.wait (_swap_mutex);                
146         }
147 }
148
149 void
150 Graph::prep()
151 {
152         node_list_t::iterator i;
153         int chain;
154
155         if (_swap_mutex.trylock()) {
156                 // we got the swap mutex.
157                 if (_current_chain != _pending_chain)
158                 {
159                         // printf ("chain swap ! %d -> %d\n", _current_chain, _pending_chain);
160                         _setup_chain = _current_chain;
161                         _current_chain = _pending_chain;
162                         _cleanup_cond.signal ();
163                 }
164                 _swap_mutex.unlock ();
165         }
166
167         chain = _current_chain;
168
169         _graph_empty = true;
170         for (i=_nodes_rt[chain].begin(); i!=_nodes_rt[chain].end(); i++) {
171                 (*i)->prep( chain);
172                 _graph_empty = false;
173         }
174         _finished_refcount = _init_finished_refcount[chain];
175
176         for (i=_init_trigger_list[chain].begin(); i!=_init_trigger_list[chain].end(); i++) {
177                 this->trigger( i->get() );
178         }
179 }
180
181 void
182 Graph::trigger (GraphNode* n)
183 {
184         pthread_mutex_lock (&_trigger_mutex);
185         _trigger_queue.push_back( n);
186         pthread_mutex_unlock (&_trigger_mutex);
187 }
188
189 void
190 Graph::dec_ref()
191 {
192         if (g_atomic_int_dec_and_test (&_finished_refcount)) {
193
194                 // ok... this cycle is finished now.
195                 // we are the only thread alive.
196         
197                 this->restart_cycle();
198         }
199 }
200
201 void
202 Graph::restart_cycle()
203 {
204         //printf( "cycle_done chain: %d\n", _current_chain);
205
206         // we are through. wakeup our caller.
207   again:
208         _callback_done_sem.signal ();
209
210         // block until we are triggered.
211         _callback_start_sem.wait();
212         if (_quit_threads)
213                 return;
214
215         //printf( "cycle_start\n" );
216
217         this->prep();
218         if (_graph_empty)
219                 goto again;
220         //printf( "cycle_start chain: %d\n", _current_chain);
221
222         // returning will restart the cycle.
223         //  starting with waking up the others.
224 }
225
226 static bool
227 is_feedback (boost::shared_ptr<RouteList> routelist, Route* from, boost::shared_ptr<Route> to)
228 {
229         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
230                 if ((*ri).get() == from)
231                         return false;
232                 if ((*ri) == to)
233                         return true;
234         }
235         assert(0);
236         return false;
237 }
238
239 static bool
240 is_feedback (boost::shared_ptr<RouteList> routelist, boost::shared_ptr<Route> from, Route* to)
241 {
242         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
243                 if ((*ri).get() == to)
244                         return true;
245                 if ((*ri) == from)
246                         return false;
247         }
248         assert(0);
249         return false;
250 }
251
252 void
253 Graph::rechain (boost::shared_ptr<RouteList> routelist)
254 {
255         node_list_t::iterator ni;
256         Glib::Mutex::Lock ls (_swap_mutex);
257
258         int chain = _setup_chain;
259         DEBUG_TRACE (DEBUG::Graph, string_compose ("============== setup %1\n", chain));
260         // set all refcounts to 0;
261
262         _init_finished_refcount[chain] = 0;
263         _init_trigger_list[chain].clear();
264
265         _nodes_rt[chain].clear();
266
267         for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
268                 node_ptr_t n = boost::dynamic_pointer_cast<GraphNode> (*ri);
269
270                 n->_init_refcount[chain] = 0;
271                 n->_activation_set[chain].clear();
272                 _nodes_rt[chain].push_back(n);
273         }
274
275         // now add refs for the connections.
276
277         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
278                 bool has_input  = false;
279                 bool has_output = false;
280
281                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
282
283                 for (RouteList::iterator ri=routelist->begin(); ri!=routelist->end(); ri++) {
284                         if (rp->direct_feeds (*ri)) {
285                                 if (is_feedback (routelist, rp.get(), *ri)) {
286                                         continue; 
287                                 }
288
289                                 has_output = true;
290                                 (*ni)->_activation_set[chain].insert (boost::dynamic_pointer_cast<GraphNode> (*ri) );
291                         }
292                 }
293
294                 for (Route::FedBy::iterator fi=rp->fed_by().begin(); fi!=rp->fed_by().end(); fi++) {
295                         if (boost::shared_ptr<Route> r = fi->r.lock()) {
296                                 if (!is_feedback (routelist, r, rp.get())) {
297                                         has_input = true;
298                                 }
299                         }
300                 }
301
302                 for (node_set_t::iterator ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
303                         (*ai)->_init_refcount[chain] += 1;
304                 }
305
306                 if (!has_input)
307                         _init_trigger_list[chain].push_back (*ni);
308
309                 if (!has_output)
310                         _init_finished_refcount[chain] += 1;
311         } 
312
313         _pending_chain = chain;
314         dump(chain);
315 }
316
317 bool
318 Graph::run_one()
319 {
320         GraphNode* to_run;
321
322         pthread_mutex_lock (&_trigger_mutex);
323         if (_trigger_queue.size()) {
324                 to_run = _trigger_queue.back();
325                 _trigger_queue.pop_back();
326         } else {
327                 to_run = 0;
328         }
329
330         int et = _execution_tokens;
331         int ts = _trigger_queue.size();
332
333         int wakeup = min (et, ts);
334         _execution_tokens -= wakeup;
335
336         for (int i=0; i<wakeup; i++ ) {
337                 _execution_sem.signal ();
338         }
339
340         while (to_run == 0) {
341                 _execution_tokens += 1;
342                 pthread_mutex_unlock (&_trigger_mutex);
343                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 goes to sleep\n", pthread_self()));
344                 _execution_sem.wait ();
345                 if (_quit_threads)
346                         return true;
347                 DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 is awake\n", pthread_self()));
348                 pthread_mutex_lock (&_trigger_mutex);
349                 if (_trigger_queue.size()) {
350                         to_run = _trigger_queue.back();
351                         _trigger_queue.pop_back();
352                 }
353         }
354         pthread_mutex_unlock (&_trigger_mutex);
355
356         to_run->process();
357         to_run->finish (_current_chain);
358
359         return false;
360 }
361
362 static void get_rt()
363 {
364         if (!jack_is_realtime (AudioEngine::instance()->jack())) {
365                 return;
366         }
367
368         int priority = jack_client_real_time_priority (AudioEngine::instance()->jack());
369
370         if (priority) {
371                 struct sched_param rtparam;
372         
373                 memset (&rtparam, 0, sizeof (rtparam));
374                 rtparam.sched_priority = priority;
375         
376                 pthread_setschedparam (pthread_self(), SCHED_FIFO, &rtparam);
377         }
378 }
379
380 void
381 Graph::helper_thread()
382 {
383         ProcessThread *pt = new ProcessThread;
384
385         pt->get_buffers();
386         get_rt();
387
388         while(1) {
389                 if (run_one()) {
390                         break;
391                 }
392         }
393
394         pt->drop_buffers();
395 }
396
397 void
398 Graph::main_thread()
399 {
400         ProcessThread *pt = new ProcessThread;
401
402         pt->get_buffers();
403         get_rt();
404
405   again:
406         _callback_start_sem.wait ();
407         DEBUG_TRACE(DEBUG::Graph, "main thread is awake\n");
408         this->prep();
409
410         if (_graph_empty && !_quit_threads) {
411                 _callback_done_sem.signal ();
412                 goto again;
413         }
414
415         while (1) {
416                 DEBUG_TRACE(DEBUG::Graph, "main thread runs one graph node\n");
417                 if (run_one()) {
418                         break;
419                 }
420         }
421
422         pt->drop_buffers();
423 }
424
425 void
426 Graph::dump (int chain)
427 {
428 #ifndef NDEBUG
429         node_list_t::iterator ni;
430         node_set_t::iterator ai;
431
432         chain = _pending_chain;
433
434         DEBUG_TRACE (DEBUG::Graph, "--------------------------------------------Graph dump:\n");
435         for (ni=_nodes_rt[chain].begin(); ni!=_nodes_rt[chain].end(); ni++) {
436                 boost::shared_ptr<Route> rp = boost::dynamic_pointer_cast<Route>( *ni);
437                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", rp->name().c_str(), (*ni)->_init_refcount[chain]));
438                 for (ai=(*ni)->_activation_set[chain].begin(); ai!=(*ni)->_activation_set[chain].end(); ai++) {
439                         DEBUG_TRACE (DEBUG::Graph, string_compose ("  triggers: %1\n", boost::dynamic_pointer_cast<Route>(*ai)->name().c_str()));
440                 }
441         }
442
443         DEBUG_TRACE (DEBUG::Graph, "------------- trigger list:\n");
444         for (ni=_init_trigger_list[chain].begin(); ni!=_init_trigger_list[chain].end(); ni++) {
445                 DEBUG_TRACE (DEBUG::Graph, string_compose ("GraphNode: %1  refcount: %2\n", boost::dynamic_pointer_cast<Route>(*ni)->name().c_str(), (*ni)->_init_refcount[chain]));
446         }
447
448         DEBUG_TRACE (DEBUG::Graph, string_compose ("final activation refcount: %1\n", _init_finished_refcount[chain]));
449 #endif
450 }
451
452 int
453 Graph::silent_process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame,
454                               bool can_record, bool rec_monitors_input, bool& need_butler)
455 {
456         _process_nframes = nframes;
457         _process_start_frame = start_frame;
458         _process_end_frame = end_frame;
459         _process_can_record = can_record;
460         _process_rec_monitors_input = rec_monitors_input;
461
462         _process_silent = true;
463         _process_noroll = false;
464         _process_retval = 0;
465         _process_need_butler = false;
466
467         if (!_graph_empty) {
468                 DEBUG_TRACE(DEBUG::Graph, "wake graph for silent process\n");
469                 _callback_start_sem.signal ();
470                 _callback_done_sem.wait ();
471         }
472
473         need_butler = _process_need_butler;
474
475         return _process_retval;
476 }
477
478 int
479 Graph::process_routes (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, int declick,
480                        bool can_record, bool rec_monitors_input, bool& need_butler)
481 {
482         DEBUG_TRACE (DEBUG::Graph, string_compose ("graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
483
484         _process_nframes = nframes;
485         _process_start_frame = start_frame;
486         _process_end_frame = end_frame;
487         _process_can_record = can_record;
488         _process_rec_monitors_input = rec_monitors_input;
489         _process_declick = declick;
490
491         _process_silent = false;
492         _process_noroll = false;
493         _process_retval = 0;
494         _process_need_butler = false;
495
496         DEBUG_TRACE(DEBUG::Graph, "wake graph for non-silent process\n");
497         _callback_start_sem.signal ();
498         _callback_done_sem.wait ();
499
500         DEBUG_TRACE (DEBUG::Graph, "graph execution complete\n");
501
502         need_butler = _process_need_butler;
503
504         return _process_retval;
505 }
506
507 int
508 Graph::routes_no_roll (pframes_t nframes, framepos_t start_frame, framepos_t end_frame, 
509                        bool non_rt_pending, bool can_record, int declick)
510 {
511         DEBUG_TRACE (DEBUG::Graph, string_compose ("no-roll graph execution from %1 to %2 = %3\n", start_frame, end_frame, nframes));
512
513         _process_nframes = nframes;
514         _process_start_frame = start_frame;
515         _process_end_frame = end_frame;
516         _process_can_record = can_record;
517         _process_declick = declick;
518         _process_non_rt_pending = non_rt_pending;
519
520         _process_silent = false;
521         _process_noroll = true;
522         _process_retval = 0;
523         _process_need_butler = false;
524
525         DEBUG_TRACE(DEBUG::Graph, "wake graph for no-roll process\n");
526         _callback_start_sem.signal ();
527         _callback_done_sem.wait ();
528
529         return _process_retval;
530 }
531 void
532 Graph::process_one_route (Route* route)
533 {
534         bool need_butler = false;
535         int retval;
536
537         assert (route);
538
539         DEBUG_TRACE (DEBUG::ProcessThreads, string_compose ("%1 runs route %2\n", pthread_self(), route->name()));
540
541         if (_process_silent) {
542                 retval = route->silent_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_can_record, _process_rec_monitors_input, need_butler);
543         } else if (_process_noroll) {
544                 route->set_pending_declick (_process_declick);
545                 retval = route->no_roll (_process_nframes, _process_start_frame, _process_end_frame, _process_non_rt_pending, _process_can_record, _process_declick);
546         } else {
547                 route->set_pending_declick (_process_declick);
548                 retval = route->roll (_process_nframes, _process_start_frame, _process_end_frame, _process_declick, _process_can_record, _process_rec_monitors_input, need_butler);
549         }
550
551         if (retval) {
552                 _process_retval = retval;
553         }
554     
555         if (need_butler) {
556                 _process_need_butler = true;
557         }
558 }
559
560
561