Browse Source

MonadRobot: new method: getTickerInfo

master
Denis Tereshkin 4 years ago
parent
commit
3cca37812f
  1. 14
      src/ATrade/Driver/Junction.hs
  2. 27
      src/ATrade/Driver/Junction/QuoteThread.hs
  3. 7
      src/ATrade/Driver/Junction/RobotDriverThread.hs
  4. 1
      src/ATrade/RoboCom/Monad.hs
  5. 3
      src/ATrade/RoboCom/Types.hs

14
src/ATrade/Driver/Junction.hs

@ -52,7 +52,7 @@ import ATrade.RoboCom.ConfigStorage (ConfigStorage (loa
import ATrade.RoboCom.Monad (StrategyEnvironment (..)) import ATrade.RoboCom.Monad (StrategyEnvironment (..))
import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState))
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId),
Bars) Bars, TickerInfoMap)
import ATrade.Types (ClientSecurityParams (ClientSecurityParams), import ATrade.Types (ClientSecurityParams (ClientSecurityParams),
OrderId, OrderId,
Trade (tradeOrderId)) Trade (tradeOrderId))
@ -192,6 +192,7 @@ junctionMain descriptors = do
let log = logWith (logger h) let log = logWith (logger h)
barsMap <- newIORef M.empty barsMap <- newIORef M.empty
tickerInfoMap <- newIORef M.empty
log Info "Junction" $ "Connecting to redis: " <> redisSocket cfg log Info "Junction" $ "Connecting to redis: " <> redisSocket cfg
redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) }) redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) })
@ -204,7 +205,7 @@ junctionMain descriptors = do
ordersMap <- newIORef M.empty ordersMap <- newIORef M.empty
handledNotifications <- newIORef S.empty handledNotifications <- newIORef S.empty
withBroker cfg ctx robotsMap ordersMap handledNotifications (logger h) $ \bro -> withBroker cfg ctx robotsMap ordersMap handledNotifications (logger h) $ \bro ->
withQThread downloaderEnv barsMap cfg ctx (logger h) $ \qt -> do withQThread downloaderEnv barsMap tickerInfoMap cfg ctx (logger h) $ \qt -> do
broService <- mkBrokerService bro ordersMap broService <- mkBrokerService bro ordersMap
let junctionLogAction = logger h let junctionLogAction = logger h
let env = let env =
@ -218,7 +219,7 @@ junctionMain descriptors = do
peLogAction = junctionLogAction peLogAction = junctionLogAction
} }
withJunction env $ do withJunction env $ do
startRobots h cfg barsMap broService startRobots h cfg barsMap tickerInfoMap broService
forever $ do forever $ do
notifications <- liftIO $ getNotifications broService notifications <- liftIO $ getNotifications broService
forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications (logger h)) forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications (logger h))
@ -237,8 +238,8 @@ junctionMain descriptors = do
currentTimers <- liftIO $ readIORef (strategyTimers inst) currentTimers <- liftIO $ readIORef (strategyTimers inst)
saveState currentTimers (strategyInstanceId inst <> ":timers") saveState currentTimers (strategyInstanceId inst <> ":timers")
startRobots :: Handle -> ProgramConfiguration -> IORef Bars -> BrokerService -> JunctionM () startRobots :: Handle -> ProgramConfiguration -> IORef Bars -> IORef TickerInfoMap -> BrokerService -> JunctionM ()
startRobots logHandle cfg barsMap broService = forM_ (instances cfg) $ \inst -> do startRobots logHandle cfg barsMap tickerInfoMap broService = forM_ (instances cfg) $ \inst -> do
now <- liftIO getCurrentTime now <- liftIO getCurrentTime
case M.lookup (strategyBaseName inst) descriptors of case M.lookup (strategyBaseName inst) descriptors of
Just (StrategyDescriptorE desc) -> do Just (StrategyDescriptorE desc) -> do
@ -258,7 +259,8 @@ junctionMain descriptors = do
_seVolume = 1, _seVolume = 1,
_seLastTimestamp = now _seLastTimestamp = now
} }
let robotEnv = RobotEnv rState rConf rTimers barsMap stratEnv robotLogAction broService (toBarSeriesId <$> (firstTicker :| restTickers)) let robotEnv =
RobotEnv rState rConf rTimers barsMap tickerInfoMap stratEnv robotLogAction broService (toBarSeriesId <$> (firstTicker :| restTickers))
robot <- createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState rTimers robot <- createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState rTimers
robotsMap' <- asks peRobots robotsMap' <- asks peRobots
liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ())) liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ()))

27
src/ATrade/Driver/Junction/QuoteThread.hs

@ -36,7 +36,8 @@ import ATrade.RoboCom.Types (Bar (barSecurity),
BarSeries (..), BarSeries (..),
BarSeriesId (BarSeriesId), BarSeriesId (BarSeriesId),
Bars, Bars,
InstrumentParameters (InstrumentParameters)) InstrumentParameters (InstrumentParameters),
TickerInfoMap)
import ATrade.Types (BarTimeframe (BarTimeframe), import ATrade.Types (BarTimeframe (BarTimeframe),
ClientSecurityParams (ClientSecurityParams), ClientSecurityParams (ClientSecurityParams),
Tick (security), Tick (security),
@ -78,7 +79,7 @@ data QuoteThreadEnv =
bars :: IORef Bars, bars :: IORef Bars,
endpoints :: IORef (HM.HashMap QuoteSubscription [BoundedChan QuoteData]), endpoints :: IORef (HM.HashMap QuoteSubscription [BoundedChan QuoteData]),
qsclient :: QuoteSourceClientHandle, qsclient :: QuoteSourceClientHandle,
paramsCache :: IORef (M.Map TickerId InstrumentParameters), paramsCache :: IORef TickerInfoMap,
downloaderChan :: BoundedChan QuoteSubscription downloaderChan :: BoundedChan QuoteSubscription
} }
@ -88,17 +89,18 @@ startQuoteThread :: (MonadIO m,
HistoryProvider m1, HistoryProvider m1,
TickerInfoProvider m1) => TickerInfoProvider m1) =>
IORef Bars -> IORef Bars ->
IORef TickerInfoMap ->
Context -> Context ->
T.Text -> T.Text ->
ClientSecurityParams -> ClientSecurityParams ->
(m1 () -> IO ()) -> (m1 () -> IO ()) ->
LogAction IO Message -> LogAction IO Message ->
m QuoteThreadHandle m QuoteThreadHandle
startQuoteThread barsRef ctx ep secparams downloadThreadRunner logger = do startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do
chan <- liftIO $ newBoundedChan 2000 chan <- liftIO $ newBoundedChan 2000
dChan <- liftIO $ newBoundedChan 2000 dChan <- liftIO $ newBoundedChan 2000
qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger
env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> newIORef M.empty <*> pure dChan env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan
tid <- liftIO . forkIO $ quoteThread env chan tid <- liftIO . forkIO $ quoteThread env chan
downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan)
return $ QuoteThreadHandle tid downloaderTid env return $ QuoteThreadHandle tid downloaderTid env
@ -119,7 +121,9 @@ startQuoteThread barsRef ctx ep secparams downloadThreadRunner logger = do
Nothing -> case mbParams of Nothing -> case mbParams of
Just params -> do Just params -> do
now <- liftIO getCurrentTime now <- liftIO getCurrentTime
barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) now -- Load data in interval [today - 60days; today + 1day]. +1 day guarantees that we will download data up until current time.
-- If we don't make this adjustment it is possible that we will get data only up to beginning of current day.
barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) (86400 `addUTCTime` now)
let barSeries = BarSeries tickerid tf barsData params let barSeries = BarSeries tickerid tf barsData params
liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ()))
_ -> return () -- TODO log _ -> return () -- TODO log
@ -200,12 +204,21 @@ instance TickerInfoProvider DownloaderM where
(fromInteger $ tiLotSize ti) (fromInteger $ tiLotSize ti)
(tiTickSize ti) (tiTickSize ti)
withQThread :: DownloaderEnv -> IORef Bars -> ProgramConfiguration -> Context -> LogAction IO Message -> (QuoteThreadHandle -> IO ()) -> IO () withQThread ::
withQThread env barsMap cfg ctx logger f = do DownloaderEnv
-> IORef Bars
-> IORef TickerInfoMap
-> ProgramConfiguration
-> Context
-> LogAction IO Message
-> (QuoteThreadHandle -> IO ())
-> IO ()
withQThread env barsMap tiMap cfg ctx logger f = do
securityParameters <- loadSecurityParameters securityParameters <- loadSecurityParameters
bracket bracket
(startQuoteThread (startQuoteThread
barsMap barsMap
tiMap
ctx ctx
(quotesourceEndpoint cfg) (quotesourceEndpoint cfg)
securityParameters securityParameters

7
src/ATrade/Driver/Junction/RobotDriverThread.hs

@ -37,7 +37,7 @@ import ATrade.RoboCom.Monad (Event (NewBar, NewTick, N
StrategyEnvironment (StrategyEnvironment, _seInstanceId, _seLastTimestamp)) StrategyEnvironment (StrategyEnvironment, _seInstanceId, _seLastTimestamp))
import ATrade.RoboCom.Persistence (MonadPersistence) import ATrade.RoboCom.Persistence (MonadPersistence)
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId),
Bars) Bars, TickerInfoMap)
import ATrade.Types (Order (orderId), OrderId, import ATrade.Types (Order (orderId), OrderId,
OrderState, Trade) OrderState, Trade)
import Colog (HasLog (getLogAction, setLogAction), import Colog (HasLog (getLogAction, setLogAction),
@ -139,6 +139,7 @@ data RobotEnv c s =
configRef :: IORef c, configRef :: IORef c,
timersRef :: IORef [UTCTime], timersRef :: IORef [UTCTime],
bars :: IORef Bars, bars :: IORef Bars,
tickerInfoMap :: IORef TickerInfoMap,
env :: IORef StrategyEnvironment, env :: IORef StrategyEnvironment,
logAction :: LogAction (RobotM c s) Message, logAction :: LogAction (RobotM c s) Message,
brokerService :: Bro.BrokerService, brokerService :: Bro.BrokerService,
@ -183,6 +184,10 @@ instance MonadRobot (RobotM c s) c s where
b <- asks bars >>= liftIO . readIORef b <- asks bars >>= liftIO . readIORef
return $ M.lookup (BarSeriesId tid tf) b return $ M.lookup (BarSeriesId tid tf) b
getTickerInfo tid = do
b <- asks tickerInfoMap >>= liftIO . readIORef
return $ M.lookup tid b
getAvailableTickers = asks tickers getAvailableTickers = asks tickers
postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m () postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m ()

1
src/ATrade/RoboCom/Monad.hs

@ -51,6 +51,7 @@ class (Monad m) => MonadRobot m c s | m -> c, m -> s where
setState (f oldState) setState (f oldState)
getEnvironment :: m StrategyEnvironment getEnvironment :: m StrategyEnvironment
getTicker :: TickerId -> BarTimeframe -> m (Maybe BarSeries) getTicker :: TickerId -> BarTimeframe -> m (Maybe BarSeries)
getTickerInfo :: TickerId -> m (Maybe InstrumentParameters)
getAvailableTickers :: m (NonEmpty BarSeriesId) getAvailableTickers :: m (NonEmpty BarSeriesId)
getFirstTickerId :: forall c s m. (Monad m, MonadRobot m c s) => m BarSeriesId getFirstTickerId :: forall c s m. (Monad m, MonadRobot m c s) => m BarSeriesId

3
src/ATrade/RoboCom/Types.hs

@ -11,6 +11,7 @@ module ATrade.RoboCom.Types (
BarSeries(..), BarSeries(..),
Ticker(..), Ticker(..),
Bars, Bars,
TickerInfoMap,
InstrumentParameters(..), InstrumentParameters(..),
bsidTickerId bsidTickerId
) where ) where
@ -31,6 +32,8 @@ data InstrumentParameters =
ipTickSize :: Price ipTickSize :: Price
} deriving (Show, Eq) } deriving (Show, Eq)
type TickerInfoMap = M.Map TickerId InstrumentParameters
data BarSeries = data BarSeries =
BarSeries { BarSeries {
bsTickerId :: TickerId, bsTickerId :: TickerId,

Loading…
Cancel
Save