Denis Tereshkin 9 years ago
parent
commit
a7a825d28c
  1. 3
      libatrade.cabal
  2. 101
      src/ATrade/Broker/Protocol.hs
  3. 66
      src/ATrade/Broker/Server.hs
  4. 6
      test/TestBrokerClient.hs
  5. 59
      test/TestBrokerServer.hs

3
libatrade.cabal

@ -39,6 +39,9 @@ library @@ -39,6 +39,9 @@ library
, monad-loops
, safe
, stm
, text-format
, errors
, parsec
default-language: Haskell2010
executable libatrade-exe

101
src/ATrade/Broker/Protocol.hs

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
{-# LANGUAGE OverloadedStrings, MultiWayIf #-}
{-# LANGUAGE OverloadedStrings, MultiWayIf, RecordWildCards #-}
module ATrade.Broker.Protocol (
BrokerServerRequest(..),
@ -6,15 +6,21 @@ module ATrade.Broker.Protocol ( @@ -6,15 +6,21 @@ module ATrade.Broker.Protocol (
Notification(..),
notificationOrderId,
RequestSqnum(..),
requestSqnum
requestSqnum,
TradeSinkMessage(..)
) where
import Control.Error.Util
import qualified Data.HashMap.Strict as HM
import qualified Data.Text as T
import Data.Text.Format
import Data.Aeson
import Data.Aeson.Types
import Data.Aeson.Types hiding (parse)
import Data.Int
import Data.Time.Clock
import Data.Time.Calendar
import ATrade.Types
import Text.Parsec
type RequestSqnum = Int64
@ -104,3 +110,92 @@ instance FromJSON Notification where @@ -104,3 +110,92 @@ instance FromJSON Notification where
instance ToJSON Notification where
toJSON (OrderNotification oid newState) = object ["order-state" .= object [ "order-id" .= oid, "new-state" .= newState] ]
toJSON (TradeNotification trade) = object ["trade" .= toJSON trade]
data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade {
tsAccountId :: T.Text,
tsSecurity :: T.Text,
tsPrice :: Double,
tsQuantity :: Int,
tsVolume :: Double,
tsCurrency :: T.Text,
tsOperation :: Operation,
tsExecutionTime :: UTCTime,
tsSignalId :: SignalId
}
getHMS :: UTCTime -> (Int, Int, Int, Int)
getHMS (UTCTime _ diff) = (intsec `div` 3600, (intsec `mod` 3600) `div` 60, intsec `mod` 60, msec)
where
intsec = floor diff
msec = floor $ (diff - fromIntegral intsec) * 1000
formatTimestamp dt = format "{}-{}-{} {}:{}:{}.{}" (left 4 '0' y, left 2 '0' m, left 2 '0' d, left 2 '0' hour, left 2 '0' min, left 2 '0' sec, left 3 '0' msec)
where
(y, m, d) = toGregorian $ utctDay dt
(hour, min, sec, msec) = getHMS dt
parseTimestamp (String t) = case hush $ parse p "" t of
Just ts -> return ts
Nothing -> fail "Unable to parse timestamp"
where
p = do
year <- read <$> many1 digit
char '-'
mon <- read <$> many1 digit
char '-'
day <- read <$> many1 digit
char ' '
hour <- read <$> many1 digit
char ':'
min <- read <$> many1 digit
char ':'
sec <- read <$> many1 digit
char '.'
msec <- many1 digit -- TODO use msec
return $ UTCTime (fromGregorian year mon day) (secondsToDiffTime $ hour * 3600 + min * 60 + sec)
parseTimestamp _ = fail "Unable to parse timestamp: invalid type"
instance ToJSON TradeSinkMessage where
toJSON TradeSinkHeartBeat = object ["command" .= T.pack "heartbeat" ]
toJSON TradeSinkTrade { .. } = object ["account" .= tsAccountId,
"security" .= tsSecurity,
"price" .= tsPrice,
"quantity" .= tsQuantity,
"volume" .= tsVolume,
"volume-currency" .= tsCurrency,
"operation" .= tsOperation,
"execution-time" .= formatTimestamp tsExecutionTime,
"strategy" .= strategyId tsSignalId,
"signal-id" .= signalName tsSignalId,
"comment" .= comment tsSignalId]
instance FromJSON TradeSinkMessage where
parseJSON = withObject "object" (\obj ->
case HM.lookup "command" obj of
Nothing -> parseTrade obj
Just cmd -> return TradeSinkHeartBeat)
where
parseTrade v = do
acc <- v .: "account"
sec <- v .: "security"
pr <- v .: "price"
q <- v .: "quantity"
vol <- v .: "volume"
cur <- v .: "volume-currency"
op <- v .: "operation"
extime <- v .: "execution-time" >>= parseTimestamp
strategy <- v .: "strategy"
sid <- v .: "signal-id"
com <- v .: "comment"
return TradeSinkTrade {
tsAccountId = acc,
tsSecurity = sec,
tsPrice = pr,
tsQuantity = q,
tsVolume = vol,
tsCurrency = cur,
tsOperation = op,
tsExecutionTime = extime,
tsSignalId = SignalId strategy sid com }

66
src/ATrade/Broker/Server.hs

@ -19,11 +19,13 @@ import Data.Aeson @@ -19,11 +19,13 @@ import Data.Aeson
import Data.Maybe
import Data.Time.Clock
import Data.IORef
import Control.Concurrent
import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.BoundedChan
import Control.Exception
import Control.Monad
import Control.Monad.Loops
import System.Log.Logger
import System.Timeout
import ATrade.Util
newtype OrderIdGenerator = IO OrderId
@ -46,18 +48,20 @@ data BrokerServerState = BrokerServerState { @@ -46,18 +48,20 @@ data BrokerServerState = BrokerServerState {
brokers :: [BrokerInterface],
completionMvar :: MVar (),
killMvar :: MVar (),
orderIdCounter :: OrderId
orderIdCounter :: OrderId,
tradeSink :: BoundedChan Trade
}
data BrokerServerHandle = BrokerServerHandle ThreadId (MVar ()) (MVar ())
data BrokerServerHandle = BrokerServerHandle ThreadId ThreadId (MVar ()) (MVar ())
startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> IO BrokerServerHandle
startBrokerServer brokers c ep = do
startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> T.Text -> IO BrokerServerHandle
startBrokerServer brokers c ep tradeSinkEp = do
sock <- socket c Router
bind sock (T.unpack ep)
tid <- myThreadId
compMv <- newEmptyMVar
killMv <- newEmptyMVar
tsChan <- newBoundedChan 100
state <- newIORef BrokerServerState {
bsSocket = sock,
orderMap = M.empty,
@ -67,15 +71,20 @@ startBrokerServer brokers c ep = do @@ -67,15 +71,20 @@ startBrokerServer brokers c ep = do
brokers = brokers,
completionMvar = compMv,
killMvar = killMv,
orderIdCounter = 1
orderIdCounter = 1,
tradeSink = tsChan
}
mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers
debugM "Broker.Server" "Forking broker server thread"
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv <*> pure killMv
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv
notificationCallback :: IORef BrokerServerState -> Notification -> IO ()
notificationCallback state n = do
chan <- tradeSink <$> readIORef state
case n of
TradeNotification trade -> tryWriteChan chan trade
_ -> return False
orders <- orderMap <$> readIORef state
case M.lookup (notificationOrderId n) orders of
Just peerId -> addNotification peerId n
@ -87,10 +96,45 @@ notificationCallback state n = do @@ -87,10 +96,45 @@ notificationCallback state n = do
Just ns -> s { pendingNotifications = M.insert peerId (n : ns) (pendingNotifications s)}
Nothing -> s { pendingNotifications = M.insert peerId [n] (pendingNotifications s)})
tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO ()
tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $
whileM_ (not <$> wasKilled) $
withSocket c Req (\sock -> do
chan <- tradeSink <$> readIORef state
connect sock $ T.unpack tradeSinkEp
timeoutMv <- newEmptyMVar
whileM_ (andM [not <$> wasKilled, isNothing <$> tryReadMVar timeoutMv]) $ do
threadDelay 500000
maybeTrade <- tryReadChan chan
case maybeTrade of
Just trade -> send sock [] $ encodeTrade trade
Nothing -> do
send sock [] $ BL.toStrict $ encode TradeSinkHeartBeat
events <- poll 5000 [Sock sock [In] Nothing]
if not . L.null . L.head $ events
then void . receive $ sock -- anything will do
else putMVar timeoutMv ())
where
wasKilled = fmap killMvar (readIORef state) >>= fmap isJust . tryReadMVar
encodeTrade :: Trade -> B.ByteString
encodeTrade = BL.toStrict . encode . convertTrade
convertTrade trade = TradeSinkTrade {
tsAccountId = tradeAccount trade,
tsSecurity = tradeSecurity trade,
tsPrice = fromRational . toRational . tradePrice $ trade,
tsQuantity = fromInteger $ tradeQuantity trade,
tsVolume = fromRational . toRational . tradeVolume $ trade,
tsCurrency = tradeVolumeCurrency trade,
tsOperation = tradeOperation trade,
tsExecutionTime = tradeTimestamp trade,
tsSignalId = tradeSignalId trade
}
brokerServerThread :: IORef BrokerServerState -> IO ()
brokerServerThread state = finally brokerServerThread' cleanup
where
brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryTakeMVar) $ do
brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ do
sock <- bsSocket <$> readIORef state
evs <- poll 200 [Sock sock [In] Nothing]
when ((L.length . L.head) evs > 0) $ do
@ -171,7 +215,9 @@ brokerServerThread state = finally brokerServerThread' cleanup @@ -171,7 +215,9 @@ brokerServerThread state = finally brokerServerThread' cleanup
stopBrokerServer :: BrokerServerHandle -> IO ()
stopBrokerServer (BrokerServerHandle tid compMv killMv) = do
stopBrokerServer (BrokerServerHandle tid tstid compMv killMv) = do
putMVar killMv ()
yield >> readMVar compMv
killThread tstid
yield
readMVar compMv

6
test/TestBrokerClient.hs

@ -56,7 +56,7 @@ defaultOrder = mkOrder { @@ -56,7 +56,7 @@ defaultOrder = mkOrder {
testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext (\ctx -> do
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do
oid <- submitOrder broC defaultOrder
case oid of
@ -66,7 +66,7 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext @@ -66,7 +66,7 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext
testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" $ withContext (\ctx -> do
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do
maybeOid <- submitOrder broC defaultOrder
case maybeOid of
@ -81,7 +81,7 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" @@ -81,7 +81,7 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order"
testBrokerClientGetNotifications = testCase "Broker client: get notifications" $ withContext (\ctx -> do
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do
maybeOid <- submitOrder broC defaultOrder
case maybeOid of

59
test/TestBrokerServer.hs

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE OverloadedStrings, RecordWildCards #-}
module TestBrokerServer (
unitTests
@ -40,7 +40,8 @@ unitTests = testGroup "Broker.Server" [testBrokerServerStartStop @@ -40,7 +40,8 @@ unitTests = testGroup "Broker.Server" [testBrokerServerStartStop
, testBrokerServerCancelUnknownOrder
, testBrokerServerCorruptedPacket
, testBrokerServerGetNotifications
, testBrokerServerDuplicateRequest ]
, testBrokerServerDuplicateRequest
, testBrokerServerTradeSink ]
--
-- Few helpers
@ -72,14 +73,14 @@ defaultOrder = mkOrder { @@ -72,14 +73,14 @@ defaultOrder = mkOrder {
testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do
ep <- toText <$> UV4.nextRandom
broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep)
broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) ""
stopBrokerServer broS)
testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do
step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep
@ -101,7 +102,7 @@ testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server retur @@ -101,7 +102,7 @@ testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server retur
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock (defaultOrder { orderAccountId = "foobar" }) ep
@ -119,7 +120,7 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc @@ -119,7 +120,7 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
@ -145,7 +146,7 @@ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancell @@ -145,7 +146,7 @@ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancell
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep
receive sock
@ -167,7 +168,7 @@ testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet @@ -167,7 +168,7 @@ testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
step "Connecting"
connect sock (T.unpack ep)
@ -191,7 +192,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r @@ -191,7 +192,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
-- We have to actually submit order, or else server won't know that we should
-- be notified about this order
@ -252,7 +253,7 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque @@ -252,7 +253,7 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque
step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep
@ -275,3 +276,41 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque @@ -275,3 +276,41 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque
Nothing -> assertFailure "Invalid response"
)))
testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade sink" $ \step -> withContext (\ctx -> do
step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint
withSocket ctx Rep (\tradeSock -> do
bind tradeSock "inproc://trade-sink"
setReceiveTimeout (restrict 1000) tradeSock
bracket (startBrokerServer [mockBroker] ctx ep "inproc://trade-sink") stopBrokerServer (\broS -> do
withSocket ctx Req (\sock -> do
step "Connecting"
connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
threadDelay 100000
(Just cb) <- notificationCallback <$> readIORef broState
let trade = Trade {
tradeOrderId = orderId,
tradePrice = 19.82,
tradeQuantity = 1,
tradeVolume = 1982,
tradeVolumeCurrency = "TEST_CURRENCY",
tradeOperation = Buy,
tradeAccount = "demo",
tradeSecurity = "FOO",
tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000,
tradeSignalId = SignalId "Foo" "bar" "baz" }
cb (TradeNotification trade)
threadDelay 100000
step "Testing"
tradeMsg <- receive tradeSock
case decode . BL.fromStrict $ tradeMsg of
Just tsTrade@TradeSinkTrade{..} -> do
tsAccountId @?= tradeAccount trade
tsPrice @?= (fromRational . toRational . tradePrice) trade
_ -> assertFailure "Invalid trade in sink"
))))

Loading…
Cancel
Save