From 3cca37812f64e2a4fdd02716f2ec9ac584b5e265 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 23 Dec 2021 19:01:51 +0700 Subject: [PATCH] MonadRobot: new method: getTickerInfo --- src/ATrade/Driver/Junction.hs | 14 +++++----- src/ATrade/Driver/Junction/QuoteThread.hs | 27 ++++++++++++++----- .../Driver/Junction/RobotDriverThread.hs | 7 ++++- src/ATrade/RoboCom/Monad.hs | 1 + src/ATrade/RoboCom/Types.hs | 3 +++ 5 files changed, 38 insertions(+), 14 deletions(-) diff --git a/src/ATrade/Driver/Junction.hs b/src/ATrade/Driver/Junction.hs index 45276be..c69db44 100644 --- a/src/ATrade/Driver/Junction.hs +++ b/src/ATrade/Driver/Junction.hs @@ -52,7 +52,7 @@ import ATrade.RoboCom.ConfigStorage (ConfigStorage (loa import ATrade.RoboCom.Monad (StrategyEnvironment (..)) import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), - Bars) + Bars, TickerInfoMap) import ATrade.Types (ClientSecurityParams (ClientSecurityParams), OrderId, Trade (tradeOrderId)) @@ -192,6 +192,7 @@ junctionMain descriptors = do let log = logWith (logger h) barsMap <- newIORef M.empty + tickerInfoMap <- newIORef M.empty log Info "Junction" $ "Connecting to redis: " <> redisSocket cfg redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) }) @@ -204,7 +205,7 @@ junctionMain descriptors = do ordersMap <- newIORef M.empty handledNotifications <- newIORef S.empty withBroker cfg ctx robotsMap ordersMap handledNotifications (logger h) $ \bro -> - withQThread downloaderEnv barsMap cfg ctx (logger h) $ \qt -> do + withQThread downloaderEnv barsMap tickerInfoMap cfg ctx (logger h) $ \qt -> do broService <- mkBrokerService bro ordersMap let junctionLogAction = logger h let env = @@ -218,7 +219,7 @@ junctionMain descriptors = do peLogAction = junctionLogAction } withJunction env $ do - startRobots h cfg barsMap broService + startRobots h cfg barsMap tickerInfoMap broService forever $ do notifications <- liftIO $ getNotifications broService forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications (logger h)) @@ -237,8 +238,8 @@ junctionMain descriptors = do currentTimers <- liftIO $ readIORef (strategyTimers inst) saveState currentTimers (strategyInstanceId inst <> ":timers") - startRobots :: Handle -> ProgramConfiguration -> IORef Bars -> BrokerService -> JunctionM () - startRobots logHandle cfg barsMap broService = forM_ (instances cfg) $ \inst -> do + startRobots :: Handle -> ProgramConfiguration -> IORef Bars -> IORef TickerInfoMap -> BrokerService -> JunctionM () + startRobots logHandle cfg barsMap tickerInfoMap broService = forM_ (instances cfg) $ \inst -> do now <- liftIO getCurrentTime case M.lookup (strategyBaseName inst) descriptors of Just (StrategyDescriptorE desc) -> do @@ -258,7 +259,8 @@ junctionMain descriptors = do _seVolume = 1, _seLastTimestamp = now } - let robotEnv = RobotEnv rState rConf rTimers barsMap stratEnv robotLogAction broService (toBarSeriesId <$> (firstTicker :| restTickers)) + let robotEnv = + RobotEnv rState rConf rTimers barsMap tickerInfoMap stratEnv robotLogAction broService (toBarSeriesId <$> (firstTicker :| restTickers)) robot <- createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState rTimers robotsMap' <- asks peRobots liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ())) diff --git a/src/ATrade/Driver/Junction/QuoteThread.hs b/src/ATrade/Driver/Junction/QuoteThread.hs index baa5230..11825d9 100644 --- a/src/ATrade/Driver/Junction/QuoteThread.hs +++ b/src/ATrade/Driver/Junction/QuoteThread.hs @@ -36,7 +36,8 @@ import ATrade.RoboCom.Types (Bar (barSecurity), BarSeries (..), BarSeriesId (BarSeriesId), Bars, - InstrumentParameters (InstrumentParameters)) + InstrumentParameters (InstrumentParameters), + TickerInfoMap) import ATrade.Types (BarTimeframe (BarTimeframe), ClientSecurityParams (ClientSecurityParams), Tick (security), @@ -78,7 +79,7 @@ data QuoteThreadEnv = bars :: IORef Bars, endpoints :: IORef (HM.HashMap QuoteSubscription [BoundedChan QuoteData]), qsclient :: QuoteSourceClientHandle, - paramsCache :: IORef (M.Map TickerId InstrumentParameters), + paramsCache :: IORef TickerInfoMap, downloaderChan :: BoundedChan QuoteSubscription } @@ -88,17 +89,18 @@ startQuoteThread :: (MonadIO m, HistoryProvider m1, TickerInfoProvider m1) => IORef Bars -> + IORef TickerInfoMap -> Context -> T.Text -> ClientSecurityParams -> (m1 () -> IO ()) -> LogAction IO Message -> m QuoteThreadHandle -startQuoteThread barsRef ctx ep secparams downloadThreadRunner logger = do +startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do chan <- liftIO $ newBoundedChan 2000 dChan <- liftIO $ newBoundedChan 2000 qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger - env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> newIORef M.empty <*> pure dChan + env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan tid <- liftIO . forkIO $ quoteThread env chan downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) return $ QuoteThreadHandle tid downloaderTid env @@ -119,7 +121,9 @@ startQuoteThread barsRef ctx ep secparams downloadThreadRunner logger = do Nothing -> case mbParams of Just params -> do now <- liftIO getCurrentTime - barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) now + -- Load data in interval [today - 60days; today + 1day]. +1 day guarantees that we will download data up until current time. + -- If we don't make this adjustment it is possible that we will get data only up to beginning of current day. + barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) (86400 `addUTCTime` now) let barSeries = BarSeries tickerid tf barsData params liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) _ -> return () -- TODO log @@ -200,12 +204,21 @@ instance TickerInfoProvider DownloaderM where (fromInteger $ tiLotSize ti) (tiTickSize ti) -withQThread :: DownloaderEnv -> IORef Bars -> ProgramConfiguration -> Context -> LogAction IO Message -> (QuoteThreadHandle -> IO ()) -> IO () -withQThread env barsMap cfg ctx logger f = do +withQThread :: + DownloaderEnv + -> IORef Bars + -> IORef TickerInfoMap + -> ProgramConfiguration + -> Context + -> LogAction IO Message + -> (QuoteThreadHandle -> IO ()) + -> IO () +withQThread env barsMap tiMap cfg ctx logger f = do securityParameters <- loadSecurityParameters bracket (startQuoteThread barsMap + tiMap ctx (quotesourceEndpoint cfg) securityParameters diff --git a/src/ATrade/Driver/Junction/RobotDriverThread.hs b/src/ATrade/Driver/Junction/RobotDriverThread.hs index ae40d36..2746f60 100644 --- a/src/ATrade/Driver/Junction/RobotDriverThread.hs +++ b/src/ATrade/Driver/Junction/RobotDriverThread.hs @@ -37,7 +37,7 @@ import ATrade.RoboCom.Monad (Event (NewBar, NewTick, N StrategyEnvironment (StrategyEnvironment, _seInstanceId, _seLastTimestamp)) import ATrade.RoboCom.Persistence (MonadPersistence) import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), - Bars) + Bars, TickerInfoMap) import ATrade.Types (Order (orderId), OrderId, OrderState, Trade) import Colog (HasLog (getLogAction, setLogAction), @@ -139,6 +139,7 @@ data RobotEnv c s = configRef :: IORef c, timersRef :: IORef [UTCTime], bars :: IORef Bars, + tickerInfoMap :: IORef TickerInfoMap, env :: IORef StrategyEnvironment, logAction :: LogAction (RobotM c s) Message, brokerService :: Bro.BrokerService, @@ -183,6 +184,10 @@ instance MonadRobot (RobotM c s) c s where b <- asks bars >>= liftIO . readIORef return $ M.lookup (BarSeriesId tid tf) b + getTickerInfo tid = do + b <- asks tickerInfoMap >>= liftIO . readIORef + return $ M.lookup tid b + getAvailableTickers = asks tickers postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m () diff --git a/src/ATrade/RoboCom/Monad.hs b/src/ATrade/RoboCom/Monad.hs index c30c18c..4466c03 100644 --- a/src/ATrade/RoboCom/Monad.hs +++ b/src/ATrade/RoboCom/Monad.hs @@ -51,6 +51,7 @@ class (Monad m) => MonadRobot m c s | m -> c, m -> s where setState (f oldState) getEnvironment :: m StrategyEnvironment getTicker :: TickerId -> BarTimeframe -> m (Maybe BarSeries) + getTickerInfo :: TickerId -> m (Maybe InstrumentParameters) getAvailableTickers :: m (NonEmpty BarSeriesId) getFirstTickerId :: forall c s m. (Monad m, MonadRobot m c s) => m BarSeriesId diff --git a/src/ATrade/RoboCom/Types.hs b/src/ATrade/RoboCom/Types.hs index 136ebcf..305eebb 100644 --- a/src/ATrade/RoboCom/Types.hs +++ b/src/ATrade/RoboCom/Types.hs @@ -11,6 +11,7 @@ module ATrade.RoboCom.Types ( BarSeries(..), Ticker(..), Bars, + TickerInfoMap, InstrumentParameters(..), bsidTickerId ) where @@ -31,6 +32,8 @@ data InstrumentParameters = ipTickSize :: Price } deriving (Show, Eq) +type TickerInfoMap = M.Map TickerId InstrumentParameters + data BarSeries = BarSeries { bsTickerId :: TickerId,