Browse Source

BrokerServer: Support for multiple trade sinks

master
Denis Tereshkin 9 years ago
parent
commit
388c9e1397
  1. 10
      libatrade.cabal
  2. 17
      src/ATrade/Broker/Protocol.hs
  3. 67
      src/ATrade/Broker/Server.hs
  4. 75
      src/ATrade/Broker/TradeSinks/TelegramTradeSink.hs
  5. 75
      src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs
  6. 4
      test/Spec.hs
  7. 6
      test/TestBrokerClient.hs
  8. 83
      test/TestBrokerServer.hs
  9. 47
      test/TestZMQTradeSink.hs

10
libatrade.cabal

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
name: libatrade
version: 0.1.0.0
version: 0.2.0.0
synopsis: ATrade infrastructure core library
description: Please see README.md
homepage: https://github.com/asakul/libatrade.git
@ -22,6 +22,8 @@ library @@ -22,6 +22,8 @@ library
, ATrade.Broker.Client
, ATrade.Broker.Protocol
, ATrade.Broker.Server
, ATrade.Broker.TradeSinks.TelegramTradeSink
, ATrade.Broker.TradeSinks.ZMQTradeSink
, ATrade.Util
build-depends: base >= 4.7 && < 5
, Decimal
@ -44,6 +46,11 @@ library @@ -44,6 +46,11 @@ library
, errors
, text-format
, parsec
, extra
, connection
, http-client
, http-client-tls
, utf8-string
default-language: Haskell2010
executable libatrade-exe
@ -93,6 +100,7 @@ test-suite libatrade-test @@ -93,6 +100,7 @@ test-suite libatrade-test
, TestQuoteSourceClient
, TestQuoteSourceServer
, TestTypes
, TestZMQTradeSink
source-repository head
type: git

17
src/ATrade/Broker/Protocol.hs

@ -7,7 +7,8 @@ module ATrade.Broker.Protocol ( @@ -7,7 +7,8 @@ module ATrade.Broker.Protocol (
notificationOrderId,
RequestSqnum(..),
requestSqnum,
TradeSinkMessage(..)
TradeSinkMessage(..),
mkTradeMessage
) where
import Control.Error.Util
@ -123,6 +124,20 @@ data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade { @@ -123,6 +124,20 @@ data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade {
tsSignalId :: SignalId
} deriving (Show, Eq)
mkTradeMessage trade = TradeSinkTrade {
tsAccountId = tradeAccount trade,
tsSecurity = tradeSecurity trade,
tsPrice = (toDouble . tradePrice) trade,
tsQuantity = (fromInteger . tradeQuantity) trade,
tsVolume = (toDouble . tradeVolume) trade,
tsCurrency = tradeVolumeCurrency trade,
tsOperation = tradeOperation trade,
tsExecutionTime = tradeTimestamp trade,
tsSignalId = tradeSignalId trade
}
where
toDouble = fromRational . toRational
getHMS :: UTCTime -> (Int, Int, Int, Int)
getHMS (UTCTime _ diff) = (intsec `div` 3600, (intsec `mod` 3600) `div` 60, intsec `mod` 60, msec)
where

67
src/ATrade/Broker/Server.hs

@ -3,7 +3,8 @@ @@ -3,7 +3,8 @@
module ATrade.Broker.Server (
startBrokerServer,
stopBrokerServer,
BrokerInterface(..)
BrokerInterface(..),
TradeSink
) where
import ATrade.Types
@ -55,8 +56,10 @@ data BrokerServerState = BrokerServerState { @@ -55,8 +56,10 @@ data BrokerServerState = BrokerServerState {
data BrokerServerHandle = BrokerServerHandle ThreadId ThreadId (MVar ()) (MVar ())
startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> T.Text -> ServerSecurityParams -> IO BrokerServerHandle
startBrokerServer brokers c ep tradeSinkEp params = do
type TradeSink = Trade -> IO ()
startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> [TradeSink] -> ServerSecurityParams -> IO BrokerServerHandle
startBrokerServer brokers c ep tradeSinks params = do
sock <- socket c Router
setLinger (restrict 0) sock
case sspDomain params of
@ -87,10 +90,11 @@ startBrokerServer brokers c ep tradeSinkEp params = do @@ -87,10 +90,11 @@ startBrokerServer brokers c ep tradeSinkEp params = do
mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers
debugM "Broker.Server" "Forking broker server thread"
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinks) <*> pure compMv <*> pure killMv
notificationCallback :: IORef BrokerServerState -> Notification -> IO ()
notificationCallback state n = do
debugM "Broker.Server" $ "Notification: " ++ show n
chan <- tradeSink <$> readIORef state
case n of
TradeNotification trade -> tryWriteChan chan trade
@ -106,52 +110,17 @@ notificationCallback state n = do @@ -106,52 +110,17 @@ notificationCallback state n = do
Just ns -> s { pendingNotifications = M.insert peerId (n : ns) (pendingNotifications s)}
Nothing -> s { pendingNotifications = M.insert peerId [n] (pendingNotifications s)})
tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO ()
tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $
whileM_ (not <$> wasKilled) $
handle (\e -> do
warningM "Broker.Server" $ "Trade sink: exception: " ++ (show (e :: SomeException)) ++ "; isZMQ: " ++ show (isZMQError e)
when (isZMQError e) $ do
debugM "Broker.Server" "Rethrowing exception"
throwIO e) $ withSocket c Dealer (\sock -> do
debugM "Broker.Server" "Connecting trade sink socket"
chan <- tradeSink <$> readIORef state
connect sock $ T.unpack tradeSinkEp
timeoutMv <- newIORef False
threadDelay 1000000
whileM_ (andM [not <$> wasKilled, not <$> readIORef timeoutMv]) $ do
maybeTrade <- tryReadChan chan
case maybeTrade of
Just trade -> do
sendMulti sock $ B.empty :| [encodeTrade trade]
_ <- receiveMulti sock
return ()
Nothing -> do
threadDelay 1000000
sendMulti sock $ B.empty :| [BL.toStrict $ encode TradeSinkHeartBeat]
events <- poll 5000 [Sock sock [In] Nothing]
if not . L.null . L.head $ events
then void . receive $ sock -- anything will do
else do
writeIORef timeoutMv True
warningM "Broker.Server" "Trade sink timeout")
tradeSinkHandler :: Context -> IORef BrokerServerState -> [TradeSink] -> IO ()
tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $
whileM_ (not <$> wasKilled) $ do
chan <- tradeSink <$> readIORef state
maybeTrade <- tryReadChan chan
case maybeTrade of
Just trade -> mapM_ (\x -> x trade) tradeSinks
Nothing -> return ()
where
isZMQError e = "ZMQError" `L.isPrefixOf` show e
wasKilled = fmap killMvar (readIORef state) >>= fmap isJust . tryReadMVar
encodeTrade :: Trade -> B.ByteString
encodeTrade = BL.toStrict . encode . convertTrade
convertTrade trade = TradeSinkTrade {
tsAccountId = tradeAccount trade,
tsSecurity = tradeSecurity trade,
tsPrice = fromRational . toRational . tradePrice $ trade,
tsQuantity = fromInteger $ tradeQuantity trade,
tsVolume = fromRational . toRational . tradeVolume $ trade,
tsCurrency = tradeVolumeCurrency trade,
tsOperation = tradeOperation trade,
tsExecutionTime = tradeTimestamp trade,
tsSignalId = tradeSignalId trade
}
wasKilled = isJust <$> (killMvar <$> readIORef state >>= tryReadMVar)
brokerServerThread :: IORef BrokerServerState -> IO ()
brokerServerThread state = finally brokerServerThread' cleanup

75
src/ATrade/Broker/TradeSinks/TelegramTradeSink.hs

@ -0,0 +1,75 @@ @@ -0,0 +1,75 @@
{-# LANGUAGE OverloadedStrings #-}
module ATrade.Broker.TradeSinks.TelegramTradeSink (
withTelegramTradeSink
) where
import Control.Exception
import Control.Concurrent
import qualified Control.Concurrent.BoundedChan as BC
import Data.Aeson
import Data.Aeson.Types
import Data.IORef
import Data.Maybe
import Data.List.NonEmpty
import qualified Data.List as L
import qualified Data.ByteString as B hiding (putStrLn)
import qualified Data.ByteString.Lazy as BL hiding (putStrLn)
import System.Log.Logger
import Control.Monad.Loops
import Control.Monad.Extra
import ATrade.Types
import ATrade.Broker.Protocol
import Network.Connection
import Network.HTTP.Client
import Network.HTTP.Client.TLS
import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import Data.Text.Format
import qualified Data.ByteString.UTF8 as BU8
withTelegramTradeSink apitoken chatId f = do
killMv <- newEmptyMVar
chan <- BC.newBoundedChan 1000
bracket (forkIO $ sinkThread apitoken chatId killMv chan) (stopSinkThread killMv) (\_ -> f $ sink chan)
where
sink = BC.writeChan
sinkThread apitoken chatId killMv chan = do
man <- newManager $ mkManagerSettings tlsSettings Nothing
whileM_ (not <$> wasKilled) $ do
maybeTrade <- BC.tryReadChan chan
whenJust maybeTrade (\trade -> sendMessage man apitoken chatId $ format "Trade: {} {} of {} at {} for {} ({}/{})"
(show (tradeOperation trade),
show (tradeQuantity trade),
tradeSecurity trade,
show (tradePrice trade),
tradeAccount trade,
(strategyId . tradeSignalId) trade,
(signalName . tradeSignalId) trade))
where
tlsSettings = TLSSettingsSimple { settingDisableCertificateValidation = True, settingDisableSession = False, settingUseServerName = False }
wasKilled = isJust <$> tryReadMVar killMv
encodeTrade :: Trade -> B.ByteString
encodeTrade = BL.toStrict . encode . convertTrade
convertTrade trade = TradeSinkTrade {
tsAccountId = tradeAccount trade,
tsSecurity = tradeSecurity trade,
tsPrice = fromRational . toRational . tradePrice $ trade,
tsQuantity = fromInteger $ tradeQuantity trade,
tsVolume = fromRational . toRational . tradeVolume $ trade,
tsCurrency = tradeVolumeCurrency trade,
tsOperation = tradeOperation trade,
tsExecutionTime = tradeTimestamp trade,
tsSignalId = tradeSignalId trade
}
sendMessage httpManager apitoken chatId text = do
req <- parseUrl $ "https://api.telegram.org/bot" ++ (T.unpack apitoken) ++ "/sendMessage"
void $ withResponse (req { method = "POST", requestHeaders = [("Content-Type", BU8.fromString "application/json")], requestBody = (RequestBodyLBS . encode) (object ["chat_id" .= chatId, "text" .= text]) }) httpManager (\resp -> brConsume (responseBody resp))
stopSinkThread killMv threadId = putMVar killMv () >> killThread threadId

75
src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs

@ -0,0 +1,75 @@ @@ -0,0 +1,75 @@
module ATrade.Broker.TradeSinks.ZMQTradeSink (
withZMQTradeSink
) where
import Control.Exception
import Control.Concurrent
import qualified Control.Concurrent.BoundedChan as BC
import Data.Aeson
import Data.IORef
import Data.Maybe
import qualified Data.Text as T
import Data.List.NonEmpty
import qualified Data.List as L
import qualified Data.ByteString as B hiding (putStrLn)
import qualified Data.ByteString.Lazy as BL hiding (putStrLn)
import Control.Monad.Loops
import Control.Monad.Extra
import System.Log.Logger
import System.Timeout
import System.ZMQ4
import ATrade.Types
import ATrade.Broker.Protocol
withZMQTradeSink ctx tradeSinkEp f = do
killMv <- newEmptyMVar
chan <- BC.newBoundedChan 1000
bracket (forkIO $ sinkThread ctx tradeSinkEp killMv chan) (stopSinkThread killMv) (\_ -> f $ sink chan)
where
sink = BC.writeChan
sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $ do
handle (\e -> do
warningM "Broker.Server" $ "Trade sink: exception: " ++ show (e :: SomeException) ++ "; isZMQ: " ++ show (isZMQError e)
when (isZMQError e) $ do
debugM "Broker.Server" "Rethrowing exception"
throwIO e) sinkThread'
where
sinkThread' = withSocket ctx Dealer (\sock -> do
connect sock $ T.unpack tradeSinkEp
whenM (not <$> wasKilled) $ do
maybeTrade <- BC.tryReadChan chan
case maybeTrade of
Just trade -> do
sendMulti sock $ B.empty :| [encodeTrade trade]
void $ receiveMulti sock
Nothing -> do
sendMulti sock $ B.empty :| [BL.toStrict $ encode TradeSinkHeartBeat]
events <- poll 1000 [Sock sock [In] Nothing]
if L.null . L.head $ events
then warningM "Broker.Server" "Trade sink timeout"
else do
void . receive $ sock -- anything will do
sinkThread')
isZMQError e = "ZMQError" `L.isPrefixOf` show e
wasKilled = isJust <$> tryReadMVar killMv
encodeTrade :: Trade -> B.ByteString
encodeTrade = BL.toStrict . encode . convertTrade
convertTrade trade = TradeSinkTrade {
tsAccountId = tradeAccount trade,
tsSecurity = tradeSecurity trade,
tsPrice = fromRational . toRational . tradePrice $ trade,
tsQuantity = fromInteger $ tradeQuantity trade,
tsVolume = fromRational . toRational . tradeVolume $ trade,
tsCurrency = tradeVolumeCurrency trade,
tsOperation = tradeOperation trade,
tsExecutionTime = tradeTimestamp trade,
tsSignalId = tradeSignalId trade
}
stopSinkThread killMv threadId = putMVar killMv () >> killThread threadId

4
test/Spec.hs

@ -5,6 +5,7 @@ import qualified TestBrokerProtocol @@ -5,6 +5,7 @@ import qualified TestBrokerProtocol
import qualified TestBrokerServer
import qualified TestQuoteSourceClient
import qualified TestQuoteSourceServer
import qualified TestZMQTradeSink
import Test.Tasty
@ -19,5 +20,6 @@ unitTests = testGroup "Unit-tests" [ @@ -19,5 +20,6 @@ unitTests = testGroup "Unit-tests" [
TestQuoteSourceClient.unitTests
, TestQuoteSourceServer.unitTests
, TestBrokerServer.unitTests
, TestBrokerClient.unitTests]
, TestBrokerClient.unitTests
, TestZMQTradeSink.unitTests ]

6
test/TestBrokerClient.hs

@ -57,7 +57,7 @@ defaultOrder = mkOrder { @@ -57,7 +57,7 @@ defaultOrder = mkOrder {
testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext (\ctx -> do
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do
oid <- submitOrder broC defaultOrder
case oid of
@ -67,7 +67,7 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext @@ -67,7 +67,7 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext
testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" $ withContext (\ctx -> do
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do
maybeOid <- submitOrder broC defaultOrder
case maybeOid of
@ -82,7 +82,7 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" @@ -82,7 +82,7 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order"
testBrokerClientGetNotifications = testCase "Broker client: get notifications" $ withContext (\ctx -> do
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do
maybeOid <- submitOrder broC defaultOrder
case maybeOid of

83
test/TestBrokerServer.hs

@ -68,20 +68,28 @@ defaultOrder = mkOrder { @@ -68,20 +68,28 @@ defaultOrder = mkOrder {
orderOperation = Buy
}
makeTestTradeSink :: IO (IORef (Maybe Trade), TradeSink)
makeTestTradeSink = do
ref <- newIORef Nothing
return (ref, f ref)
where
f ref t = writeIORef ref $ Just t
--
-- Tests
--
testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do
ep <- toText <$> UV4.nextRandom
broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) "" defaultServerSecurityParams
broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) [] defaultServerSecurityParams
stopBrokerServer broS)
testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do
step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> do
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> do
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep
@ -103,7 +111,7 @@ testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server retur @@ -103,7 +111,7 @@ testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server retur
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock (defaultOrder { orderAccountId = "foobar" }) ep
@ -121,7 +129,7 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc @@ -121,7 +129,7 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
@ -147,7 +155,7 @@ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancell @@ -147,7 +155,7 @@ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancell
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep
receive sock
@ -169,7 +177,7 @@ testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet @@ -169,7 +177,7 @@ testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
step "Connecting"
connect sock (T.unpack ep)
@ -193,7 +201,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r @@ -193,7 +201,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
-- We have to actually submit order, or else server won't know that we should
-- be notified about this order
@ -256,7 +264,7 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque @@ -256,7 +264,7 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque
(mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint
putStrLn "delta"
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> do
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> do
putStrLn "gamma"
withSocket ctx Req (\sock -> do
putStrLn "alpha"
@ -287,35 +295,32 @@ testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade @@ -287,35 +295,32 @@ testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade
step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint
withSocket ctx Rep (\tradeSock -> do
bind tradeSock "inproc://trade-sink"
setReceiveTimeout (restrict 1000) tradeSock
bracket (startBrokerServer [mockBroker] ctx ep "inproc://trade-sink" defaultServerSecurityParams) stopBrokerServer (\broS -> do
withSocket ctx Req (\sock -> do
step "Connecting"
connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
(Just cb) <- notificationCallback <$> readIORef broState
let trade = Trade {
tradeOrderId = orderId,
tradePrice = 19.82,
tradeQuantity = 1,
tradeVolume = 1982,
tradeVolumeCurrency = "TEST_CURRENCY",
tradeOperation = Buy,
tradeAccount = "demo",
tradeSecurity = "FOO",
tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000,
tradeSignalId = SignalId "Foo" "bar" "baz" }
cb (TradeNotification trade)
(tradeRef, sink) <- makeTestTradeSink
bracket (startBrokerServer [mockBroker] ctx ep [sink] defaultServerSecurityParams) stopBrokerServer (\broS -> do
withSocket ctx Req (\sock -> do
step "Connecting"
connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
threadDelay 100000
step "Testing"
tradeMsg <- receive tradeSock
case decode . BL.fromStrict $ tradeMsg of
Just tsTrade@TradeSinkTrade{..} -> do
tsAccountId @?= tradeAccount trade
tsPrice @?= (fromRational . toRational . tradePrice) trade
_ -> assertFailure "Invalid trade in sink"
))))
(Just cb) <- notificationCallback <$> readIORef broState
let trade = Trade {
tradeOrderId = orderId,
tradePrice = 19.82,
tradeQuantity = 1,
tradeVolume = 1982,
tradeVolumeCurrency = "TEST_CURRENCY",
tradeOperation = Buy,
tradeAccount = "demo",
tradeSecurity = "FOO",
tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000,
tradeSignalId = SignalId "Foo" "bar" "baz" }
cb (TradeNotification trade)
threadDelay 100000
step "Testing"
maybeTrade <- readIORef tradeRef
case maybeTrade of
Just trade' -> do
trade' @?= trade
_ -> assertFailure "Invalid trade in sink"
)))

47
test/TestZMQTradeSink.hs

@ -0,0 +1,47 @@ @@ -0,0 +1,47 @@
{-# LANGUAGE OverloadedStrings #-}
module TestZMQTradeSink (
unitTests
) where
import Test.Tasty
import Test.Tasty.SmallCheck as SC
import Test.Tasty.QuickCheck as QC
import Test.Tasty.HUnit
import ATrade.Types
import ATrade.Broker.Protocol
import ATrade.Broker.TradeSinks.ZMQTradeSink
import Control.Concurrent
import System.ZMQ4
import Data.Aeson
import Data.Time.Calendar
import Data.Time.Clock
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
unitTests = testGroup "Broker.Server.TradeSinks.ZMQTradeSink" [ testZMQTradeSink ]
testZMQTradeSink = testCase "Test ZMQTradeSink trade serialization" $
withContext (\ctx -> withSocket ctx Rep (\insock -> do
bind insock "inproc://test-sink"
withZMQTradeSink ctx "inproc://test-sink" (\f -> do
f trade
raw <- receive insock
send insock [] B.empty
case decode $ BL.fromStrict raw of
Just t -> mkTradeMessage trade @?= t
Nothing -> assertFailure "Unable to decode incoming message")))
where
trade = Trade {
tradeOrderId = 0,
tradePrice = 10,
tradeQuantity = 20,
tradeVolume = 30,
tradeVolumeCurrency = "TEST",
tradeOperation = Buy,
tradeAccount = "FOO",
tradeSecurity = "BAR",
tradeTimestamp = UTCTime (fromGregorian 1970 1 1) 0,
tradeSignalId = SignalId "foo" "bar" "" }
Loading…
Cancel
Save