From affc8d2fbeeedae1609248cd79e2508a94d5dc4c Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Fri, 27 Jan 2017 11:36:58 +0700 Subject: [PATCH] Added zeromq4-haskell-zap to dependencies --- .gitignore | 2 ++ libatrade.cabal | 2 ++ src/ATrade/Broker/Client.hs | 17 +++++++---- src/ATrade/Broker/Server.hs | 59 +++++++++++++++++++++---------------- stack.yaml | 1 + test/TestBrokerClient.hs | 13 ++++---- test/TestBrokerServer.hs | 19 ++++++------ 7 files changed, 67 insertions(+), 46 deletions(-) create mode 100644 .gitignore diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..67635a9 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ + +.* diff --git a/libatrade.cabal b/libatrade.cabal index 885a29a..aaf99b5 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -34,6 +34,7 @@ library , BoundedChan , hslogger , zeromq4-haskell + , zeromq4-haskell-zap , unordered-containers , containers , monad-loops @@ -77,6 +78,7 @@ test-suite libatrade-test , BoundedChan , hslogger , zeromq4-haskell + , zeromq4-haskell-zap , bytestring , monad-loops , uuid diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index 41d5066..04981f6 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -26,6 +26,7 @@ import qualified Data.Text as T import qualified Data.ByteString.Lazy as BL import Data.Text.Encoding import System.ZMQ4 +import System.ZMQ4.ZAP import System.Log.Logger import System.Timeout @@ -40,8 +41,8 @@ data BrokerClientHandle = BrokerClientHandle { respVar :: MVar BrokerServerResponse } -brokerClientThread :: Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> IO () -brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cleanup +brokerClientThread :: Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> Maybe (CurveCertificate, CurveCertificate) -> IO () +brokerClientThread ctx ep cmd resp comp killMv maybeCerts = finally brokerClientThread' cleanup where cleanup = infoM "Broker.Client" "Quitting broker client thread" >> putMVar comp () brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do @@ -55,6 +56,12 @@ brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cle else do putMVar resp (ResponseError "Response error")) $ withSocket ctx Req (\sock -> do debugM "Broker.Client" $ "Connecting to: " ++ show (T.unpack ep) + case maybeCerts of + Just (clientCert, serverCert) -> do + zapApplyCertificate clientCert sock + zapSetServerCertificate serverCert sock + Nothing -> return () + connect sock $ T.unpack ep debugM "Broker.Client" $ "Connected" isTimeout <- newIORef False @@ -74,14 +81,14 @@ brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cle isZMQError e = "ZMQError" `L.isPrefixOf` show e -startBrokerClient :: Context -> T.Text -> IO BrokerClientHandle -startBrokerClient ctx endpoint = do +startBrokerClient :: Context -> T.Text -> Maybe (CurveCertificate, CurveCertificate) -> IO BrokerClientHandle +startBrokerClient ctx endpoint maybeCerts = do idCounter <- newIORef 1 compMv <- newEmptyMVar killMv <- newEmptyMVar cmdVar <- newEmptyMVar :: IO (MVar BrokerServerRequest) respVar <- newEmptyMVar :: IO (MVar BrokerServerResponse) - tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv killMv) + tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv killMv maybeCerts) return BrokerClientHandle { tid = tid, diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 447dba8..8f45972 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -9,6 +9,7 @@ module ATrade.Broker.Server ( import ATrade.Types import ATrade.Broker.Protocol import System.ZMQ4 +import System.ZMQ4.ZAP import Data.List.NonEmpty import qualified Data.Map as M import qualified Data.ByteString as B hiding (putStrLn) @@ -54,9 +55,14 @@ data BrokerServerState = BrokerServerState { data BrokerServerHandle = BrokerServerHandle ThreadId ThreadId (MVar ()) (MVar ()) -startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> T.Text -> IO BrokerServerHandle -startBrokerServer brokers c ep tradeSinkEp = do +startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> T.Text -> Maybe CurveCertificate -> IO BrokerServerHandle +startBrokerServer brokers c ep tradeSinkEp maybeCert = do sock <- socket c Router + case maybeCert of + Just cert -> do + setCurveServer True sock + zapApplyCertificate cert sock + Nothing -> return () bind sock (T.unpack ep) tid <- myThreadId compMv <- newEmptyMVar @@ -137,29 +143,31 @@ brokerServerThread state = finally brokerServerThread' cleanup where brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ do sock <- bsSocket <$> readIORef state - msg <- timeout 1000000 $ receiveMulti sock - case msg of - Just [peerId, _, payload] -> - case decode . BL.fromStrict $ payload of - Just request -> do - let sqnum = requestSqnum request - -- Here, we should check if previous packet sequence number is the same - -- If it is, we should resend previous response - lastPackMap <- lastPacket <$> readIORef state - case shouldResend sqnum peerId lastPackMap of - Just response -> sendMessage sock peerId response -- Resend - Nothing -> do - -- Handle incoming request, send response - response <- handleMessage peerId request - sendMessage sock peerId response - -- and store response in case we'll need to resend it - atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)}) - Nothing -> do - -- If we weren't able to parse request, we should send error - -- but shouldn't update lastPacket - let response = ResponseError "Invalid request" - sendMessage sock peerId response - _ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg) + events <- poll 100 [Sock sock [In] Nothing] + unless (null . L.head $ events) $ do + msg <- receiveMulti sock + case msg of + [peerId, _, payload] -> + case decode . BL.fromStrict $ payload of + Just request -> do + let sqnum = requestSqnum request + -- Here, we should check if previous packet sequence number is the same + -- If it is, we should resend previous response + lastPackMap <- lastPacket <$> readIORef state + case shouldResend sqnum peerId lastPackMap of + Just response -> sendMessage sock peerId response -- Resend + Nothing -> do + -- Handle incoming request, send response + response <- handleMessage peerId request + sendMessage sock peerId response + -- and store response in case we'll need to resend it + atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)}) + Nothing -> do + -- If we weren't able to parse request, we should send error + -- but shouldn't update lastPacket + let response = ResponseError "Invalid request" + sendMessage sock peerId response + _ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg) shouldResend sqnum peerId lastPackMap = case M.lookup peerId lastPackMap of Just (lastSqnum, response) -> if sqnum == lastSqnum @@ -219,4 +227,3 @@ stopBrokerServer (BrokerServerHandle tid tstid compMv killMv) = do killThread tstid yield readMVar compMv - diff --git a/stack.yaml b/stack.yaml index 10d69fd..b976803 100644 --- a/stack.yaml +++ b/stack.yaml @@ -37,6 +37,7 @@ resolver: lts-7.7 # will not be run. This is useful for tweaking upstream packages. packages: - '.' +- '../zeromq4-haskell-zap' # Dependency packages to be pulled from upstream that are not in the resolver # (e.g., acme-missiles-0.3) extra-deps: [ "datetime-0.3.1", "hexdump-0.1"] diff --git a/test/TestBrokerClient.hs b/test/TestBrokerClient.hs index 8eaf100..8067df6 100644 --- a/test/TestBrokerClient.hs +++ b/test/TestBrokerClient.hs @@ -25,6 +25,7 @@ import Control.Concurrent.BoundedChan import Control.Concurrent hiding (writeChan) import Control.Exception import System.ZMQ4 +import System.ZMQ4.ZAP import Data.Aeson import Data.Time.Clock import Data.Time.Calendar @@ -56,8 +57,8 @@ defaultOrder = mkOrder { testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext (\ctx -> do ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> - bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do + bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> + bracket (startBrokerClient ctx ep Nothing) stopBrokerClient (\broC -> do oid <- submitOrder broC defaultOrder case oid of Left err -> assertFailure "Invalid response" @@ -66,8 +67,8 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" $ withContext (\ctx -> do ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> - bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do + bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> + bracket (startBrokerClient ctx ep Nothing) stopBrokerClient (\broC -> do maybeOid <- submitOrder broC defaultOrder case maybeOid of Left err -> assertFailure "Invalid response" @@ -81,8 +82,8 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" testBrokerClientGetNotifications = testCase "Broker client: get notifications" $ withContext (\ctx -> do ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> - bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do + bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> + bracket (startBrokerClient ctx ep Nothing) stopBrokerClient (\broC -> do maybeOid <- submitOrder broC defaultOrder case maybeOid of Left err -> assertFailure "Invalid response" diff --git a/test/TestBrokerServer.hs b/test/TestBrokerServer.hs index e942ccb..e0c0175 100644 --- a/test/TestBrokerServer.hs +++ b/test/TestBrokerServer.hs @@ -23,6 +23,7 @@ import Control.Concurrent.BoundedChan import Control.Concurrent hiding (writeChan) import Control.Exception import System.ZMQ4 +import System.ZMQ4.ZAP import Data.Aeson import Data.Time.Clock import Data.Time.Calendar @@ -73,14 +74,14 @@ defaultOrder = mkOrder { testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do ep <- toText <$> UV4.nextRandom - broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) "" + broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) "" Nothing stopBrokerServer broS) testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do step "Setup" (mockBroker, broState) <- mkMockBroker ["demo"] ep <- makeEndpoint - bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> do withSocket ctx Req (\sock -> do connectAndSendOrder step sock defaultOrder ep @@ -102,7 +103,7 @@ testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server retur step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock (defaultOrder { orderAccountId = "foobar" }) ep @@ -120,7 +121,7 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock defaultOrder ep (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock @@ -146,7 +147,7 @@ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancell step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock defaultOrder ep receive sock @@ -168,7 +169,7 @@ testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do step "Connecting" connect sock (T.unpack ep) @@ -192,7 +193,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r step "Setup" ep <- makeEndpoint (mockBroker, broState) <- mkMockBroker ["demo"] - bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do -- We have to actually submit order, or else server won't know that we should -- be notified about this order @@ -253,7 +254,7 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque step "Setup" (mockBroker, broState) <- mkMockBroker ["demo"] ep <- makeEndpoint - bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS -> + bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do connectAndSendOrder step sock defaultOrder ep @@ -284,7 +285,7 @@ testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade withSocket ctx Rep (\tradeSock -> do bind tradeSock "inproc://trade-sink" setReceiveTimeout (restrict 1000) tradeSock - bracket (startBrokerServer [mockBroker] ctx ep "inproc://trade-sink") stopBrokerServer (\broS -> do + bracket (startBrokerServer [mockBroker] ctx ep "inproc://trade-sink" Nothing) stopBrokerServer (\broS -> do withSocket ctx Req (\sock -> do step "Connecting" connectAndSendOrder step sock defaultOrder ep