diff --git a/src/ATrade/Driver/Junction.hs b/src/ATrade/Driver/Junction.hs index da9cc73..b3e112f 100644 --- a/src/ATrade/Driver/Junction.hs +++ b/src/ATrade/Driver/Junction.hs @@ -86,8 +86,8 @@ import System.IO (BufferMode (LineBu IOMode (AppendMode), hSetBuffering, withFile) -import System.ZMQ4 (Rep (Rep), bind, - withContext, +import System.ZMQ4 (Router (Router), + bind, withContext, withSocket) import System.ZMQ4.ZAP (loadCertificateFromFile) import UnliftIO (MonadUnliftIO) @@ -133,7 +133,7 @@ junctionMain descriptors = do handledNotifications <- newIORef S.empty withBroker cfg ctx robotsMap ordersMap handledNotifications globalLogger $ \bro -> withQThread downloaderEnv barsMap tickerInfoMap cfg ctx globalLogger $ \qt -> - withSocket ctx Rep $ \rcSocket -> do + withSocket ctx Router $ \rcSocket -> do liftIO $ bind rcSocket (T.unpack . remoteControlEndpoint $ cfg) broService <- mkBrokerService bro ordersMap let junctionLogAction = hoistLogAction liftIO globalLogger diff --git a/src/ATrade/Driver/Junction/JunctionMonad.hs b/src/ATrade/Driver/Junction/JunctionMonad.hs index b8f87ac..4666638 100644 --- a/src/ATrade/Driver/Junction/JunctionMonad.hs +++ b/src/ATrade/Driver/Junction/JunctionMonad.hs @@ -92,7 +92,7 @@ import System.IO (BufferMode (LineBu hSetBuffering, openFile) import System.IO (hClose) -import System.ZMQ4 (Rep, Socket) +import System.ZMQ4 (Router, Socket) import UnliftIO (MonadUnliftIO) import UnliftIO.Exception (catchAny, onException) @@ -105,7 +105,7 @@ data JunctionEnv = peQuoteThread :: QuoteThreadHandle, peBroker :: BrokerClientHandle, peRobots :: IORef (M.Map T.Text RobotDriverHandle), - peRemoteControlSocket :: Socket Rep, + peRemoteControlSocket :: Socket Router, peLogAction :: LogAction JunctionM Message, peIoLogAction :: LogAction IO Message, peProgramConfiguration :: ProgramConfiguration, diff --git a/src/ATrade/Driver/Junction/RemoteControl.hs b/src/ATrade/Driver/Junction/RemoteControl.hs index 2a5fa94..92cbb09 100644 --- a/src/ATrade/Driver/Junction/RemoteControl.hs +++ b/src/ATrade/Driver/Junction/RemoteControl.hs @@ -21,13 +21,15 @@ import Control.Monad.Reader (asks) import Data.Aeson (decode) import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as BL +import Data.List.NonEmpty (NonEmpty ((:|))) import qualified Data.Map.Strict as M import qualified Data.Text as T import Data.Text.Encoding (decodeUtf8', encodeUtf8) import System.ZMQ4 (Event (In), Poll (Sock), poll, - receive, send) + receiveMulti, + sendMulti) import UnliftIO (MonadIO (liftIO), atomicModifyIORef', readIORef) @@ -100,12 +102,15 @@ handleRemoteControl timeout = do evs <- poll (fromIntegral timeout) [Sock sock [In] Nothing] case evs of (x:_) -> unless (null x) $ do - rawRequest <- liftIO $ receive sock - case parseRemoteControlRequest rawRequest of - Left err -> logErrorWith logger "RemoteControl" ("Unable to parse request: " <> (T.pack . show) err) - Right request -> do - response <- handleRequest request - liftIO $ send sock [] (makeRemoteControlResponse response) + frames <- liftIO $ receiveMulti sock + case frames of + [peerId, _, rawRequest] -> do + case parseRemoteControlRequest rawRequest of + Left err -> logErrorWith logger "RemoteControl" ("Unable to parse request: " <> (T.pack . show) err) + Right request -> do + response <- handleRequest request + liftIO $ sendMulti sock $ peerId :| [B.empty, makeRemoteControlResponse response] + _ -> logErrorWith logger "RemoteControl" "Invalid incoming request" _ -> return () where handleRequest (StartRobot inst) = do