Browse Source

BrokerClient/BrokerServer: correct notification handling

master
Denis Tereshkin 4 years ago
parent
commit
39df25fa9c
  1. 54
      src/ATrade/Broker/Client.hs
  2. 13
      src/ATrade/Broker/Protocol.hs
  3. 22
      src/ATrade/Broker/Server.hs

54
src/ATrade/Broker/Client.hs

@ -91,13 +91,11 @@ brokerClientThread socketIdentity ctx ep cmd comp killMv secParams = finally bro @@ -91,13 +91,11 @@ brokerClientThread socketIdentity ctx ep cmd comp killMv secParams = finally bro
isZMQError e = "ZMQError" `L.isPrefixOf` show e
notificationThread :: ClientIdentity -> [NotificationCallback] -> Context -> T.Text -> MVar () -> ClientSecurityParams -> IO ()
notificationThread clientIdentity callbacks ctx ep killMv secParams = flip finally (return ()) $ do
notificationThread :: ClientIdentity -> [NotificationCallback] -> Context -> T.Text -> IORef RequestSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> MVar () -> ClientSecurityParams -> IO ()
notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secParams = flip finally (return ()) $ do
whileM_ (isNothing <$> tryReadMVar killMv) $
withSocket ctx Sub $ \sock -> do
setLinger (restrict 0) sock
case cspCertificate secParams of
Just clientCert -> zapApplyCertificate clientCert sock
Nothing -> return ()
@ -111,13 +109,57 @@ notificationThread clientIdentity callbacks ctx ep killMv secParams = flip final @@ -111,13 +109,57 @@ notificationThread clientIdentity callbacks ctx ep killMv secParams = flip final
connect sock $ T.unpack ep
debugM "Broker.Client" $ "Subscribing: [" <> T.unpack clientIdentity <> "]"
subscribe sock $ T.encodeUtf8 clientIdentity
initialSqnum <- requestCurrentSqnum cmdVar idCounter clientIdentity
notifSqnumRef <- newIORef initialSqnum
whileM_ (isNothing <$> tryReadMVar killMv) $ do
evs <- poll 5000 [Sock sock [In] Nothing]
if null . L.head $ evs
then do
respVar <- newEmptyMVar
sqnum <- nextId idCounter
notifSqnum <- readIORef notifSqnumRef
putMVar cmdVar (RequestNotifications sqnum clientIdentity notifSqnum, respVar)
resp <- takeMVar respVar
case resp of
(ResponseNotifications ns) -> do
case lastMay ns of
Just n -> atomicWriteIORef notifSqnumRef (nextSqnum $ getNotificationSqnum n)
Nothing -> return ()
return ()
(ResponseError msg) -> warningM "Broker.Client" $ "ResponseError: " <> T.unpack msg
_ -> warningM "Broker.Client" $ "Unknown error when requesting notifications"
else do
msg <- receiveMulti sock
case msg of
[_, payload] -> case decode (BL.fromStrict payload) of
Just notification -> forM_ callbacks $ \c -> c notification
Just notification -> do
currentSqnum <- readIORef notifSqnumRef
if getNotificationSqnum notification /= currentSqnum
then
if currentSqnum > getNotificationSqnum notification
then debugM "Broker.Client" $ "Already processed notification: " <> show (getNotificationSqnum notification)
else warningM "Broker.Client" $ "Notification sqnum mismatch: " <> show currentSqnum <> " -> " <> show (getNotificationSqnum notification)
else do
atomicWriteIORef notifSqnumRef (nextSqnum currentSqnum)
forM_ callbacks $ \c -> c notification
_ -> return ()
_ -> return ()
where
requestCurrentSqnum cmdVar idCounter clientIdentity = do
respVar <- newEmptyMVar
sqnum <- nextId idCounter
putMVar cmdVar (RequestCurrentSqnum sqnum clientIdentity, respVar)
resp <- takeMVar respVar
case resp of
(ResponseCurrentSqnum sqnum) -> return sqnum
(ResponseError msg) -> do
warningM "Broker.Client" $ "ResponseError: " <> T.unpack msg
return (NotificationSqnum 1)
_ -> do
warningM "Broker.Client" "Unknown error when requesting notifications"
return (NotificationSqnum 1)
startBrokerClient :: B.ByteString -- ^ Socket Identity
@ -134,7 +176,7 @@ startBrokerClient socketIdentity ctx endpoint notifEndpoint notificationCallback @@ -134,7 +176,7 @@ startBrokerClient socketIdentity ctx endpoint notifEndpoint notificationCallback
cmdVar <- newEmptyMVar :: IO (MVar (BrokerServerRequest, MVar BrokerServerResponse))
tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar compMv killMv secParams)
notifSqnumRef <- newIORef (NotificationSqnum 0)
notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint killMv secParams)
notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint idCounter cmdVar killMv secParams)
return BrokerClientHandle {
tid = tid,

13
src/ATrade/Broker/Protocol.hs

@ -80,12 +80,14 @@ instance ToJSON Notification where @@ -80,12 +80,14 @@ instance ToJSON Notification where
data BrokerServerRequest = RequestSubmitOrder RequestSqnum ClientIdentity Order
| RequestCancelOrder RequestSqnum ClientIdentity OrderId
| RequestNotifications RequestSqnum ClientIdentity NotificationSqnum
| RequestCurrentSqnum RequestSqnum ClientIdentity
deriving (Eq, Show)
requestSqnum :: BrokerServerRequest -> RequestSqnum
requestSqnum (RequestSubmitOrder sqnum _ _) = sqnum
requestSqnum (RequestCancelOrder sqnum _ _) = sqnum
requestSqnum (RequestNotifications sqnum _ _) = sqnum
requestSqnum (RequestCurrentSqnum sqnum _) = sqnum
instance FromJSON BrokerServerRequest where
parseJSON = withObject "object" (\obj -> do
@ -104,6 +106,8 @@ instance FromJSON BrokerServerRequest where @@ -104,6 +106,8 @@ instance FromJSON BrokerServerRequest where
| HM.member "request-notifications" obj = do
initialSqnum <- obj .: "initial-sqnum"
return (RequestNotifications sqnum clientIdentity (NotificationSqnum initialSqnum))
| HM.member "request-current-sqnum" obj =
return (RequestCurrentSqnum sqnum clientIdentity)
parseRequest _ _ _ = fail "Invalid request object"
instance ToJSON BrokerServerRequest where
@ -117,9 +121,14 @@ instance ToJSON BrokerServerRequest where @@ -117,9 +121,14 @@ instance ToJSON BrokerServerRequest where
"client-identity" .= clientIdentity,
"request-notifications" .= ("" :: T.Text),
"initial-sqnum" .= unNotificationSqnum initialNotificationSqnum]
toJSON (RequestCurrentSqnum sqnum clientIdentity) = object
["request-sqnum" .= sqnum,
"client-identity" .= clientIdentity,
"request-current-sqnum" .= ("" :: T.Text) ]
data BrokerServerResponse = ResponseOk
| ResponseNotifications [Notification]
| ResponseCurrentSqnum NotificationSqnum
| ResponseError T.Text
deriving (Eq, Show)
@ -135,11 +144,15 @@ instance FromJSON BrokerServerResponse where @@ -135,11 +144,15 @@ instance FromJSON BrokerServerResponse where
| HM.member "notifications" obj -> do
notifications <- obj .: "notifications"
ResponseNotifications <$> parseJSON notifications
| HM.member "current-sqnum" obj -> do
rawSqnum <- obj .: "current-sqnum"
return $ ResponseCurrentSqnum (NotificationSqnum rawSqnum)
| otherwise -> fail "Unable to parse BrokerServerResponse")
instance ToJSON BrokerServerResponse where
toJSON ResponseOk = object [ "result" .= ("success" :: T.Text) ]
toJSON (ResponseNotifications notifications) = object [ "notifications" .= notifications ]
toJSON (ResponseCurrentSqnum sqnum) = object [ "current-sqnum" .= unNotificationSqnum sqnum ]
toJSON (ResponseError errorMessage) = object [ "result" .= ("error" :: T.Text), "message" .= errorMessage ]
data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade {

22
src/ATrade/Broker/Server.hs

@ -28,6 +28,7 @@ import Data.Maybe @@ -28,6 +28,7 @@ import Data.Maybe
import qualified Data.Text as T
import qualified Data.Text.Encoding as E
import Data.Time.Clock
import Safe (lastMay)
import System.Log.Logger
import System.Timeout
import System.ZMQ4
@ -156,8 +157,8 @@ brokerServerThread state = finally brokerServerThread' cleanup @@ -156,8 +157,8 @@ brokerServerThread state = finally brokerServerThread' cleanup
msg <- receiveMulti sock
case msg of
[peerId, _, payload] ->
case decode . BL.fromStrict $ payload of
Just request -> do
case eitherDecode . BL.fromStrict $ payload of
Right request -> do
let sqnum = requestSqnum request
-- Here, we should check if previous packet sequence number is the same
-- If it is, we should resend previous response
@ -173,10 +174,10 @@ brokerServerThread state = finally brokerServerThread' cleanup @@ -173,10 +174,10 @@ brokerServerThread state = finally brokerServerThread' cleanup
sendMessage sock peerId response
-- and store response in case we'll need to resend it
atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)})
Nothing -> do
Left errmsg -> do
-- If we weren't able to parse request, we should send error
-- but shouldn't update lastPacket
let response = ResponseError "Invalid request"
let response = ResponseError $ "Invalid request: " <> T.pack errmsg
sendMessage sock peerId response
_ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg)
@ -229,6 +230,19 @@ brokerServerThread state = finally brokerServerThread' cleanup @@ -229,6 +230,19 @@ brokerServerThread state = finally brokerServerThread' cleanup
atomicMapIORef state (\s -> s { pendingNotifications = M.insert clientIdentity filtered (pendingNotifications s)})
return $ ResponseNotifications . L.reverse $ filtered
Nothing -> return $ ResponseNotifications []
RequestCurrentSqnum sqnum clientIdentity -> do
sqnumMap <- notificationSqnum <$> readIORef state
notifMap <- pendingNotifications <$> readIORef state
case M.lookup clientIdentity notifMap of
Just [] ->
case M.lookup clientIdentity sqnumMap of
Just sqnum -> return (ResponseCurrentSqnum sqnum)
_ -> return (ResponseCurrentSqnum (NotificationSqnum 1))
Just notifs -> case lastMay notifs of
Just v -> return (ResponseCurrentSqnum (getNotificationSqnum v))
_ -> return (ResponseCurrentSqnum (NotificationSqnum 1))
Nothing -> return (ResponseCurrentSqnum (NotificationSqnum 1))
sendMessage sock peerId resp = sendMulti sock (peerId :| [B.empty, BL.toStrict . encode $ resp])

Loading…
Cancel
Save