From 388c9e13971e99a02331bf51e3336b42ea9611a8 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 27 Jun 2017 00:13:22 +0700 Subject: [PATCH] BrokerServer: Support for multiple trade sinks --- libatrade.cabal | 10 ++- src/ATrade/Broker/Protocol.hs | 17 +++- src/ATrade/Broker/Server.hs | 67 ++++----------- .../Broker/TradeSinks/TelegramTradeSink.hs | 75 +++++++++++++++++ src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs | 75 +++++++++++++++++ test/Spec.hs | 4 +- test/TestBrokerClient.hs | 6 +- test/TestBrokerServer.hs | 83 ++++++++++--------- test/TestZMQTradeSink.hs | 47 +++++++++++ 9 files changed, 290 insertions(+), 94 deletions(-) create mode 100644 src/ATrade/Broker/TradeSinks/TelegramTradeSink.hs create mode 100644 src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs create mode 100644 test/TestZMQTradeSink.hs diff --git a/libatrade.cabal b/libatrade.cabal index aaf99b5..74c3526 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -1,5 +1,5 @@ name: libatrade -version: 0.1.0.0 +version: 0.2.0.0 synopsis: ATrade infrastructure core library description: Please see README.md homepage: https://github.com/asakul/libatrade.git @@ -22,6 +22,8 @@ library , ATrade.Broker.Client , ATrade.Broker.Protocol , ATrade.Broker.Server + , ATrade.Broker.TradeSinks.TelegramTradeSink + , ATrade.Broker.TradeSinks.ZMQTradeSink , ATrade.Util build-depends: base >= 4.7 && < 5 , Decimal @@ -44,6 +46,11 @@ library , errors , text-format , parsec + , extra + , connection + , http-client + , http-client-tls + , utf8-string default-language: Haskell2010 executable libatrade-exe @@ -93,6 +100,7 @@ test-suite libatrade-test , TestQuoteSourceClient , TestQuoteSourceServer , TestTypes + , TestZMQTradeSink source-repository head type: git diff --git a/src/ATrade/Broker/Protocol.hs b/src/ATrade/Broker/Protocol.hs index c629ca1..5039232 100644 --- a/src/ATrade/Broker/Protocol.hs +++ b/src/ATrade/Broker/Protocol.hs @@ -7,7 +7,8 @@ module ATrade.Broker.Protocol ( notificationOrderId, RequestSqnum(..), requestSqnum, - TradeSinkMessage(..) + TradeSinkMessage(..), + mkTradeMessage ) where import Control.Error.Util @@ -123,6 +124,20 @@ data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade { tsSignalId :: SignalId } deriving (Show, Eq) +mkTradeMessage trade = TradeSinkTrade { + tsAccountId = tradeAccount trade, + tsSecurity = tradeSecurity trade, + tsPrice = (toDouble . tradePrice) trade, + tsQuantity = (fromInteger . tradeQuantity) trade, + tsVolume = (toDouble . tradeVolume) trade, + tsCurrency = tradeVolumeCurrency trade, + tsOperation = tradeOperation trade, + tsExecutionTime = tradeTimestamp trade, + tsSignalId = tradeSignalId trade +} + where + toDouble = fromRational . toRational + getHMS :: UTCTime -> (Int, Int, Int, Int) getHMS (UTCTime _ diff) = (intsec `div` 3600, (intsec `mod` 3600) `div` 60, intsec `mod` 60, msec) where diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 68b537a..b16ac82 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -3,7 +3,8 @@ module ATrade.Broker.Server ( startBrokerServer, stopBrokerServer, - BrokerInterface(..) + BrokerInterface(..), + TradeSink ) where import ATrade.Types @@ -55,8 +56,10 @@ data BrokerServerState = BrokerServerState { data BrokerServerHandle = BrokerServerHandle ThreadId ThreadId (MVar ()) (MVar ()) -startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> T.Text -> ServerSecurityParams -> IO BrokerServerHandle -startBrokerServer brokers c ep tradeSinkEp params = do +type TradeSink = Trade -> IO () + +startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> [TradeSink] -> ServerSecurityParams -> IO BrokerServerHandle +startBrokerServer brokers c ep tradeSinks params = do sock <- socket c Router setLinger (restrict 0) sock case sspDomain params of @@ -87,10 +90,11 @@ startBrokerServer brokers c ep tradeSinkEp params = do mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers debugM "Broker.Server" "Forking broker server thread" - BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv + BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinks) <*> pure compMv <*> pure killMv notificationCallback :: IORef BrokerServerState -> Notification -> IO () notificationCallback state n = do + debugM "Broker.Server" $ "Notification: " ++ show n chan <- tradeSink <$> readIORef state case n of TradeNotification trade -> tryWriteChan chan trade @@ -106,52 +110,17 @@ notificationCallback state n = do Just ns -> s { pendingNotifications = M.insert peerId (n : ns) (pendingNotifications s)} Nothing -> s { pendingNotifications = M.insert peerId [n] (pendingNotifications s)}) -tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO () -tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $ - whileM_ (not <$> wasKilled) $ - handle (\e -> do - warningM "Broker.Server" $ "Trade sink: exception: " ++ (show (e :: SomeException)) ++ "; isZMQ: " ++ show (isZMQError e) - when (isZMQError e) $ do - debugM "Broker.Server" "Rethrowing exception" - throwIO e) $ withSocket c Dealer (\sock -> do - debugM "Broker.Server" "Connecting trade sink socket" - chan <- tradeSink <$> readIORef state - connect sock $ T.unpack tradeSinkEp - timeoutMv <- newIORef False - threadDelay 1000000 - whileM_ (andM [not <$> wasKilled, not <$> readIORef timeoutMv]) $ do - maybeTrade <- tryReadChan chan - case maybeTrade of - Just trade -> do - sendMulti sock $ B.empty :| [encodeTrade trade] - _ <- receiveMulti sock - return () - Nothing -> do - threadDelay 1000000 - sendMulti sock $ B.empty :| [BL.toStrict $ encode TradeSinkHeartBeat] - events <- poll 5000 [Sock sock [In] Nothing] - if not . L.null . L.head $ events - then void . receive $ sock -- anything will do - else do - writeIORef timeoutMv True - warningM "Broker.Server" "Trade sink timeout") - +tradeSinkHandler :: Context -> IORef BrokerServerState -> [TradeSink] -> IO () +tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $ + whileM_ (not <$> wasKilled) $ do + chan <- tradeSink <$> readIORef state + maybeTrade <- tryReadChan chan + case maybeTrade of + Just trade -> mapM_ (\x -> x trade) tradeSinks + Nothing -> return () where - isZMQError e = "ZMQError" `L.isPrefixOf` show e - wasKilled = fmap killMvar (readIORef state) >>= fmap isJust . tryReadMVar - encodeTrade :: Trade -> B.ByteString - encodeTrade = BL.toStrict . encode . convertTrade - convertTrade trade = TradeSinkTrade { - tsAccountId = tradeAccount trade, - tsSecurity = tradeSecurity trade, - tsPrice = fromRational . toRational . tradePrice $ trade, - tsQuantity = fromInteger $ tradeQuantity trade, - tsVolume = fromRational . toRational . tradeVolume $ trade, - tsCurrency = tradeVolumeCurrency trade, - tsOperation = tradeOperation trade, - tsExecutionTime = tradeTimestamp trade, - tsSignalId = tradeSignalId trade - } + wasKilled = isJust <$> (killMvar <$> readIORef state >>= tryReadMVar) + brokerServerThread :: IORef BrokerServerState -> IO () brokerServerThread state = finally brokerServerThread' cleanup diff --git a/src/ATrade/Broker/TradeSinks/TelegramTradeSink.hs b/src/ATrade/Broker/TradeSinks/TelegramTradeSink.hs new file mode 100644 index 0000000..dc5aeba --- /dev/null +++ b/src/ATrade/Broker/TradeSinks/TelegramTradeSink.hs @@ -0,0 +1,75 @@ +{-# LANGUAGE OverloadedStrings #-} + +module ATrade.Broker.TradeSinks.TelegramTradeSink ( + withTelegramTradeSink +) where + +import Control.Exception +import Control.Concurrent +import qualified Control.Concurrent.BoundedChan as BC +import Data.Aeson +import Data.Aeson.Types +import Data.IORef +import Data.Maybe +import Data.List.NonEmpty +import qualified Data.List as L +import qualified Data.ByteString as B hiding (putStrLn) +import qualified Data.ByteString.Lazy as BL hiding (putStrLn) +import System.Log.Logger +import Control.Monad.Loops +import Control.Monad.Extra + +import ATrade.Types +import ATrade.Broker.Protocol +import Network.Connection +import Network.HTTP.Client +import Network.HTTP.Client.TLS + +import qualified Data.Text as T +import qualified Data.Text.Lazy as TL +import Data.Text.Format +import qualified Data.ByteString.UTF8 as BU8 + +withTelegramTradeSink apitoken chatId f = do + killMv <- newEmptyMVar + chan <- BC.newBoundedChan 1000 + bracket (forkIO $ sinkThread apitoken chatId killMv chan) (stopSinkThread killMv) (\_ -> f $ sink chan) + where + sink = BC.writeChan + +sinkThread apitoken chatId killMv chan = do + man <- newManager $ mkManagerSettings tlsSettings Nothing + whileM_ (not <$> wasKilled) $ do + maybeTrade <- BC.tryReadChan chan + whenJust maybeTrade (\trade -> sendMessage man apitoken chatId $ format "Trade: {} {} of {} at {} for {} ({}/{})" + (show (tradeOperation trade), + show (tradeQuantity trade), + tradeSecurity trade, + show (tradePrice trade), + tradeAccount trade, + (strategyId . tradeSignalId) trade, + (signalName . tradeSignalId) trade)) + where + tlsSettings = TLSSettingsSimple { settingDisableCertificateValidation = True, settingDisableSession = False, settingUseServerName = False } + wasKilled = isJust <$> tryReadMVar killMv + encodeTrade :: Trade -> B.ByteString + encodeTrade = BL.toStrict . encode . convertTrade + convertTrade trade = TradeSinkTrade { + tsAccountId = tradeAccount trade, + tsSecurity = tradeSecurity trade, + tsPrice = fromRational . toRational . tradePrice $ trade, + tsQuantity = fromInteger $ tradeQuantity trade, + tsVolume = fromRational . toRational . tradeVolume $ trade, + tsCurrency = tradeVolumeCurrency trade, + tsOperation = tradeOperation trade, + tsExecutionTime = tradeTimestamp trade, + tsSignalId = tradeSignalId trade + } + + sendMessage httpManager apitoken chatId text = do + req <- parseUrl $ "https://api.telegram.org/bot" ++ (T.unpack apitoken) ++ "/sendMessage" + void $ withResponse (req { method = "POST", requestHeaders = [("Content-Type", BU8.fromString "application/json")], requestBody = (RequestBodyLBS . encode) (object ["chat_id" .= chatId, "text" .= text]) }) httpManager (\resp -> brConsume (responseBody resp)) + + +stopSinkThread killMv threadId = putMVar killMv () >> killThread threadId + diff --git a/src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs b/src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs new file mode 100644 index 0000000..76f4bdc --- /dev/null +++ b/src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs @@ -0,0 +1,75 @@ + +module ATrade.Broker.TradeSinks.ZMQTradeSink ( + withZMQTradeSink +) where + +import Control.Exception +import Control.Concurrent +import qualified Control.Concurrent.BoundedChan as BC +import Data.Aeson +import Data.IORef +import Data.Maybe +import qualified Data.Text as T +import Data.List.NonEmpty +import qualified Data.List as L +import qualified Data.ByteString as B hiding (putStrLn) +import qualified Data.ByteString.Lazy as BL hiding (putStrLn) +import Control.Monad.Loops +import Control.Monad.Extra +import System.Log.Logger +import System.Timeout +import System.ZMQ4 + +import ATrade.Types +import ATrade.Broker.Protocol + +withZMQTradeSink ctx tradeSinkEp f = do + killMv <- newEmptyMVar + chan <- BC.newBoundedChan 1000 + bracket (forkIO $ sinkThread ctx tradeSinkEp killMv chan) (stopSinkThread killMv) (\_ -> f $ sink chan) + where + sink = BC.writeChan + +sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $ do + handle (\e -> do + warningM "Broker.Server" $ "Trade sink: exception: " ++ show (e :: SomeException) ++ "; isZMQ: " ++ show (isZMQError e) + when (isZMQError e) $ do + debugM "Broker.Server" "Rethrowing exception" + throwIO e) sinkThread' + where + sinkThread' = withSocket ctx Dealer (\sock -> do + connect sock $ T.unpack tradeSinkEp + whenM (not <$> wasKilled) $ do + maybeTrade <- BC.tryReadChan chan + case maybeTrade of + Just trade -> do + sendMulti sock $ B.empty :| [encodeTrade trade] + void $ receiveMulti sock + Nothing -> do + sendMulti sock $ B.empty :| [BL.toStrict $ encode TradeSinkHeartBeat] + events <- poll 1000 [Sock sock [In] Nothing] + if L.null . L.head $ events + then warningM "Broker.Server" "Trade sink timeout" + else do + void . receive $ sock -- anything will do + sinkThread') + + + isZMQError e = "ZMQError" `L.isPrefixOf` show e + wasKilled = isJust <$> tryReadMVar killMv + encodeTrade :: Trade -> B.ByteString + encodeTrade = BL.toStrict . encode . convertTrade + convertTrade trade = TradeSinkTrade { + tsAccountId = tradeAccount trade, + tsSecurity = tradeSecurity trade, + tsPrice = fromRational . toRational . tradePrice $ trade, + tsQuantity = fromInteger $ tradeQuantity trade, + tsVolume = fromRational . toRational . tradeVolume $ trade, + tsCurrency = tradeVolumeCurrency trade, + tsOperation = tradeOperation trade, + tsExecutionTime = tradeTimestamp trade, + tsSignalId = tradeSignalId trade + } + +stopSinkThread killMv threadId = putMVar killMv () >> killThread threadId + diff --git a/test/Spec.hs b/test/Spec.hs index 8d57152..126cc8a 100644 --- a/test/Spec.hs +++ b/test/Spec.hs @@ -5,6 +5,7 @@ import qualified TestBrokerProtocol import qualified TestBrokerServer import qualified TestQuoteSourceClient import qualified TestQuoteSourceServer +import qualified TestZMQTradeSink import Test.Tasty @@ -19,5 +20,6 @@ unitTests = testGroup "Unit-tests" [ TestQuoteSourceClient.unitTests , TestQuoteSourceServer.unitTests , TestBrokerServer.unitTests - , TestBrokerClient.unitTests] + , TestBrokerClient.unitTests + , TestZMQTradeSink.unitTests ] diff --git a/test/TestBrokerClient.hs b/test/TestBrokerClient.hs index 37c0940..9841de5 100644 --- a/test/TestBrokerClient.hs +++ b/test/TestBrokerClient.hs @@ -57,7 +57,7 @@ defaultOrder = mkOrder { testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext (\ctx -> do ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do oid <- submitOrder broC defaultOrder case oid of @@ -67,7 +67,7 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" $ withContext (\ctx -> do ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do maybeOid <- submitOrder broC defaultOrder case maybeOid of @@ -82,7 +82,7 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" testBrokerClientGetNotifications = testCase "Broker client: get notifications" $ withContext (\ctx -> do ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do maybeOid <- submitOrder broC defaultOrder case maybeOid of diff --git a/test/TestBrokerServer.hs b/test/TestBrokerServer.hs index e245901..1b39724 100644 --- a/test/TestBrokerServer.hs +++ b/test/TestBrokerServer.hs @@ -68,20 +68,28 @@ defaultOrder = mkOrder { orderOperation = Buy } +makeTestTradeSink :: IO (IORef (Maybe Trade), TradeSink) +makeTestTradeSink = do + ref <- newIORef Nothing + return (ref, f ref) + where + f ref t = writeIORef ref $ Just t + + -- -- Tests -- testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do ep <- toText <$> UV4.nextRandom - broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) "" defaultServerSecurityParams + broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) [] defaultServerSecurityParams stopBrokerServer broS) testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do step "Setup" (mockBroker, broState) <- mkMockBroker ["demo"] ep <- makeEndpoint - bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> do + bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> do withSocket ctx Req (\sock -> do connectAndSendOrder step sock defaultOrder ep @@ -103,7 +111,7 @@ testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server retur step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock (defaultOrder { orderAccountId = "foobar" }) ep @@ -121,7 +129,7 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock defaultOrder ep (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock @@ -147,7 +155,7 @@ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancell step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock defaultOrder ep receive sock @@ -169,7 +177,7 @@ testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do step "Connecting" connect sock (T.unpack ep) @@ -193,7 +201,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do -- We have to actually submit order, or else server won't know that we should -- be notified about this order @@ -256,7 +264,7 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque (mockBroker, broState) <- mkMockBroker ["demo"] ep <- makeEndpoint putStrLn "delta" - bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> do + bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> do putStrLn "gamma" withSocket ctx Req (\sock -> do putStrLn "alpha" @@ -287,35 +295,32 @@ testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade step "Setup" (mockBroker, broState) <- mkMockBroker ["demo"] ep <- makeEndpoint - withSocket ctx Rep (\tradeSock -> do - bind tradeSock "inproc://trade-sink" - setReceiveTimeout (restrict 1000) tradeSock - bracket (startBrokerServer [mockBroker] ctx ep "inproc://trade-sink" defaultServerSecurityParams) stopBrokerServer (\broS -> do - withSocket ctx Req (\sock -> do - step "Connecting" - connectAndSendOrder step sock defaultOrder ep - (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock - - (Just cb) <- notificationCallback <$> readIORef broState - let trade = Trade { - tradeOrderId = orderId, - tradePrice = 19.82, - tradeQuantity = 1, - tradeVolume = 1982, - tradeVolumeCurrency = "TEST_CURRENCY", - tradeOperation = Buy, - tradeAccount = "demo", - tradeSecurity = "FOO", - tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000, - tradeSignalId = SignalId "Foo" "bar" "baz" } - cb (TradeNotification trade) + (tradeRef, sink) <- makeTestTradeSink + bracket (startBrokerServer [mockBroker] ctx ep [sink] defaultServerSecurityParams) stopBrokerServer (\broS -> do + withSocket ctx Req (\sock -> do + step "Connecting" + connectAndSendOrder step sock defaultOrder ep + (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock - threadDelay 100000 - step "Testing" - tradeMsg <- receive tradeSock - case decode . BL.fromStrict $ tradeMsg of - Just tsTrade@TradeSinkTrade{..} -> do - tsAccountId @?= tradeAccount trade - tsPrice @?= (fromRational . toRational . tradePrice) trade - _ -> assertFailure "Invalid trade in sink" - )))) + (Just cb) <- notificationCallback <$> readIORef broState + let trade = Trade { + tradeOrderId = orderId, + tradePrice = 19.82, + tradeQuantity = 1, + tradeVolume = 1982, + tradeVolumeCurrency = "TEST_CURRENCY", + tradeOperation = Buy, + tradeAccount = "demo", + tradeSecurity = "FOO", + tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000, + tradeSignalId = SignalId "Foo" "bar" "baz" } + cb (TradeNotification trade) + + threadDelay 100000 + step "Testing" + maybeTrade <- readIORef tradeRef + case maybeTrade of + Just trade' -> do + trade' @?= trade + _ -> assertFailure "Invalid trade in sink" + ))) diff --git a/test/TestZMQTradeSink.hs b/test/TestZMQTradeSink.hs new file mode 100644 index 0000000..1f9f107 --- /dev/null +++ b/test/TestZMQTradeSink.hs @@ -0,0 +1,47 @@ +{-# LANGUAGE OverloadedStrings #-} + +module TestZMQTradeSink ( + unitTests +) where + +import Test.Tasty +import Test.Tasty.SmallCheck as SC +import Test.Tasty.QuickCheck as QC +import Test.Tasty.HUnit + +import ATrade.Types +import ATrade.Broker.Protocol +import ATrade.Broker.TradeSinks.ZMQTradeSink +import Control.Concurrent +import System.ZMQ4 +import Data.Aeson +import Data.Time.Calendar +import Data.Time.Clock +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL + +unitTests = testGroup "Broker.Server.TradeSinks.ZMQTradeSink" [ testZMQTradeSink ] + +testZMQTradeSink = testCase "Test ZMQTradeSink trade serialization" $ + withContext (\ctx -> withSocket ctx Rep (\insock -> do + bind insock "inproc://test-sink" + withZMQTradeSink ctx "inproc://test-sink" (\f -> do + f trade + raw <- receive insock + send insock [] B.empty + case decode $ BL.fromStrict raw of + Just t -> mkTradeMessage trade @?= t + Nothing -> assertFailure "Unable to decode incoming message"))) + where + trade = Trade { + tradeOrderId = 0, + tradePrice = 10, + tradeQuantity = 20, + tradeVolume = 30, + tradeVolumeCurrency = "TEST", + tradeOperation = Buy, + tradeAccount = "FOO", + tradeSecurity = "BAR", + tradeTimestamp = UTCTime (fromGregorian 1970 1 1) 0, + tradeSignalId = SignalId "foo" "bar" "" } +