Browse Source

Trade sink support

master
Denis Tereshkin 9 years ago
parent
commit
13811c454e
  1. 3
      libatrade.cabal
  2. 101
      src/ATrade/Broker/Protocol.hs
  3. 63
      src/ATrade/Broker/Server.hs
  4. 6
      test/TestBrokerClient.hs
  5. 59
      test/TestBrokerServer.hs

3
libatrade.cabal

@ -39,6 +39,9 @@ library
, monad-loops , monad-loops
, safe , safe
, stm , stm
, text-format
, errors
, parsec
default-language: Haskell2010 default-language: Haskell2010
executable libatrade-exe executable libatrade-exe

101
src/ATrade/Broker/Protocol.hs

@ -1,4 +1,4 @@
{-# LANGUAGE OverloadedStrings, MultiWayIf #-} {-# LANGUAGE OverloadedStrings, MultiWayIf, RecordWildCards #-}
module ATrade.Broker.Protocol ( module ATrade.Broker.Protocol (
BrokerServerRequest(..), BrokerServerRequest(..),
@ -6,15 +6,21 @@ module ATrade.Broker.Protocol (
Notification(..), Notification(..),
notificationOrderId, notificationOrderId,
RequestSqnum(..), RequestSqnum(..),
requestSqnum requestSqnum,
TradeSinkMessage(..)
) where ) where
import Control.Error.Util
import qualified Data.HashMap.Strict as HM import qualified Data.HashMap.Strict as HM
import qualified Data.Text as T import qualified Data.Text as T
import Data.Text.Format
import Data.Aeson import Data.Aeson
import Data.Aeson.Types import Data.Aeson.Types hiding (parse)
import Data.Int import Data.Int
import Data.Time.Clock
import Data.Time.Calendar
import ATrade.Types import ATrade.Types
import Text.Parsec
type RequestSqnum = Int64 type RequestSqnum = Int64
@ -104,3 +110,92 @@ instance FromJSON Notification where
instance ToJSON Notification where instance ToJSON Notification where
toJSON (OrderNotification oid newState) = object ["order-state" .= object [ "order-id" .= oid, "new-state" .= newState] ] toJSON (OrderNotification oid newState) = object ["order-state" .= object [ "order-id" .= oid, "new-state" .= newState] ]
toJSON (TradeNotification trade) = object ["trade" .= toJSON trade] toJSON (TradeNotification trade) = object ["trade" .= toJSON trade]
data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade {
tsAccountId :: T.Text,
tsSecurity :: T.Text,
tsPrice :: Double,
tsQuantity :: Int,
tsVolume :: Double,
tsCurrency :: T.Text,
tsOperation :: Operation,
tsExecutionTime :: UTCTime,
tsSignalId :: SignalId
}
getHMS :: UTCTime -> (Int, Int, Int, Int)
getHMS (UTCTime _ diff) = (intsec `div` 3600, (intsec `mod` 3600) `div` 60, intsec `mod` 60, msec)
where
intsec = floor diff
msec = floor $ (diff - fromIntegral intsec) * 1000
formatTimestamp dt = format "{}-{}-{} {}:{}:{}.{}" (left 4 '0' y, left 2 '0' m, left 2 '0' d, left 2 '0' hour, left 2 '0' min, left 2 '0' sec, left 3 '0' msec)
where
(y, m, d) = toGregorian $ utctDay dt
(hour, min, sec, msec) = getHMS dt
parseTimestamp (String t) = case hush $ parse p "" t of
Just ts -> return ts
Nothing -> fail "Unable to parse timestamp"
where
p = do
year <- read <$> many1 digit
char '-'
mon <- read <$> many1 digit
char '-'
day <- read <$> many1 digit
char ' '
hour <- read <$> many1 digit
char ':'
min <- read <$> many1 digit
char ':'
sec <- read <$> many1 digit
char '.'
msec <- many1 digit -- TODO use msec
return $ UTCTime (fromGregorian year mon day) (secondsToDiffTime $ hour * 3600 + min * 60 + sec)
parseTimestamp _ = fail "Unable to parse timestamp: invalid type"
instance ToJSON TradeSinkMessage where
toJSON TradeSinkHeartBeat = object ["command" .= T.pack "heartbeat" ]
toJSON TradeSinkTrade { .. } = object ["account" .= tsAccountId,
"security" .= tsSecurity,
"price" .= tsPrice,
"quantity" .= tsQuantity,
"volume" .= tsVolume,
"volume-currency" .= tsCurrency,
"operation" .= tsOperation,
"execution-time" .= formatTimestamp tsExecutionTime,
"strategy" .= strategyId tsSignalId,
"signal-id" .= signalName tsSignalId,
"comment" .= comment tsSignalId]
instance FromJSON TradeSinkMessage where
parseJSON = withObject "object" (\obj ->
case HM.lookup "command" obj of
Nothing -> parseTrade obj
Just cmd -> return TradeSinkHeartBeat)
where
parseTrade v = do
acc <- v .: "account"
sec <- v .: "security"
pr <- v .: "price"
q <- v .: "quantity"
vol <- v .: "volume"
cur <- v .: "volume-currency"
op <- v .: "operation"
extime <- v .: "execution-time" >>= parseTimestamp
strategy <- v .: "strategy"
sid <- v .: "signal-id"
com <- v .: "comment"
return TradeSinkTrade {
tsAccountId = acc,
tsSecurity = sec,
tsPrice = pr,
tsQuantity = q,
tsVolume = vol,
tsCurrency = cur,
tsOperation = op,
tsExecutionTime = extime,
tsSignalId = SignalId strategy sid com }

63
src/ATrade/Broker/Server.hs

@ -19,11 +19,13 @@ import Data.Aeson
import Data.Maybe import Data.Maybe
import Data.Time.Clock import Data.Time.Clock
import Data.IORef import Data.IORef
import Control.Concurrent import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.BoundedChan
import Control.Exception import Control.Exception
import Control.Monad import Control.Monad
import Control.Monad.Loops import Control.Monad.Loops
import System.Log.Logger import System.Log.Logger
import System.Timeout
import ATrade.Util import ATrade.Util
newtype OrderIdGenerator = IO OrderId newtype OrderIdGenerator = IO OrderId
@ -46,18 +48,20 @@ data BrokerServerState = BrokerServerState {
brokers :: [BrokerInterface], brokers :: [BrokerInterface],
completionMvar :: MVar (), completionMvar :: MVar (),
killMvar :: MVar (), killMvar :: MVar (),
orderIdCounter :: OrderId orderIdCounter :: OrderId,
tradeSink :: BoundedChan Trade
} }
data BrokerServerHandle = BrokerServerHandle ThreadId (MVar ()) (MVar ()) data BrokerServerHandle = BrokerServerHandle ThreadId ThreadId (MVar ()) (MVar ())
startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> IO BrokerServerHandle startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> T.Text -> IO BrokerServerHandle
startBrokerServer brokers c ep = do startBrokerServer brokers c ep tradeSinkEp = do
sock <- socket c Router sock <- socket c Router
bind sock (T.unpack ep) bind sock (T.unpack ep)
tid <- myThreadId tid <- myThreadId
compMv <- newEmptyMVar compMv <- newEmptyMVar
killMv <- newEmptyMVar killMv <- newEmptyMVar
tsChan <- newBoundedChan 100
state <- newIORef BrokerServerState { state <- newIORef BrokerServerState {
bsSocket = sock, bsSocket = sock,
orderMap = M.empty, orderMap = M.empty,
@ -67,15 +71,20 @@ startBrokerServer brokers c ep = do
brokers = brokers, brokers = brokers,
completionMvar = compMv, completionMvar = compMv,
killMvar = killMv, killMvar = killMv,
orderIdCounter = 1 orderIdCounter = 1,
tradeSink = tsChan
} }
mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers
debugM "Broker.Server" "Forking broker server thread" debugM "Broker.Server" "Forking broker server thread"
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv <*> pure killMv BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv
notificationCallback :: IORef BrokerServerState -> Notification -> IO () notificationCallback :: IORef BrokerServerState -> Notification -> IO ()
notificationCallback state n = do notificationCallback state n = do
chan <- tradeSink <$> readIORef state
case n of
TradeNotification trade -> tryWriteChan chan trade
_ -> return False
orders <- orderMap <$> readIORef state orders <- orderMap <$> readIORef state
case M.lookup (notificationOrderId n) orders of case M.lookup (notificationOrderId n) orders of
Just peerId -> addNotification peerId n Just peerId -> addNotification peerId n
@ -87,10 +96,42 @@ notificationCallback state n = do
Just ns -> s { pendingNotifications = M.insert peerId (n : ns) (pendingNotifications s)} Just ns -> s { pendingNotifications = M.insert peerId (n : ns) (pendingNotifications s)}
Nothing -> s { pendingNotifications = M.insert peerId [n] (pendingNotifications s)}) Nothing -> s { pendingNotifications = M.insert peerId [n] (pendingNotifications s)})
tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO ()
tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $
whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $
withSocket c Req (\sock -> do
chan <- tradeSink <$> readIORef state
connect sock $ T.unpack tradeSinkEp
setReceiveTimeout (restrict 5000) sock
whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ do
threadDelay 500000
maybeTrade <- tryReadChan chan
case maybeTrade of
Just trade -> send sock [] $ encodeTrade trade
Nothing -> do
send sock [] $ BL.toStrict $ encode TradeSinkHeartBeat
void $ receive sock -- anything will do
)
where
encodeTrade :: Trade -> B.ByteString
encodeTrade = BL.toStrict . encode . convertTrade
convertTrade trade = TradeSinkTrade {
tsAccountId = tradeAccount trade,
tsSecurity = tradeSecurity trade,
tsPrice = fromRational . toRational . tradePrice $ trade,
tsQuantity = fromInteger $ tradeQuantity trade,
tsVolume = fromRational . toRational . tradeVolume $ trade,
tsCurrency = tradeVolumeCurrency trade,
tsOperation = tradeOperation trade,
tsExecutionTime = tradeTimestamp trade,
tsSignalId = tradeSignalId trade
}
brokerServerThread :: IORef BrokerServerState -> IO () brokerServerThread :: IORef BrokerServerState -> IO ()
brokerServerThread state = finally brokerServerThread' cleanup brokerServerThread state = finally brokerServerThread' cleanup
where where
brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryTakeMVar) $ do brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ do
sock <- bsSocket <$> readIORef state sock <- bsSocket <$> readIORef state
evs <- poll 200 [Sock sock [In] Nothing] evs <- poll 200 [Sock sock [In] Nothing]
when ((L.length . L.head) evs > 0) $ do when ((L.length . L.head) evs > 0) $ do
@ -168,7 +209,9 @@ brokerServerThread state = finally brokerServerThread' cleanup
stopBrokerServer :: BrokerServerHandle -> IO () stopBrokerServer :: BrokerServerHandle -> IO ()
stopBrokerServer (BrokerServerHandle tid compMv killMv) = do stopBrokerServer (BrokerServerHandle tid tstid compMv killMv) = do
putMVar killMv () putMVar killMv ()
yield >> readMVar compMv killThread tstid
yield
readMVar compMv

6
test/TestBrokerClient.hs

@ -56,7 +56,7 @@ defaultOrder = mkOrder {
testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext (\ctx -> do testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext (\ctx -> do
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do
oid <- submitOrder broC defaultOrder oid <- submitOrder broC defaultOrder
case oid of case oid of
@ -66,7 +66,7 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext
testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" $ withContext (\ctx -> do testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" $ withContext (\ctx -> do
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do
maybeOid <- submitOrder broC defaultOrder maybeOid <- submitOrder broC defaultOrder
case maybeOid of case maybeOid of
@ -81,7 +81,7 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order"
testBrokerClientGetNotifications = testCase "Broker client: get notifications" $ withContext (\ctx -> do testBrokerClientGetNotifications = testCase "Broker client: get notifications" $ withContext (\ctx -> do
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do
maybeOid <- submitOrder broC defaultOrder maybeOid <- submitOrder broC defaultOrder
case maybeOid of case maybeOid of

59
test/TestBrokerServer.hs

@ -1,4 +1,4 @@
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings, RecordWildCards #-}
module TestBrokerServer ( module TestBrokerServer (
unitTests unitTests
@ -40,7 +40,8 @@ unitTests = testGroup "Broker.Server" [testBrokerServerStartStop
, testBrokerServerCancelUnknownOrder , testBrokerServerCancelUnknownOrder
, testBrokerServerCorruptedPacket , testBrokerServerCorruptedPacket
, testBrokerServerGetNotifications , testBrokerServerGetNotifications
, testBrokerServerDuplicateRequest ] , testBrokerServerDuplicateRequest
, testBrokerServerTradeSink ]
-- --
-- Few helpers -- Few helpers
@ -72,14 +73,14 @@ defaultOrder = mkOrder {
testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do
ep <- toText <$> UV4.nextRandom ep <- toText <$> UV4.nextRandom
broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) ""
stopBrokerServer broS) stopBrokerServer broS)
testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do
step "Setup" step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint ep <- makeEndpoint
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep
@ -101,7 +102,7 @@ testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server retur
step "Setup" step "Setup"
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
connectAndSendOrder step sock (defaultOrder { orderAccountId = "foobar" }) ep connectAndSendOrder step sock (defaultOrder { orderAccountId = "foobar" }) ep
@ -119,7 +120,7 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc
step "Setup" step "Setup"
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
@ -145,7 +146,7 @@ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancell
step "Setup" step "Setup"
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep
receive sock receive sock
@ -167,7 +168,7 @@ testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet
step "Setup" step "Setup"
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
step "Connecting" step "Connecting"
connect sock (T.unpack ep) connect sock (T.unpack ep)
@ -191,7 +192,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r
step "Setup" step "Setup"
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
-- We have to actually submit order, or else server won't know that we should -- We have to actually submit order, or else server won't know that we should
-- be notified about this order -- be notified about this order
@ -252,7 +253,7 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque
step "Setup" step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint ep <- makeEndpoint
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep
@ -275,3 +276,41 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque
Nothing -> assertFailure "Invalid response" Nothing -> assertFailure "Invalid response"
))) )))
testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade sink" $ \step -> withContext (\ctx -> do
step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint
withSocket ctx Rep (\tradeSock -> do
bind tradeSock "inproc://trade-sink"
setReceiveTimeout (restrict 1000) tradeSock
bracket (startBrokerServer [mockBroker] ctx ep "inproc://trade-sink") stopBrokerServer (\broS -> do
withSocket ctx Req (\sock -> do
step "Connecting"
connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
threadDelay 100000
(Just cb) <- notificationCallback <$> readIORef broState
let trade = Trade {
tradeOrderId = orderId,
tradePrice = 19.82,
tradeQuantity = 1,
tradeVolume = 1982,
tradeVolumeCurrency = "TEST_CURRENCY",
tradeOperation = Buy,
tradeAccount = "demo",
tradeSecurity = "FOO",
tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000,
tradeSignalId = SignalId "Foo" "bar" "baz" }
cb (TradeNotification trade)
threadDelay 100000
step "Testing"
tradeMsg <- receive tradeSock
case decode . BL.fromStrict $ tradeMsg of
Just tsTrade@TradeSinkTrade{..} -> do
tsAccountId @?= tradeAccount trade
tsPrice @?= (fromRational . toRational . tradePrice) trade
_ -> assertFailure "Invalid trade in sink"
))))

Loading…
Cancel
Save