Denis Tereshkin 9 years ago
parent
commit
f37ec1da8a
  1. 52
      src/ATrade/Broker/Protocol.hs
  2. 8
      src/ATrade/Broker/Server.hs
  3. 1
      test/TestBrokerServer.hs

52
src/ATrade/Broker/Protocol.hs

@ -121,7 +121,7 @@ data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade { @@ -121,7 +121,7 @@ data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade {
tsOperation :: Operation,
tsExecutionTime :: UTCTime,
tsSignalId :: SignalId
}
} deriving (Show, Eq)
getHMS :: UTCTime -> (Int, Int, Int, Int)
getHMS (UTCTime _ diff) = (intsec `div` 3600, (intsec `mod` 3600) `div` 60, intsec `mod` 60, msec)
@ -159,7 +159,7 @@ parseTimestamp _ = fail "Unable to parse timestamp: invalid type" @@ -159,7 +159,7 @@ parseTimestamp _ = fail "Unable to parse timestamp: invalid type"
instance ToJSON TradeSinkMessage where
toJSON TradeSinkHeartBeat = object ["command" .= T.pack "heartbeat" ]
toJSON TradeSinkTrade { .. } = object ["account" .= tsAccountId,
toJSON TradeSinkTrade { .. } = object ["trade" .= object ["account" .= tsAccountId,
"security" .= tsSecurity,
"price" .= tsPrice,
"quantity" .= tsQuantity,
@ -169,7 +169,7 @@ instance ToJSON TradeSinkMessage where @@ -169,7 +169,7 @@ instance ToJSON TradeSinkMessage where
"execution-time" .= formatTimestamp tsExecutionTime,
"strategy" .= strategyId tsSignalId,
"signal-id" .= signalName tsSignalId,
"comment" .= comment tsSignalId]
"comment" .= comment tsSignalId]]
instance FromJSON TradeSinkMessage where
parseJSON = withObject "object" (\obj ->
@ -177,25 +177,27 @@ instance FromJSON TradeSinkMessage where @@ -177,25 +177,27 @@ instance FromJSON TradeSinkMessage where
Nothing -> parseTrade obj
Just cmd -> return TradeSinkHeartBeat)
where
parseTrade v = do
acc <- v .: "account"
sec <- v .: "security"
pr <- v .: "price"
q <- v .: "quantity"
vol <- v .: "volume"
cur <- v .: "volume-currency"
op <- v .: "operation"
extime <- v .: "execution-time" >>= parseTimestamp
strategy <- v .: "strategy"
sid <- v .: "signal-id"
com <- v .: "comment"
return TradeSinkTrade {
tsAccountId = acc,
tsSecurity = sec,
tsPrice = pr,
tsQuantity = q,
tsVolume = vol,
tsCurrency = cur,
tsOperation = op,
tsExecutionTime = extime,
tsSignalId = SignalId strategy sid com }
parseTrade obj = case HM.lookup "trade" obj of
Just (Object v) -> do
acc <- v .: "account"
sec <- v .: "security"
pr <- v .: "price"
q <- v .: "quantity"
vol <- v .: "volume"
cur <- v .: "volume-currency"
op <- v .: "operation"
extime <- v .: "execution-time" >>= parseTimestamp
strategy <- v .: "strategy"
sid <- v .: "signal-id"
com <- v .: "comment"
return TradeSinkTrade {
tsAccountId = acc,
tsSecurity = sec,
tsPrice = pr,
tsQuantity = q,
tsVolume = vol,
tsCurrency = cur,
tsOperation = op,
tsExecutionTime = extime,
tsSignalId = SignalId strategy sid com }
_ -> fail "Should've been trade object"

8
src/ATrade/Broker/Server.hs

@ -99,17 +99,17 @@ notificationCallback state n = do @@ -99,17 +99,17 @@ notificationCallback state n = do
tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO ()
tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $
whileM_ (not <$> wasKilled) $
withSocket c Req (\sock -> do
withSocket c Dealer (\sock -> do
chan <- tradeSink <$> readIORef state
connect sock $ T.unpack tradeSinkEp
timeoutMv <- newEmptyMVar
threadDelay 1000000
whileM_ (andM [not <$> wasKilled, isNothing <$> tryReadMVar timeoutMv]) $ do
threadDelay 500000
maybeTrade <- tryReadChan chan
case maybeTrade of
Just trade -> send sock [] $ encodeTrade trade
Just trade -> sendMulti sock $ B.empty :| [encodeTrade trade]
Nothing -> do
send sock [] $ BL.toStrict $ encode TradeSinkHeartBeat
sendMulti sock $ B.empty :| [BL.toStrict $ encode TradeSinkHeartBeat]
events <- poll 5000 [Sock sock [In] Nothing]
if not . L.null . L.head $ events
then void . receive $ sock -- anything will do

1
test/TestBrokerServer.hs

@ -289,7 +289,6 @@ testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade @@ -289,7 +289,6 @@ testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade
step "Connecting"
connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
threadDelay 100000
(Just cb) <- notificationCallback <$> readIORef broState
let trade = Trade {

Loading…
Cancel
Save