From 13811c454e78e27efa29364e1981536c7b8f0fc3 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 29 Nov 2016 15:51:52 +0700 Subject: [PATCH] Trade sink support --- libatrade.cabal | 3 + src/ATrade/Broker/Protocol.hs | 101 +++++++++++++++++++++++++++++++++- src/ATrade/Broker/Server.hs | 63 +++++++++++++++++---- test/TestBrokerClient.hs | 6 +- test/TestBrokerServer.hs | 59 ++++++++++++++++---- 5 files changed, 206 insertions(+), 26 deletions(-) diff --git a/libatrade.cabal b/libatrade.cabal index 86dbd66..1d95dca 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -39,6 +39,9 @@ library , monad-loops , safe , stm + , text-format + , errors + , parsec default-language: Haskell2010 executable libatrade-exe diff --git a/src/ATrade/Broker/Protocol.hs b/src/ATrade/Broker/Protocol.hs index db2c511..4cd0620 100644 --- a/src/ATrade/Broker/Protocol.hs +++ b/src/ATrade/Broker/Protocol.hs @@ -1,4 +1,4 @@ -{-# LANGUAGE OverloadedStrings, MultiWayIf #-} +{-# LANGUAGE OverloadedStrings, MultiWayIf, RecordWildCards #-} module ATrade.Broker.Protocol ( BrokerServerRequest(..), @@ -6,15 +6,21 @@ module ATrade.Broker.Protocol ( Notification(..), notificationOrderId, RequestSqnum(..), - requestSqnum + requestSqnum, + TradeSinkMessage(..) ) where +import Control.Error.Util import qualified Data.HashMap.Strict as HM import qualified Data.Text as T +import Data.Text.Format import Data.Aeson -import Data.Aeson.Types +import Data.Aeson.Types hiding (parse) import Data.Int +import Data.Time.Clock +import Data.Time.Calendar import ATrade.Types +import Text.Parsec type RequestSqnum = Int64 @@ -104,3 +110,92 @@ instance FromJSON Notification where instance ToJSON Notification where toJSON (OrderNotification oid newState) = object ["order-state" .= object [ "order-id" .= oid, "new-state" .= newState] ] toJSON (TradeNotification trade) = object ["trade" .= toJSON trade] + +data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade { + tsAccountId :: T.Text, + tsSecurity :: T.Text, + tsPrice :: Double, + tsQuantity :: Int, + tsVolume :: Double, + tsCurrency :: T.Text, + tsOperation :: Operation, + tsExecutionTime :: UTCTime, + tsSignalId :: SignalId +} + +getHMS :: UTCTime -> (Int, Int, Int, Int) +getHMS (UTCTime _ diff) = (intsec `div` 3600, (intsec `mod` 3600) `div` 60, intsec `mod` 60, msec) + where + intsec = floor diff + msec = floor $ (diff - fromIntegral intsec) * 1000 + +formatTimestamp dt = format "{}-{}-{} {}:{}:{}.{}" (left 4 '0' y, left 2 '0' m, left 2 '0' d, left 2 '0' hour, left 2 '0' min, left 2 '0' sec, left 3 '0' msec) + where + (y, m, d) = toGregorian $ utctDay dt + (hour, min, sec, msec) = getHMS dt + +parseTimestamp (String t) = case hush $ parse p "" t of + Just ts -> return ts + Nothing -> fail "Unable to parse timestamp" + where + p = do + year <- read <$> many1 digit + char '-' + mon <- read <$> many1 digit + char '-' + day <- read <$> many1 digit + char ' ' + hour <- read <$> many1 digit + char ':' + min <- read <$> many1 digit + char ':' + sec <- read <$> many1 digit + char '.' + msec <- many1 digit -- TODO use msec + return $ UTCTime (fromGregorian year mon day) (secondsToDiffTime $ hour * 3600 + min * 60 + sec) + +parseTimestamp _ = fail "Unable to parse timestamp: invalid type" + + +instance ToJSON TradeSinkMessage where + toJSON TradeSinkHeartBeat = object ["command" .= T.pack "heartbeat" ] + toJSON TradeSinkTrade { .. } = object ["account" .= tsAccountId, + "security" .= tsSecurity, + "price" .= tsPrice, + "quantity" .= tsQuantity, + "volume" .= tsVolume, + "volume-currency" .= tsCurrency, + "operation" .= tsOperation, + "execution-time" .= formatTimestamp tsExecutionTime, + "strategy" .= strategyId tsSignalId, + "signal-id" .= signalName tsSignalId, + "comment" .= comment tsSignalId] + +instance FromJSON TradeSinkMessage where + parseJSON = withObject "object" (\obj -> + case HM.lookup "command" obj of + Nothing -> parseTrade obj + Just cmd -> return TradeSinkHeartBeat) + where + parseTrade v = do + acc <- v .: "account" + sec <- v .: "security" + pr <- v .: "price" + q <- v .: "quantity" + vol <- v .: "volume" + cur <- v .: "volume-currency" + op <- v .: "operation" + extime <- v .: "execution-time" >>= parseTimestamp + strategy <- v .: "strategy" + sid <- v .: "signal-id" + com <- v .: "comment" + return TradeSinkTrade { + tsAccountId = acc, + tsSecurity = sec, + tsPrice = pr, + tsQuantity = q, + tsVolume = vol, + tsCurrency = cur, + tsOperation = op, + tsExecutionTime = extime, + tsSignalId = SignalId strategy sid com } diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 0c2f99e..2d4bdf3 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -19,11 +19,13 @@ import Data.Aeson import Data.Maybe import Data.Time.Clock import Data.IORef -import Control.Concurrent +import Control.Concurrent hiding (readChan, writeChan) +import Control.Concurrent.BoundedChan import Control.Exception import Control.Monad import Control.Monad.Loops import System.Log.Logger +import System.Timeout import ATrade.Util newtype OrderIdGenerator = IO OrderId @@ -46,18 +48,20 @@ data BrokerServerState = BrokerServerState { brokers :: [BrokerInterface], completionMvar :: MVar (), killMvar :: MVar (), - orderIdCounter :: OrderId + orderIdCounter :: OrderId, + tradeSink :: BoundedChan Trade } -data BrokerServerHandle = BrokerServerHandle ThreadId (MVar ()) (MVar ()) +data BrokerServerHandle = BrokerServerHandle ThreadId ThreadId (MVar ()) (MVar ()) -startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> IO BrokerServerHandle -startBrokerServer brokers c ep = do +startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> T.Text -> IO BrokerServerHandle +startBrokerServer brokers c ep tradeSinkEp = do sock <- socket c Router bind sock (T.unpack ep) tid <- myThreadId compMv <- newEmptyMVar killMv <- newEmptyMVar + tsChan <- newBoundedChan 100 state <- newIORef BrokerServerState { bsSocket = sock, orderMap = M.empty, @@ -67,15 +71,20 @@ startBrokerServer brokers c ep = do brokers = brokers, completionMvar = compMv, killMvar = killMv, - orderIdCounter = 1 + orderIdCounter = 1, + tradeSink = tsChan } mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers debugM "Broker.Server" "Forking broker server thread" - BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv <*> pure killMv + BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv notificationCallback :: IORef BrokerServerState -> Notification -> IO () notificationCallback state n = do + chan <- tradeSink <$> readIORef state + case n of + TradeNotification trade -> tryWriteChan chan trade + _ -> return False orders <- orderMap <$> readIORef state case M.lookup (notificationOrderId n) orders of Just peerId -> addNotification peerId n @@ -87,10 +96,42 @@ notificationCallback state n = do Just ns -> s { pendingNotifications = M.insert peerId (n : ns) (pendingNotifications s)} Nothing -> s { pendingNotifications = M.insert peerId [n] (pendingNotifications s)}) +tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO () +tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $ + whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ + withSocket c Req (\sock -> do + chan <- tradeSink <$> readIORef state + connect sock $ T.unpack tradeSinkEp + setReceiveTimeout (restrict 5000) sock + whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ do + threadDelay 500000 + maybeTrade <- tryReadChan chan + case maybeTrade of + Just trade -> send sock [] $ encodeTrade trade + Nothing -> do + send sock [] $ BL.toStrict $ encode TradeSinkHeartBeat + void $ receive sock -- anything will do + ) + + where + encodeTrade :: Trade -> B.ByteString + encodeTrade = BL.toStrict . encode . convertTrade + convertTrade trade = TradeSinkTrade { + tsAccountId = tradeAccount trade, + tsSecurity = tradeSecurity trade, + tsPrice = fromRational . toRational . tradePrice $ trade, + tsQuantity = fromInteger $ tradeQuantity trade, + tsVolume = fromRational . toRational . tradeVolume $ trade, + tsCurrency = tradeVolumeCurrency trade, + tsOperation = tradeOperation trade, + tsExecutionTime = tradeTimestamp trade, + tsSignalId = tradeSignalId trade + } + brokerServerThread :: IORef BrokerServerState -> IO () brokerServerThread state = finally brokerServerThread' cleanup where - brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryTakeMVar) $ do + brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ do sock <- bsSocket <$> readIORef state evs <- poll 200 [Sock sock [In] Nothing] when ((L.length . L.head) evs > 0) $ do @@ -168,7 +209,9 @@ brokerServerThread state = finally brokerServerThread' cleanup stopBrokerServer :: BrokerServerHandle -> IO () -stopBrokerServer (BrokerServerHandle tid compMv killMv) = do +stopBrokerServer (BrokerServerHandle tid tstid compMv killMv) = do putMVar killMv () - yield >> readMVar compMv + killThread tstid + yield + readMVar compMv diff --git a/test/TestBrokerClient.hs b/test/TestBrokerClient.hs index 846896e..8eaf100 100644 --- a/test/TestBrokerClient.hs +++ b/test/TestBrokerClient.hs @@ -56,7 +56,7 @@ defaultOrder = mkOrder { testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext (\ctx -> do ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do oid <- submitOrder broC defaultOrder case oid of @@ -66,7 +66,7 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" $ withContext (\ctx -> do ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do maybeOid <- submitOrder broC defaultOrder case maybeOid of @@ -81,7 +81,7 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" testBrokerClientGetNotifications = testCase "Broker client: get notifications" $ withContext (\ctx -> do ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do maybeOid <- submitOrder broC defaultOrder case maybeOid of diff --git a/test/TestBrokerServer.hs b/test/TestBrokerServer.hs index c431a4a..14f09d4 100644 --- a/test/TestBrokerServer.hs +++ b/test/TestBrokerServer.hs @@ -1,4 +1,4 @@ -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE OverloadedStrings, RecordWildCards #-} module TestBrokerServer ( unitTests @@ -40,7 +40,8 @@ unitTests = testGroup "Broker.Server" [testBrokerServerStartStop , testBrokerServerCancelUnknownOrder , testBrokerServerCorruptedPacket , testBrokerServerGetNotifications - , testBrokerServerDuplicateRequest ] + , testBrokerServerDuplicateRequest + , testBrokerServerTradeSink ] -- -- Few helpers @@ -72,14 +73,14 @@ defaultOrder = mkOrder { testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do ep <- toText <$> UV4.nextRandom - broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) + broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) "" stopBrokerServer broS) testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do step "Setup" (mockBroker, broState) <- mkMockBroker ["demo"] ep <- makeEndpoint - bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock defaultOrder ep @@ -101,7 +102,7 @@ testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server retur step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock (defaultOrder { orderAccountId = "foobar" }) ep @@ -119,7 +120,7 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock defaultOrder ep (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock @@ -145,7 +146,7 @@ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancell step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock defaultOrder ep receive sock @@ -167,7 +168,7 @@ testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do step "Connecting" connect sock (T.unpack ep) @@ -191,7 +192,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do -- We have to actually submit order, or else server won't know that we should -- be notified about this order @@ -252,7 +253,7 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque step "Setup" (mockBroker, broState) <- mkMockBroker ["demo"] ep <- makeEndpoint - bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock defaultOrder ep @@ -275,3 +276,41 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque Nothing -> assertFailure "Invalid response" ))) + +testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade sink" $ \step -> withContext (\ctx -> do + step "Setup" + (mockBroker, broState) <- mkMockBroker ["demo"] + ep <- makeEndpoint + withSocket ctx Rep (\tradeSock -> do + bind tradeSock "inproc://trade-sink" + setReceiveTimeout (restrict 1000) tradeSock + bracket (startBrokerServer [mockBroker] ctx ep "inproc://trade-sink") stopBrokerServer (\broS -> do + withSocket ctx Req (\sock -> do + step "Connecting" + connectAndSendOrder step sock defaultOrder ep + (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock + threadDelay 100000 + + (Just cb) <- notificationCallback <$> readIORef broState + let trade = Trade { + tradeOrderId = orderId, + tradePrice = 19.82, + tradeQuantity = 1, + tradeVolume = 1982, + tradeVolumeCurrency = "TEST_CURRENCY", + tradeOperation = Buy, + tradeAccount = "demo", + tradeSecurity = "FOO", + tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000, + tradeSignalId = SignalId "Foo" "bar" "baz" } + cb (TradeNotification trade) + + threadDelay 100000 + step "Testing" + tradeMsg <- receive tradeSock + case decode . BL.fromStrict $ tradeMsg of + Just tsTrade@TradeSinkTrade{..} -> do + tsAccountId @?= tradeAccount trade + tsPrice @?= (fromRational . toRational . tradePrice) trade + _ -> assertFailure "Invalid trade in sink" + ))))