Browse Source

BrokerServer: order cancellation

master
Denis Tereshkin 9 years ago
parent
commit
03242a30cc
  1. 22
      src/ATrade/Broker/Server.hs
  2. 92
      test/TestBrokerServer.hs

22
src/ATrade/Broker/Server.hs

@ -1,3 +1,4 @@
{-# LANGUAGE OverloadedStrings #-}
module ATrade.Broker.Server ( module ATrade.Broker.Server (
startBrokerServer, startBrokerServer,
@ -34,6 +35,7 @@ data BrokerInterface = BrokerInterface {
data BrokerServerState = BrokerServerState { data BrokerServerState = BrokerServerState {
bsSocket :: Socket Router, bsSocket :: Socket Router,
orderToBroker :: M.Map OrderId BrokerInterface,
orderMap :: M.Map OrderId B.ByteString, -- Matches 0mq client identities with corresponding orders orderMap :: M.Map OrderId B.ByteString, -- Matches 0mq client identities with corresponding orders
lastPacket :: M.Map B.ByteString (RequestSqnum, B.ByteString), lastPacket :: M.Map B.ByteString (RequestSqnum, B.ByteString),
pendingNotifications :: [(Notification, UTCTime)], -- List of tuples (Order with new state, Time when notification enqueued) pendingNotifications :: [(Notification, UTCTime)], -- List of tuples (Order with new state, Time when notification enqueued)
@ -53,6 +55,7 @@ startBrokerServer brokers c ep = do
state <- newIORef BrokerServerState { state <- newIORef BrokerServerState {
bsSocket = sock, bsSocket = sock,
orderMap = M.empty, orderMap = M.empty,
orderToBroker = M.empty,
lastPacket = M.empty, lastPacket = M.empty,
pendingNotifications = [], pendingNotifications = [],
brokers = brokers, brokers = brokers,
@ -79,23 +82,32 @@ brokerServerThread state = finally brokerServerThread' cleanup
bros <- brokers <$> readIORef state bros <- brokers <$> readIORef state
case decode . BL.fromStrict $ payload of case decode . BL.fromStrict $ payload of
Just (RequestSubmitOrder sqnum order) -> Just (RequestSubmitOrder sqnum order) ->
case findBroker (orderAccountId order) bros of case findBrokerForAccount (orderAccountId order) bros of
Just bro -> do Just bro -> do
oid <- nextOrderId oid <- nextOrderId
submitOrder bro order { orderId = oid } submitOrder bro order { orderId = oid }
atomicModifyIORef' state (\s -> (s { orderToBroker = M.insert oid bro (orderToBroker s)}, ()))
return (peerId, ResponseOrderSubmitted oid) return (peerId, ResponseOrderSubmitted oid)
Nothing -> error "foobar" Nothing -> return (peerId, ResponseError "Unknown account")
Just (RequestCancelOrder sqnum oid) -> do
m <- orderToBroker <$> readIORef state
case M.lookup oid m of
Just bro -> do
cancelOrder bro oid
return (peerId, ResponseOrderCancelled oid)
Nothing -> return (peerId, ResponseError "Unknown order")
Just _ -> return (peerId, ResponseError "Not implemented")
Nothing -> error "foobar" Nothing -> error "foobar"
handleMessage x = do handleMessage x = do
warningM "Broker.Server" ("Invalid packet received: " ++ show x) warningM "Broker.Server" ("Invalid packet received: " ++ show x)
error "foobar" error "foobar"
sendMessage sock (peerId, resp) = sendMulti sock (peerId :| [B.empty, BL.toStrict . encode $ resp]) sendMessage sock (peerId, resp) = sendMulti sock (peerId :| [B.empty, BL.toStrict . encode $ resp])
findBroker account = L.find (L.elem account . accounts) findBrokerForAccount account = L.find (L.elem account . accounts)
nextOrderId = atomicModifyIORef' state (\s -> ( s {orderIdCounter = 1 + orderIdCounter s}, orderIdCounter s)) nextOrderId = atomicModifyIORef' state (\s -> ( s {orderIdCounter = 1 + orderIdCounter s}, orderIdCounter s))
stopBrokerServer :: BrokerServerHandle -> IO () stopBrokerServer :: BrokerServerHandle -> IO ()
stopBrokerServer (BrokerServerHandle tid compMv) = yield >> killThread tid >> readMVar compMv stopBrokerServer (BrokerServerHandle tid compMv) = yield >> killThread tid >> readMVar compMv

92
test/TestBrokerServer.hs

@ -25,12 +25,14 @@ import Data.Aeson
import Data.Time.Clock import Data.Time.Clock
import Data.Time.Calendar import Data.Time.Calendar
import Data.Maybe import Data.Maybe
import qualified Data.List as L
import Data.IORef import Data.IORef
import Data.UUID as U import Data.UUID as U
import Data.UUID.V4 as UV4 import Data.UUID.V4 as UV4
data MockBrokerState = MockBrokerState { data MockBrokerState = MockBrokerState {
orders :: [Order], orders :: [Order],
cancelledOrders :: [Order],
notificationCallback :: Maybe (Notification -> IO ()) notificationCallback :: Maybe (Notification -> IO ())
} }
@ -45,7 +47,11 @@ mockSubmitOrder state order = do
submittedOrder = order { orderState = Submitted } submittedOrder = order { orderState = Submitted }
mockCancelOrder :: IORef MockBrokerState -> OrderId -> IO () mockCancelOrder :: IORef MockBrokerState -> OrderId -> IO ()
mockCancelOrder state = undefined mockCancelOrder state oid = do
ors <- orders <$> readIORef state
case L.find (\o -> orderId o == oid) ors of
Just order -> atomicModifyIORef' state (\s -> (s { cancelledOrders = order : cancelledOrders s}, ()))
Nothing -> return ()
mockStopBroker :: IORef MockBrokerState -> IO () mockStopBroker :: IORef MockBrokerState -> IO ()
mockStopBroker state = return () mockStopBroker state = return ()
@ -54,6 +60,7 @@ mockStopBroker state = return ()
mkMockBroker accs = do mkMockBroker accs = do
state <- newIORef MockBrokerState { state <- newIORef MockBrokerState {
orders = [], orders = [],
cancelledOrders = [],
notificationCallback = Nothing notificationCallback = Nothing
} }
@ -66,18 +73,32 @@ mkMockBroker accs = do
}, state) }, state)
unitTests = testGroup "Broker.Server" [testBrokerServerStartStop, testBrokerServerSubmitOrder] unitTests = testGroup "Broker.Server" [testBrokerServerStartStop
, testBrokerServerSubmitOrder
, testBrokerServerSubmitOrderToUnknownAccount
, testBrokerServerCancelOrder ]
testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do
ep <- toText <$> UV4.nextRandom ep <- toText <$> UV4.nextRandom
broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep) broS <- startBrokerServer [] ctx ("inproc://brokerserver" `T.append` ep)
stopBrokerServer broS) stopBrokerServer broS)
makeEndpoint = do
uid <- toText <$> UV4.nextRandom
return $ "inproc://brokerserver" `T.append` uid
connectAndSendOrder step sock order ep = do
step "Connecting"
connect sock (T.unpack ep)
step "Sending request"
send sock [] (BL.toStrict . encode $ RequestSubmitOrder 1 order)
threadDelay 10000
testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \step -> withContext (\ctx -> do
step "Setup" step "Setup"
uid <- toText <$> UV4.nextRandom
(mockBroker, broState) <- mkMockBroker ["demo"] (mockBroker, broState) <- mkMockBroker ["demo"]
let ep = "inproc://brokerserver" `T.append` uid ep <- makeEndpoint
let order = mkOrder { let order = mkOrder {
orderAccountId = "demo", orderAccountId = "demo",
orderSecurity = "FOO", orderSecurity = "FOO",
@ -87,12 +108,7 @@ testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \ste
} }
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do withSocket ctx Req (\sock -> do
step "Connecting" connectAndSendOrder step sock order ep
connect sock (T.unpack ep)
step "Sending request"
send sock [] (BL.toStrict . encode $ RequestSubmitOrder 1 order)
threadDelay 10000
step "Checking that order is submitted to BrokerInterface" step "Checking that order is submitted to BrokerInterface"
s <- readIORef broState s <- readIORef broState
@ -106,3 +122,59 @@ testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \ste
))) )))
testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server returns error if account is unknown" $
\step -> withContext (\ctx -> do
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
let order = mkOrder {
orderAccountId = "foobar",
orderSecurity = "FOO",
orderPrice = Market,
orderQuantity = 10,
orderOperation = Buy
}
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock order ep
step "Reading response"
resp <- decode . BL.fromStrict <$> receive sock
case resp of
Just (ResponseError _) -> return ()
Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response"
)))
testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order cancellation" $
\step -> withContext (\ctx -> do
step "Setup"
ep <- makeEndpoint
(mockBroker, broState) <- mkMockBroker ["demo"]
let order = mkOrder {
orderAccountId = "demo",
orderSecurity = "FOO",
orderPrice = Market,
orderQuantity = 10,
orderOperation = Buy
}
bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS ->
withSocket ctx Req (\sock -> do
connectAndSendOrder step sock order ep
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
step "Sending order cancellation request"
send sock [] (BL.toStrict . encode $ RequestCancelOrder 2 orderId)
threadDelay 10000
step "Checking that order is cancelled in BrokerInterface"
s <- readIORef broState
(length . cancelledOrders) s @?= 1
step "Reading response"
resp <- decode . BL.fromStrict <$> receive sock
case resp of
Just (ResponseOrderCancelled _) -> return ()
Nothing -> assertFailure "Invalid response"
)))

Loading…
Cancel
Save