From b8ffcd8888f502c14b3eb5dfead58cfe3ee8646f Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 1 Dec 2016 14:33:21 +0700 Subject: [PATCH] Fix for tradesink messages --- src/ATrade/Broker/Protocol.hs | 52 ++++++++++++++++++----------------- src/ATrade/Broker/Server.hs | 8 +++--- test/TestBrokerServer.hs | 1 - 3 files changed, 31 insertions(+), 30 deletions(-) diff --git a/src/ATrade/Broker/Protocol.hs b/src/ATrade/Broker/Protocol.hs index 4cd0620..c629ca1 100644 --- a/src/ATrade/Broker/Protocol.hs +++ b/src/ATrade/Broker/Protocol.hs @@ -121,7 +121,7 @@ data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade { tsOperation :: Operation, tsExecutionTime :: UTCTime, tsSignalId :: SignalId -} +} deriving (Show, Eq) getHMS :: UTCTime -> (Int, Int, Int, Int) getHMS (UTCTime _ diff) = (intsec `div` 3600, (intsec `mod` 3600) `div` 60, intsec `mod` 60, msec) @@ -159,7 +159,7 @@ parseTimestamp _ = fail "Unable to parse timestamp: invalid type" instance ToJSON TradeSinkMessage where toJSON TradeSinkHeartBeat = object ["command" .= T.pack "heartbeat" ] - toJSON TradeSinkTrade { .. } = object ["account" .= tsAccountId, + toJSON TradeSinkTrade { .. } = object ["trade" .= object ["account" .= tsAccountId, "security" .= tsSecurity, "price" .= tsPrice, "quantity" .= tsQuantity, @@ -169,7 +169,7 @@ instance ToJSON TradeSinkMessage where "execution-time" .= formatTimestamp tsExecutionTime, "strategy" .= strategyId tsSignalId, "signal-id" .= signalName tsSignalId, - "comment" .= comment tsSignalId] + "comment" .= comment tsSignalId]] instance FromJSON TradeSinkMessage where parseJSON = withObject "object" (\obj -> @@ -177,25 +177,27 @@ instance FromJSON TradeSinkMessage where Nothing -> parseTrade obj Just cmd -> return TradeSinkHeartBeat) where - parseTrade v = do - acc <- v .: "account" - sec <- v .: "security" - pr <- v .: "price" - q <- v .: "quantity" - vol <- v .: "volume" - cur <- v .: "volume-currency" - op <- v .: "operation" - extime <- v .: "execution-time" >>= parseTimestamp - strategy <- v .: "strategy" - sid <- v .: "signal-id" - com <- v .: "comment" - return TradeSinkTrade { - tsAccountId = acc, - tsSecurity = sec, - tsPrice = pr, - tsQuantity = q, - tsVolume = vol, - tsCurrency = cur, - tsOperation = op, - tsExecutionTime = extime, - tsSignalId = SignalId strategy sid com } + parseTrade obj = case HM.lookup "trade" obj of + Just (Object v) -> do + acc <- v .: "account" + sec <- v .: "security" + pr <- v .: "price" + q <- v .: "quantity" + vol <- v .: "volume" + cur <- v .: "volume-currency" + op <- v .: "operation" + extime <- v .: "execution-time" >>= parseTimestamp + strategy <- v .: "strategy" + sid <- v .: "signal-id" + com <- v .: "comment" + return TradeSinkTrade { + tsAccountId = acc, + tsSecurity = sec, + tsPrice = pr, + tsQuantity = q, + tsVolume = vol, + tsCurrency = cur, + tsOperation = op, + tsExecutionTime = extime, + tsSignalId = SignalId strategy sid com } + _ -> fail "Should've been trade object" diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index f9c9945..1b50fe1 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -99,17 +99,17 @@ notificationCallback state n = do tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO () tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $ whileM_ (not <$> wasKilled) $ - withSocket c Req (\sock -> do + withSocket c Dealer (\sock -> do chan <- tradeSink <$> readIORef state connect sock $ T.unpack tradeSinkEp timeoutMv <- newEmptyMVar + threadDelay 1000000 whileM_ (andM [not <$> wasKilled, isNothing <$> tryReadMVar timeoutMv]) $ do - threadDelay 500000 maybeTrade <- tryReadChan chan case maybeTrade of - Just trade -> send sock [] $ encodeTrade trade + Just trade -> sendMulti sock $ B.empty :| [encodeTrade trade] Nothing -> do - send sock [] $ BL.toStrict $ encode TradeSinkHeartBeat + sendMulti sock $ B.empty :| [BL.toStrict $ encode TradeSinkHeartBeat] events <- poll 5000 [Sock sock [In] Nothing] if not . L.null . L.head $ events then void . receive $ sock -- anything will do diff --git a/test/TestBrokerServer.hs b/test/TestBrokerServer.hs index 14f09d4..e942ccb 100644 --- a/test/TestBrokerServer.hs +++ b/test/TestBrokerServer.hs @@ -289,7 +289,6 @@ testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade step "Connecting" connectAndSendOrder step sock defaultOrder ep (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock - threadDelay 100000 (Just cb) <- notificationCallback <$> readIORef broState let trade = Trade {