Browse Source

Separated orderId spaces for backends and BrokerServer

master
Denis Tereshkin 4 years ago
parent
commit
e6708ce928
  1. 38
      libatrade.cabal
  2. 28
      src/ATrade/Broker/Backend.hs
  3. 53
      src/ATrade/Broker/Protocol.hs
  4. 123
      src/ATrade/Broker/Server.hs
  5. 61
      src/ATrade/Broker/TradeSinks/TelegramTradeSink.hs
  6. 31
      test/MockBroker.hs
  7. 132
      test/TestBrokerServer.hs

38
libatrade.cabal

@ -20,6 +20,7 @@ library
, ATrade.Price , ATrade.Price
, ATrade.QuoteSource.Client , ATrade.QuoteSource.Client
, ATrade.QuoteSource.Server , ATrade.QuoteSource.Server
, ATrade.Broker.Backend
, ATrade.Broker.Client , ATrade.Broker.Client
, ATrade.Broker.Protocol , ATrade.Broker.Protocol
, ATrade.Broker.Server , ATrade.Broker.Server
@ -29,32 +30,33 @@ library
, ATrade , ATrade
other-modules: Paths_libatrade other-modules: Paths_libatrade
build-depends: base >= 4.7 && < 5 build-depends: base >= 4.7 && < 5
, time
, datetime
, bytestring
, text
, binary
, aeson
, BoundedChan , BoundedChan
, hslogger , aeson
, zeromq4-haskell , bimap
, zeromq4-haskell-zap , binary
, unordered-containers , bytestring
, connection
, containers , containers
, monad-loops , datetime
, safe
, stm
, deepseq , deepseq
, errors , errors
, text-format
, parsec
, extra , extra
, connection , gitrev
, hslogger
, http-client , http-client
, http-client-tls , http-client-tls
, utf8-string , monad-loops
, parsec
, safe
, scientific , scientific
, gitrev , stm
, text
, th-printf
, time
, unordered-containers
, utf8-string
, zeromq4-haskell
, zeromq4-haskell-zap
default-language: Haskell2010 default-language: Haskell2010

28
src/ATrade/Broker/Backend.hs

@ -0,0 +1,28 @@
module ATrade.Broker.Backend
(
BrokerBackend(..),
BrokerBackendNotification(..),
backendNotificationOrderId
) where
import ATrade.Types
import qualified Data.Text as T
data BrokerBackendNotification =
BackendTradeNotification Trade |
BackendOrderNotification OrderId OrderState
deriving (Show, Eq)
backendNotificationOrderId :: BrokerBackendNotification -> OrderId
backendNotificationOrderId (BackendOrderNotification oid _) = oid
backendNotificationOrderId (BackendTradeNotification trade) = tradeOrderId trade
data BrokerBackend = BrokerBackend
{
accounts :: [T.Text],
setNotificationCallback :: (Maybe (BrokerBackendNotification -> IO ())) -> IO (),
submitOrder :: Order -> IO (),
cancelOrder :: OrderId -> IO (),
stop :: IO ()
}

53
src/ATrade/Broker/Protocol.hs

@ -1,4 +1,7 @@
{-# LANGUAGE OverloadedStrings, MultiWayIf, RecordWildCards #-} {-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE RecordWildCards #-}
module ATrade.Broker.Protocol ( module ATrade.Broker.Protocol (
BrokerServerRequest(..), BrokerServerRequest(..),
@ -12,18 +15,18 @@ module ATrade.Broker.Protocol (
ClientIdentity(..) ClientIdentity(..)
) where ) where
import Control.Error.Util import ATrade.Types
import qualified Data.HashMap.Strict as HM import Control.Error.Util
import qualified Data.Text as T import Data.Aeson
import Data.Text.Format import Data.Aeson.Types hiding (parse)
import Data.Text.Encoding import qualified Data.HashMap.Strict as HM
import Data.Aeson import Data.Int
import Data.Aeson.Types hiding (parse) import qualified Data.Text as T
import Data.Int import Data.Text.Encoding
import Data.Time.Clock import Data.Time.Calendar
import Data.Time.Calendar import Data.Time.Clock
import ATrade.Types import Language.Haskell.Printf
import Text.Parsec import Text.Parsec
type ClientIdentity = T.Text type ClientIdentity = T.Text
type RequestSqnum = Int64 type RequestSqnum = Int64
@ -103,7 +106,7 @@ notificationOrderId (TradeNotification trade) = tradeOrderId trade
instance FromJSON Notification where instance FromJSON Notification where
parseJSON n = withObject "notification" (\obj -> parseJSON n = withObject "notification" (\obj ->
case HM.lookup "trade" obj of case HM.lookup "trade" obj of
Just v -> parseTrade v Just v -> parseTrade v
Nothing -> parseOrder n) n Nothing -> parseOrder n) n
where where
parseTrade v = TradeNotification <$> parseJSON v parseTrade v = TradeNotification <$> parseJSON v
@ -120,16 +123,16 @@ instance ToJSON Notification where
toJSON (TradeNotification trade) = object ["trade" .= toJSON trade] toJSON (TradeNotification trade) = object ["trade" .= toJSON trade]
data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade { data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade {
tsAccountId :: T.Text, tsAccountId :: T.Text,
tsSecurity :: T.Text, tsSecurity :: T.Text,
tsPrice :: Double, tsPrice :: Double,
tsQuantity :: Int, tsQuantity :: Int,
tsVolume :: Double, tsVolume :: Double,
tsCurrency :: T.Text, tsCurrency :: T.Text,
tsOperation :: Operation, tsOperation :: Operation,
tsExecutionTime :: UTCTime, tsExecutionTime :: UTCTime,
tsCommission :: Double, tsCommission :: Double,
tsSignalId :: SignalId tsSignalId :: SignalId
} deriving (Show, Eq) } deriving (Show, Eq)
mkTradeMessage trade = TradeSinkTrade { mkTradeMessage trade = TradeSinkTrade {
@ -153,7 +156,7 @@ getHMS (UTCTime _ diff) = (intsec `div` 3600, (intsec `mod` 3600) `div` 60, ints
intsec = floor diff intsec = floor diff
msec = floor $ (diff - fromIntegral intsec) * 1000 msec = floor $ (diff - fromIntegral intsec) * 1000
formatTimestamp dt = format "{}-{}-{} {}:{}:{}.{}" (left 4 '0' y, left 2 '0' m, left 2 '0' d, left 2 '0' hour, left 2 '0' min, left 2 '0' sec, left 3 '0' msec) formatTimestamp dt = [t|%04d-%02d-%02d %02d:%02d:%02d.%03d|] y m d hour min sec msec
where where
(y, m, d) = toGregorian $ utctDay dt (y, m, d) = toGregorian $ utctDay dt
(hour, min, sec, msec) = getHMS dt (hour, min, sec, msec) = getHMS dt
@ -199,7 +202,7 @@ instance ToJSON TradeSinkMessage where
instance FromJSON TradeSinkMessage where instance FromJSON TradeSinkMessage where
parseJSON = withObject "object" (\obj -> parseJSON = withObject "object" (\obj ->
case HM.lookup "command" obj of case HM.lookup "command" obj of
Nothing -> parseTrade obj Nothing -> parseTrade obj
Just cmd -> return TradeSinkHeartBeat) Just cmd -> return TradeSinkHeartBeat)
where where
parseTrade obj = case HM.lookup "trade" obj of parseTrade obj = case HM.lookup "trade" obj of

123
src/ATrade/Broker/Server.hs

@ -3,69 +3,66 @@
module ATrade.Broker.Server ( module ATrade.Broker.Server (
startBrokerServer, startBrokerServer,
stopBrokerServer, stopBrokerServer,
BrokerInterface(..), BrokerBackend(..),
TradeSink TradeSink
) where ) where
import ATrade.Types import ATrade.Broker.Backend
import ATrade.Broker.Protocol import ATrade.Broker.Protocol
import System.ZMQ4 import ATrade.Types
import System.ZMQ4.ZAP import ATrade.Util
import Data.List.NonEmpty import Control.Concurrent hiding (readChan, writeChan)
import qualified Data.Map as M import Control.Concurrent.BoundedChan
import qualified Data.ByteString as B hiding (putStrLn) import Control.Exception
import qualified Data.ByteString.Lazy as BL hiding (putStrLn) import Control.Monad
import qualified Data.Text as T import Control.Monad.Loops
import qualified Data.Text.Encoding as E import Data.Aeson
import qualified Data.List as L import qualified Data.Bimap as BM
import Data.Aeson import qualified Data.ByteString as B hiding (putStrLn)
import Data.Maybe import qualified Data.ByteString.Lazy as BL hiding (putStrLn)
import Data.Time.Clock import Data.IORef
import Data.IORef import qualified Data.List as L
import Control.Concurrent hiding (readChan, writeChan) import Data.List.NonEmpty
import Control.Concurrent.BoundedChan import qualified Data.Map as M
import Control.Exception import Data.Maybe
import Control.Monad import qualified Data.Text as T
import Control.Monad.Loops import qualified Data.Text.Encoding as E
import System.Log.Logger import Data.Time.Clock
import System.Timeout import System.Log.Logger
import ATrade.Util import System.Timeout
import System.ZMQ4
import System.ZMQ4.ZAP
newtype OrderIdGenerator = IO OrderId newtype OrderIdGenerator = IO OrderId
type PeerId = B.ByteString type PeerId = B.ByteString
data BrokerInterface = BrokerInterface { data FullOrderId = FullOrderId ClientIdentity OrderId
accounts :: [T.Text], deriving (Show, Eq, Ord)
setNotificationCallback :: Maybe (Notification -> IO()) -> IO (),
submitOrder :: Order -> IO (),
cancelOrder :: OrderId -> IO Bool,
stopBroker :: IO ()
}
data BrokerServerState = BrokerServerState { data BrokerServerState = BrokerServerState {
bsSocket :: Socket Router, bsSocket :: Socket Router,
orderToBroker :: M.Map OrderId BrokerInterface, orderToBroker :: M.Map FullOrderId BrokerBackend,
orderMap :: M.Map OrderId ClientIdentity, -- Matches 0mq client identities with corresponding orders orderMap :: BM.Bimap FullOrderId OrderId,
lastPacket :: M.Map PeerId (RequestSqnum, BrokerServerResponse), lastPacket :: M.Map PeerId (RequestSqnum, BrokerServerResponse),
pendingNotifications :: M.Map ClientIdentity [Notification], pendingNotifications :: M.Map ClientIdentity [Notification],
brokers :: [BrokerInterface], brokers :: [BrokerBackend],
completionMvar :: MVar (), completionMvar :: MVar (),
killMvar :: MVar (), killMvar :: MVar (),
orderIdCounter :: OrderId, orderIdCounter :: OrderId,
tradeSink :: BoundedChan Trade tradeSink :: BoundedChan Trade
} }
data BrokerServerHandle = BrokerServerHandle ThreadId ThreadId (MVar ()) (MVar ()) data BrokerServerHandle = BrokerServerHandle ThreadId ThreadId (MVar ()) (MVar ())
type TradeSink = Trade -> IO () type TradeSink = Trade -> IO ()
startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> [TradeSink] -> ServerSecurityParams -> IO BrokerServerHandle startBrokerServer :: [BrokerBackend] -> Context -> T.Text -> [TradeSink] -> ServerSecurityParams -> IO BrokerServerHandle
startBrokerServer brokers c ep tradeSinks params = do startBrokerServer brokers c ep tradeSinks params = do
sock <- socket c Router sock <- socket c Router
setLinger (restrict 0) sock setLinger (restrict 0) sock
case sspDomain params of case sspDomain params of
Just domain -> setZapDomain (restrict $ E.encodeUtf8 domain) sock Just domain -> setZapDomain (restrict $ E.encodeUtf8 domain) sock
Nothing -> return () Nothing -> return ()
case sspCertificate params of case sspCertificate params of
Just cert -> do Just cert -> do
setCurveServer True sock setCurveServer True sock
@ -78,7 +75,7 @@ startBrokerServer brokers c ep tradeSinks params = do
tsChan <- newBoundedChan 100 tsChan <- newBoundedChan 100
state <- newIORef BrokerServerState { state <- newIORef BrokerServerState {
bsSocket = sock, bsSocket = sock,
orderMap = M.empty, orderMap = BM.empty,
orderToBroker = M.empty, orderToBroker = M.empty,
lastPacket = M.empty, lastPacket = M.empty,
pendingNotifications = M.empty, pendingNotifications = M.empty,
@ -93,16 +90,19 @@ startBrokerServer brokers c ep tradeSinks params = do
debugM "Broker.Server" "Forking broker server thread" debugM "Broker.Server" "Forking broker server thread"
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinks) <*> pure compMv <*> pure killMv BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinks) <*> pure compMv <*> pure killMv
notificationCallback :: IORef BrokerServerState -> Notification -> IO () notificationCallback :: IORef BrokerServerState -> BrokerBackendNotification -> IO ()
notificationCallback state n = do notificationCallback state n = do
debugM "Broker.Server" $ "Notification: " ++ show n debugM "Broker.Server" $ "Notification: " ++ show n
chan <- tradeSink <$> readIORef state chan <- tradeSink <$> readIORef state
case n of case n of
TradeNotification trade -> tryWriteChan chan trade BackendTradeNotification trade -> tryWriteChan chan trade
_ -> return False _ -> return False
orders <- orderMap <$> readIORef state orders <- orderMap <$> readIORef state
case M.lookup (notificationOrderId n) orders of case BM.lookupR (backendNotificationOrderId n) orders of
Just clientIdentity -> addNotification clientIdentity n Just (FullOrderId clientIdentity localOrderId) ->
case n of
BackendTradeNotification trade -> addNotification clientIdentity (TradeNotification trade { tradeOrderId = localOrderId })
BackendOrderNotification globalOrderId newstate -> addNotification clientIdentity (OrderNotification localOrderId newstate)
Nothing -> warningM "Broker.Server" "Notification: unknown order" Nothing -> warningM "Broker.Server" "Notification: unknown order"
where where
@ -118,7 +118,7 @@ tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $
maybeTrade <- tryReadChan chan maybeTrade <- tryReadChan chan
case maybeTrade of case maybeTrade of
Just trade -> mapM_ (\x -> x trade) tradeSinks Just trade -> mapM_ (\x -> x trade) tradeSinks
Nothing -> threadDelay 100000 Nothing -> threadDelay 100000
where where
wasKilled = isJust <$> (killMvar <$> readIORef state >>= tryReadMVar) wasKilled = isJust <$> (killMvar <$> readIORef state >>= tryReadMVar)
@ -178,23 +178,26 @@ brokerServerThread state = finally brokerServerThread' cleanup
debugM "Broker.Server" $ "Request: submit order:" ++ show request debugM "Broker.Server" $ "Request: submit order:" ++ show request
case findBrokerForAccount (orderAccountId order) bros of case findBrokerForAccount (orderAccountId order) bros of
Just bro -> do Just bro -> do
oid <- nextOrderId globalOrderId <- nextOrderId
let fullOrderId = (FullOrderId clientIdentity (orderId order))
atomicMapIORef state (\s -> s { atomicMapIORef state (\s -> s {
orderToBroker = M.insert oid bro (orderToBroker s), orderToBroker = M.insert fullOrderId bro (orderToBroker s),
orderMap = M.insert oid clientIdentity (orderMap s) }) orderMap = BM.insert fullOrderId globalOrderId (orderMap s) })
submitOrder bro order { orderId = oid } submitOrder bro order { orderId = globalOrderId }
return $ ResponseOrderSubmitted oid return $ ResponseOrderSubmitted (orderId order)
Nothing -> do Nothing -> do
debugM "Broker.Server" $ "Unknown account: " ++ T.unpack (orderAccountId order) debugM "Broker.Server" $ "Unknown account: " ++ T.unpack (orderAccountId order)
return $ ResponseError "Unknown account" return $ ResponseError "Unknown account"
RequestCancelOrder sqnum clientIdentity oid -> do RequestCancelOrder sqnum clientIdentity localOrderId -> do
m <- orderToBroker <$> readIORef state m <- orderToBroker <$> readIORef state
case M.lookup oid m of bm <- orderMap <$> readIORef state
Just bro -> do let fullOrderId = FullOrderId clientIdentity localOrderId
cancelOrder bro oid case (M.lookup fullOrderId m, BM.lookup fullOrderId bm) of
return $ ResponseOrderCancelled oid (Just bro, Just globalOrderId) -> do
Nothing -> return $ ResponseError "Unknown order" cancelOrder bro globalOrderId
return $ ResponseOrderCancelled localOrderId
_ -> return $ ResponseError "Unknown order"
RequestNotifications sqnum clientIdentity -> do RequestNotifications sqnum clientIdentity -> do
maybeNs <- M.lookup clientIdentity . pendingNotifications <$> readIORef state maybeNs <- M.lookup clientIdentity . pendingNotifications <$> readIORef state
case maybeNs of case maybeNs of

61
src/ATrade/Broker/TradeSinks/TelegramTradeSink.hs

@ -1,34 +1,35 @@
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE QuasiQuotes #-}
module ATrade.Broker.TradeSinks.TelegramTradeSink ( module ATrade.Broker.TradeSinks.TelegramTradeSink (
withTelegramTradeSink withTelegramTradeSink
) where ) where
import Control.Exception import Control.Concurrent
import Control.Concurrent
import qualified Control.Concurrent.BoundedChan as BC import qualified Control.Concurrent.BoundedChan as BC
import Data.Aeson import Control.Exception
import Data.Aeson.Types import Control.Monad.Extra
import Data.IORef import Control.Monad.Loops
import Data.Maybe import Data.Aeson
import Data.List.NonEmpty import Data.Aeson.Types
import qualified Data.List as L import qualified Data.ByteString as B hiding (putStrLn)
import qualified Data.ByteString as B hiding (putStrLn) import qualified Data.ByteString.Lazy as BL hiding (putStrLn)
import qualified Data.ByteString.Lazy as BL hiding (putStrLn) import Data.IORef
import System.Log.Logger import qualified Data.List as L
import Control.Monad.Loops import Data.List.NonEmpty
import Control.Monad.Extra import Data.Maybe
import System.Log.Logger
import ATrade.Types import ATrade.Broker.Protocol
import ATrade.Broker.Protocol import ATrade.Types
import Network.Connection import Network.Connection
import Network.HTTP.Client import Network.HTTP.Client
import Network.HTTP.Client.TLS import Network.HTTP.Client.TLS
import qualified Data.Text as T import qualified Data.ByteString.UTF8 as BU8
import qualified Data.Text.Lazy as TL import qualified Data.Text as T
import Data.Text.Format import qualified Data.Text.Lazy as TL
import qualified Data.ByteString.UTF8 as BU8 import Language.Haskell.Printf
withTelegramTradeSink apitoken chatId f = do withTelegramTradeSink apitoken chatId f = do
killMv <- newEmptyMVar killMv <- newEmptyMVar
@ -42,14 +43,14 @@ sinkThread apitoken chatId killMv chan = do
whileM_ (not <$> wasKilled) $ do whileM_ (not <$> wasKilled) $ do
maybeTrade <- BC.tryReadChan chan maybeTrade <- BC.tryReadChan chan
case maybeTrade of case maybeTrade of
Just trade -> sendMessage man apitoken chatId $ format "Trade: {} {} of {} at {} for {} ({}/{})" Just trade -> sendMessage man apitoken chatId $ [t|Trade: %? %? of %? at %? for %? (%?/%?)|]
(show (tradeOperation trade), (show $ tradeOperation trade)
show (tradeQuantity trade), (tradeQuantity trade)
tradeSecurity trade, (tradeSecurity trade)
show (tradePrice trade), (show $ tradePrice trade)
tradeAccount trade, (tradeAccount trade)
(strategyId . tradeSignalId) trade, ((strategyId . tradeSignalId) trade)
(signalName . tradeSignalId) trade) ((signalName . tradeSignalId) trade)
Nothing -> threadDelay 1000000 Nothing -> threadDelay 1000000
where where
tlsSettings = TLSSettingsSimple { settingDisableCertificateValidation = True, settingDisableSession = False, settingUseServerName = False } tlsSettings = TLSSettingsSimple { settingDisableCertificateValidation = True, settingDisableSession = False, settingUseServerName = False }

31
test/MockBroker.hs

@ -7,17 +7,18 @@ module MockBroker (
mkMockBroker mkMockBroker
) where ) where
import ATrade.Types import ATrade.Broker.Backend
import ATrade.Broker.Protocol import ATrade.Broker.Protocol
import ATrade.Broker.Server import ATrade.Broker.Server
import ATrade.Util import ATrade.Types
import Data.IORef import ATrade.Util
import qualified Data.List as L import Data.IORef
import qualified Data.List as L
data MockBrokerState = MockBrokerState { data MockBrokerState = MockBrokerState {
orders :: [Order], orders :: [Order],
cancelledOrders :: [Order], cancelledOrders :: [Order],
notificationCallback :: Maybe (Notification -> IO ()) notificationCallback :: Maybe (BrokerBackendNotification -> IO ())
} }
mockSubmitOrder :: IORef MockBrokerState -> Order -> IO () mockSubmitOrder :: IORef MockBrokerState -> Order -> IO ()
@ -25,17 +26,17 @@ mockSubmitOrder state order = do
atomicMapIORef state (\s -> s { orders = submittedOrder : orders s }) atomicMapIORef state (\s -> s { orders = submittedOrder : orders s })
maybeCb <- notificationCallback <$> readIORef state maybeCb <- notificationCallback <$> readIORef state
case maybeCb of case maybeCb of
Just cb -> cb $ OrderNotification (orderId order) Submitted Just cb -> cb $ BackendOrderNotification (orderId order) Submitted
Nothing -> return () Nothing -> return ()
where where
submittedOrder = order { orderState = Submitted } submittedOrder = order { orderState = Submitted }
mockCancelOrder :: IORef MockBrokerState -> OrderId -> IO Bool mockCancelOrder :: IORef MockBrokerState -> OrderId -> IO ()
mockCancelOrder state oid = do mockCancelOrder state oid = do
ors <- orders <$> readIORef state ors <- orders <$> readIORef state
case L.find (\o -> orderId o == oid) ors of case L.find (\o -> orderId o == oid) ors of
Just order -> atomicModifyIORef' state (\s -> (s { cancelledOrders = order : cancelledOrders s}, True)) Just order -> atomicModifyIORef' state (\s -> (s { cancelledOrders = order : cancelledOrders s}, ()))
Nothing -> return False Nothing -> return ()
mockStopBroker :: IORef MockBrokerState -> IO () mockStopBroker :: IORef MockBrokerState -> IO ()
mockStopBroker state = return () mockStopBroker state = return ()
@ -48,11 +49,11 @@ mkMockBroker accs = do
notificationCallback = Nothing notificationCallback = Nothing
} }
return (BrokerInterface { return (BrokerBackend {
accounts = accs, accounts = accs,
setNotificationCallback = \cb -> atomicMapIORef state (\s -> s { notificationCallback = cb }), setNotificationCallback = \cb -> atomicMapIORef state (\s -> s { notificationCallback = cb }),
submitOrder = mockSubmitOrder state, submitOrder = mockSubmitOrder state,
cancelOrder = mockCancelOrder state, cancelOrder = mockCancelOrder state,
stopBroker = mockStopBroker state stop = mockStopBroker state
}, state) }, state)

132
test/TestBrokerServer.hs

@ -1,28 +1,30 @@
{-# LANGUAGE OverloadedStrings, RecordWildCards #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RecordWildCards #-}
module TestBrokerServer ( module TestBrokerServer (
unitTests unitTests
) where ) where
import Test.Tasty import Test.Tasty
import Test.Tasty.HUnit import Test.Tasty.HUnit
import ATrade.Types import ATrade.Broker.Backend
import qualified Data.ByteString as B import ATrade.Broker.Protocol
import qualified Data.ByteString.Lazy as BL import ATrade.Broker.Server
import ATrade.Broker.Server import ATrade.Types
import ATrade.Broker.Protocol import Control.Concurrent hiding (writeChan)
import qualified Data.Text as T import Control.Exception
import Control.Concurrent hiding (writeChan) import Data.Aeson
import Control.Exception import qualified Data.ByteString as B
import System.ZMQ4 import qualified Data.ByteString.Lazy as BL
import Data.Aeson import Data.IORef
import Data.Time.Clock import qualified Data.Text as T
import Data.Time.Calendar import Data.Time.Calendar
import Data.IORef import Data.Time.Clock
import Data.UUID as U import Data.UUID as U
import Data.UUID.V4 as UV4 import Data.UUID.V4 as UV4
import MockBroker import MockBroker
import System.ZMQ4
unitTests :: TestTree unitTests :: TestTree
unitTests = testGroup "Broker.Server" [testBrokerServerStartStop unitTests = testGroup "Broker.Server" [testBrokerServerStartStop
@ -32,8 +34,7 @@ unitTests = testGroup "Broker.Server" [testBrokerServerStartStop
, testBrokerServerCancelUnknownOrder , testBrokerServerCancelUnknownOrder
, testBrokerServerCorruptedPacket , testBrokerServerCorruptedPacket
, testBrokerServerGetNotifications , testBrokerServerGetNotifications
, testBrokerServerDuplicateRequest , testBrokerServerDuplicateRequest ]
, testBrokerServerTradeSink ]
-- --
-- Few helpers -- Few helpers
@ -55,6 +56,7 @@ connectAndSendOrder step sock order ep = do
defaultOrder :: Order defaultOrder :: Order
defaultOrder = mkOrder { defaultOrder = mkOrder {
orderId = 25,
orderAccountId = "demo", orderAccountId = "demo",
orderSecurity = "FOO", orderSecurity = "FOO",
orderPrice = Market, orderPrice = Market,
@ -81,12 +83,12 @@ testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withCont
stopBrokerServer broS) stopBrokerServer broS)
testBrokerServerSubmitOrder :: TestTree testBrokerServerSubmitOrder :: TestTree
testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext $ \ctx -> do
step "Setup" step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint ep <- makeEndpoint
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\_ -> do bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer $ \_ -> do
withSocket ctx Req (\sock -> do withSocket ctx Req $ \sock -> do
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep
step "Checking that order is submitted to BrokerInterface" step "Checking that order is submitted to BrokerInterface"
@ -97,10 +99,8 @@ testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \ste
resp <- decode . BL.fromStrict <$> receive sock resp <- decode . BL.fromStrict <$> receive sock
case resp of case resp of
Just (ResponseOrderSubmitted _) -> return () Just (ResponseOrderSubmitted _) -> return ()
Just _ -> assertFailure "Invalid response" Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response" Nothing -> assertFailure "Invalid response"
)))
testBrokerServerSubmitOrderToUnknownAccount :: TestTree testBrokerServerSubmitOrderToUnknownAccount :: TestTree
testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server returns error if account is unknown" $ testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server returns error if account is unknown" $
@ -116,27 +116,28 @@ testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server retur
resp <- decode . BL.fromStrict <$> receive sock resp <- decode . BL.fromStrict <$> receive sock
case resp of case resp of
Just (ResponseError _) -> return () Just (ResponseError _) -> return ()
Just _ -> assertFailure "Invalid response" Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response" Nothing -> assertFailure "Invalid response"
))) )))
testBrokerServerCancelOrder :: TestTree testBrokerServerCancelOrder :: TestTree
testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order cancellation" $ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order cancellation" $
\step -> withContext (\ctx -> do \step -> withContext $ \ctx -> do
step "Setup" step "Setup"
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\_ -> bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer $ \_ ->
withSocket ctx Req (\sock -> do withSocket ctx Req $ \sock -> do
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock (Just (ResponseOrderSubmitted localOrderId)) <- decode . BL.fromStrict <$> receive sock
localOrderId @=? (orderId defaultOrder)
step "Sending order cancellation request" step "Sending order cancellation request"
send sock [] (BL.toStrict . encode $ RequestCancelOrder 2 "identity" orderId) send sock [] (BL.toStrict . encode $ RequestCancelOrder 2 "identity" localOrderId)
threadDelay 10000 threadDelay 10000
step "Checking that order is cancelled in BrokerInterface" step "Checking that order is cancelled in BrokerBackend"
s <- readIORef broState s <- readIORef broState
(length . cancelledOrders) s @?= 1 (length . cancelledOrders) s @?= 1
@ -144,9 +145,8 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc
resp <- decode . BL.fromStrict <$> receive sock resp <- decode . BL.fromStrict <$> receive sock
case resp of case resp of
Just (ResponseOrderCancelled _) -> return () Just (ResponseOrderCancelled _) -> return ()
Just _ -> assertFailure "Invalid response" Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response" Nothing -> assertFailure "Invalid response"
)))
testBrokerServerCancelUnknownOrder :: TestTree testBrokerServerCancelUnknownOrder :: TestTree
testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancellation: error if order is unknown" $ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancellation: error if order is unknown" $
@ -167,8 +167,8 @@ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancell
resp <- decode . BL.fromStrict <$> receive sock resp <- decode . BL.fromStrict <$> receive sock
case resp of case resp of
Just (ResponseError _) -> return () Just (ResponseError _) -> return ()
Just _ -> assertFailure "Invalid response" Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response" Nothing -> assertFailure "Invalid response"
))) )))
testBrokerServerCorruptedPacket :: TestTree testBrokerServerCorruptedPacket :: TestTree
@ -190,30 +190,33 @@ testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet
resp <- decode . BL.fromStrict <$> receive sock resp <- decode . BL.fromStrict <$> receive sock
case resp of case resp of
Just (ResponseError _) -> return () Just (ResponseError _) -> return ()
Just _ -> assertFailure "Invalid response" Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response" Nothing -> assertFailure "Invalid response"
))) )))
where where
corrupt = B.drop 5 corrupt = B.drop 5
testBrokerServerGetNotifications :: TestTree testBrokerServerGetNotifications :: TestTree
testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications request" $ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications request" $
\step -> withContext (\ctx -> do \step -> withContext $ \ctx -> do
step "Setup" step "Setup"
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\_ -> bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer $ \_ ->
withSocket ctx Req (\sock -> do withSocket ctx Req $ \sock -> do
-- We have to actually submit order, or else server won't know that we should -- We have to actually submit order, or else server won't know that we should
-- be notified about this order -- be notified about this order
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock (Just (ResponseOrderSubmitted localOrderId)) <- decode . BL.fromStrict <$> receive sock
localOrderId @=? orderId defaultOrder
threadDelay 10000 threadDelay 10000
globalOrderId <- orderId . head . orders <$> readIORef broState
(Just cb) <- notificationCallback <$> readIORef broState (Just cb) <- notificationCallback <$> readIORef broState
cb (OrderNotification orderId Executed) cb (BackendOrderNotification globalOrderId Executed)
let trade = Trade { let trade = Trade {
tradeOrderId = orderId, tradeOrderId = globalOrderId,
tradePrice = 19.82, tradePrice = 19.82,
tradeQuantity = 1, tradeQuantity = 1,
tradeVolume = 1982, tradeVolume = 1982,
@ -224,7 +227,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r
tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000, tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000,
tradeCommission = 0, tradeCommission = 0,
tradeSignalId = SignalId "Foo" "bar" "baz" } tradeSignalId = SignalId "Foo" "bar" "baz" }
cb (TradeNotification trade) cb (BackendTradeNotification trade)
step "Sending notifications request" step "Sending notifications request"
send sock [] (BL.toStrict . encode $ RequestNotifications 2 "identity") send sock [] (BL.toStrict . encode $ RequestNotifications 2 "identity")
@ -241,9 +244,10 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r
length ns @=? 3 length ns @=? 3
let (OrderNotification oid newstate) = ns !! 1 let (OrderNotification oid newstate) = ns !! 1
let (TradeNotification newtrade) = ns !! 2 let (TradeNotification newtrade) = ns !! 2
orderId @=? oid localOrderId @=? oid
Executed @=? newstate Executed @=? newstate
trade @=? newtrade trade { tradeOrderId = localOrderId } @=? newtrade
Just _ -> assertFailure "Invalid response" Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response" Nothing -> assertFailure "Invalid response"
@ -258,21 +262,15 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r
0 @=? length ns 0 @=? length ns
Just _ -> assertFailure "Invalid response" Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response" Nothing -> assertFailure "Invalid response"
)))
testBrokerServerDuplicateRequest :: TestTree testBrokerServerDuplicateRequest :: TestTree
testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate request" $ \step -> withContext (\ctx -> do testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate request" $ \step -> withContext $ \ctx -> do
putStrLn "epsilon"
step "Setup" step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint ep <- makeEndpoint
putStrLn "delta" bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer $ \_ -> do
bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\_ -> do withSocket ctx Req $ \sock -> do
putStrLn "gamma"
withSocket ctx Req (\sock -> do
putStrLn "alpha"
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep
putStrLn "beta"
step "Reading response" step "Reading response"
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
@ -289,11 +287,10 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque
resp <- decode . BL.fromStrict <$> receive sock resp <- decode . BL.fromStrict <$> receive sock
case resp of case resp of
Just (ResponseOrderSubmitted oid) -> orderId @?= oid Just (ResponseOrderSubmitted oid) -> orderId @?= oid
Just _ -> assertFailure "Invalid response" Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response" Nothing -> assertFailure "Invalid response"
)))
{-
testBrokerServerTradeSink :: TestTree testBrokerServerTradeSink :: TestTree
testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade sink" $ \step -> withContext (\ctx -> do testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade sink" $ \step -> withContext (\ctx -> do
step "Setup" step "Setup"
@ -319,7 +316,7 @@ testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade
tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000, tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000,
tradeCommission = 0, tradeCommission = 0,
tradeSignalId = SignalId "Foo" "bar" "baz" } tradeSignalId = SignalId "Foo" "bar" "baz" }
cb (TradeNotification trade) cb (BackendTradeNotification trade)
threadDelay 100000 threadDelay 100000
step "Testing" step "Testing"
@ -329,3 +326,4 @@ testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade
trade' @?= trade trade' @?= trade
_ -> assertFailure "Invalid trade in sink" _ -> assertFailure "Invalid trade in sink"
))) )))
-}

Loading…
Cancel
Save