Browse Source

BrokerProtocol: Asynchronous order submission

master
Denis Tereshkin 4 years ago
parent
commit
8edc931127
  1. 16
      src/ATrade/Broker/Client.hs
  2. 24
      src/ATrade/Broker/Protocol.hs
  3. 6
      src/ATrade/Broker/Server.hs
  4. 11
      test/ArbitraryInstances.hs
  5. 4
      test/TestBrokerClient.hs
  6. 55
      test/TestBrokerServer.hs

16
src/ATrade/Broker/Client.hs

@ -35,7 +35,7 @@ data BrokerClientHandle = BrokerClientHandle { @@ -35,7 +35,7 @@ data BrokerClientHandle = BrokerClientHandle {
tid :: ThreadId,
completionMvar :: MVar (),
killMvar :: MVar (),
submitOrder :: Order -> IO (Either T.Text OrderId),
submitOrder :: Order -> IO (Either T.Text ()),
cancelOrder :: OrderId -> IO (Either T.Text ()),
getNotifications :: IO (Either T.Text [Notification]),
cmdVar :: MVar (BrokerServerRequest, MVar BrokerServerResponse),
@ -108,16 +108,16 @@ stopBrokerClient handle = putMVar (killMvar handle) () >> yield >> killThread (t @@ -108,16 +108,16 @@ stopBrokerClient handle = putMVar (killMvar handle) () >> yield >> killThread (t
nextId cnt = atomicModifyIORef' cnt (\v -> (v + 1, v))
bcSubmitOrder :: ClientIdentity -> IORef Int64 -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> Order -> IO (Either T.Text OrderId)
bcSubmitOrder :: ClientIdentity -> IORef Int64 -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> Order -> IO (Either T.Text ())
bcSubmitOrder clientIdentity idCounter cmdVar order = do
respVar <- newEmptyMVar
sqnum <- nextId idCounter
putMVar cmdVar (RequestSubmitOrder sqnum clientIdentity order, respVar)
resp <- takeMVar respVar
case resp of
(ResponseOrderSubmitted oid) -> return $ Right oid
(ResponseError msg) -> return $ Left msg
_ -> return $ Left "Unknown error"
ResponseOk -> return $ Right ()
(ResponseError msg) -> return $ Left msg
_ -> return $ Left "Unknown error"
bcCancelOrder :: ClientIdentity -> IORef RequestSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> OrderId -> IO (Either T.Text ())
bcCancelOrder clientIdentity idCounter cmdVar orderId = do
@ -126,9 +126,9 @@ bcCancelOrder clientIdentity idCounter cmdVar orderId = do @@ -126,9 +126,9 @@ bcCancelOrder clientIdentity idCounter cmdVar orderId = do
putMVar cmdVar (RequestCancelOrder sqnum clientIdentity orderId, respVar)
resp <- takeMVar respVar
case resp of
(ResponseOrderCancelled oid) -> return $ Right ()
(ResponseError msg) -> return $ Left msg
_ -> return $ Left "Unknown error"
ResponseOk -> return $ Right ()
(ResponseError msg) -> return $ Left msg
_ -> return $ Left "Unknown error"
bcGetNotifications :: ClientIdentity -> IORef RequestSqnum -> IORef NotificationSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> IO (Either T.Text [Notification])
bcGetNotifications clientIdentity idCounter notifSqnumRef cmdVar = do

24
src/ATrade/Broker/Protocol.hs

@ -118,33 +118,29 @@ instance ToJSON BrokerServerRequest where @@ -118,33 +118,29 @@ instance ToJSON BrokerServerRequest where
"request-notifications" .= ("" :: T.Text),
"initial-sqnum" .= unNotificationSqnum initialNotificationSqnum]
data BrokerServerResponse = ResponseOrderSubmitted OrderId
| ResponseOrderCancelled OrderId
data BrokerServerResponse = ResponseOk
| ResponseNotifications [Notification]
| ResponseError T.Text
deriving (Eq, Show)
instance FromJSON BrokerServerResponse where
parseJSON = withObject "object" (\obj ->
if | HM.member "order-id" obj -> do
oid <- obj .: "order-id"
return $ ResponseOrderSubmitted oid
| HM.member "order-cancelled" obj -> do
oid <- obj .: "order-cancelled"
return $ ResponseOrderCancelled oid
if | HM.member "result" obj -> do
result <- obj .: "result"
if (result :: T.Text) == "success"
then return ResponseOk
else do
msg <- obj .:? "message" .!= ""
return (ResponseError msg)
| HM.member "notifications" obj -> do
notifications <- obj .: "notifications"
ResponseNotifications <$> parseJSON notifications
| HM.member "error" obj -> do
error <- obj .: "error"
ResponseError <$> parseJSON error
| otherwise -> fail "Unable to parse BrokerServerResponse")
instance ToJSON BrokerServerResponse where
toJSON (ResponseOrderSubmitted oid) = object [ "order-id" .= oid ]
toJSON (ResponseOrderCancelled oid) = object [ "order-cancelled" .= oid ]
toJSON ResponseOk = object [ "result" .= ("success" :: T.Text) ]
toJSON (ResponseNotifications notifications) = object [ "notifications" .= notifications ]
toJSON (ResponseError errorMessage) = object [ "error" .= errorMessage ]
toJSON (ResponseError errorMessage) = object [ "result" .= ("error" :: T.Text), "message" .= errorMessage ]
data TradeSinkMessage = TradeSinkHeartBeat | TradeSinkTrade {
tsAccountId :: T.Text,

6
src/ATrade/Broker/Server.hs

@ -196,12 +196,12 @@ brokerServerThread state = finally brokerServerThread' cleanup @@ -196,12 +196,12 @@ brokerServerThread state = finally brokerServerThread' cleanup
case findBrokerForAccount (orderAccountId order) bros of
Just bro -> do
globalOrderId <- nextOrderId
let fullOrderId = (FullOrderId clientIdentity (orderId order))
let fullOrderId = FullOrderId clientIdentity (orderId order)
atomicMapIORef state (\s -> s {
orderToBroker = M.insert fullOrderId bro (orderToBroker s),
orderMap = BM.insert fullOrderId globalOrderId (orderMap s) })
submitOrder bro order { orderId = globalOrderId }
return $ ResponseOrderSubmitted (orderId order)
return ResponseOk
Nothing -> do
warningM "Broker.Server" $ "Unknown account: " ++ T.unpack (orderAccountId order)
@ -213,7 +213,7 @@ brokerServerThread state = finally brokerServerThread' cleanup @@ -213,7 +213,7 @@ brokerServerThread state = finally brokerServerThread' cleanup
case (M.lookup fullOrderId m, BM.lookup fullOrderId bm) of
(Just bro, Just globalOrderId) -> do
cancelOrder bro globalOrderId
return $ ResponseOrderCancelled localOrderId
return ResponseOk
_ -> return $ ResponseError "Unknown order"
RequestNotifications sqnum clientIdentity initialSqnum -> do
maybeNs <- M.lookup clientIdentity . pendingNotifications <$> readIORef state

11
test/ArbitraryInstances.hs

@ -113,14 +113,15 @@ instance Arbitrary BrokerServerRequest where @@ -113,14 +113,15 @@ instance Arbitrary BrokerServerRequest where
if | t == 1 -> RequestSubmitOrder <$> arbitrary <*> arbitrary <*> arbitrary
| t == 2 -> RequestCancelOrder <$> arbitrary <*> arbitrary <*> arbitrary
| t == 3 -> RequestNotifications <$> arbitrary <*> arbitrary <*> arbitrary
| otherwise -> error "Invalid argument"
instance Arbitrary BrokerServerResponse where
arbitrary = do
t <- choose (1, 4) :: Gen Int
if | t == 1 -> ResponseOrderSubmitted <$> arbitrary
| t == 2 -> ResponseOrderCancelled <$> arbitrary
| t == 3 -> ResponseNotifications <$> arbitrary
| t == 4 -> ResponseError <$> arbitrary
t <- choose (1, 3) :: Gen Int
if | t == 1 -> return ResponseOk
| t == 2 -> ResponseNotifications <$> arbitrary
| t == 3 -> ResponseError <$> arbitrary
| otherwise -> error "Invalid argument"
instance Arbitrary P.Price where
arbitrary = P.Price <$> (arbitrary `suchThat` (\p -> abs p < 1000000000 * 10000000))

4
test/TestBrokerClient.hs

@ -73,8 +73,8 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order" @@ -73,8 +73,8 @@ testBrokerClientCancelOrder = testCase "Broker client: submit and cancel order"
maybeOid <- submitOrder broC defaultOrder
case maybeOid of
Left err -> assertFailure "Invalid response"
Right oid -> do
rc <- cancelOrder broC oid
Right _ -> do
rc <- cancelOrder broC (orderId defaultOrder)
case rc of
Left err -> assertFailure "Invalid response"
Right _ -> return()

55
test/TestBrokerServer.hs

@ -115,9 +115,9 @@ testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \ste @@ -115,9 +115,9 @@ testBrokerServerSubmitOrder = testCaseSteps "Broker Server submits order" $ \ste
step "Reading response"
resp <- decode . BL.fromStrict <$> receive sock
case resp of
Just (ResponseOrderSubmitted _) -> return ()
Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response"
Just ResponseOk -> return ()
Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response"
testBrokerServerSubmitOrderDifferentIdentities :: TestTree
testBrokerServerSubmitOrderDifferentIdentities = testCaseSteps "Broker Server submits order: different identities" $ \step -> withContext $ \ctx -> do
@ -139,16 +139,16 @@ testBrokerServerSubmitOrderDifferentIdentities = testCaseSteps "Broker Server su @@ -139,16 +139,16 @@ testBrokerServerSubmitOrderDifferentIdentities = testCaseSteps "Broker Server su
step "Reading response for identity1"
resp <- decode . BL.fromStrict <$> receive sock1
case resp of
Just (ResponseOrderSubmitted localOrderId) -> localOrderId @=? orderId1
Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response"
Just ResponseOk -> return ()
Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response"
step "Reading response for identity2"
resp <- decode . BL.fromStrict <$> receive sock2
case resp of
Just (ResponseOrderSubmitted localOrderId) -> localOrderId @=? orderId2
Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response"
Just ResponseOk -> return ()
Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response"
testBrokerServerSubmitOrderToUnknownAccount :: TestTree
testBrokerServerSubmitOrderToUnknownAccount = testCaseSteps "Broker Server returns error if account is unknown" $
@ -178,11 +178,10 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc @@ -178,11 +178,10 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc
bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer $ \_ ->
withSocket ctx Req $ \sock -> do
connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted localOrderId)) <- decode . BL.fromStrict <$> receive sock
localOrderId @=? (orderId defaultOrder)
Just ResponseOk <- decode . BL.fromStrict <$> receive sock
step "Sending order cancellation request"
send sock [] (BL.toStrict . encode $ RequestCancelOrder 2 "identity" localOrderId)
send sock [] (BL.toStrict . encode $ RequestCancelOrder 2 "identity" (orderId defaultOrder))
threadDelay 10000
step "Checking that order is cancelled in BrokerBackend"
@ -192,9 +191,9 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc @@ -192,9 +191,9 @@ testBrokerServerCancelOrder = testCaseSteps "Broker Server: submitted order canc
step "Reading response"
resp <- decode . BL.fromStrict <$> receive sock
case resp of
Just (ResponseOrderCancelled _) -> return ()
Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response"
Just ResponseOk -> return ()
Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response"
testBrokerServerCancelUnknownOrder :: TestTree
testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancellation: error if order is unknown" $
@ -255,8 +254,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r @@ -255,8 +254,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r
-- We have to actually submit order, or else server won't know that we should
-- be notified about this order
connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted localOrderId)) <- decode . BL.fromStrict <$> receive sock
localOrderId @=? orderId defaultOrder
Just ResponseOk <- decode . BL.fromStrict <$> receive sock
threadDelay 10000
globalOrderId <- orderId . head . orders <$> readIORef broState
@ -292,9 +290,8 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r @@ -292,9 +290,8 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r
length ns @=? 3
let (OrderNotification orderNotificationSqnum oid newstate) = ns !! 1
let (TradeNotification tradeNotificationSqnum newtrade) = ns !! 2
localOrderId @=? oid
Executed @=? newstate
trade { tradeOrderId = localOrderId } @=? newtrade
trade { tradeOrderId = orderId defaultOrder } @=? newtrade
-- Check notification sqnums
step "Received notification sqnums are correct"
let sqnums = sort $ fmap (unNotificationSqnum . getNotificationSqnum) ns
@ -313,8 +310,7 @@ testBrokerServerGetNotificationsFromSameSqnum = testCaseSteps "Broker Server: no @@ -313,8 +310,7 @@ testBrokerServerGetNotificationsFromSameSqnum = testCaseSteps "Broker Server: no
bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer $ \_ ->
withSocket ctx Req $ \sock -> do
connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted localOrderId)) <- decode . BL.fromStrict <$> receive sock
localOrderId @=? orderId defaultOrder
Just ResponseOk <- decode . BL.fromStrict <$> receive sock
threadDelay 10000
globalOrderId <- orderId . head . orders <$> readIORef broState
@ -372,8 +368,7 @@ testBrokerServerGetNotificationsRemovesEarlierNotifications = testCaseSteps "Bro @@ -372,8 +368,7 @@ testBrokerServerGetNotificationsRemovesEarlierNotifications = testCaseSteps "Bro
bracket (startBrokerServer [mockBroker] ctx ep notifEp [] defaultServerSecurityParams) stopBrokerServer $ \_ ->
withSocket ctx Req $ \sock -> do
connectAndSendOrder step sock defaultOrder ep
(Just (ResponseOrderSubmitted localOrderId)) <- decode . BL.fromStrict <$> receive sock
localOrderId @=? orderId defaultOrder
Just ResponseOk <- decode . BL.fromStrict <$> receive sock
threadDelay 10000
globalOrderId <- orderId . head . orders <$> readIORef broState
@ -428,7 +423,7 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque @@ -428,7 +423,7 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque
connectAndSendOrder step sock defaultOrder ep
step "Reading response"
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
Just ResponseOk <- decode . BL.fromStrict <$> receive sock
step "Sending duplicate request (with same sequence number)"
send sock [] (BL.toStrict . encode $ RequestSubmitOrder 1 "identity" defaultOrder)
@ -441,9 +436,9 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque @@ -441,9 +436,9 @@ testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate reque
step "Reading response from duplicate request"
resp <- decode . BL.fromStrict <$> receive sock
case resp of
Just (ResponseOrderSubmitted oid) -> orderId @?= oid
Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response"
Just ResponseOk -> return ()
Just _ -> assertFailure "Invalid response"
Nothing -> assertFailure "Invalid response"
testBrokerServerNotificationSocket :: TestTree
testBrokerServerNotificationSocket = testCaseSteps "Broker Server: sends notification via notification socket" $ \step -> withContext $ \ctx -> do
@ -459,13 +454,13 @@ testBrokerServerNotificationSocket = testCaseSteps "Broker Server: sends notific @@ -459,13 +454,13 @@ testBrokerServerNotificationSocket = testCaseSteps "Broker Server: sends notific
connectAndSendOrderWithIdentity step sock defaultOrder "test-identity" ep
step "Reading response"
(Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock
Just ResponseOk <- decode . BL.fromStrict <$> receive sock
step "Reading order submitted notification"
[_, payload] <- receiveMulti nSocket
let (Just (OrderNotification notifSqnum1 notifOid newOrderState)) = decode . BL.fromStrict $ payload
notifOid @?= orderId
notifOid @?= orderId defaultOrder
newOrderState @?= Submitted
backendOrderId <- mockBrokerLastOrderId broState
@ -489,7 +484,7 @@ testBrokerServerNotificationSocket = testCaseSteps "Broker Server: sends notific @@ -489,7 +484,7 @@ testBrokerServerNotificationSocket = testCaseSteps "Broker Server: sends notific
step "Receiving trade notification"
[_, payload] <- receiveMulti nSocket
let (Just (TradeNotification notifSqnum2 incomingTrade)) = decode . BL.fromStrict $ payload
incomingTrade @?= trade { tradeOrderId = orderId }
incomingTrade @?= trade { tradeOrderId = orderId defaultOrder }
{-
testBrokerServerTradeSink :: TestTree

Loading…
Cancel
Save