Browse Source

junction: RemoteControl: replace Rep socket with Router

master
Denis Tereshkin 4 years ago
parent
commit
acfaa3d51c
  1. 6
      src/ATrade/Driver/Junction.hs
  2. 4
      src/ATrade/Driver/Junction/JunctionMonad.hs
  3. 19
      src/ATrade/Driver/Junction/RemoteControl.hs

6
src/ATrade/Driver/Junction.hs

@ -86,8 +86,8 @@ import System.IO (BufferMode (LineBu
IOMode (AppendMode), IOMode (AppendMode),
hSetBuffering, hSetBuffering,
withFile) withFile)
import System.ZMQ4 (Rep (Rep), bind, import System.ZMQ4 (Router (Router),
withContext, bind, withContext,
withSocket) withSocket)
import System.ZMQ4.ZAP (loadCertificateFromFile) import System.ZMQ4.ZAP (loadCertificateFromFile)
import UnliftIO (MonadUnliftIO) import UnliftIO (MonadUnliftIO)
@ -133,7 +133,7 @@ junctionMain descriptors = do
handledNotifications <- newIORef S.empty handledNotifications <- newIORef S.empty
withBroker cfg ctx robotsMap ordersMap handledNotifications globalLogger $ \bro -> withBroker cfg ctx robotsMap ordersMap handledNotifications globalLogger $ \bro ->
withQThread downloaderEnv barsMap tickerInfoMap cfg ctx globalLogger $ \qt -> withQThread downloaderEnv barsMap tickerInfoMap cfg ctx globalLogger $ \qt ->
withSocket ctx Rep $ \rcSocket -> do withSocket ctx Router $ \rcSocket -> do
liftIO $ bind rcSocket (T.unpack . remoteControlEndpoint $ cfg) liftIO $ bind rcSocket (T.unpack . remoteControlEndpoint $ cfg)
broService <- mkBrokerService bro ordersMap broService <- mkBrokerService bro ordersMap
let junctionLogAction = hoistLogAction liftIO globalLogger let junctionLogAction = hoistLogAction liftIO globalLogger

4
src/ATrade/Driver/Junction/JunctionMonad.hs

@ -92,7 +92,7 @@ import System.IO (BufferMode (LineBu
hSetBuffering, hSetBuffering,
openFile) openFile)
import System.IO (hClose) import System.IO (hClose)
import System.ZMQ4 (Rep, Socket) import System.ZMQ4 (Router, Socket)
import UnliftIO (MonadUnliftIO) import UnliftIO (MonadUnliftIO)
import UnliftIO.Exception (catchAny, import UnliftIO.Exception (catchAny,
onException) onException)
@ -105,7 +105,7 @@ data JunctionEnv =
peQuoteThread :: QuoteThreadHandle, peQuoteThread :: QuoteThreadHandle,
peBroker :: BrokerClientHandle, peBroker :: BrokerClientHandle,
peRobots :: IORef (M.Map T.Text RobotDriverHandle), peRobots :: IORef (M.Map T.Text RobotDriverHandle),
peRemoteControlSocket :: Socket Rep, peRemoteControlSocket :: Socket Router,
peLogAction :: LogAction JunctionM Message, peLogAction :: LogAction JunctionM Message,
peIoLogAction :: LogAction IO Message, peIoLogAction :: LogAction IO Message,
peProgramConfiguration :: ProgramConfiguration, peProgramConfiguration :: ProgramConfiguration,

19
src/ATrade/Driver/Junction/RemoteControl.hs

@ -21,13 +21,15 @@ import Control.Monad.Reader (asks)
import Data.Aeson (decode) import Data.Aeson (decode)
import qualified Data.ByteString as B import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import qualified Data.Text as T import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8', import Data.Text.Encoding (decodeUtf8',
encodeUtf8) encodeUtf8)
import System.ZMQ4 (Event (In), import System.ZMQ4 (Event (In),
Poll (Sock), poll, Poll (Sock), poll,
receive, send) receiveMulti,
sendMulti)
import UnliftIO (MonadIO (liftIO), import UnliftIO (MonadIO (liftIO),
atomicModifyIORef', atomicModifyIORef',
readIORef) readIORef)
@ -100,12 +102,15 @@ handleRemoteControl timeout = do
evs <- poll (fromIntegral timeout) [Sock sock [In] Nothing] evs <- poll (fromIntegral timeout) [Sock sock [In] Nothing]
case evs of case evs of
(x:_) -> unless (null x) $ do (x:_) -> unless (null x) $ do
rawRequest <- liftIO $ receive sock frames <- liftIO $ receiveMulti sock
case parseRemoteControlRequest rawRequest of case frames of
Left err -> logErrorWith logger "RemoteControl" ("Unable to parse request: " <> (T.pack . show) err) [peerId, _, rawRequest] -> do
Right request -> do case parseRemoteControlRequest rawRequest of
response <- handleRequest request Left err -> logErrorWith logger "RemoteControl" ("Unable to parse request: " <> (T.pack . show) err)
liftIO $ send sock [] (makeRemoteControlResponse response) Right request -> do
response <- handleRequest request
liftIO $ sendMulti sock $ peerId :| [B.empty, makeRemoteControlResponse response]
_ -> logErrorWith logger "RemoteControl" "Invalid incoming request"
_ -> return () _ -> return ()
where where
handleRequest (StartRobot inst) = do handleRequest (StartRobot inst) = do

Loading…
Cancel
Save