X-Git-Url: https://main.carlh.net/gitweb/?a=blobdiff_plain;f=src%2Flib%2Fnanomsg.cc;h=0fc0dd357edc7a5b61535922c4ac96b805cc08cd;hb=4b7185e4eda53534c4d71a1f31ba33ca3dd8dc8d;hp=57220cd546347d2fa05e718efd54c32a23771641;hpb=a1f7bf2d9e5610075fbd898cdf52f4f8373741f2;p=dcpomatic.git diff --git a/src/lib/nanomsg.cc b/src/lib/nanomsg.cc index 57220cd54..0fc0dd357 100644 --- a/src/lib/nanomsg.cc +++ b/src/lib/nanomsg.cc @@ -20,6 +20,7 @@ #include "nanomsg.h" #include "dcpomatic_log.h" +#include "exceptions.h" #include #include #include @@ -48,23 +49,16 @@ Nanomsg::Nanomsg (bool server) } } -void -Nanomsg::blocking_send (string s) +bool +Nanomsg::send (string s, int timeout) { - int const r = nn_send (_socket, s.c_str(), s.length(), 0); - if (r < 0) { - throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno)); - } else if (r != int(s.length())) { - throw runtime_error("Could not send to nanomsg socket (message too big)"); + if (timeout != 0) { + nn_setsockopt (_socket, NN_SOL_SOCKET, NN_SNDTIMEO, &timeout, sizeof(int)); } -} -bool -Nanomsg::nonblocking_send (string s) -{ - int const r = nn_send (_socket, s.c_str(), s.length(), NN_DONTWAIT); + int const r = nn_send (_socket, s.c_str(), s.length(), timeout ? 0 : NN_DONTWAIT); if (r < 0) { - if (errno == EAGAIN) { + if (errno == ETIMEDOUT || errno == EAGAIN) { return false; } throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno)); @@ -88,17 +82,17 @@ Nanomsg::get_from_pending () } void -Nanomsg::recv_and_parse (bool blocking) +Nanomsg::recv_and_parse (int flags) { char* buf = 0; - int const received = nn_recv (_socket, &buf, NN_MSG, blocking ? 0 : NN_DONTWAIT); + int const received = nn_recv (_socket, &buf, NN_MSG, flags); if (received < 0) { - if (!blocking && errno == EAGAIN) { + if (errno == ETIMEDOUT || errno == EAGAIN) { return; } - throw runtime_error ("Could not communicate with subprocess"); + throw CommunicationFailedError (); } char* p = buf; @@ -114,32 +108,24 @@ Nanomsg::recv_and_parse (bool blocking) nn_freemsg (buf); } -string -Nanomsg::blocking_get () +optional +Nanomsg::receive (int timeout) { + if (timeout != 0) { + nn_setsockopt (_socket, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(int)); + } + optional l = get_from_pending (); if (l) { return *l; } - recv_and_parse (true); + recv_and_parse (timeout ? 0 : NN_DONTWAIT); - l = get_from_pending (); + return get_from_pending (); if (!l) { - throw runtime_error ("Could not communicate with subprocess"); + throw CommunicationFailedError (); } return *l; } - -optional -Nanomsg::nonblocking_get () -{ - optional l = get_from_pending (); - if (l) { - return *l; - } - - recv_and_parse (false); - return get_from_pending (); -}