Add accessor for _playlist.
[dcpomatic.git] / src / lib / nanomsg.cc
index 57220cd546347d2fa05e718efd54c32a23771641..0fc0dd357edc7a5b61535922c4ac96b805cc08cd 100644 (file)
@@ -20,6 +20,7 @@
 
 #include "nanomsg.h"
 #include "dcpomatic_log.h"
+#include "exceptions.h"
 #include <nanomsg/nn.h>
 #include <nanomsg/pair.h>
 #include <stdexcept>
@@ -48,23 +49,16 @@ Nanomsg::Nanomsg (bool server)
        }
 }
 
-void
-Nanomsg::blocking_send (string s)
+bool
+Nanomsg::send (string s, int timeout)
 {
-       int const r = nn_send (_socket, s.c_str(), s.length(), 0);
-       if (r < 0) {
-               throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno));
-       } else if (r != int(s.length())) {
-               throw runtime_error("Could not send to nanomsg socket (message too big)");
+       if (timeout != 0) {
+               nn_setsockopt (_socket, NN_SOL_SOCKET, NN_SNDTIMEO, &timeout, sizeof(int));
        }
-}
 
-bool
-Nanomsg::nonblocking_send (string s)
-{
-       int const r = nn_send (_socket, s.c_str(), s.length(), NN_DONTWAIT);
+       int const r = nn_send (_socket, s.c_str(), s.length(), timeout ? 0 : NN_DONTWAIT);
        if (r < 0) {
-               if (errno == EAGAIN) {
+               if (errno == ETIMEDOUT || errno == EAGAIN) {
                        return false;
                }
                throw runtime_error(String::compose("Could not send to nanomsg socket (%1)", errno));
@@ -88,17 +82,17 @@ Nanomsg::get_from_pending ()
 }
 
 void
-Nanomsg::recv_and_parse (bool blocking)
+Nanomsg::recv_and_parse (int flags)
 {
        char* buf = 0;
-       int const received = nn_recv (_socket, &buf, NN_MSG, blocking ? 0 : NN_DONTWAIT);
+       int const received = nn_recv (_socket, &buf, NN_MSG, flags);
        if (received < 0)
        {
-               if (!blocking && errno == EAGAIN) {
+               if (errno == ETIMEDOUT || errno == EAGAIN) {
                        return;
                }
 
-               throw runtime_error ("Could not communicate with subprocess");
+               throw CommunicationFailedError ();
        }
 
        char* p = buf;
@@ -114,32 +108,24 @@ Nanomsg::recv_and_parse (bool blocking)
        nn_freemsg (buf);
 }
 
-string
-Nanomsg::blocking_get ()
+optional<string>
+Nanomsg::receive (int timeout)
 {
+       if (timeout != 0) {
+               nn_setsockopt (_socket, NN_SOL_SOCKET, NN_RCVTIMEO, &timeout, sizeof(int));
+       }
+
        optional<string> l = get_from_pending ();
        if (l) {
                return *l;
        }
 
-       recv_and_parse (true);
+       recv_and_parse (timeout ? 0 : NN_DONTWAIT);
 
-       l = get_from_pending ();
+       return get_from_pending ();
        if (!l) {
-               throw runtime_error ("Could not communicate with subprocess");
+               throw CommunicationFailedError ();
        }
 
        return *l;
 }
-
-optional<string>
-Nanomsg::nonblocking_get ()
-{
-       optional<string> l = get_from_pending ();
-       if (l) {
-               return *l;
-       }
-
-       recv_and_parse (false);
-       return get_from_pending ();
-}