From 2408c49dc8f9eccc56b62eec37c12d732363964e Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 11 Jul 2021 12:38:52 +0700 Subject: [PATCH] BrokerServer: create notification socket --- src/ATrade/Broker/Server.hs | 48 +++++++++++++--------- test/TestBrokerClient.hs | 79 +++++++++++++++++++------------------ test/TestBrokerServer.hs | 50 +++++++++++------------ 3 files changed, 95 insertions(+), 82 deletions(-) diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 3cc0146..6ba66b9 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -40,42 +40,51 @@ data FullOrderId = FullOrderId ClientIdentity OrderId deriving (Show, Eq, Ord) data BrokerServerState = BrokerServerState { - bsSocket :: Socket Router, - orderToBroker :: M.Map FullOrderId BrokerBackend, - orderMap :: BM.Bimap FullOrderId OrderId, - lastPacket :: M.Map PeerId (RequestSqnum, BrokerServerResponse), - pendingNotifications :: M.Map ClientIdentity [Notification], - notificationSqnum :: M.Map ClientIdentity NotificationSqnum, - brokers :: [BrokerBackend], - completionMvar :: MVar (), - killMvar :: MVar (), - orderIdCounter :: OrderId, - tradeSink :: BoundedChan Trade + bsSocket :: Socket Router, + bsNotificationsSocket :: Socket Pub, + orderToBroker :: M.Map FullOrderId BrokerBackend, + orderMap :: BM.Bimap FullOrderId OrderId, + lastPacket :: M.Map PeerId (RequestSqnum, BrokerServerResponse), + pendingNotifications :: M.Map ClientIdentity [Notification], + notificationSqnum :: M.Map ClientIdentity NotificationSqnum, + brokers :: [BrokerBackend], + completionMvar :: MVar (), + killMvar :: MVar (), + orderIdCounter :: OrderId, + tradeSink :: BoundedChan Trade } data BrokerServerHandle = BrokerServerHandle ThreadId ThreadId (MVar ()) (MVar ()) type TradeSink = Trade -> IO () -startBrokerServer :: [BrokerBackend] -> Context -> T.Text -> [TradeSink] -> ServerSecurityParams -> IO BrokerServerHandle -startBrokerServer brokers c ep tradeSinks params = do +startBrokerServer :: [BrokerBackend] -> Context -> T.Text -> T.Text -> [TradeSink] -> ServerSecurityParams -> IO BrokerServerHandle +startBrokerServer brokers c ep notificationsEp tradeSinks params = do sock <- socket c Router + notificationsSock <- socket c Pub setLinger (restrict 0) sock + setLinger (restrict 0) notificationsSock case sspDomain params of - Just domain -> setZapDomain (restrict $ E.encodeUtf8 domain) sock + Just domain -> do + setZapDomain (restrict $ E.encodeUtf8 domain) sock + setZapDomain (restrict $ E.encodeUtf8 domain) notificationsSock Nothing -> return () case sspCertificate params of Just cert -> do setCurveServer True sock zapApplyCertificate cert sock + setCurveServer True notificationsSock + zapApplyCertificate cert notificationsSock Nothing -> return () bind sock (T.unpack ep) + bind notificationsSock (T.unpack notificationsEp) tid <- myThreadId compMv <- newEmptyMVar killMv <- newEmptyMVar tsChan <- newBoundedChan 100 state <- newIORef BrokerServerState { bsSocket = sock, + bsNotificationsSocket = notificationsSock, orderMap = BM.empty, orderToBroker = M.empty, lastPacket = M.empty, @@ -111,10 +120,13 @@ notificationCallback state n = do Nothing -> warningM "Broker.Server" "Notification: unknown order" where - addNotification clientIdentity n = atomicMapIORef state (\s -> - case M.lookup clientIdentity . pendingNotifications $ s of - Just ns -> s { pendingNotifications = M.insert clientIdentity (n : ns) (pendingNotifications s)} - Nothing -> s { pendingNotifications = M.insert clientIdentity [n] (pendingNotifications s)}) + addNotification clientIdentity n = do + atomicMapIORef state (\s -> + case M.lookup clientIdentity . pendingNotifications $ s of + Just ns -> s { pendingNotifications = M.insert clientIdentity (n : ns) (pendingNotifications s)} + Nothing -> s { pendingNotifications = M.insert clientIdentity [n] (pendingNotifications s)}) + sock <- bsNotificationsSocket <$> readIORef state + sendMulti sock (E.encodeUtf8 clientIdentity :| [BL.toStrict $ encode n]) tradeSinkHandler :: Context -> IORef BrokerServerState -> [TradeSink] -> IO () tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $ diff --git a/test/TestBrokerClient.hs b/test/TestBrokerClient.hs index 9841de5..cbf682e 100644 --- a/test/TestBrokerClient.hs +++ b/test/TestBrokerClient.hs @@ -5,36 +5,37 @@ module TestBrokerClient ( unitTests ) where -import Test.Tasty -import Test.Tasty.SmallCheck as SC -import Test.Tasty.QuickCheck as QC -import Test.Tasty.HUnit +import Test.Tasty +import Test.Tasty.HUnit +import Test.Tasty.QuickCheck as QC +import Test.Tasty.SmallCheck as SC -import ATrade.Types -import qualified Data.ByteString as B -import qualified Data.ByteString.Lazy as BL -import ATrade.Broker.Client -import ATrade.Broker.Server hiding (submitOrder, cancelOrder) -import ATrade.Broker.Protocol -import ATrade.Util -import qualified Data.Text as T -import Control.Monad -import Control.Monad.Loops -import Control.Concurrent.MVar -import Control.Concurrent.BoundedChan -import Control.Concurrent hiding (writeChan) -import Control.Exception -import System.ZMQ4 -import System.ZMQ4.ZAP -import Data.Aeson -import Data.Time.Clock -import Data.Time.Calendar -import Data.Maybe -import qualified Data.List as L -import Data.IORef -import Data.UUID as U -import Data.UUID.V4 as UV4 -import MockBroker +import ATrade.Broker.Client +import ATrade.Broker.Protocol +import ATrade.Broker.Server hiding (cancelOrder, + submitOrder) +import ATrade.Types +import ATrade.Util +import Control.Concurrent hiding (writeChan) +import Control.Concurrent.BoundedChan +import Control.Concurrent.MVar +import Control.Exception +import Control.Monad +import Control.Monad.Loops +import Data.Aeson +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL +import Data.IORef +import qualified Data.List as L +import Data.Maybe +import qualified Data.Text as T +import Data.Time.Calendar +import Data.Time.Clock +import Data.UUID as U +import Data.UUID.V4 as UV4 +import MockBroker +import System.ZMQ4 +import System.ZMQ4.ZAP unitTests = testGroup "Broker.Client" [ testBrokerClientStartStop @@ -42,9 +43,9 @@ unitTests = testGroup "Broker.Client" [ , testBrokerClientGetNotifications ] -makeEndpoint = do +makeEndpoints = do uid <- toText <$> UV4.nextRandom - return $ "inproc://brokerserver" `T.append` uid + return ("inproc://brokerserver-" `T.append` uid, "inproc://brokerserver-notifications-" `T.append` uid) defaultOrder = mkOrder { orderAccountId = "demo", @@ -55,19 +56,19 @@ defaultOrder = mkOrder { } testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext (\ctx -> do - ep <- makeEndpoint + (ep, notifEp) <- makeEndpoints (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer (\broS -> bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do oid <- submitOrder broC defaultOrder case oid of Left err -> assertFailure "Invalid response" - Right _ -> return ()))) + Right _ -> return ()))) testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" $ withContext (\ctx -> do - ep <- makeEndpoint + (ep, notifEp) <- makeEndpoints (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer (\broS -> bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do maybeOid <- submitOrder broC defaultOrder case maybeOid of @@ -76,13 +77,13 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" rc <- cancelOrder broC oid case rc of Left err -> assertFailure "Invalid response" - Right _ -> return() + Right _ -> return() ))) testBrokerClientGetNotifications = testCase "Broker client: get notifications" $ withContext (\ctx -> do - ep <- makeEndpoint + (ep, notifEp) <- makeEndpoints (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer (\broS -> bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do maybeOid <- submitOrder broC defaultOrder case maybeOid of diff --git a/test/TestBrokerServer.hs b/test/TestBrokerServer.hs index e6e8b55..4a4303e 100644 --- a/test/TestBrokerServer.hs +++ b/test/TestBrokerServer.hs @@ -44,10 +44,10 @@ unitTests = testGroup "Broker.Server" [testBrokerServerStartStop -- Few helpers -- -makeEndpoint :: IO T.Text -makeEndpoint = do +makeEndpoints :: IO (T.Text, T.Text) +makeEndpoints = do uid <- toText <$> UV4.nextRandom - return $ "inproc://brokerserver" `T.append` uid + return ("inproc://brokerserver-" `T.append` uid, "inproc://brokerserver-notifications-" `T.append` uid) connectAndSendOrder :: (Sender a) => (String -> IO ()) -> Socket a -> Order -> T.Text -> IO () connectAndSendOrder step sock order ep = do @@ -91,16 +91,16 @@ makeTestTradeSink = do testBrokerServerStartStop :: TestTree testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do - ep <- toText <$> UV4.nextRandom - broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) [] defaultServerSecurityParams + (ep, notifEp) <- makeEndpoints + broS <- startBrokerServer [] ctx ep notifEp [] defaultServerSecurityParams stopBrokerServer broS) testBrokerServerSubmitOrder :: TestTree testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext $ \ctx -> do step "Setup" (mockBroker, broState) <- mkMockBroker ["demo"] - ep <- makeEndpoint - bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer $ \_ -> do + (ep, notifEp) <- makeEndpoints + bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer $ \_ -> do withSocket ctx Req $ \sock -> do connectAndSendOrder step sock defaultOrder ep @@ -119,10 +119,10 @@ testBrokerServerSubmitOrderDifferentIdentities :: TestTree testBrokerServerSubmitOrderDifferentIdentities = testCaseSteps "Broker Server submits order: different identities" $ \step -> withContext $ \ctx -> do step "Setup" (mockBroker, broState) <- mkMockBroker ["demo"] - ep <- makeEndpoint + (ep, notifEp) <- makeEndpoints let orderId1 = 42 let orderId2 = 76 - bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer $ \_ -> do + bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer $ \_ -> do withSocket ctx Req $ \sock1 -> do withSocket ctx Req $ \sock2 -> do connectAndSendOrderWithIdentity step sock1 defaultOrder {orderId = orderId1} "identity1" ep @@ -150,9 +150,9 @@ testBrokerServerSubmitOrderToUnknownAccount :: TestTree testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server returns error if account is unknown" $ \step -> withContext (\ctx -> do step "Setup" - ep <- makeEndpoint + (ep, notifEp) <- makeEndpoints (mockBroker, _) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\_ -> + bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer (\_ -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock (defaultOrder { orderAccountId = "foobar" }) ep @@ -169,9 +169,9 @@ testBrokerServerCancelOrder :: TestTree testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order cancellation" $ \step -> withContext $ \ctx -> do step "Setup" - ep <- makeEndpoint + (ep, notifEp) <- makeEndpoints (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer $ \_ -> + bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer $ \_ -> withSocket ctx Req $ \sock -> do connectAndSendOrder step sock defaultOrder ep (Just (ResponseOrderSubmitted localOrderId)) <- decode . BL.fromStrict <$> receive sock @@ -196,9 +196,9 @@ testBrokerServerCancelUnknownOrder :: TestTree testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancellation: error if order is unknown" $ \step -> withContext (\ctx -> do step "Setup" - ep <- makeEndpoint + (ep, notifEp) <- makeEndpoints (mockBroker, _) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\_ -> + bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer (\_ -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock defaultOrder ep receive sock @@ -219,9 +219,9 @@ testBrokerServerCorruptedPacket :: TestTree testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet" $ \step -> withContext (\ctx -> do step "Setup" - ep <- makeEndpoint + (ep, notifEp) <- makeEndpoints (mockBroker, _) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer (\_ -> + bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer (\_ -> withSocket ctx Req (\sock -> do step "Connecting" connect sock (T.unpack ep) @@ -244,9 +244,9 @@ testBrokerServerGetNotifications :: TestTree testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications request" $ \step -> withContext $ \ctx -> do step "Setup" - ep <- makeEndpoint + (ep, notifEp) <- makeEndpoints (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer $ \_ -> + bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer $ \_ -> withSocket ctx Req $ \sock -> do -- We have to actually submit order, or else server won't know that we should -- be notified about this order @@ -304,9 +304,9 @@ testBrokerServerGetNotificationsFromSameSqnum :: TestTree testBrokerServerGetNotificationsFromSameSqnum = testCaseSteps "Broker Server: notifications request, twice from same sqnum" $ \step -> withContext $ \ctx -> do step "Setup" - ep <- makeEndpoint + (ep, notifEp) <- makeEndpoints (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer $ \_ -> + bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer $ \_ -> withSocket ctx Req $ \sock -> do connectAndSendOrder step sock defaultOrder ep (Just (ResponseOrderSubmitted localOrderId)) <- decode . BL.fromStrict <$> receive sock @@ -363,9 +363,9 @@ testBrokerServerGetNotificationsRemovesEarlierNotifications :: TestTree testBrokerServerGetNotificationsRemovesEarlierNotifications = testCaseSteps "Broker Server: notifications request removes earlier notifications" $ \step -> withContext $ \ctx -> do step "Setup" - ep <- makeEndpoint + (ep, notifEp) <- makeEndpoints (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer $ \_ -> + bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer $ \_ -> withSocket ctx Req $ \sock -> do connectAndSendOrder step sock defaultOrder ep (Just (ResponseOrderSubmitted localOrderId)) <- decode . BL.fromStrict <$> receive sock @@ -418,8 +418,8 @@ testBrokerServerDuplicateRequest :: TestTree testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate request" $ \step -> withContext $ \ctx -> do step "Setup" (mockBroker, broState) <- mkMockBroker ["demo"] - ep <- makeEndpoint - bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer $ \_ -> do + (ep, notifEp) <- makeEndpoints + bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer $ \_ -> do withSocket ctx Req $ \sock -> do connectAndSendOrder step sock defaultOrder ep