diff --git a/libatrade.cabal b/libatrade.cabal index 0c9ea96..78cd0cd 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -74,6 +74,7 @@ test-suite libatrade-test ghc-options: -threaded -rtsopts -with-rtsopts=-N -Wincomplete-patterns default-language: Haskell2010 other-modules: ArbitraryInstances + , MockBroker , TestBrokerProtocol , TestBrokerServer , TestQuoteSourceServer diff --git a/src/ATrade/Broker/Protocol.hs b/src/ATrade/Broker/Protocol.hs index 006ed7b..db2c511 100644 --- a/src/ATrade/Broker/Protocol.hs +++ b/src/ATrade/Broker/Protocol.hs @@ -5,7 +5,8 @@ module ATrade.Broker.Protocol ( BrokerServerResponse(..), Notification(..), notificationOrderId, - RequestSqnum(..) + RequestSqnum(..), + requestSqnum ) where import qualified Data.HashMap.Strict as HM @@ -22,6 +23,11 @@ data BrokerServerRequest = RequestSubmitOrder RequestSqnum Order | RequestNotifications RequestSqnum deriving (Eq, Show) +requestSqnum :: BrokerServerRequest -> RequestSqnum +requestSqnum (RequestSubmitOrder sqnum _) = sqnum +requestSqnum (RequestCancelOrder sqnum _) = sqnum +requestSqnum (RequestNotifications sqnum) = sqnum + instance FromJSON BrokerServerRequest where parseJSON = withObject "object" (\obj -> do sqnum <- obj .: "request-sqnum" diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 452d117..76cd2aa 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -39,7 +39,7 @@ data BrokerServerState = BrokerServerState { bsSocket :: Socket Router, orderToBroker :: M.Map OrderId BrokerInterface, orderMap :: M.Map OrderId PeerId, -- Matches 0mq client identities with corresponding orders - lastPacket :: M.Map PeerId (RequestSqnum, B.ByteString), + lastPacket :: M.Map PeerId (RequestSqnum, BrokerServerResponse), pendingNotifications :: M.Map PeerId [Notification], brokers :: [BrokerInterface], completionMvar :: MVar (), @@ -87,20 +87,45 @@ brokerServerThread state = finally brokerServerThread' cleanup sock <- bsSocket <$> readIORef state msg <- receiveMulti sock case msg of - [peerId, _, payload] -> handleMessage peerId payload >>= sendMessage sock peerId + [peerId, _, payload] -> + case decode . BL.fromStrict $ payload of + Just request -> do + let sqnum = requestSqnum request + -- Here, we should check if previous packet sequence number is the same + -- If it is, we should resend previous response + lastPackMap <- lastPacket <$> readIORef state + case shouldResend sqnum peerId lastPackMap of + Just response -> sendMessage sock peerId response -- Resend + Nothing -> do + -- Handle incoming request, send response + response <- handleMessage peerId request + sendMessage sock peerId response + -- and store response in case we'll need to resend it + atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)}) + Nothing -> do + -- If we weren't able to parse request, we should send error + -- but shouldn't update lastPacket + let response = ResponseError "Invalid request" + sendMessage sock peerId response _ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg) + shouldResend sqnum peerId lastPackMap = case M.lookup peerId lastPackMap of + Just (lastSqnum, response) -> if sqnum == lastSqnum + then Just response + else Nothing + Nothing -> Nothing + cleanup = do sock <- bsSocket <$> readIORef state close sock mv <- completionMvar <$> readIORef state putMVar mv () - handleMessage :: B.ByteString -> B.ByteString -> IO BrokerServerResponse - handleMessage peerId payload = do + handleMessage :: PeerId -> BrokerServerRequest -> IO BrokerServerResponse + handleMessage peerId request = do bros <- brokers <$> readIORef state - case decode . BL.fromStrict $ payload of - Just (RequestSubmitOrder sqnum order) -> + case request of + RequestSubmitOrder sqnum order -> case findBrokerForAccount (orderAccountId order) bros of Just bro -> do oid <- nextOrderId @@ -111,21 +136,20 @@ brokerServerThread state = finally brokerServerThread' cleanup return $ ResponseOrderSubmitted oid Nothing -> return $ ResponseError "Unknown account" - Just (RequestCancelOrder sqnum oid) -> do + RequestCancelOrder sqnum oid -> do m <- orderToBroker <$> readIORef state case M.lookup oid m of Just bro -> do cancelOrder bro oid return $ ResponseOrderCancelled oid Nothing -> return $ ResponseError "Unknown order" - Just (RequestNotifications sqnum) -> do + RequestNotifications sqnum -> do maybeNs <- M.lookup peerId . pendingNotifications <$> readIORef state case maybeNs of Just ns -> do atomicMapIORef state (\s -> s { pendingNotifications = M.insert peerId [] (pendingNotifications s)}) return $ ResponseNotifications ns Nothing -> return $ ResponseNotifications [] - Nothing -> return $ ResponseError "Unable to parse request" sendMessage sock peerId resp = sendMulti sock (peerId :| [B.empty, BL.toStrict . encode $ resp]) diff --git a/test/MockBroker.hs b/test/MockBroker.hs new file mode 100644 index 0000000..a40b77b --- /dev/null +++ b/test/MockBroker.hs @@ -0,0 +1,58 @@ + +module MockBroker ( + MockBrokerState(..), + mockSubmitOrder, + mockCancelOrder, + mockStopBroker, + mkMockBroker +) where + +import ATrade.Types +import ATrade.Broker.Protocol +import ATrade.Broker.Server +import ATrade.Util +import Data.IORef +import qualified Data.List as L + +data MockBrokerState = MockBrokerState { + orders :: [Order], + cancelledOrders :: [Order], + notificationCallback :: Maybe (Notification -> IO ()) +} + +mockSubmitOrder :: IORef MockBrokerState -> Order -> IO () +mockSubmitOrder state order = do + atomicMapIORef state (\s -> s { orders = submittedOrder : orders s }) + maybeCb <- notificationCallback <$> readIORef state + case maybeCb of + Just cb -> cb $ OrderNotification (orderId order) Submitted + Nothing -> return () + where + submittedOrder = order { orderState = Submitted } + +mockCancelOrder :: IORef MockBrokerState -> OrderId -> IO Bool +mockCancelOrder state oid = do + ors <- orders <$> readIORef state + case L.find (\o -> orderId o == oid) ors of + Just order -> atomicModifyIORef' state (\s -> (s { cancelledOrders = order : cancelledOrders s}, True)) + Nothing -> return False + +mockStopBroker :: IORef MockBrokerState -> IO () +mockStopBroker state = return () + + +mkMockBroker accs = do + state <- newIORef MockBrokerState { + orders = [], + cancelledOrders = [], + notificationCallback = Nothing + } + + return (BrokerInterface { + accounts = accs, + setNotificationCallback = \cb -> atomicMapIORef state (\s -> s { notificationCallback = cb }), + submitOrder = mockSubmitOrder state, + cancelOrder = mockCancelOrder state, + stopBroker = mockStopBroker state + }, state) + diff --git a/test/TestBrokerServer.hs b/test/TestBrokerServer.hs index da48b4e..7cc23a8 100644 --- a/test/TestBrokerServer.hs +++ b/test/TestBrokerServer.hs @@ -31,49 +31,7 @@ import qualified Data.List as L import Data.IORef import Data.UUID as U import Data.UUID.V4 as UV4 - -data MockBrokerState = MockBrokerState { - orders :: [Order], - cancelledOrders :: [Order], - notificationCallback :: Maybe (Notification -> IO ()) -} - -mockSubmitOrder :: IORef MockBrokerState -> Order -> IO () -mockSubmitOrder state order = do - atomicMapIORef state (\s -> s { orders = submittedOrder : orders s }) - maybeCb <- notificationCallback <$> readIORef state - case maybeCb of - Just cb -> cb $ OrderNotification (orderId order) Submitted - Nothing -> return () - where - submittedOrder = order { orderState = Submitted } - -mockCancelOrder :: IORef MockBrokerState -> OrderId -> IO Bool -mockCancelOrder state oid = do - ors <- orders <$> readIORef state - case L.find (\o -> orderId o == oid) ors of - Just order -> atomicModifyIORef' state (\s -> (s { cancelledOrders = order : cancelledOrders s}, True)) - Nothing -> return False - -mockStopBroker :: IORef MockBrokerState -> IO () -mockStopBroker state = return () - - -mkMockBroker accs = do - state <- newIORef MockBrokerState { - orders = [], - cancelledOrders = [], - notificationCallback = Nothing - } - - return (BrokerInterface { - accounts = accs, - setNotificationCallback = \cb -> atomicMapIORef state (\s -> s { notificationCallback = cb }), - submitOrder = mockSubmitOrder state, - cancelOrder = mockCancelOrder state, - stopBroker = mockStopBroker state - }, state) - +import MockBroker unitTests = testGroup "Broker.Server" [testBrokerServerStartStop , testBrokerServerSubmitOrder @@ -81,12 +39,12 @@ unitTests = testGroup "Broker.Server" [testBrokerServerStartStop , testBrokerServerCancelOrder , testBrokerServerCancelUnknownOrder , testBrokerServerCorruptedPacket - , testBrokerServerGetNotifications ] + , testBrokerServerGetNotifications + , testBrokerServerDuplicateRequest ] -testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do - ep <- toText <$> UV4.nextRandom - broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) - stopBrokerServer broS) +-- +-- Few helpers +-- makeEndpoint = do uid <- toText <$> UV4.nextRandom @@ -108,6 +66,14 @@ defaultOrder = mkOrder { orderOperation = Buy } +-- +-- Tests +-- + +testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do + ep <- toText <$> UV4.nextRandom + broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) + stopBrokerServer broS) testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do step "Setup" @@ -252,6 +218,10 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r send sock [] (BL.toStrict . encode $ RequestNotifications 2) threadDelay 10000 + -- We should obtain 3 notifications: + -- 1. When order became Submitted (from Unsubmitted) + -- 2. When order became Executed (our first notificationCallback call) + -- 3. Corresponding Trade notificatiot (our second notificationCallback call) step "Reading response" resp <- decode . BL.fromStrict <$> receive sock case resp of @@ -260,8 +230,8 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r let (TradeNotification newtrade) = head ns let (OrderNotification oid newstate) = ns !! 1 orderId @=? oid - Executed @=? newstate - trade @=? newtrade + Executed @=? newstate + trade @=? newtrade Just _ -> assertFailure "Invalid response" Nothing -> assertFailure "Invalid response" @@ -277,3 +247,31 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r Just _ -> assertFailure "Invalid response" Nothing -> assertFailure "Invalid response" ))) + +testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate request" $ \step -> withContext (\ctx -> do + step "Setup" + (mockBroker, broState) <- mkMockBroker ["demo"] + ep <- makeEndpoint + bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + withSocket ctx Req (\sock -> do + connectAndSendOrder step sock defaultOrder ep + + step "Reading response" + (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock + + step "Sending duplicate request (with same sequence number)" + send sock [] (BL.toStrict . encode $ RequestSubmitOrder 1 defaultOrder) + threadDelay 10000 + + step "Checking that only one order is submitted" + s <- readIORef broState + (length . orders) s @?= 1 + + step "Reading response from duplicate request" + resp <- decode . BL.fromStrict <$> receive sock + case resp of + Just (ResponseOrderSubmitted oid) -> orderId @?= oid + Just _ -> assertFailure "Invalid response" + Nothing -> assertFailure "Invalid response" + + )))