ad6a54212627e32091fa7d7cbe95b779afc0ae9c
[ardour.git] / libs / audiographer / audiographer / threader.h
1 #ifndef AUDIOGRAPHER_THREADER_H
2 #define AUDIOGRAPHER_THREADER_H
3
4 #include <glibmm/threadpool.h>
5 #include <sigc++/slot.h>
6 #include <boost/format.hpp>
7
8 #include <glib.h>
9 #include <vector>
10 #include <algorithm>
11
12 #include "source.h"
13 #include "sink.h"
14 #include "exception.h"
15
16 namespace AudioGrapher
17 {
18
19 class ThreaderException : public Exception
20 {
21   public:
22         template<typename T>
23         ThreaderException (T const & thrower, std::exception const & e)
24                 : Exception (thrower,
25                         boost::str ( boost::format
26                         ("\n\t- Dynamic type: %1%\n\t- what(): %2%")
27                         % DebugUtils::demangled_name (e) % e.what() ))
28         { }
29 };
30
31 template <typename T>
32 class Threader : public Source<T>, public Sink<T>
33 {
34   private:
35         typedef std::vector<typename Source<T>::SinkPtr> OutputVec;
36
37   public:
38         
39         Threader (Glib::ThreadPool & thread_pool, long wait_timeout_milliseconds = 1000)
40           : thread_pool (thread_pool)
41           , readers (0)
42           , wait_timeout (wait_timeout_milliseconds)
43         { }
44         
45         virtual ~Threader () {}
46         
47         void add_output (typename Source<T>::SinkPtr output) { outputs.push_back (output); }
48         void clear_outputs () { outputs.clear (); }
49         void remove_output (typename Source<T>::SinkPtr output) {
50                 typename OutputVec::iterator new_end = std::remove(outputs.begin(), outputs.end(), output);
51                 outputs.erase (new_end, outputs.end());
52         }
53         
54         /* The context has to be const, because this is working concurrently */
55         void process (ProcessContext<T> const & c)
56         {
57                 wait_mutex.lock();
58                 
59                 exception.reset();
60                 
61                 unsigned int outs = outputs.size();
62                 g_atomic_int_add (&readers, outs);
63                 for (unsigned int i = 0; i < outs; ++i) {
64                         thread_pool.push (sigc::bind (sigc::mem_fun (this, &Threader::process_output), c, i));
65                 }
66                 
67                 wait();
68         }
69         
70         using Sink<T>::process;
71         
72   private:
73
74         void wait()
75         {
76                 Glib::TimeVal wait_time;
77                 wait_time.assign_current_time();
78                 wait_time.add_milliseconds(wait_timeout);
79                 
80                 wait_cond.timed_wait(wait_mutex, wait_time);
81                 bool timed_out = (g_atomic_int_get (&readers) != 0);
82                 wait_mutex.unlock();
83                 if (timed_out) { throw Exception (*this, "wait timed out"); }
84                 
85                 if (exception) {
86                         throw *exception;
87                 }
88         }
89         
90         void process_output(ProcessContext<T> const & c, unsigned int output)
91         {
92                 try {
93                         outputs[output]->process (c);
94                 } catch (std::exception const & e) {
95                         // Only first exception will be passed on
96                         exception_mutex.lock();
97                         if(!exception) { exception.reset (new ThreaderException (*this, e)); }
98                         exception_mutex.unlock();
99                 }
100                 
101                 if (g_atomic_int_dec_and_test (&readers)) {
102                         wait_cond.signal();
103                 }
104         }
105
106         OutputVec outputs;
107
108         Glib::ThreadPool & thread_pool;
109         Glib::Mutex wait_mutex;
110         Glib::Cond  wait_cond;
111         gint        readers;
112         long        wait_timeout;
113         
114         Glib::Mutex exception_mutex;
115         boost::shared_ptr<ThreaderException> exception;
116
117 };
118
119 } // namespace
120
121 #endif //AUDIOGRAPHER_THREADER_H