Browse Source

BrokerClient,BrokerServer: better connection loss handling

master
Denis Tereshkin 9 years ago
parent
commit
f933215583
  1. 13
      src/ATrade/Broker/Client.hs
  2. 5
      src/ATrade/Broker/Server.hs
  3. 6
      test/TestBrokerClient.hs
  4. 7
      test/TestBrokerServer.hs

13
src/ATrade/Broker/Client.hs

@ -23,6 +23,7 @@ import Data.Maybe @@ -23,6 +23,7 @@ import Data.Maybe
import Data.List.NonEmpty
import qualified Data.List as L
import qualified Data.Text as T
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import Data.Text.Encoding
import System.ZMQ4
@ -41,8 +42,8 @@ data BrokerClientHandle = BrokerClientHandle { @@ -41,8 +42,8 @@ data BrokerClientHandle = BrokerClientHandle {
respVar :: MVar BrokerServerResponse
}
brokerClientThread :: Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> ClientSecurityParams -> IO ()
brokerClientThread ctx ep cmd resp comp killMv secParams = finally brokerClientThread' cleanup
brokerClientThread :: B.ByteString -> Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> ClientSecurityParams -> IO ()
brokerClientThread socketIdentity ctx ep cmd resp comp killMv secParams = finally brokerClientThread' cleanup
where
cleanup = infoM "Broker.Client" "Quitting broker client thread" >> putMVar comp ()
brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do
@ -56,6 +57,7 @@ brokerClientThread ctx ep cmd resp comp killMv secParams = finally brokerClientT @@ -56,6 +57,7 @@ brokerClientThread ctx ep cmd resp comp killMv secParams = finally brokerClientT
else do
putMVar resp (ResponseError "Response error")) $ withSocket ctx Req (\sock -> do
setLinger (restrict 0) sock
setIdentity (restrict socketIdentity) sock
debugM "Broker.Client" $ "Connecting to: " ++ show (T.unpack ep)
case cspCertificate secParams of
Just clientCert -> zapApplyCertificate clientCert sock
@ -82,15 +84,14 @@ brokerClientThread ctx ep cmd resp comp killMv secParams = finally brokerClientT @@ -82,15 +84,14 @@ brokerClientThread ctx ep cmd resp comp killMv secParams = finally brokerClientT
threadDelay 1000000)
isZMQError e = "ZMQError" `L.isPrefixOf` show e
startBrokerClient :: Context -> T.Text -> ClientSecurityParams -> IO BrokerClientHandle
startBrokerClient ctx endpoint secParams = do
startBrokerClient :: B.ByteString -> Context -> T.Text -> ClientSecurityParams -> IO BrokerClientHandle
startBrokerClient socketIdentity ctx endpoint secParams = do
idCounter <- newIORef 1
compMv <- newEmptyMVar
killMv <- newEmptyMVar
cmdVar <- newEmptyMVar :: IO (MVar BrokerServerRequest)
respVar <- newEmptyMVar :: IO (MVar BrokerServerResponse)
tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv killMv secParams)
tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar respVar compMv killMv secParams)
return BrokerClientHandle {
tid = tid,

5
src/ATrade/Broker/Server.hs

@ -167,7 +167,10 @@ brokerServerThread state = finally brokerServerThread' cleanup @@ -167,7 +167,10 @@ brokerServerThread state = finally brokerServerThread' cleanup
-- If it is, we should resend previous response
lastPackMap <- lastPacket <$> readIORef state
case shouldResend sqnum peerId lastPackMap of
Just response -> sendMessage sock peerId response -- Resend
Just response -> do
sendMessage sock peerId response -- Resend
_ <- receiveMulti sock
return ()
Nothing -> do
-- Handle incoming request, send response
response <- handleMessage peerId request

6
test/TestBrokerClient.hs

@ -58,7 +58,7 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext @@ -58,7 +58,7 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do
bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do
oid <- submitOrder broC defaultOrder
case oid of
Left err -> assertFailure "Invalid response"
@ -68,7 +68,7 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" @@ -68,7 +68,7 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do
bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do
maybeOid <- submitOrder broC defaultOrder
case maybeOid of
Left err -> assertFailure "Invalid response"
@ -83,7 +83,7 @@ testBrokerClientGetNotifications = testCase "Broker client: get notifications" $ @@ -83,7 +83,7 @@ testBrokerClientGetNotifications = testCase "Broker client: get notifications" $
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do
bracket (startBrokerClient "foo" ctx ep defaultClientSecurityParams) stopBrokerClient (\broC -> do
maybeOid <- submitOrder broC defaultOrder
case maybeOid of
Left err -> assertFailure "Invalid response"

7
test/TestBrokerServer.hs

@ -251,12 +251,17 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r @@ -251,12 +251,17 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r
)))
testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate request" $ \step -> withContext (\ctx -> do
putStrLn "epsilon"
step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS ->
putStrLn "delta"
bracket (startBrokerServer [mockBroker] ctx ep "" defaultServerSecurityParams) stopBrokerServer (\broS -> do
putStrLn "gamma"
withSocket ctx Req (\sock -> do
putStrLn "alpha"
connectAndSendOrder step sock defaultOrder ep
putStrLn "beta"
step "Reading response"
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock

Loading…
Cancel
Save