Browse Source

Refactoring

master
Denis Tereshkin 9 years ago
parent
commit
3a702302e5
  1. 19
      src/ATrade/Broker/Client.hs
  2. 9
      src/ATrade/Broker/Server.hs
  3. 30
      src/ATrade/Types.hs
  4. 12
      test/TestBrokerClient.hs
  5. 18
      test/TestBrokerServer.hs

19
src/ATrade/Broker/Client.hs

@ -41,8 +41,8 @@ data BrokerClientHandle = BrokerClientHandle {
respVar :: MVar BrokerServerResponse respVar :: MVar BrokerServerResponse
} }
brokerClientThread :: Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> Maybe (CurveCertificate, CurveCertificate) -> IO () brokerClientThread :: Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> ClientSecurityParams -> IO ()
brokerClientThread ctx ep cmd resp comp killMv maybeCerts = finally brokerClientThread' cleanup brokerClientThread ctx ep cmd resp comp killMv secParams = finally brokerClientThread' cleanup
where where
cleanup = infoM "Broker.Client" "Quitting broker client thread" >> putMVar comp () cleanup = infoM "Broker.Client" "Quitting broker client thread" >> putMVar comp ()
brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do
@ -56,10 +56,11 @@ brokerClientThread ctx ep cmd resp comp killMv maybeCerts = finally brokerClient
else do else do
putMVar resp (ResponseError "Response error")) $ withSocket ctx Req (\sock -> do putMVar resp (ResponseError "Response error")) $ withSocket ctx Req (\sock -> do
debugM "Broker.Client" $ "Connecting to: " ++ show (T.unpack ep) debugM "Broker.Client" $ "Connecting to: " ++ show (T.unpack ep)
case maybeCerts of case cspCertificate secParams of
Just (clientCert, serverCert) -> do Just clientCert -> zapApplyCertificate clientCert sock
zapApplyCertificate clientCert sock Nothing -> return ()
zapSetServerCertificate serverCert sock case cspServerCertificate secParams of
Just serverCert -> zapSetServerCertificate serverCert sock
Nothing -> return () Nothing -> return ()
connect sock $ T.unpack ep connect sock $ T.unpack ep
@ -81,14 +82,14 @@ brokerClientThread ctx ep cmd resp comp killMv maybeCerts = finally brokerClient
isZMQError e = "ZMQError" `L.isPrefixOf` show e isZMQError e = "ZMQError" `L.isPrefixOf` show e
startBrokerClient :: Context -> T.Text -> Maybe (CurveCertificate, CurveCertificate) -> IO BrokerClientHandle startBrokerClient :: Context -> T.Text -> ClientSecurityParams -> IO BrokerClientHandle
startBrokerClient ctx endpoint maybeCerts = do startBrokerClient ctx endpoint secParams = do
idCounter <- newIORef 1 idCounter <- newIORef 1
compMv <- newEmptyMVar compMv <- newEmptyMVar
killMv <- newEmptyMVar killMv <- newEmptyMVar
cmdVar <- newEmptyMVar :: IO (MVar BrokerServerRequest) cmdVar <- newEmptyMVar :: IO (MVar BrokerServerRequest)
respVar <- newEmptyMVar :: IO (MVar BrokerServerResponse) respVar <- newEmptyMVar :: IO (MVar BrokerServerResponse)
tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv killMv maybeCerts) tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv killMv secParams)
return BrokerClientHandle { return BrokerClientHandle {
tid = tid, tid = tid,

9
src/ATrade/Broker/Server.hs

@ -55,10 +55,13 @@ data BrokerServerState = BrokerServerState {
data BrokerServerHandle = BrokerServerHandle ThreadId ThreadId (MVar ()) (MVar ()) data BrokerServerHandle = BrokerServerHandle ThreadId ThreadId (MVar ()) (MVar ())
startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> T.Text -> Maybe CurveCertificate -> IO BrokerServerHandle startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> T.Text -> ServerSecurityParams -> IO BrokerServerHandle
startBrokerServer brokers c ep tradeSinkEp maybeCert = do startBrokerServer brokers c ep tradeSinkEp params = do
sock <- socket c Router sock <- socket c Router
case maybeCert of case sspDomain params of
Just domain -> setZapDomain domain sock
Nothing -> return ()
case sspCertificate params of
Just cert -> do Just cert -> do
setCurveServer True sock setCurveServer True sock
zapApplyCertificate cert sock zapApplyCertificate cert sock

30
src/ATrade/Types.hs

@ -16,7 +16,11 @@ module ATrade.Types (
Order(..), Order(..),
mkOrder, mkOrder,
Trade(..), Trade(..),
OrderId(..) OrderId(..),
ServerSecurityParams(..),
defaultServerSecurityParams,
ClientSecurityParams(..),
defaultClientSecurityParams
) where ) where
import Control.Monad import Control.Monad
@ -37,6 +41,8 @@ import Data.Time.Clock
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Data.Word import Data.Word
import System.ZMQ4.ZAP
type TickerId = T.Text type TickerId = T.Text
data DataType = Unknown data DataType = Unknown
@ -345,3 +351,25 @@ instance ToJSON Trade where
"security" .= tradeSecurity trade, "security" .= tradeSecurity trade,
"execution-time" .= tradeTimestamp trade, "execution-time" .= tradeTimestamp trade,
"signal-id" .= tradeSignalId trade] "signal-id" .= tradeSignalId trade]
data ServerSecurityParams = ServerSecurityParams {
sspDomain :: Maybe T.Text,
sspCertificate :: Maybe CurveCertificate
} deriving (Show, Eq)
defaultServerSecurityParams = ServerSecurityParams {
sspDomain = Nothing,
sspCertificate = Nothing
}
data ClientSecurityParams = ClientSecurityParams {
cspDomain :: Maybe T.Text,
cspCertificate :: Maybe CurveCertificate,
cspServerCertificate :: Maybe CurveCertificate
} deriving (Show, Eq)
defaultClientSecurityParams = ClientSecurityParams {
cspCertificate = Nothing,
cspServerCertificate = Nothing
}

12
test/TestBrokerClient.hs

@ -57,8 +57,8 @@ defaultOrder = mkOrder {
testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext (\ctx -> do testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext (\ctx -> do
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep Nothing) stopBrokerClient (\broC -> do bracket (startBrokerClient ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do
oid <- submitOrder broC defaultOrder oid <- submitOrder broC defaultOrder
case oid of case oid of
Left err -> assertFailure "Invalid response" Left err -> assertFailure "Invalid response"
@ -67,8 +67,8 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext
testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" $ withContext (\ctx -> do testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" $ withContext (\ctx -> do
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep Nothing) stopBrokerClient (\broC -> do bracket (startBrokerClient ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do
maybeOid <- submitOrder broC defaultOrder maybeOid <- submitOrder broC defaultOrder
case maybeOid of case maybeOid of
Left err -> assertFailure "Invalid response" Left err -> assertFailure "Invalid response"
@ -82,8 +82,8 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order"
testBrokerClientGetNotifications = testCase "Broker client: get notifications" $ withContext (\ctx -> do testBrokerClientGetNotifications = testCase "Broker client: get notifications" $ withContext (\ctx -> do
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep Nothing) stopBrokerClient (\broC -> do bracket (startBrokerClient ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do
maybeOid <- submitOrder broC defaultOrder maybeOid <- submitOrder broC defaultOrder
case maybeOid of case maybeOid of
Left err -> assertFailure "Invalid response" Left err -> assertFailure "Invalid response"

18
test/TestBrokerServer.hs

@ -74,14 +74,14 @@ defaultOrder = mkOrder {
testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do
ep <- toText <$> UV4.nextRandom ep <- toText <$> UV4.nextRandom
broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) "" Nothing broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) "" defaultServerSecurityParams
stopBrokerServer broS) stopBrokerServer broS)
testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do
step "Setup" step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint ep <- makeEndpoint
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> do bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> do
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep
@ -103,7 +103,7 @@ testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server retur
step "Setup" step "Setup"
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
connectAndSendOrder step sock (defaultOrder { orderAccountId = "foobar" }) ep connectAndSendOrder step sock (defaultOrder { orderAccountId = "foobar" }) ep
@ -121,7 +121,7 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc
step "Setup" step "Setup"
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
@ -147,7 +147,7 @@ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancell
step "Setup" step "Setup"
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep
receive sock receive sock
@ -169,7 +169,7 @@ testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet
step "Setup" step "Setup"
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
step "Connecting" step "Connecting"
connect sock (T.unpack ep) connect sock (T.unpack ep)
@ -193,7 +193,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r
step "Setup" step "Setup"
ep <- makeEndpoint ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
-- We have to actually submit order, or else server won't know that we should -- We have to actually submit order, or else server won't know that we should
-- be notified about this order -- be notified about this order
@ -254,7 +254,7 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque
step "Setup" step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint ep <- makeEndpoint
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep
@ -285,7 +285,7 @@ testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade
withSocket ctx Rep (\tradeSock -> do withSocket ctx Rep (\tradeSock -> do
bind tradeSock "inproc://trade-sink" bind tradeSock "inproc://trade-sink"
setReceiveTimeout (restrict 1000) tradeSock setReceiveTimeout (restrict 1000) tradeSock
bracket (startBrokerServer [mockBroker] ctx ep "inproc://trade-sink" Nothing) stopBrokerServer (\broS -> do bracket (startBrokerServer [mockBroker] ctx ep "inproc://trade-sink" defaultServerSecurityParams) stopBrokerServer (\broS -> do
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
step "Connecting" step "Connecting"
connectAndSendOrder step sock defaultOrder ep connectAndSendOrder step sock defaultOrder ep

Loading…
Cancel
Save