1 #ifndef AUDIOGRAPHER_THREADER_H
2 #define AUDIOGRAPHER_THREADER_H
4 #include <glibmm/threadpool.h>
5 #include <sigc++/slot.h>
6 #include <boost/format.hpp>
14 #include "exception.h"
16 namespace AudioGrapher
19 class ThreaderException : public Exception
23 ThreaderException (T const & thrower, std::exception const & e)
25 boost::str ( boost::format
26 ("\n\t- Dynamic type: %1%\n\t- what(): %2%")
27 % DebugUtils::demangled_name (e) % e.what() ))
32 class Threader : public Source<T>, public Sink<T>
35 typedef std::vector<typename Source<T>::SinkPtr> OutputVec;
39 Threader (Glib::ThreadPool & thread_pool, long wait_timeout_milliseconds = 1000)
40 : thread_pool (thread_pool)
42 , wait_timeout (wait_timeout_milliseconds)
45 virtual ~Threader () {}
47 void add_output (typename Source<T>::SinkPtr output) { outputs.push_back (output); }
48 void clear_outputs () { outputs.clear (); }
49 void remove_output (typename Source<T>::SinkPtr output) {
50 typename OutputVec::iterator new_end = std::remove(outputs.begin(), outputs.end(), output);
51 outputs.erase (new_end, outputs.end());
54 /* The context has to be const, because this is working concurrently */
55 void process (ProcessContext<T> const & c)
61 unsigned int outs = outputs.size();
62 g_atomic_int_add (&readers, outs);
63 for (unsigned int i = 0; i < outs; ++i) {
64 thread_pool.push (sigc::bind (sigc::mem_fun (this, &Threader::process_output), c, i));
70 using Sink<T>::process;
76 Glib::TimeVal wait_time;
77 wait_time.assign_current_time();
78 wait_time.add_milliseconds(wait_timeout);
80 wait_cond.timed_wait(wait_mutex, wait_time);
81 bool timed_out = (g_atomic_int_get (&readers) != 0);
83 if (timed_out) { throw Exception (*this, "wait timed out"); }
90 void process_output(ProcessContext<T> const & c, unsigned int output)
93 outputs[output]->process (c);
94 } catch (std::exception const & e) {
95 // Only first exception will be passed on
96 exception_mutex.lock();
97 if(!exception) { exception.reset (new ThreaderException (*this, e)); }
98 exception_mutex.unlock();
101 if (g_atomic_int_dec_and_test (&readers)) {
108 Glib::ThreadPool & thread_pool;
109 Glib::Mutex wait_mutex;
110 Glib::Cond wait_cond;
114 Glib::Mutex exception_mutex;
115 boost::shared_ptr<ThreaderException> exception;
121 #endif //AUDIOGRAPHER_THREADER_H