diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index e69c57f..429af2f 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -137,8 +137,9 @@ notificationThread :: ClientIdentity -> MVar () -> ClientSecurityParams -> LogAction IO Message -> + IORef NotificationSqnum -> IO () -notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secParams logger = flip finally (return ()) $ do +notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secParams logger lastKnownNotificationSqnum = flip finally (return ()) $ do whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub $ \sock -> do setLinger (restrict 0) sock @@ -192,6 +193,7 @@ notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secPa else do atomicWriteIORef notifSqnumRef (nextSqnum currentSqnum) forM_ callbacks $ \c -> c notification + atomicWriteIORef lastKnownNotificationSqnum currentSqnum _ -> return () _ -> return () where @@ -226,7 +228,9 @@ startBrokerClient socketIdentity ctx endpoint notifEndpoint notificationCallback cmdVar <- newEmptyMVar :: IO (MVar (BrokerServerRequest, MVar BrokerServerResponse)) tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar compMv killMv secParams logger) notifSqnumRef <- newIORef (NotificationSqnum 0) - notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint idCounter cmdVar killMv secParams logger) + lastKnownNotification <- newIORef (NotificationSqnum 0) + notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint idCounter cmdVar killMv secParams logger + lastKnownNotification) return BrokerClientHandle { tid = tid, @@ -234,7 +238,7 @@ startBrokerClient socketIdentity ctx endpoint notifEndpoint notificationCallback killMvar = killMv, submitOrder = bcSubmitOrder (decodeUtf8 socketIdentity) idCounter cmdVar, cancelOrder = bcCancelOrder (decodeUtf8 socketIdentity) idCounter cmdVar, - getNotifications = bcGetNotifications (decodeUtf8 socketIdentity) idCounter notifSqnumRef cmdVar, + getNotifications = bcGetNotifications (decodeUtf8 socketIdentity) idCounter notifSqnumRef cmdVar lastKnownNotification, cmdVar = cmdVar, lastKnownNotificationRef = notifSqnumRef, notificationCallback = [], @@ -278,8 +282,9 @@ bcGetNotifications :: ClientIdentity -> IORef RequestSqnum -> IORef NotificationSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> + IORef NotificationSqnum -> IO (Either T.Text [Notification]) -bcGetNotifications clientIdentity idCounter notifSqnumRef cmdVar = do +bcGetNotifications clientIdentity idCounter notifSqnumRef cmdVar lastKnownNotification = do respVar <- newEmptyMVar sqnum <- nextId idCounter notifSqnum <- nextSqnum <$> readIORef notifSqnumRef @@ -289,7 +294,7 @@ bcGetNotifications clientIdentity idCounter notifSqnumRef cmdVar = do (ResponseNotifications ns) -> do case lastMay ns of Just n -> atomicWriteIORef notifSqnumRef (getNotificationSqnum n) - Nothing -> return () + Nothing -> readIORef lastKnownNotification >>= atomicWriteIORef notifSqnumRef return $ Right ns (ResponseError msg) -> return $ Left msg _ -> return $ Left "Unknown error"