Browse Source

BrokerClient: track last known sqnum

master
Denis Tereshkin 4 years ago
parent
commit
ab699ff0e8
  1. 15
      src/ATrade/Broker/Client.hs

15
src/ATrade/Broker/Client.hs

@ -137,8 +137,9 @@ notificationThread :: ClientIdentity ->
MVar () -> MVar () ->
ClientSecurityParams -> ClientSecurityParams ->
LogAction IO Message -> LogAction IO Message ->
IORef NotificationSqnum ->
IO () IO ()
notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secParams logger = flip finally (return ()) $ do notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secParams logger lastKnownNotificationSqnum = flip finally (return ()) $ do
whileM_ (isNothing <$> tryReadMVar killMv) $ whileM_ (isNothing <$> tryReadMVar killMv) $
withSocket ctx Sub $ \sock -> do withSocket ctx Sub $ \sock -> do
setLinger (restrict 0) sock setLinger (restrict 0) sock
@ -192,6 +193,7 @@ notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secPa
else do else do
atomicWriteIORef notifSqnumRef (nextSqnum currentSqnum) atomicWriteIORef notifSqnumRef (nextSqnum currentSqnum)
forM_ callbacks $ \c -> c notification forM_ callbacks $ \c -> c notification
atomicWriteIORef lastKnownNotificationSqnum currentSqnum
_ -> return () _ -> return ()
_ -> return () _ -> return ()
where where
@ -226,7 +228,9 @@ startBrokerClient socketIdentity ctx endpoint notifEndpoint notificationCallback
cmdVar <- newEmptyMVar :: IO (MVar (BrokerServerRequest, MVar BrokerServerResponse)) cmdVar <- newEmptyMVar :: IO (MVar (BrokerServerRequest, MVar BrokerServerResponse))
tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar compMv killMv secParams logger) tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar compMv killMv secParams logger)
notifSqnumRef <- newIORef (NotificationSqnum 0) notifSqnumRef <- newIORef (NotificationSqnum 0)
notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint idCounter cmdVar killMv secParams logger) lastKnownNotification <- newIORef (NotificationSqnum 0)
notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint idCounter cmdVar killMv secParams logger
lastKnownNotification)
return BrokerClientHandle { return BrokerClientHandle {
tid = tid, tid = tid,
@ -234,7 +238,7 @@ startBrokerClient socketIdentity ctx endpoint notifEndpoint notificationCallback
killMvar = killMv, killMvar = killMv,
submitOrder = bcSubmitOrder (decodeUtf8 socketIdentity) idCounter cmdVar, submitOrder = bcSubmitOrder (decodeUtf8 socketIdentity) idCounter cmdVar,
cancelOrder = bcCancelOrder (decodeUtf8 socketIdentity) idCounter cmdVar, cancelOrder = bcCancelOrder (decodeUtf8 socketIdentity) idCounter cmdVar,
getNotifications = bcGetNotifications (decodeUtf8 socketIdentity) idCounter notifSqnumRef cmdVar, getNotifications = bcGetNotifications (decodeUtf8 socketIdentity) idCounter notifSqnumRef cmdVar lastKnownNotification,
cmdVar = cmdVar, cmdVar = cmdVar,
lastKnownNotificationRef = notifSqnumRef, lastKnownNotificationRef = notifSqnumRef,
notificationCallback = [], notificationCallback = [],
@ -278,8 +282,9 @@ bcGetNotifications :: ClientIdentity ->
IORef RequestSqnum -> IORef RequestSqnum ->
IORef NotificationSqnum -> IORef NotificationSqnum ->
MVar (BrokerServerRequest, MVar BrokerServerResponse) -> MVar (BrokerServerRequest, MVar BrokerServerResponse) ->
IORef NotificationSqnum ->
IO (Either T.Text [Notification]) IO (Either T.Text [Notification])
bcGetNotifications clientIdentity idCounter notifSqnumRef cmdVar = do bcGetNotifications clientIdentity idCounter notifSqnumRef cmdVar lastKnownNotification = do
respVar <- newEmptyMVar respVar <- newEmptyMVar
sqnum <- nextId idCounter sqnum <- nextId idCounter
notifSqnum <- nextSqnum <$> readIORef notifSqnumRef notifSqnum <- nextSqnum <$> readIORef notifSqnumRef
@ -289,7 +294,7 @@ bcGetNotifications clientIdentity idCounter notifSqnumRef cmdVar = do
(ResponseNotifications ns) -> do (ResponseNotifications ns) -> do
case lastMay ns of case lastMay ns of
Just n -> atomicWriteIORef notifSqnumRef (getNotificationSqnum n) Just n -> atomicWriteIORef notifSqnumRef (getNotificationSqnum n)
Nothing -> return () Nothing -> readIORef lastKnownNotification >>= atomicWriteIORef notifSqnumRef
return $ Right ns return $ Right ns
(ResponseError msg) -> return $ Left msg (ResponseError msg) -> return $ Left msg
_ -> return $ Left "Unknown error" _ -> return $ Left "Unknown error"

Loading…
Cancel
Save