diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index ae856a5..b54364a 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -8,38 +8,38 @@ module ATrade.Broker.Client ( getNotifications ) where -import ATrade.Types -import ATrade.Broker.Protocol -import Control.Concurrent hiding (readChan, writeChan) -import Control.Concurrent.BoundedChan -import Control.Concurrent.MVar -import Control.Exception -import Control.Monad -import Control.Monad.Loops -import Data.Aeson -import Data.Int -import Data.IORef -import Data.Maybe -import Data.List.NonEmpty -import qualified Data.List as L -import qualified Data.Text as T -import qualified Data.ByteString as B -import qualified Data.ByteString.Lazy as BL -import Data.Text.Encoding -import System.ZMQ4 -import System.ZMQ4.ZAP -import System.Log.Logger -import System.Timeout +import ATrade.Broker.Protocol +import ATrade.Types +import Control.Concurrent hiding (readChan, writeChan) +import Control.Concurrent.BoundedChan +import Control.Concurrent.MVar +import Control.Exception +import Control.Monad +import Control.Monad.Loops +import Data.Aeson +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL +import Data.Int +import Data.IORef +import qualified Data.List as L +import Data.List.NonEmpty +import Data.Maybe +import qualified Data.Text as T +import Data.Text.Encoding +import System.Log.Logger +import System.Timeout +import System.ZMQ4 +import System.ZMQ4.ZAP data BrokerClientHandle = BrokerClientHandle { - tid :: ThreadId, - completionMvar :: MVar (), - killMvar :: MVar (), - submitOrder :: Order -> IO (Either T.Text OrderId), - cancelOrder :: OrderId -> IO (Either T.Text ()), + tid :: ThreadId, + completionMvar :: MVar (), + killMvar :: MVar (), + submitOrder :: Order -> IO (Either T.Text OrderId), + cancelOrder :: OrderId -> IO (Either T.Text ()), getNotifications :: IO (Either T.Text [Notification]), - cmdVar :: MVar BrokerServerRequest, - respVar :: MVar BrokerServerResponse + cmdVar :: MVar BrokerServerRequest, + respVar :: MVar BrokerServerResponse } brokerClientThread :: B.ByteString -> Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> ClientSecurityParams -> IO () @@ -60,10 +60,10 @@ brokerClientThread socketIdentity ctx ep cmd resp comp killMv secParams = finall debugM "Broker.Client" $ "Connecting to: " ++ show (T.unpack ep) case cspCertificate secParams of Just clientCert -> zapApplyCertificate clientCert sock - Nothing -> return () + Nothing -> return () case cspServerCertificate secParams of Just serverCert -> zapSetServerCertificate serverCert sock - Nothing -> return () + Nothing -> return () connect sock $ T.unpack ep debugM "Broker.Client" $ "Connected" @@ -115,8 +115,8 @@ bcSubmitOrder clientIdentity idCounter cmdVar respVar order = do resp <- takeMVar respVar case resp of (ResponseOrderSubmitted oid) -> return $ Right oid - (ResponseError msg) -> return $ Left msg - _ -> return $ Left "Unknown error" + (ResponseError msg) -> return $ Left msg + _ -> return $ Left "Unknown error" bcCancelOrder :: ClientIdentity -> IORef RequestSqnum -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> OrderId -> IO (Either T.Text ()) bcCancelOrder clientIdentity idCounter cmdVar respVar orderId = do @@ -125,15 +125,15 @@ bcCancelOrder clientIdentity idCounter cmdVar respVar orderId = do resp <- takeMVar respVar case resp of (ResponseOrderCancelled oid) -> return $ Right () - (ResponseError msg) -> return $ Left msg - _ -> return $ Left "Unknown error" + (ResponseError msg) -> return $ Left msg + _ -> return $ Left "Unknown error" bcGetNotifications :: ClientIdentity -> IORef RequestSqnum -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> IO (Either T.Text [Notification]) bcGetNotifications clientIdentity idCounter cmdVar respVar = do sqnum <- nextId idCounter - putMVar cmdVar (RequestNotifications sqnum clientIdentity) + putMVar cmdVar (RequestNotifications sqnum clientIdentity (NotificationSqnum 0)) resp <- takeMVar respVar case resp of (ResponseNotifications ns) -> return $ Right ns - (ResponseError msg) -> return $ Left msg - _ -> return $ Left "Unknown error" + (ResponseError msg) -> return $ Left msg + _ -> return $ Left "Unknown error" diff --git a/src/ATrade/Broker/Protocol.hs b/src/ATrade/Broker/Protocol.hs index 3366397..423fac7 100644 --- a/src/ATrade/Broker/Protocol.hs +++ b/src/ATrade/Broker/Protocol.hs @@ -9,6 +9,7 @@ module ATrade.Broker.Protocol ( Notification(..), NotificationSqnum(..), nextSqnum, + getNotificationSqnum, notificationOrderId, RequestSqnum(..), requestSqnum, @@ -35,7 +36,7 @@ type ClientIdentity = T.Text type RequestSqnum = Int64 newtype NotificationSqnum = NotificationSqnum { unNotificationSqnum :: Int64 } - deriving (Eq, Show) + deriving (Eq, Show, Ord) nextSqnum :: NotificationSqnum -> NotificationSqnum nextSqnum (NotificationSqnum n) = NotificationSqnum (n + 1) @@ -43,9 +44,9 @@ nextSqnum (NotificationSqnum n) = NotificationSqnum (n + 1) data Notification = OrderNotification NotificationSqnum OrderId OrderState | TradeNotification NotificationSqnum Trade deriving (Eq, Show) -notificationSqnum :: Notification -> NotificationSqnum -notificationSqnum (OrderNotification sqnum _ _) = sqnum -notificationSqnum (TradeNotification sqnum _) = sqnum +getNotificationSqnum :: Notification -> NotificationSqnum +getNotificationSqnum (OrderNotification sqnum _ _) = sqnum +getNotificationSqnum (TradeNotification sqnum _) = sqnum notificationOrderId :: Notification -> OrderId notificationOrderId (OrderNotification _ oid _) = oid @@ -78,13 +79,13 @@ instance ToJSON Notification where data BrokerServerRequest = RequestSubmitOrder RequestSqnum ClientIdentity Order | RequestCancelOrder RequestSqnum ClientIdentity OrderId - | RequestNotifications RequestSqnum ClientIdentity + | RequestNotifications RequestSqnum ClientIdentity NotificationSqnum deriving (Eq, Show) requestSqnum :: BrokerServerRequest -> RequestSqnum -requestSqnum (RequestSubmitOrder sqnum _ _) = sqnum -requestSqnum (RequestCancelOrder sqnum _ _) = sqnum -requestSqnum (RequestNotifications sqnum _) = sqnum +requestSqnum (RequestSubmitOrder sqnum _ _) = sqnum +requestSqnum (RequestCancelOrder sqnum _ _) = sqnum +requestSqnum (RequestNotifications sqnum _ _) = sqnum instance FromJSON BrokerServerRequest where parseJSON = withObject "object" (\obj -> do @@ -100,7 +101,9 @@ instance FromJSON BrokerServerRequest where | HM.member "cancel-order" obj = do orderId <- obj .: "cancel-order" RequestCancelOrder sqnum clientIdentity <$> parseJSON orderId - | HM.member "request-notifications" obj = return (RequestNotifications sqnum clientIdentity) + | HM.member "request-notifications" obj = do + initialSqnum <- obj .: "initial-sqnum" + return (RequestNotifications sqnum clientIdentity (NotificationSqnum initialSqnum)) parseRequest _ _ _ = fail "Invalid request object" instance ToJSON BrokerServerRequest where @@ -110,9 +113,10 @@ instance ToJSON BrokerServerRequest where toJSON (RequestCancelOrder sqnum clientIdentity oid) = object ["request-sqnum" .= sqnum, "client-identity" .= clientIdentity, "cancel-order" .= oid ] - toJSON (RequestNotifications sqnum clientIdentity) = object ["request-sqnum" .= sqnum, + toJSON (RequestNotifications sqnum clientIdentity initialNotificationSqnum) = object ["request-sqnum" .= sqnum, "client-identity" .= clientIdentity, - "request-notifications" .= ("" :: T.Text) ] + "request-notifications" .= ("" :: T.Text), + "initial-sqnum" .= unNotificationSqnum initialNotificationSqnum] data BrokerServerResponse = ResponseOrderSubmitted OrderId | ResponseOrderCancelled OrderId diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 9874591..3cc0146 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -203,12 +203,13 @@ brokerServerThread state = finally brokerServerThread' cleanup cancelOrder bro globalOrderId return $ ResponseOrderCancelled localOrderId _ -> return $ ResponseError "Unknown order" - RequestNotifications sqnum clientIdentity -> do + RequestNotifications sqnum clientIdentity initialSqnum -> do maybeNs <- M.lookup clientIdentity . pendingNotifications <$> readIORef state case maybeNs of Just ns -> do - atomicMapIORef state (\s -> s { pendingNotifications = M.insert clientIdentity [] (pendingNotifications s)}) - return $ ResponseNotifications . L.reverse $ ns + let filtered = L.filter (\n -> getNotificationSqnum n >= initialSqnum) ns + atomicMapIORef state (\s -> s { pendingNotifications = M.insert clientIdentity filtered (pendingNotifications s)}) + return $ ResponseNotifications . L.reverse $ filtered Nothing -> return $ ResponseNotifications [] sendMessage sock peerId resp = sendMulti sock (peerId :| [B.empty, BL.toStrict . encode $ resp]) diff --git a/test/ArbitraryInstances.hs b/test/ArbitraryInstances.hs index 5405721..0f0d58d 100644 --- a/test/ArbitraryInstances.hs +++ b/test/ArbitraryInstances.hs @@ -112,7 +112,7 @@ instance Arbitrary BrokerServerRequest where t <- choose (1, 3) :: Gen Int if | t == 1 -> RequestSubmitOrder <$> arbitrary <*> arbitrary <*> arbitrary | t == 2 -> RequestCancelOrder <$> arbitrary <*> arbitrary <*> arbitrary - | t == 3 -> RequestNotifications <$> arbitrary <*> arbitrary + | t == 3 -> RequestNotifications <$> arbitrary <*> arbitrary <*> arbitrary instance Arbitrary BrokerServerResponse where arbitrary = do diff --git a/test/TestBrokerServer.hs b/test/TestBrokerServer.hs index fd976c7..e6e8b55 100644 --- a/test/TestBrokerServer.hs +++ b/test/TestBrokerServer.hs @@ -18,6 +18,7 @@ import Data.Aeson import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as BL import Data.IORef +import Data.List (sort) import qualified Data.Text as T import Data.Time.Calendar import Data.Time.Clock @@ -35,6 +36,8 @@ unitTests = testGroup "Broker.Server" [testBrokerServerStartStop , testBrokerServerCancelUnknownOrder , testBrokerServerCorruptedPacket , testBrokerServerGetNotifications + , testBrokerServerGetNotificationsFromSameSqnum + , testBrokerServerGetNotificationsRemovesEarlierNotifications , testBrokerServerDuplicateRequest ] -- @@ -271,7 +274,7 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r cb (BackendTradeNotification trade) step "Sending notifications request" - send sock [] (BL.toStrict . encode $ RequestNotifications 2 "identity") + send sock [] (BL.toStrict . encode $ RequestNotifications 2 "identity" (NotificationSqnum 1)) threadDelay 10000 -- We should obtain 3 notifications: @@ -288,21 +291,128 @@ testBrokerServerGetNotifications = testCaseSteps "Broker Server: notifications r localOrderId @=? oid Executed @=? newstate trade { tradeOrderId = localOrderId } @=? newtrade + -- Check notification sqnums + step "Received notification sqnums are correct" + let sqnums = sort $ fmap (unNotificationSqnum . getNotificationSqnum) ns + sqnums @=? [1, 2, 3] + Just _ -> assertFailure "Invalid response" Nothing -> assertFailure "Invalid response" +testBrokerServerGetNotificationsFromSameSqnum :: TestTree +testBrokerServerGetNotificationsFromSameSqnum = testCaseSteps "Broker Server: notifications request, twice from same sqnum" $ + \step -> withContext $ \ctx -> do + step "Setup" + ep <- makeEndpoint + (mockBroker, broState) <- mkMockBroker ["demo"] + bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer $ \_ -> + withSocket ctx Req $ \sock -> do + connectAndSendOrder step sock defaultOrder ep + (Just (ResponseOrderSubmitted localOrderId)) <- decode . BL.fromStrict <$> receive sock + localOrderId @=? orderId defaultOrder + threadDelay 10000 + + globalOrderId <- orderId . head . orders <$> readIORef broState + + (Just cb) <- notificationCallback <$> readIORef broState + cb (BackendOrderNotification globalOrderId Executed) + let trade = Trade { + tradeOrderId = globalOrderId, + tradePrice = 19.82, + tradeQuantity = 1, + tradeVolume = 1982, + tradeVolumeCurrency = "TEST_CURRENCY", + tradeOperation = Buy, + tradeAccount = "demo", + tradeSecurity = "FOO", + tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000, + tradeCommission = 0, + tradeSignalId = SignalId "Foo" "bar" "baz" } + cb (BackendTradeNotification trade) + + step "Sending notifications request" + send sock [] (BL.toStrict . encode $ RequestNotifications 2 "identity" (NotificationSqnum 1)) + threadDelay 10000 + + step "Reading response" + resp <- decode . BL.fromStrict <$> receive sock + case resp of + Just (ResponseNotifications ns) -> do + step "Received notification sqnums are correct" + let sqnums = sort $ fmap (unNotificationSqnum . getNotificationSqnum) ns + sqnums @=? [1, 2, 3] + + + _ -> assertFailure "Invalid response" + step "Sending second notifications request" - send sock [] (BL.toStrict . encode $ RequestNotifications 3 "identity") + send sock [] (BL.toStrict . encode $ RequestNotifications 3 "identity" (NotificationSqnum 1)) threadDelay 10000 step "Reading response" resp' <- decode . BL.fromStrict <$> receive sock case resp' of Just (ResponseNotifications ns) -> do - 0 @=? length ns - Just _ -> assertFailure "Invalid response" - Nothing -> assertFailure "Invalid response" + step "Received notification sqnums are correct" + let sqnums = sort $ fmap (unNotificationSqnum . getNotificationSqnum) ns + [1, 2, 3] @=? sqnums + _ -> assertFailure "Invalid response" + +testBrokerServerGetNotificationsRemovesEarlierNotifications :: TestTree +testBrokerServerGetNotificationsRemovesEarlierNotifications = testCaseSteps "Broker Server: notifications request removes earlier notifications" $ + \step -> withContext $ \ctx -> do + step "Setup" + ep <- makeEndpoint + (mockBroker, broState) <- mkMockBroker ["demo"] + bracket (startBrokerServer [mockBroker] ctx ep [] defaultServerSecurityParams) stopBrokerServer $ \_ -> + withSocket ctx Req $ \sock -> do + connectAndSendOrder step sock defaultOrder ep + (Just (ResponseOrderSubmitted localOrderId)) <- decode . BL.fromStrict <$> receive sock + localOrderId @=? orderId defaultOrder + threadDelay 10000 + + globalOrderId <- orderId . head . orders <$> readIORef broState + + (Just cb) <- notificationCallback <$> readIORef broState + cb (BackendOrderNotification globalOrderId Executed) + let trade = Trade { + tradeOrderId = globalOrderId, + tradePrice = 19.82, + tradeQuantity = 1, + tradeVolume = 1982, + tradeVolumeCurrency = "TEST_CURRENCY", + tradeOperation = Buy, + tradeAccount = "demo", + tradeSecurity = "FOO", + tradeTimestamp = UTCTime (fromGregorian 2016 9 28) 16000, + tradeCommission = 0, + tradeSignalId = SignalId "Foo" "bar" "baz" } + cb (BackendTradeNotification trade) + + step "Sending notifications request" + send sock [] (BL.toStrict . encode $ RequestNotifications 2 "identity" (NotificationSqnum 4)) + threadDelay 10000 + + step "Reading response" + resp <- decode . BL.fromStrict <$> receive sock + case resp of + Just (ResponseNotifications ns) -> do + step "Checking that request list is empty" + [] @=? ns + _ -> assertFailure "Invalid response" + + step "Sending second notifications request" + send sock [] (BL.toStrict . encode $ RequestNotifications 3 "identity" (NotificationSqnum 1)) + threadDelay 10000 + + step "Reading response" + resp' <- decode . BL.fromStrict <$> receive sock + case resp' of + Just (ResponseNotifications ns) -> do + step "Checking that request list is empty" + [] @=? ns + _ -> assertFailure "Invalid response" testBrokerServerDuplicateRequest :: TestTree testBrokerServerDuplicateRequest = testCaseSteps "Broker Server: duplicate request" $ \step -> withContext $ \ctx -> do