From 98741da75c15465ae7ab480a0d20adc547af50e5 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Mon, 19 Jul 2021 13:12:42 +0700 Subject: [PATCH] BrokerClient: adjust to protocol changes --- src/ATrade/Broker/Client.hs | 28 ++++++++++++++++------------ 1 file changed, 16 insertions(+), 12 deletions(-) diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index 4f659dd..43a6d20 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -32,13 +32,14 @@ import System.ZMQ4 import System.ZMQ4.ZAP data BrokerClientHandle = BrokerClientHandle { - tid :: ThreadId, - completionMvar :: MVar (), - killMvar :: MVar (), - submitOrder :: Order -> IO (Either T.Text OrderId), - cancelOrder :: OrderId -> IO (Either T.Text ()), - getNotifications :: IO (Either T.Text [Notification]), - cmdVar :: MVar (BrokerServerRequest, MVar BrokerServerResponse) + tid :: ThreadId, + completionMvar :: MVar (), + killMvar :: MVar (), + submitOrder :: Order -> IO (Either T.Text OrderId), + cancelOrder :: OrderId -> IO (Either T.Text ()), + getNotifications :: IO (Either T.Text [Notification]), + cmdVar :: MVar (BrokerServerRequest, MVar BrokerServerResponse), + lastKnownNotificationRef :: IORef NotificationSqnum } brokerClientThread :: B.ByteString -> Context -> T.Text -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> MVar () -> MVar () -> ClientSecurityParams -> IO () @@ -89,6 +90,7 @@ startBrokerClient socketIdentity ctx endpoint secParams = do killMv <- newEmptyMVar cmdVar <- newEmptyMVar :: IO (MVar (BrokerServerRequest, MVar BrokerServerResponse)) tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar compMv killMv secParams) + notifSqnumRef <- newIORef (NotificationSqnum 0) return BrokerClientHandle { tid = tid, @@ -96,8 +98,9 @@ startBrokerClient socketIdentity ctx endpoint secParams = do killMvar = killMv, submitOrder = bcSubmitOrder (decodeUtf8 socketIdentity) idCounter cmdVar, cancelOrder = bcCancelOrder (decodeUtf8 socketIdentity) idCounter cmdVar, - getNotifications = bcGetNotifications (decodeUtf8 socketIdentity) idCounter cmdVar, - cmdVar = cmdVar + getNotifications = bcGetNotifications (decodeUtf8 socketIdentity) idCounter notifSqnumRef cmdVar, + cmdVar = cmdVar, + lastKnownNotificationRef = notifSqnumRef } stopBrokerClient :: BrokerClientHandle -> IO () @@ -127,11 +130,12 @@ bcCancelOrder clientIdentity idCounter cmdVar orderId = do (ResponseError msg) -> return $ Left msg _ -> return $ Left "Unknown error" -bcGetNotifications :: ClientIdentity -> IORef RequestSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> IO (Either T.Text [Notification]) -bcGetNotifications clientIdentity idCounter cmdVar = do +bcGetNotifications :: ClientIdentity -> IORef RequestSqnum -> IORef NotificationSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> IO (Either T.Text [Notification]) +bcGetNotifications clientIdentity idCounter notifSqnumRef cmdVar = do respVar <- newEmptyMVar sqnum <- nextId idCounter - putMVar cmdVar (RequestNotifications sqnum clientIdentity (NotificationSqnum 0), respVar) + notifSqnum <- nextSqnum <$> readIORef notifSqnumRef + putMVar cmdVar (RequestNotifications sqnum clientIdentity notifSqnum, respVar) resp <- takeMVar respVar case resp of (ResponseNotifications ns) -> return $ Right ns