From 39df25fa9c6a8906ef86c5c2489878957687a645 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 12 Oct 2021 22:05:05 +0700 Subject: [PATCH] BrokerClient/BrokerServer: correct notification handling --- src/ATrade/Broker/Client.hs | 64 +++++++++++++++++++++++++++++------ src/ATrade/Broker/Protocol.hs | 13 +++++++ src/ATrade/Broker/Server.hs | 22 +++++++++--- 3 files changed, 84 insertions(+), 15 deletions(-) diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index 9090a6b..0a658dc 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -91,13 +91,11 @@ brokerClientThread socketIdentity ctx ep cmd comp killMv secParams = finally bro isZMQError e = "ZMQError" `L.isPrefixOf` show e -notificationThread :: ClientIdentity -> [NotificationCallback] -> Context -> T.Text -> MVar () -> ClientSecurityParams -> IO () -notificationThread clientIdentity callbacks ctx ep killMv secParams = flip finally (return ()) $ do +notificationThread :: ClientIdentity -> [NotificationCallback] -> Context -> T.Text -> IORef RequestSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> MVar () -> ClientSecurityParams -> IO () +notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secParams = flip finally (return ()) $ do whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub $ \sock -> do - setLinger (restrict 0) sock - case cspCertificate secParams of Just clientCert -> zapApplyCertificate clientCert sock Nothing -> return () @@ -111,13 +109,57 @@ notificationThread clientIdentity callbacks ctx ep killMv secParams = flip final connect sock $ T.unpack ep debugM "Broker.Client" $ "Subscribing: [" <> T.unpack clientIdentity <> "]" subscribe sock $ T.encodeUtf8 clientIdentity + + initialSqnum <- requestCurrentSqnum cmdVar idCounter clientIdentity + + notifSqnumRef <- newIORef initialSqnum whileM_ (isNothing <$> tryReadMVar killMv) $ do - msg <- receiveMulti sock - case msg of - [_, payload] -> case decode (BL.fromStrict payload) of - Just notification -> forM_ callbacks $ \c -> c notification - _ -> return () - _ -> return () + evs <- poll 5000 [Sock sock [In] Nothing] + if null . L.head $ evs + then do + respVar <- newEmptyMVar + sqnum <- nextId idCounter + notifSqnum <- readIORef notifSqnumRef + putMVar cmdVar (RequestNotifications sqnum clientIdentity notifSqnum, respVar) + resp <- takeMVar respVar + case resp of + (ResponseNotifications ns) -> do + case lastMay ns of + Just n -> atomicWriteIORef notifSqnumRef (nextSqnum $ getNotificationSqnum n) + Nothing -> return () + return () + (ResponseError msg) -> warningM "Broker.Client" $ "ResponseError: " <> T.unpack msg + _ -> warningM "Broker.Client" $ "Unknown error when requesting notifications" + else do + msg <- receiveMulti sock + case msg of + [_, payload] -> case decode (BL.fromStrict payload) of + Just notification -> do + currentSqnum <- readIORef notifSqnumRef + if getNotificationSqnum notification /= currentSqnum + then + if currentSqnum > getNotificationSqnum notification + then debugM "Broker.Client" $ "Already processed notification: " <> show (getNotificationSqnum notification) + else warningM "Broker.Client" $ "Notification sqnum mismatch: " <> show currentSqnum <> " -> " <> show (getNotificationSqnum notification) + else do + atomicWriteIORef notifSqnumRef (nextSqnum currentSqnum) + forM_ callbacks $ \c -> c notification + _ -> return () + _ -> return () + where + requestCurrentSqnum cmdVar idCounter clientIdentity = do + respVar <- newEmptyMVar + sqnum <- nextId idCounter + putMVar cmdVar (RequestCurrentSqnum sqnum clientIdentity, respVar) + resp <- takeMVar respVar + case resp of + (ResponseCurrentSqnum sqnum) -> return sqnum + (ResponseError msg) -> do + warningM "Broker.Client" $ "ResponseError: " <> T.unpack msg + return (NotificationSqnum 1) + _ -> do + warningM "Broker.Client" "Unknown error when requesting notifications" + return (NotificationSqnum 1) startBrokerClient :: B.ByteString -- ^ Socket Identity @@ -134,7 +176,7 @@ startBrokerClient socketIdentity ctx endpoint notifEndpoint notificationCallback cmdVar <- newEmptyMVar :: IO (MVar (BrokerServerRequest, MVar BrokerServerResponse)) tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar compMv killMv secParams) notifSqnumRef <- newIORef (NotificationSqnum 0) - notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint killMv secParams) + notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint idCounter cmdVar killMv secParams) return BrokerClientHandle { tid = tid, diff --git a/src/ATrade/Broker/Protocol.hs b/src/ATrade/Broker/Protocol.hs index b07d8c1..917eb39 100644 --- a/src/ATrade/Broker/Protocol.hs +++ b/src/ATrade/Broker/Protocol.hs @@ -80,12 +80,14 @@ instance ToJSON Notification where data BrokerServerRequest = RequestSubmitOrder RequestSqnum ClientIdentity Order | RequestCancelOrder RequestSqnum ClientIdentity OrderId | RequestNotifications RequestSqnum ClientIdentity NotificationSqnum + | RequestCurrentSqnum RequestSqnum ClientIdentity deriving (Eq, Show) requestSqnum :: BrokerServerRequest -> RequestSqnum requestSqnum (RequestSubmitOrder sqnum _ _) = sqnum requestSqnum (RequestCancelOrder sqnum _ _) = sqnum requestSqnum (RequestNotifications sqnum _ _) = sqnum +requestSqnum (RequestCurrentSqnum sqnum _) = sqnum instance FromJSON BrokerServerRequest where parseJSON = withObject "object" (\obj -> do @@ -104,6 +106,8 @@ instance FromJSON BrokerServerRequest where | HM.member "request-notifications" obj = do initialSqnum <- obj .: "initial-sqnum" return (RequestNotifications sqnum clientIdentity (NotificationSqnum initialSqnum)) + | HM.member "request-current-sqnum" obj = + return (RequestCurrentSqnum sqnum clientIdentity) parseRequest _ _ _ = fail "Invalid request object" instance ToJSON BrokerServerRequest where @@ -117,9 +121,14 @@ instance ToJSON BrokerServerRequest where "client-identity" .= clientIdentity, "request-notifications" .= ("" :: T.Text), "initial-sqnum" .= unNotificationSqnum initialNotificationSqnum] + toJSON (RequestCurrentSqnum sqnum clientIdentity) = object + ["request-sqnum" .= sqnum, + "client-identity" .= clientIdentity, + "request-current-sqnum" .= ("" :: T.Text) ] data BrokerServerResponse = ResponseOk | ResponseNotifications [Notification] + | ResponseCurrentSqnum NotificationSqnum | ResponseError T.Text deriving (Eq, Show) @@ -135,11 +144,15 @@ instance FromJSON BrokerServerResponse where | HM.member "notifications" obj -> do notifications <- obj .: "notifications" ResponseNotifications <$> parseJSON notifications + | HM.member "current-sqnum" obj -> do + rawSqnum <- obj .: "current-sqnum" + return $ ResponseCurrentSqnum (NotificationSqnum rawSqnum) | otherwise -> fail "Unable to parse BrokerServerResponse") instance ToJSON BrokerServerResponse where toJSON ResponseOk = object [ "result" .= ("success" :: T.Text) ] toJSON (ResponseNotifications notifications) = object [ "notifications" .= notifications ] + toJSON (ResponseCurrentSqnum sqnum) = object [ "current-sqnum" .= unNotificationSqnum sqnum ] toJSON (ResponseError errorMessage) = object [ "result" .= ("error" :: T.Text), "message" .= errorMessage ] data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade { diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index f531d89..0735cdf 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -28,6 +28,7 @@ import Data.Maybe import qualified Data.Text as T import qualified Data.Text.Encoding as E import Data.Time.Clock +import Safe (lastMay) import System.Log.Logger import System.Timeout import System.ZMQ4 @@ -156,8 +157,8 @@ brokerServerThread state = finally brokerServerThread' cleanup msg <- receiveMulti sock case msg of [peerId, _, payload] -> - case decode . BL.fromStrict $ payload of - Just request -> do + case eitherDecode . BL.fromStrict $ payload of + Right request -> do let sqnum = requestSqnum request -- Here, we should check if previous packet sequence number is the same -- If it is, we should resend previous response @@ -173,10 +174,10 @@ brokerServerThread state = finally brokerServerThread' cleanup sendMessage sock peerId response -- and store response in case we'll need to resend it atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)}) - Nothing -> do + Left errmsg -> do -- If we weren't able to parse request, we should send error -- but shouldn't update lastPacket - let response = ResponseError "Invalid request" + let response = ResponseError $ "Invalid request: " <> T.pack errmsg sendMessage sock peerId response _ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg) @@ -229,6 +230,19 @@ brokerServerThread state = finally brokerServerThread' cleanup atomicMapIORef state (\s -> s { pendingNotifications = M.insert clientIdentity filtered (pendingNotifications s)}) return $ ResponseNotifications . L.reverse $ filtered Nothing -> return $ ResponseNotifications [] + RequestCurrentSqnum sqnum clientIdentity -> do + sqnumMap <- notificationSqnum <$> readIORef state + notifMap <- pendingNotifications <$> readIORef state + case M.lookup clientIdentity notifMap of + Just [] -> + case M.lookup clientIdentity sqnumMap of + Just sqnum -> return (ResponseCurrentSqnum sqnum) + _ -> return (ResponseCurrentSqnum (NotificationSqnum 1)) + Just notifs -> case lastMay notifs of + Just v -> return (ResponseCurrentSqnum (getNotificationSqnum v)) + _ -> return (ResponseCurrentSqnum (NotificationSqnum 1)) + Nothing -> return (ResponseCurrentSqnum (NotificationSqnum 1)) + sendMessage sock peerId resp = sendMulti sock (peerId :| [B.empty, BL.toStrict . encode $ resp])