From 46674b0d4910d88ef97c668ed22adba475140daa Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 28 Nov 2021 13:17:08 +0700 Subject: [PATCH] junction: order notifications routing --- robocom-zero.cabal | 1 + src/ATrade/Driver/Junction.hs | 47 ++++++++++++++++--- .../Driver/Junction/RobotDriverThread.hs | 13 ++++- 3 files changed, 53 insertions(+), 8 deletions(-) diff --git a/robocom-zero.cabal b/robocom-zero.cabal index 5ca2859..921f893 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -85,6 +85,7 @@ library , stm , async , dhall + , extra default-language: Haskell2010 other-modules: ATrade.Exceptions diff --git a/src/ATrade/Driver/Junction.hs b/src/ATrade/Driver/Junction.hs index ac902a3..81580b7 100644 --- a/src/ATrade/Driver/Junction.hs +++ b/src/ATrade/Driver/Junction.hs @@ -14,6 +14,9 @@ module ATrade.Driver.Junction import ATrade.Broker.Client (BrokerClientHandle, startBrokerClient, stopBrokerClient) +import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification), + NotificationSqnum, + getNotificationSqnum) import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (brokerEndpoint, brokerNotificationEndpoint, instances, qhpEndpoint, qtisEndpoint, redisSocket, robotsConfigsPath), ProgramOptions (ProgramOptions, configPath)) import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), @@ -27,7 +30,8 @@ import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, RobotEnv (..), RobotM (..), createRobotDriverThread, - onStrategyInstance) + onStrategyInstance, + postNotificationEvent) import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE), StrategyInstance (strategyInstanceId), StrategyInstanceDescriptor (..), @@ -37,11 +41,14 @@ import ATrade.Driver.Junction.Types (StrategyDescriptor import ATrade.Quotes.QHP (mkQHPHandle) import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) -import ATrade.Types (ClientSecurityParams (ClientSecurityParams)) +import ATrade.Types (ClientSecurityParams (ClientSecurityParams), + OrderId, + Trade (tradeOrderId)) import Control.Concurrent import Control.Exception.Safe (MonadThrow, bracket) import Control.Monad (forM_, forever) +import Control.Monad.Extra (whenM) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Reader (MonadReader, ReaderT (runReaderT), asks) @@ -55,6 +62,8 @@ import Data.IORef (IORef, newIORef, readIORef) import qualified Data.Map.Strict as M +import Data.Set (notMember) +import qualified Data.Set as S import qualified Data.Text as T import Data.Text.Encoding (encodeUtf8) import Data.Text.IO (readFile) @@ -145,9 +154,11 @@ junctionMain descriptors = do redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) }) withContext $ \ctx -> do let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) - withBroker cfg ctx $ \bro -> + robotsMap <- newIORef M.empty + ordersMap <- newIORef M.empty + handledNotifications <- newIORef S.empty + withBroker cfg ctx robotsMap ordersMap handledNotifications $ \bro -> withQThread downloaderEnv barsMap cfg ctx $ \qt -> do - robotsMap <- newIORef M.empty let env = JunctionEnv { @@ -191,13 +202,37 @@ junctionMain descriptors = do withJunction :: JunctionEnv -> JunctionM () -> IO () withJunction env = (`runReaderT` env) . unJunctionM - withBroker cfg ctx f = bracket + handleBrokerNotification :: IORef (M.Map T.Text RobotDriverHandle) -> + IORef (M.Map OrderId T.Text) -> + IORef (S.Set NotificationSqnum) -> + Notification -> + IO () + handleBrokerNotification robotsRef ordersMapRef handled notification = + whenM (notMember (getNotificationSqnum notification) <$> readIORef handled) $ do + robotsMap <- readIORef robotsRef + ordersMap <- readIORef ordersMapRef + + case getNotificationTarget robotsMap ordersMap notification of + Just robot -> postNotificationEvent robot notification + Nothing -> warningM "Junction" "Unknown order" + + atomicModifyIORef' handled (\s -> (S.insert (getNotificationSqnum notification) s, ())) + + getNotificationTarget :: M.Map T.Text RobotDriverHandle -> M.Map OrderId T.Text -> Notification -> Maybe RobotDriverHandle + getNotificationTarget robotsMap ordersMap notification = do + robotId <- M.lookup (notificationOrderId notification) ordersMap + M.lookup robotId robotsMap + + notificationOrderId (OrderNotification _ oid _) = oid + notificationOrderId (TradeNotification _ trade) = tradeOrderId trade + + withBroker cfg ctx robotsMap ordersMap handled f = bracket (startBrokerClient "broker" ctx (brokerEndpoint cfg) (brokerNotificationEndpoint cfg) - [] + [handleBrokerNotification robotsMap ordersMap handled] (ClientSecurityParams -- TODO load certificates from file Nothing Nothing)) diff --git a/src/ATrade/Driver/Junction/RobotDriverThread.hs b/src/ATrade/Driver/Junction/RobotDriverThread.hs index 1652f3f..ae7e235 100644 --- a/src/ATrade/Driver/Junction/RobotDriverThread.hs +++ b/src/ATrade/Driver/Junction/RobotDriverThread.hs @@ -10,11 +10,12 @@ module ATrade.Driver.Junction.RobotDriverThread RobotEnv(..), RobotM(..), RobotDriverHandle, - onStrategyInstance - ) where + onStrategyInstance, + postNotificationEvent) where import ATrade.Broker.Client (BrokerClientHandle) import qualified ATrade.Broker.Client as Bro +import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification)) import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription), QuoteSubscription (QuoteSubscription)) import ATrade.Driver.Junction.Types (BigConfig, @@ -155,3 +156,11 @@ instance MonadRobot (RobotM c s) c s where getTicker tid tf = do b <- asks bars >>= liftIO . readIORef return $ M.lookup (BarSeriesId tid tf) b + +postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m () +postNotificationEvent (RobotDriverHandle _ _ _ eventQueue) notification = liftIO $ + case notification of + OrderNotification _ oid state -> writeChan eventQueue (OrderEvent oid state) + TradeNotification _ trade -> writeChan eventQueue (NewTradeEvent trade) + +