diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index edcace2..1917882 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -23,6 +23,7 @@ import Data.Maybe import Data.List.NonEmpty import qualified Data.List as L import qualified Data.Text as T +import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as BL import Data.Text.Encoding import System.ZMQ4 @@ -41,8 +42,8 @@ data BrokerClientHandle = BrokerClientHandle { respVar :: MVar BrokerServerResponse } -brokerClientThread :: Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> ClientSecurityParams -> IO () -brokerClientThread ctx ep cmd resp comp killMv secParams = finally brokerClientThread' cleanup +brokerClientThread :: B.ByteString -> Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> ClientSecurityParams -> IO () +brokerClientThread socketIdentity ctx ep cmd resp comp killMv secParams = finally brokerClientThread' cleanup where cleanup = infoM "Broker.Client" "Quitting broker client thread" >> putMVar comp () brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do @@ -56,6 +57,7 @@ brokerClientThread ctx ep cmd resp comp killMv secParams = finally brokerClientT else do putMVar resp (ResponseError "Response error")) $ withSocket ctx Req (\sock -> do setLinger (restrict 0) sock + setIdentity (restrict socketIdentity) sock debugM "Broker.Client" $ "Connecting to: " ++ show (T.unpack ep) case cspCertificate secParams of Just clientCert -> zapApplyCertificate clientCert sock @@ -82,15 +84,14 @@ brokerClientThread ctx ep cmd resp comp killMv secParams = finally brokerClientT threadDelay 1000000) isZMQError e = "ZMQError" `L.isPrefixOf` show e - -startBrokerClient :: Context -> T.Text -> ClientSecurityParams -> IO BrokerClientHandle -startBrokerClient ctx endpoint secParams = do +startBrokerClient :: B.ByteString -> Context -> T.Text -> ClientSecurityParams -> IO BrokerClientHandle +startBrokerClient socketIdentity ctx endpoint secParams = do idCounter <- newIORef 1 compMv <- newEmptyMVar killMv <- newEmptyMVar cmdVar <- newEmptyMVar :: IO (MVar BrokerServerRequest) respVar <- newEmptyMVar :: IO (MVar BrokerServerResponse) - tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv killMv secParams) + tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar respVar compMv killMv secParams) return BrokerClientHandle { tid = tid, diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index a59b40b..aec9d21 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -167,7 +167,10 @@ brokerServerThread state = finally brokerServerThread' cleanup -- If it is, we should resend previous response lastPackMap <- lastPacket <$> readIORef state case shouldResend sqnum peerId lastPackMap of - Just response -> sendMessage sock peerId response -- Resend + Just response -> do + sendMessage sock peerId response -- Resend + _ <- receiveMulti sock + return () Nothing -> do -- Handle incoming request, send response response <- handleMessage peerId request diff --git a/test/TestBrokerClient.hs b/test/TestBrokerClient.hs index c2dcc71..37c0940 100644 --- a/test/TestBrokerClient.hs +++ b/test/TestBrokerClient.hs @@ -58,7 +58,7 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> - bracket (startBrokerClient ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do + bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do oid <- submitOrder broC defaultOrder case oid of Left err -> assertFailure "Invalid response" @@ -68,7 +68,7 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> - bracket (startBrokerClient ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do + bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do maybeOid <- submitOrder broC defaultOrder case maybeOid of Left err -> assertFailure "Invalid response" @@ -83,7 +83,7 @@ testBrokerClientGetNotifications = testCase "Broker client: get notifications" $ ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> - bracket (startBrokerClient ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do + bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do maybeOid <- submitOrder broC defaultOrder case maybeOid of Left err -> assertFailure "Invalid response" diff --git a/test/TestBrokerServer.hs b/test/TestBrokerServer.hs index cee895e..e245901 100644 --- a/test/TestBrokerServer.hs +++ b/test/TestBrokerServer.hs @@ -251,12 +251,17 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r ))) testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate request" $ \step -> withContext (\ctx -> do + putStrLn "epsilon" step "Setup" (mockBroker, broState) <- mkMockBroker ["demo"] ep <- makeEndpoint - bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> + putStrLn "delta" + bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> do + putStrLn "gamma" withSocket ctx Req (\sock -> do + putStrLn "alpha" connectAndSendOrder step sock defaultOrder ep + putStrLn "beta" step "Reading response" (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock