Browse Source

Fix for tradesink messages

master
Denis Tereshkin 9 years ago
parent
commit
b8ffcd8888
  1. 52
      src/ATrade/Broker/Protocol.hs
  2. 8
      src/ATrade/Broker/Server.hs
  3. 1
      test/TestBrokerServer.hs

52
src/ATrade/Broker/Protocol.hs

@ -121,7 +121,7 @@ data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade {
tsOperation :: Operation, tsOperation :: Operation,
tsExecutionTime :: UTCTime, tsExecutionTime :: UTCTime,
tsSignalId :: SignalId tsSignalId :: SignalId
} } deriving (Show, Eq)
getHMS :: UTCTime -> (Int, Int, Int, Int) getHMS :: UTCTime -> (Int, Int, Int, Int)
getHMS (UTCTime _ diff) = (intsec `div` 3600, (intsec `mod` 3600) `div` 60, intsec `mod` 60, msec) getHMS (UTCTime _ diff) = (intsec `div` 3600, (intsec `mod` 3600) `div` 60, intsec `mod` 60, msec)
@ -159,7 +159,7 @@ parseTimestamp _ = fail "Unable to parse timestamp: invalid type"
instance ToJSON TradeSinkMessage where instance ToJSON TradeSinkMessage where
toJSON TradeSinkHeartBeat = object ["command" .= T.pack "heartbeat" ] toJSON TradeSinkHeartBeat = object ["command" .= T.pack "heartbeat" ]
toJSON TradeSinkTrade { .. } = object ["account" .= tsAccountId, toJSON TradeSinkTrade { .. } = object ["trade" .= object ["account" .= tsAccountId,
"security" .= tsSecurity, "security" .= tsSecurity,
"price" .= tsPrice, "price" .= tsPrice,
"quantity" .= tsQuantity, "quantity" .= tsQuantity,
@ -169,7 +169,7 @@ instance ToJSON TradeSinkMessage where
"execution-time" .= formatTimestamp tsExecutionTime, "execution-time" .= formatTimestamp tsExecutionTime,
"strategy" .= strategyId tsSignalId, "strategy" .= strategyId tsSignalId,
"signal-id" .= signalName tsSignalId, "signal-id" .= signalName tsSignalId,
"comment" .= comment tsSignalId] "comment" .= comment tsSignalId]]
instance FromJSON TradeSinkMessage where instance FromJSON TradeSinkMessage where
parseJSON = withObject "object" (\obj -> parseJSON = withObject "object" (\obj ->
@ -177,25 +177,27 @@ instance FromJSON TradeSinkMessage where
Nothing -> parseTrade obj Nothing -> parseTrade obj
Just cmd -> return TradeSinkHeartBeat) Just cmd -> return TradeSinkHeartBeat)
where where
parseTrade v = do parseTrade obj = case HM.lookup "trade" obj of
acc <- v .: "account" Just (Object v) -> do
sec <- v .: "security" acc <- v .: "account"
pr <- v .: "price" sec <- v .: "security"
q <- v .: "quantity" pr <- v .: "price"
vol <- v .: "volume" q <- v .: "quantity"
cur <- v .: "volume-currency" vol <- v .: "volume"
op <- v .: "operation" cur <- v .: "volume-currency"
extime <- v .: "execution-time" >>= parseTimestamp op <- v .: "operation"
strategy <- v .: "strategy" extime <- v .: "execution-time" >>= parseTimestamp
sid <- v .: "signal-id" strategy <- v .: "strategy"
com <- v .: "comment" sid <- v .: "signal-id"
return TradeSinkTrade { com <- v .: "comment"
tsAccountId = acc, return TradeSinkTrade {
tsSecurity = sec, tsAccountId = acc,
tsPrice = pr, tsSecurity = sec,
tsQuantity = q, tsPrice = pr,
tsVolume = vol, tsQuantity = q,
tsCurrency = cur, tsVolume = vol,
tsOperation = op, tsCurrency = cur,
tsExecutionTime = extime, tsOperation = op,
tsSignalId = SignalId strategy sid com } tsExecutionTime = extime,
tsSignalId = SignalId strategy sid com }
_ -> fail "Should've been trade object"

8
src/ATrade/Broker/Server.hs

@ -99,17 +99,17 @@ notificationCallback state n = do
tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO () tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO ()
tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $ tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $
whileM_ (not <$> wasKilled) $ whileM_ (not <$> wasKilled) $
withSocket c Req (\sock -> do withSocket c Dealer (\sock -> do
chan <- tradeSink <$> readIORef state chan <- tradeSink <$> readIORef state
connect sock $ T.unpack tradeSinkEp connect sock $ T.unpack tradeSinkEp
timeoutMv <- newEmptyMVar timeoutMv <- newEmptyMVar
threadDelay 1000000
whileM_ (andM [not <$> wasKilled, isNothing <$> tryReadMVar timeoutMv]) $ do whileM_ (andM [not <$> wasKilled, isNothing <$> tryReadMVar timeoutMv]) $ do
threadDelay 500000
maybeTrade <- tryReadChan chan maybeTrade <- tryReadChan chan
case maybeTrade of case maybeTrade of
Just trade -> send sock [] $ encodeTrade trade Just trade -> sendMulti sock $ B.empty :| [encodeTrade trade]
Nothing -> do Nothing -> do
send sock [] $ BL.toStrict $ encode TradeSinkHeartBeat sendMulti sock $ B.empty :| [BL.toStrict $ encode TradeSinkHeartBeat]
events <- poll 5000 [Sock sock [In] Nothing] events <- poll 5000 [Sock sock [In] Nothing]
if not . L.null . L.head $ events if not . L.null . L.head $ events
then void . receive $ sock -- anything will do then void . receive $ sock -- anything will do

1
test/TestBrokerServer.hs

@ -289,7 +289,6 @@ testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade
step "Connecting" step "Connecting"
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
threadDelay 100000
(Just cb) <- notificationCallback <$> readIORef broState (Just cb) <- notificationCallback <$> readIORef broState
let trade = Trade { let trade = Trade {

Loading…
Cancel
Save