diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 38e1253..b43e663 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE OverloadedStrings #-} module ATrade.Broker.Server ( startBrokerServer, @@ -34,6 +35,7 @@ data BrokerInterface = BrokerInterface { data BrokerServerState = BrokerServerState { bsSocket :: Socket Router, + orderToBroker :: M.Map OrderId BrokerInterface, orderMap :: M.Map OrderId B.ByteString, -- Matches 0mq client identities with corresponding orders lastPacket :: M.Map B.ByteString (RequestSqnum, B.ByteString), pendingNotifications :: [(Notification, UTCTime)], -- List of tuples (Order with new state, Time when notification enqueued) @@ -53,6 +55,7 @@ startBrokerServer brokers c ep = do state <- newIORef BrokerServerState { bsSocket = sock, orderMap = M.empty, + orderToBroker = M.empty, lastPacket = M.empty, pendingNotifications = [], brokers = brokers, @@ -79,23 +82,32 @@ brokerServerThread state = finally brokerServerThread' cleanup bros <- brokers <$> readIORef state case decode . BL.fromStrict $ payload of Just (RequestSubmitOrder sqnum order) -> - case findBroker (orderAccountId order) bros of + case findBrokerForAccount (orderAccountId order) bros of Just bro -> do oid <- nextOrderId submitOrder bro order { orderId = oid } + atomicModifyIORef' state (\s -> (s { orderToBroker = M.insert oid bro (orderToBroker s)}, ())) return (peerId, ResponseOrderSubmitted oid) - Nothing -> error "foobar" + Nothing -> return (peerId, ResponseError "Unknown account") + Just (RequestCancelOrder sqnum oid) -> do + m <- orderToBroker <$> readIORef state + case M.lookup oid m of + Just bro -> do + cancelOrder bro oid + return (peerId, ResponseOrderCancelled oid) + Nothing -> return (peerId, ResponseError "Unknown order") + Just _ -> return (peerId, ResponseError "Not implemented") Nothing -> error "foobar" handleMessage x = do warningM "Broker.Server" ("Invalid packet received: " ++ show x) error "foobar" sendMessage sock (peerId, resp) = sendMulti sock (peerId :| [B.empty, BL.toStrict . encode $ resp]) - - findBroker account = L.find (L.elem account . accounts) + + findBrokerForAccount account = L.find (L.elem account . accounts) nextOrderId = atomicModifyIORef' state (\s -> ( s {orderIdCounter = 1 + orderIdCounter s}, orderIdCounter s)) - + stopBrokerServer :: BrokerServerHandle -> IO () stopBrokerServer (BrokerServerHandle tid compMv) = yield >> killThread tid >> readMVar compMv diff --git a/test/TestBrokerServer.hs b/test/TestBrokerServer.hs index 55aa977..c817807 100644 --- a/test/TestBrokerServer.hs +++ b/test/TestBrokerServer.hs @@ -25,12 +25,14 @@ import Data.Aeson import Data.Time.Clock import Data.Time.Calendar import Data.Maybe +import qualified Data.List as L import Data.IORef import Data.UUID as U import Data.UUID.V4 as UV4 data MockBrokerState = MockBrokerState { orders :: [Order], + cancelledOrders :: [Order], notificationCallback :: Maybe (Notification -> IO ()) } @@ -45,7 +47,11 @@ mockSubmitOrder state order = do submittedOrder = order { orderState = Submitted } mockCancelOrder :: IORef MockBrokerState -> OrderId -> IO () -mockCancelOrder state = undefined +mockCancelOrder state oid = do + ors <- orders <$> readIORef state + case L.find (\o -> orderId o == oid) ors of + Just order -> atomicModifyIORef' state (\s -> (s { cancelledOrders = order : cancelledOrders s}, ())) + Nothing -> return () mockStopBroker :: IORef MockBrokerState -> IO () mockStopBroker state = return () @@ -54,6 +60,7 @@ mockStopBroker state = return () mkMockBroker accs = do state <- newIORef MockBrokerState { orders = [], + cancelledOrders = [], notificationCallback = Nothing } @@ -66,18 +73,32 @@ mkMockBroker accs = do }, state) -unitTests = testGroup "Broker.Server" [testBrokerServerStartStop, testBrokerServerSubmitOrder] +unitTests = testGroup "Broker.Server" [testBrokerServerStartStop + , testBrokerServerSubmitOrder + , testBrokerServerSubmitOrderToUnknownAccount + , testBrokerServerCancelOrder ] testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do ep <- toText <$> UV4.nextRandom broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) stopBrokerServer broS) +makeEndpoint = do + uid <- toText <$> UV4.nextRandom + return $ "inproc://brokerserver" `T.append` uid + +connectAndSendOrder step sock order ep = do + step "Connecting" + connect sock (T.unpack ep) + + step "Sending request" + send sock [] (BL.toStrict . encode $ RequestSubmitOrder 1 order) + threadDelay 10000 + testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do step "Setup" - uid <- toText <$> UV4.nextRandom (mockBroker, broState) <- mkMockBroker ["demo"] - let ep = "inproc://brokerserver" `T.append` uid + ep <- makeEndpoint let order = mkOrder { orderAccountId = "demo", orderSecurity = "FOO", @@ -87,12 +108,7 @@ testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \ste } bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> withSocket ctx Req (\sock -> do - step "Connecting" - connect sock (T.unpack ep) - - step "Sending request" - send sock [] (BL.toStrict . encode $ RequestSubmitOrder 1 order) - threadDelay 10000 + connectAndSendOrder step sock order ep step "Checking that order is submitted to BrokerInterface" s <- readIORef broState @@ -106,3 +122,59 @@ testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \ste ))) +testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server returns error if account is unknown" $ + \step -> withContext (\ctx -> do + step "Setup" + ep <- makeEndpoint + (mockBroker, broState) <- mkMockBroker ["demo"] + let order = mkOrder { + orderAccountId = "foobar", + orderSecurity = "FOO", + orderPrice = Market, + orderQuantity = 10, + orderOperation = Buy + } + bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + withSocket ctx Req (\sock -> do + connectAndSendOrder step sock order ep + + step "Reading response" + resp <- decode . BL.fromStrict <$> receive sock + case resp of + Just (ResponseError _) -> return () + Just _ -> assertFailure "Invalid response" + Nothing -> assertFailure "Invalid response" + + ))) + +testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order cancellation" $ + \step -> withContext (\ctx -> do + step "Setup" + ep <- makeEndpoint + (mockBroker, broState) <- mkMockBroker ["demo"] + let order = mkOrder { + orderAccountId = "demo", + orderSecurity = "FOO", + orderPrice = Market, + orderQuantity = 10, + orderOperation = Buy + } + bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + withSocket ctx Req (\sock -> do + connectAndSendOrder step sock order ep + (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock + + step "Sending order cancellation request" + send sock [] (BL.toStrict . encode $ RequestCancelOrder 2 orderId) + threadDelay 10000 + + step "Checking that order is cancelled in BrokerInterface" + s <- readIORef broState + (length . cancelledOrders) s @?= 1 + + step "Reading response" + resp <- decode . BL.fromStrict <$> receive sock + case resp of + Just (ResponseOrderCancelled _) -> return () + Nothing -> assertFailure "Invalid response" + )))