diff --git a/src/ATrade/Broker/Protocol.hs b/src/ATrade/Broker/Protocol.hs index 253393b..006ed7b 100644 --- a/src/ATrade/Broker/Protocol.hs +++ b/src/ATrade/Broker/Protocol.hs @@ -4,6 +4,7 @@ module ATrade.Broker.Protocol ( BrokerServerRequest(..), BrokerServerResponse(..), Notification(..), + notificationOrderId, RequestSqnum(..) ) where @@ -75,6 +76,10 @@ instance ToJSON BrokerServerResponse where data Notification = OrderNotification OrderId OrderState | TradeNotification Trade deriving (Eq, Show) +notificationOrderId :: Notification -> OrderId +notificationOrderId (OrderNotification oid _) = oid +notificationOrderId (TradeNotification trade) = tradeOrderId trade + instance FromJSON Notification where parseJSON n = withObject "notification" (\obj -> case HM.lookup "trade" obj of diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index cbf44ce..6bffc6c 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -24,6 +24,7 @@ import Control.Monad import System.Log.Logger newtype OrderIdGenerator = IO OrderId +type PeerId = B.ByteString data BrokerInterface = BrokerInterface { accounts :: [T.Text], @@ -36,9 +37,9 @@ data BrokerInterface = BrokerInterface { data BrokerServerState = BrokerServerState { bsSocket :: Socket Router, orderToBroker :: M.Map OrderId BrokerInterface, - orderMap :: M.Map OrderId B.ByteString, -- Matches 0mq client identities with corresponding orders - lastPacket :: M.Map B.ByteString (RequestSqnum, B.ByteString), - pendingNotifications :: [(Notification, UTCTime)], -- List of tuples (Order with new state, Time when notification enqueued) + orderMap :: M.Map OrderId PeerId, -- Matches 0mq client identities with corresponding orders + lastPacket :: M.Map PeerId (RequestSqnum, B.ByteString), + pendingNotifications :: M.Map PeerId [Notification], brokers :: [BrokerInterface], completionMvar :: MVar (), orderIdCounter :: OrderId @@ -57,21 +58,35 @@ startBrokerServer brokers c ep = do orderMap = M.empty, orderToBroker = M.empty, lastPacket = M.empty, - pendingNotifications = [], + pendingNotifications = M.empty, brokers = brokers, completionMvar = compMv, orderIdCounter = 1 } + mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv +notificationCallback :: IORef BrokerServerState -> Notification -> IO () +notificationCallback state n = do + orders <- orderMap <$> readIORef state + case M.lookup (notificationOrderId n) orders of + Just peerId -> addNotification peerId n + Nothing -> warningM "Broker.Server" "Notification: unknown order" + + where + addNotification peerId n = atomicModifyIORef' state (\s -> + case M.lookup peerId . pendingNotifications $ s of + Just ns -> (s { pendingNotifications = M.insert peerId (n : ns) (pendingNotifications s)}, ()) + Nothing -> (s { pendingNotifications = M.insert peerId [n] (pendingNotifications s)}, ())) + brokerServerThread state = finally brokerServerThread' cleanup where brokerServerThread' = forever $ do sock <- bsSocket <$> readIORef state msg <- receiveMulti sock case msg of - [peerId, _, payload] -> handleMessage payload >>= sendMessage sock peerId + [peerId, _, payload] -> handleMessage peerId payload >>= sendMessage sock peerId _ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg) cleanup = do @@ -80,16 +95,18 @@ brokerServerThread state = finally brokerServerThread' cleanup mv <- completionMvar <$> readIORef state putMVar mv () - handleMessage :: B.ByteString -> IO BrokerServerResponse - handleMessage payload = do + handleMessage :: B.ByteString -> B.ByteString -> IO BrokerServerResponse + handleMessage peerId payload = do bros <- brokers <$> readIORef state case decode . BL.fromStrict $ payload of Just (RequestSubmitOrder sqnum order) -> case findBrokerForAccount (orderAccountId order) bros of Just bro -> do oid <- nextOrderId + atomicModifyIORef' state (\s -> (s { + orderToBroker = M.insert oid bro (orderToBroker s), + orderMap = M.insert oid peerId (orderMap s) }, ())) submitOrder bro order { orderId = oid } - atomicModifyIORef' state (\s -> (s { orderToBroker = M.insert oid bro (orderToBroker s)}, ())) return $ ResponseOrderSubmitted oid Nothing -> return $ ResponseError "Unknown account" @@ -100,7 +117,13 @@ brokerServerThread state = finally brokerServerThread' cleanup cancelOrder bro oid return $ ResponseOrderCancelled oid Nothing -> return $ ResponseError "Unknown order" - Just _ -> return $ ResponseError "Not implemented" + Just (RequestNotifications sqnum) -> do + maybeNs <- M.lookup peerId . pendingNotifications <$> readIORef state + case maybeNs of + Just ns -> do + atomicModifyIORef' state (\s -> (s { pendingNotifications = M.insert peerId [] (pendingNotifications s)}, ())) + return $ ResponseNotifications ns + Nothing -> return $ ResponseNotifications [] Nothing -> return $ ResponseError "Unable to parse request" sendMessage sock peerId resp = sendMulti sock (peerId :| [B.empty, BL.toStrict . encode $ resp]) diff --git a/test/TestBrokerServer.hs b/test/TestBrokerServer.hs index 4ab4abe..3406876 100644 --- a/test/TestBrokerServer.hs +++ b/test/TestBrokerServer.hs @@ -79,7 +79,8 @@ unitTests = testGroup "Broker.Server" [testBrokerServerStartStop , testBrokerServerSubmitOrderToUnknownAccount , testBrokerServerCancelOrder , testBrokerServerCancelUnknownOrder - , testBrokerServerCorruptedPacket ] + , testBrokerServerCorruptedPacket + , testBrokerServerGetNotifications ] testBrokerServerStartStop = testCase "Broker Server starts and stops" $ withContext (\ctx -> do ep <- toText <$> UV4.nextRandom @@ -194,7 +195,6 @@ testBrokerServerCancelUnknownOrder = testCaseSteps "Broker Server: order cancell Nothing -> assertFailure "Invalid response" ))) - testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet" $ \step -> withContext (\ctx -> do step "Setup" @@ -218,3 +218,61 @@ testBrokerServerCorruptedPacket = testCaseSteps "Broker Server: corrupted packet ))) where corrupt = B.drop 5 + +testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications request" $ + \step -> withContext (\ctx -> do + step "Setup" + ep <- makeEndpoint + (mockBroker, broState) <- mkMockBroker ["demo"] + bracket (startBrokerServer [mockBroker] ctx ep) stopBrokerServer (\broS -> + withSocket ctx Req (\sock -> do + -- We have to actually submit order, or else server won't know that we should + -- be notified about this order + connectAndSendOrder step sock defaultOrder ep + (Just (ResponseOrderSubmitted orderId)) <- decode . BL.fromStrict <$> receive sock + threadDelay 10000 + + (Just cb) <- notificationCallback <$> readIORef broState + cb (OrderNotification orderId Executed) + let trade = Trade { + tradeOrderId = orderId, + tradePrice = 19.82, + tradeQuantity = 1, + tradeVolume = 1982, + tradeVolumeCurrency = "TEST_CURRENCY", + tradeOperation = Buy, + tradeAccount = "demo", + tradeSecurity = "FOO", + tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000, + tradeSignalId = SignalId "Foo" "bar" "baz" } + cb (TradeNotification trade) + + step "Sending notifications request" + send sock [] (BL.toStrict . encode $ RequestNotifications 2) + threadDelay 10000 + + step "Reading response" + resp <- decode . BL.fromStrict <$> receive sock + case resp of + Just (ResponseNotifications ns) -> do + length ns @=? 3 + let (TradeNotification newtrade) = head ns + let (OrderNotification oid newstate) = ns !! 1 + orderId @=? oid + Executed @=? newstate + trade @=? newtrade + Just _ -> assertFailure "Invalid response" + Nothing -> assertFailure "Invalid response" + + step "Sending second notifications request" + send sock [] (BL.toStrict . encode $ RequestNotifications 3) + threadDelay 10000 + + step "Reading response" + resp <- decode . BL.fromStrict <$> receive sock + case resp of + Just (ResponseNotifications ns) -> do + 0 @=? length ns + Just _ -> assertFailure "Invalid response" + Nothing -> assertFailure "Invalid response" + )))