+#include <poll.h>
+
CrossThreadChannel::CrossThreadChannel (bool non_blocking)
: receive_channel (0)
+ , receive_source (0)
{
fds[0] = -1;
fds[1] = -1;
error << "cannot set non-blocking mode for x-thread pipe (read) (" << ::strerror (errno) << ')' << endmsg;
return;
}
-
+
if (fcntl (fds[1], F_SETFL, O_NONBLOCK)) {
error << "cannot set non-blocking mode for x-thread pipe (write) (%2)" << ::strerror (errno) << ')' << endmsg;
return;
CrossThreadChannel::~CrossThreadChannel ()
{
- if (receive_channel) {
+ if (receive_source) {
+ g_source_destroy (receive_source);
+ receive_source = 0;
+ }
+
+ if (receive_channel) {
g_io_channel_unref (receive_channel);
+ receive_channel = 0;
}
if (fds[0] >= 0) {
close (fds[0]);
fds[0] = -1;
- }
+ }
if (fds[1] >= 0) {
close (fds[1]);
fds[1] = -1;
- }
+ }
}
void
return ::write (fds[1], &msg, 1);
}
-int
-CrossThreadChannel::receive (char& msg)
+bool
+CrossThreadChannel::poll_for_request()
{
+ struct pollfd pfd[1];
+ pfd[0].fd = fds[0];
+ pfd[0].events = POLLIN|POLLERR|POLLHUP;
+ while(true) {
+ if (poll (pfd, 1, -1) < 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ break;
+ }
+ if (pfd[0].revents & ~POLLIN) {
+ break;
+ }
+
+ if (pfd[0].revents & POLLIN) {
+ return true;
+ }
+ }
+ return false;
+}
+
+int
+CrossThreadChannel::receive (char& msg, bool wait)
+{
+ if (wait) {
+ if (!poll_for_request ()) {
+ return -1;
+ }
+ }
return ::read (fds[0], &msg, 1);
}