Browse Source

Added zeromq4-haskell-zap to dependencies

master
Denis Tereshkin 9 years ago
parent
commit
affc8d2fbe
  1. 2
      .gitignore
  2. 2
      libatrade.cabal
  3. 17
      src/ATrade/Broker/Client.hs
  4. 59
      src/ATrade/Broker/Server.hs
  5. 1
      stack.yaml
  6. 13
      test/TestBrokerClient.hs
  7. 19
      test/TestBrokerServer.hs

2
.gitignore vendored

@ -0,0 +1,2 @@ @@ -0,0 +1,2 @@
.*

2
libatrade.cabal

@ -34,6 +34,7 @@ library @@ -34,6 +34,7 @@ library
, BoundedChan
, hslogger
, zeromq4-haskell
, zeromq4-haskell-zap
, unordered-containers
, containers
, monad-loops
@ -77,6 +78,7 @@ test-suite libatrade-test @@ -77,6 +78,7 @@ test-suite libatrade-test
, BoundedChan
, hslogger
, zeromq4-haskell
, zeromq4-haskell-zap
, bytestring
, monad-loops
, uuid

17
src/ATrade/Broker/Client.hs

@ -26,6 +26,7 @@ import qualified Data.Text as T @@ -26,6 +26,7 @@ import qualified Data.Text as T
import qualified Data.ByteString.Lazy as BL
import Data.Text.Encoding
import System.ZMQ4
import System.ZMQ4.ZAP
import System.Log.Logger
import System.Timeout
@ -40,8 +41,8 @@ data BrokerClientHandle = BrokerClientHandle { @@ -40,8 +41,8 @@ data BrokerClientHandle = BrokerClientHandle {
respVar :: MVar BrokerServerResponse
}
brokerClientThread :: Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> IO ()
brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cleanup
brokerClientThread :: Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> Maybe (CurveCertificate, CurveCertificate) -> IO ()
brokerClientThread ctx ep cmd resp comp killMv maybeCerts = finally brokerClientThread' cleanup
where
cleanup = infoM "Broker.Client" "Quitting broker client thread" >> putMVar comp ()
brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do
@ -55,6 +56,12 @@ brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cle @@ -55,6 +56,12 @@ brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cle
else do
putMVar resp (ResponseError "Response error")) $ withSocket ctx Req (\sock -> do
debugM "Broker.Client" $ "Connecting to: " ++ show (T.unpack ep)
case maybeCerts of
Just (clientCert, serverCert) -> do
zapApplyCertificate clientCert sock
zapSetServerCertificate serverCert sock
Nothing -> return ()
connect sock $ T.unpack ep
debugM "Broker.Client" $ "Connected"
isTimeout <- newIORef False
@ -74,14 +81,14 @@ brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cle @@ -74,14 +81,14 @@ brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cle
isZMQError e = "ZMQError" `L.isPrefixOf` show e
startBrokerClient :: Context -> T.Text -> IO BrokerClientHandle
startBrokerClient ctx endpoint = do
startBrokerClient :: Context -> T.Text -> Maybe (CurveCertificate, CurveCertificate) -> IO BrokerClientHandle
startBrokerClient ctx endpoint maybeCerts = do
idCounter <- newIORef 1
compMv <- newEmptyMVar
killMv <- newEmptyMVar
cmdVar <- newEmptyMVar :: IO (MVar BrokerServerRequest)
respVar <- newEmptyMVar :: IO (MVar BrokerServerResponse)
tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv killMv)
tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv killMv maybeCerts)
return BrokerClientHandle {
tid = tid,

59
src/ATrade/Broker/Server.hs

@ -9,6 +9,7 @@ module ATrade.Broker.Server ( @@ -9,6 +9,7 @@ module ATrade.Broker.Server (
import ATrade.Types
import ATrade.Broker.Protocol
import System.ZMQ4
import System.ZMQ4.ZAP
import Data.List.NonEmpty
import qualified Data.Map as M
import qualified Data.ByteString as B hiding (putStrLn)
@ -54,9 +55,14 @@ data BrokerServerState = BrokerServerState { @@ -54,9 +55,14 @@ data BrokerServerState = BrokerServerState {
data BrokerServerHandle = BrokerServerHandle ThreadId ThreadId (MVar ()) (MVar ())
startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> T.Text -> IO BrokerServerHandle
startBrokerServer brokers c ep tradeSinkEp = do
startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> T.Text -> Maybe CurveCertificate -> IO BrokerServerHandle
startBrokerServer brokers c ep tradeSinkEp maybeCert = do
sock <- socket c Router
case maybeCert of
Just cert -> do
setCurveServer True sock
zapApplyCertificate cert sock
Nothing -> return ()
bind sock (T.unpack ep)
tid <- myThreadId
compMv <- newEmptyMVar
@ -137,29 +143,31 @@ brokerServerThread state = finally brokerServerThread' cleanup @@ -137,29 +143,31 @@ brokerServerThread state = finally brokerServerThread' cleanup
where
brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ do
sock <- bsSocket <$> readIORef state
msg <- timeout 1000000 $ receiveMulti sock
case msg of
Just [peerId, _, payload] ->
case decode . BL.fromStrict $ payload of
Just request -> do
let sqnum = requestSqnum request
-- Here, we should check if previous packet sequence number is the same
-- If it is, we should resend previous response
lastPackMap <- lastPacket <$> readIORef state
case shouldResend sqnum peerId lastPackMap of
Just response -> sendMessage sock peerId response -- Resend
Nothing -> do
-- Handle incoming request, send response
response <- handleMessage peerId request
sendMessage sock peerId response
-- and store response in case we'll need to resend it
atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)})
Nothing -> do
-- If we weren't able to parse request, we should send error
-- but shouldn't update lastPacket
let response = ResponseError "Invalid request"
sendMessage sock peerId response
_ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg)
events <- poll 100 [Sock sock [In] Nothing]
unless (null . L.head $ events) $ do
msg <- receiveMulti sock
case msg of
[peerId, _, payload] ->
case decode . BL.fromStrict $ payload of
Just request -> do
let sqnum = requestSqnum request
-- Here, we should check if previous packet sequence number is the same
-- If it is, we should resend previous response
lastPackMap <- lastPacket <$> readIORef state
case shouldResend sqnum peerId lastPackMap of
Just response -> sendMessage sock peerId response -- Resend
Nothing -> do
-- Handle incoming request, send response
response <- handleMessage peerId request
sendMessage sock peerId response
-- and store response in case we'll need to resend it
atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)})
Nothing -> do
-- If we weren't able to parse request, we should send error
-- but shouldn't update lastPacket
let response = ResponseError "Invalid request"
sendMessage sock peerId response
_ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg)
shouldResend sqnum peerId lastPackMap = case M.lookup peerId lastPackMap of
Just (lastSqnum, response) -> if sqnum == lastSqnum
@ -219,4 +227,3 @@ stopBrokerServer (BrokerServerHandle tid tstid compMv killMv) = do @@ -219,4 +227,3 @@ stopBrokerServer (BrokerServerHandle tid tstid compMv killMv) = do
killThread tstid
yield
readMVar compMv

1
stack.yaml

@ -37,6 +37,7 @@ resolver: lts-7.7 @@ -37,6 +37,7 @@ resolver: lts-7.7
# will not be run. This is useful for tweaking upstream packages.
packages:
- '.'
- '../zeromq4-haskell-zap'
# Dependency packages to be pulled from upstream that are not in the resolver
# (e.g., acme-missiles-0.3)
extra-deps: [ "datetime-0.3.1", "hexdump-0.1"]

13
test/TestBrokerClient.hs

@ -25,6 +25,7 @@ import Control.Concurrent.BoundedChan @@ -25,6 +25,7 @@ import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (writeChan)
import Control.Exception
import System.ZMQ4
import System.ZMQ4.ZAP
import Data.Aeson
import Data.Time.Clock
import Data.Time.Calendar
@ -56,8 +57,8 @@ defaultOrder = mkOrder { @@ -56,8 +57,8 @@ defaultOrder = mkOrder {
testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext (\ctx -> do
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep Nothing) stopBrokerClient (\broC -> do
oid <- submitOrder broC defaultOrder
case oid of
Left err -> assertFailure "Invalid response"
@ -66,8 +67,8 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext @@ -66,8 +67,8 @@ testBrokerClientStartStop = testCase "Broker client: submit order" $ withContext
testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" $ withContext (\ctx -> do
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep Nothing) stopBrokerClient (\broC -> do
maybeOid <- submitOrder broC defaultOrder
case maybeOid of
Left err -> assertFailure "Invalid response"
@ -81,8 +82,8 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" @@ -81,8 +82,8 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order"
testBrokerClientGetNotifications = testCase "Broker client: get notifications" $ withContext (\ctx -> do
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep) stopBrokerClient (\broC -> do
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS ->
bracket (startBrokerClient ctx ep Nothing) stopBrokerClient (\broC -> do
maybeOid <- submitOrder broC defaultOrder
case maybeOid of
Left err -> assertFailure "Invalid response"

19
test/TestBrokerServer.hs

@ -23,6 +23,7 @@ import Control.Concurrent.BoundedChan @@ -23,6 +23,7 @@ import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (writeChan)
import Control.Exception
import System.ZMQ4
import System.ZMQ4.ZAP
import Data.Aeson
import Data.Time.Clock
import Data.Time.Calendar
@ -73,14 +74,14 @@ defaultOrder = mkOrder { @@ -73,14 +74,14 @@ defaultOrder = mkOrder {
testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do
ep <- toText <$> UV4.nextRandom
broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) ""
broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) "" Nothing
stopBrokerServer broS)
testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do
step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS -> do
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep
@ -102,7 +103,7 @@ testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server retur @@ -102,7 +103,7 @@ testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server retur
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock (defaultOrder { orderAccountId = "foobar" }) ep
@ -120,7 +121,7 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc @@ -120,7 +121,7 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
@ -146,7 +147,7 @@ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancell @@ -146,7 +147,7 @@ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancell
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep
receive sock
@ -168,7 +169,7 @@ testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet @@ -168,7 +169,7 @@ testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
step "Connecting"
connect sock (T.unpack ep)
@ -192,7 +193,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r @@ -192,7 +193,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
-- We have to actually submit order, or else server won't know that we should
-- be notified about this order
@ -253,7 +254,7 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque @@ -253,7 +254,7 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque
step "Setup"
(mockBroker, broState) <- mkMockBroker ["demo"]
ep <- makeEndpoint
bracket (startBrokerServer [mockBroker] ctx ep "") stopBrokerServer (\broS ->
bracket (startBrokerServer [mockBroker] ctx ep "" Nothing) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock defaultOrder ep
@ -284,7 +285,7 @@ testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade @@ -284,7 +285,7 @@ testBrokerServerTradeSink = testCaseSteps "Broker Server: sends trades to trade
withSocket ctx Rep (\tradeSock -> do
bind tradeSock "inproc://trade-sink"
setReceiveTimeout (restrict 1000) tradeSock
bracket (startBrokerServer [mockBroker] ctx ep "inproc://trade-sink") stopBrokerServer (\broS -> do
bracket (startBrokerServer [mockBroker] ctx ep "inproc://trade-sink" Nothing) stopBrokerServer (\broS -> do
withSocket ctx Req (\sock -> do
step "Connecting"
connectAndSendOrder step sock defaultOrder ep

Loading…
Cancel
Save