From f17fc3311653b0ab5bc3ce653f703e1d5fab1846 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 3 Jun 2021 23:35:23 +0700 Subject: [PATCH] Refactoring: history request --- robocom-zero.cabal | 2 +- src/ATrade/BarAggregator.hs | 6 +- src/ATrade/Driver/Real.hs | 126 ++++++++---------------------------- src/ATrade/Quotes.hs | 19 ++++++ src/ATrade/Quotes/QHP.hs | 46 +++++++++++-- 5 files changed, 95 insertions(+), 104 deletions(-) create mode 100644 src/ATrade/Quotes.hs diff --git a/robocom-zero.cabal b/robocom-zero.cabal index 6014371..130d55a 100644 --- a/robocom-zero.cabal +++ b/robocom-zero.cabal @@ -18,10 +18,10 @@ library ghc-options: -Wall -fno-warn-orphans -Wno-type-defaults exposed-modules: ATrade.RoboCom.Indicators , ATrade.RoboCom.Monad - , ATrade.RoboCom.Persistence , ATrade.RoboCom.Positions , ATrade.RoboCom.Types , ATrade.RoboCom.Utils + , ATrade.Quotes , ATrade.Quotes.Finam , ATrade.Quotes.HAP , ATrade.Quotes.QHP diff --git a/src/ATrade/BarAggregator.hs b/src/ATrade/BarAggregator.hs index 1b14ac4..904ec74 100644 --- a/src/ATrade/BarAggregator.hs +++ b/src/ATrade/BarAggregator.hs @@ -22,7 +22,8 @@ module ATrade.BarAggregator ( handleTick, updateTime, handleBar, - hmsToDiffTime + hmsToDiffTime, + replaceHistory ) where import ATrade.RoboCom.Types @@ -47,6 +48,9 @@ mkAggregatorFromBars myBars timeWindows = BarAggregator { lastTicks = M.empty, tickTimeWindows = timeWindows } +replaceHistory :: BarAggregator -> M.Map TickerId BarSeries -> BarAggregator +replaceHistory agg bars' = agg { bars = bars' } + lBars :: (M.Map TickerId BarSeries -> Identity (M.Map TickerId BarSeries)) -> BarAggregator -> Identity BarAggregator lBars = lens bars (\s b -> s { bars = b }) diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index db918c1..2d39f1e 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -5,6 +5,7 @@ {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE DeriveGeneric #-} module ATrade.Driver.Real ( Strategy(..), @@ -44,6 +45,7 @@ import Data.Time.Clock.POSIX import Data.Maybe import Database.Redis hiding (info, decode) import ATrade.Types +import ATrade.Quotes import ATrade.RoboCom.Monad (EventCallback, Event(..), StrategyEnvironment(..), seBars, seLastTimestamp, Event(..), MonadRobot(..)) import ATrade.BarAggregator import ATrade.Driver.Real.BrokerClientThread @@ -55,6 +57,7 @@ import ATrade.Quotes.Finam as QF import ATrade.Quotes.QHP as QQ import ATrade.Quotes.HAP as QH import System.ZMQ4 hiding (Event(..)) +import GHC.Generics data Params = Params { instanceId :: String, @@ -110,7 +113,8 @@ paramsParser = Params ( long "source-timeframe" <> metavar "SECONDS" )) -data Env c s = Env { +data Env historySource c s = Env { + envHistorySource :: historySource, envStrategyInstanceParams :: StrategyInstanceParams, envStrategyEnvironment :: IORef StrategyEnvironment, envConfigRef :: IORef c, @@ -120,11 +124,11 @@ data Env c s = Env { envEventChan :: BC.BoundedChan Event, envAggregator :: IORef BarAggregator, envLastTimestamp :: IORef UTCTime -} +} deriving (Generic) -type App c s = ReaderT (Env c s) IO +type App historySource c s = ReaderT (Env historySource c s) IO -instance MonadRobot (App c s) c s where +instance MonadRobot (App historySource c s) c s where submitOrder order = do bc <- asks envBrokerChan lift $ BC.writeChan bc $ BrokerSubmitOrder order @@ -159,6 +163,11 @@ instance MonadRobot (App c s) c s where now <- lift $ readIORef nowRef return $ env & seBars .~ bars agg & seLastTimestamp .~ now +instance MonadHistory (App QQ.QHPHandle c s) where + getHistory tickerId timeframe fromTime toTime = do + qhp <- asks envHistorySource + QQ.requestHistoryFromQHP qhp tickerId timeframe fromTime toTime + data BigConfig c = BigConfig { confTickers :: [Ticker], strategyConfig :: c @@ -257,6 +266,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do withContext (\ctx -> do infoM "main" "Loading history" -- Load tickers data and create BarAggregator from them +{- historyBars <- if | (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" -> @@ -266,9 +276,12 @@ robotMain dataDownloadDelta defaultState initCallback callback = do | otherwise -> M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy) (downloadDelta strategy)) (tickers . strategyInstanceParams $ strategy) - agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 3 50 0, hmsToDiffTime 21 10 0)] +-} + agg <- newIORef $ mkAggregatorFromBars M.empty [(hmsToDiffTime 3 50 0, hmsToDiffTime 21 10 0)] now <- getCurrentTime >>= newIORef + let env = Env { + envHistorySource = mkQHPHandle ctx (strategyHistoryProvider . strategyInstanceParams $ strategy), envStrategyInstanceParams = instanceParams, envStrategyEnvironment = straEnv, envConfigRef = configRef, @@ -368,98 +381,6 @@ robotMain dataDownloadDelta defaultState initCallback callback = do Nothing -> return defaultState ) `catch` (\e -> warningM "main" ("Unable to load state: " ++ show (e :: IOException)) >> return defaultState) - loadTickerFromHAP :: Context -> T.Text -> DiffTime -> Ticker -> IO (TickerId, BarSeries) - loadTickerFromHAP ctx ep delta t = do - debugM "Strategy" $ "Loading ticker from HAP: " ++ show (code t) - case parseHAPPeriod $ timeframeSeconds t of - Just tf -> do - now <- getCurrentTime - historyBars <- QH.getQuotes ctx QH.RequestParams { - QH.endpoint = ep, - QH.ticker = code t, - QH.startDate = addUTCTime (negate . (1 +) . fromRational . toRational $ delta) now, - QH.endDate = now, - QH.period = tf } - debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars" - return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = historyBars }) - _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) - - - loadTickerFromQHP :: Context -> T.Text -> DiffTime -> Ticker -> IO (TickerId, BarSeries) - loadTickerFromQHP ctx ep delta t = do - debugM "Strategy" $ "Loading ticker from QHP: " ++ show (code t) - case parseQHPPeriod $ timeframeSeconds t of - Just tf -> do - now <- getCurrentTime - historyBars <- QQ.getQuotes ctx QQ.RequestParams { - QQ.endpoint = ep, - QQ.ticker = code t, - QQ.startDate = addDays (negate . (1 +) . ceiling $ delta / 86400) (utctDay now), - QQ.endDate = utctDay now, - QQ.period = tf } - debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars" - debugM "Strategy" $ show (take 20 historyBars) - return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = historyBars }) - _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) - - - loadTickerFromFinam :: DiffTime -> Ticker -> IO (TickerId, BarSeries) - loadTickerFromFinam delta t = do - randDelay <- getStdRandom (randomR (1, 5)) - threadDelay $ randDelay * 1000000 - now <- getCurrentTime - debugM "Strategy" $ show (L.lookup "finam" (aliases t), parseFinamPeriod $ timeframeSeconds t) - case (L.lookup "finam" (aliases t), parseFinamPeriod $ timeframeSeconds t) of - (Just finamCode, Just per) -> do - debugM "Strategy" $ "Downloading ticker: " ++ finamCode - history <- downloadAndParseQuotes $ defaultParams { QF.ticker = T.pack finamCode, - QF.startDate = addDays (negate . (1 +) . ceiling $ delta / 86400) (utctDay now), - QF.endDate = utctDay now, - QF.period = per } - case history of - Just h -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = convertFromFinamHistory (code t) h }) - Nothing -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) - _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) - - convertFromFinamHistory :: TickerId -> [Row] -> [Bar] - convertFromFinamHistory tid = L.reverse . fmap (\row -> Bar { barSecurity = tid, - barTimestamp = rowTime row, - barOpen = rowOpen row, - barHigh = rowHigh row, - barLow = rowLow row, - barClose = rowClose row, - barVolume = rowVolume row }) - - parseFinamPeriod x - | x == 0 = Just QF.PeriodTick - | x == 60 = Just QF.Period1Min - | x == 5 * 60 = Just QF.Period5Min - | x == 10 * 60 = Just QF.Period10Min - | x == 15 * 60 = Just QF.Period15Min - | x == 30 * 60 = Just QF.Period30Min - | x == 60 * 60 = Just QF.PeriodHour - | x == 24 * 60 * 60 = Just QF.PeriodDay - | otherwise = Nothing - - parseQHPPeriod x - | x == 60 = Just QQ.Period1Min - | x == 5 * 60 = Just QQ.Period5Min - | x == 15 * 60 = Just QQ.Period15Min - | x == 30 * 60 = Just QQ.Period30Min - | x == 60 * 60 = Just QQ.PeriodHour - | x == 24 * 60 * 60 = Just QQ.PeriodDay - | otherwise = Nothing - - parseHAPPeriod x - | x == 60 = Just QH.Period1Min - | x == 5 * 60 = Just QH.Period5Min - | x == 15 * 60 = Just QH.Period15Min - | x == 30 * 60 = Just QH.Period30Min - | x == 60 * 60 = Just QH.PeriodHour - | x == 24 * 60 * 60 = Just QH.PeriodDay - | otherwise = Nothing - - -- | Helper function to make 'Strategy' instances mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s mkBarStrategy instanceParams dd params initialState cb = BarStrategy { @@ -473,11 +394,15 @@ mkBarStrategy instanceParams dd params initialState cb = BarStrategy { -- | Main function which handles incoming events (ticks/orders), passes them to strategy callback -- and executes returned strategy actions -barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> MVar () -> App c s () +barStrategyDriver :: (MonadHistory (App hs c s)) => Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> MVar () -> App hs c s () barStrategyDriver ctx mbSourceTimeframe tickFilter strategy shutdownVar = do + now <- liftIO getCurrentTime + history <- M.fromList <$> mapM (loadTickerHistory now) (tickers . strategyInstanceParams $ strategy) eventChan <- asks envEventChan brokerChan <- asks envBrokerChan agg <- asks envAggregator + liftIO $ atomicModifyIORef' agg (\s -> (replaceHistory s history, ())) + bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do lift $ debugM "Strategy" "QuoteSource thread forked" bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do @@ -502,6 +427,11 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy shutdownVar = do qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy brEp = strategyBrokerEp . strategyInstanceParams $ strategy + loadTickerHistory now t = do + history <- getHistory (code t) (BarTimeframe (fromInteger . timeframeSeconds $ t)) + ((fromRational . toRational . negate $ downloadDelta strategy) `addUTCTime` now) now + return (code t, BarSeries (code t) (Timeframe (timeframeSeconds t)) history) + readAndHandleEvents agg strategy' = do eventChan <- asks envEventChan event <- lift $ readChan eventChan diff --git a/src/ATrade/Quotes.hs b/src/ATrade/Quotes.hs new file mode 100644 index 0000000..f322438 --- /dev/null +++ b/src/ATrade/Quotes.hs @@ -0,0 +1,19 @@ + +{- | + - Module : ATrade.Quotes + + - Various historical price series management stuff +-} + +module ATrade.Quotes +( + MonadHistory(..) +) where + +import ATrade.Types (Bar, BarTimeframe, TickerId) +import Data.Time.Clock (UTCTime) + +class (Monad m) => MonadHistory m where + -- | 'getHistory tickerId timeframe fromTime toTime' should return requested timeframe between 'fromTime' and 'toTime' + getHistory :: TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] + diff --git a/src/ATrade/Quotes/QHP.hs b/src/ATrade/Quotes/QHP.hs index 5b414ce..28ed63d 100644 --- a/src/ATrade/Quotes/QHP.hs +++ b/src/ATrade/Quotes/QHP.hs @@ -1,16 +1,21 @@ {-# LANGUAGE OverloadedStrings #-} module ATrade.Quotes.QHP ( - getQuotes, Period(..), - RequestParams(..) + RequestParams(..), + QHPHandle, + mkQHPHandle, + requestHistoryFromQHP ) where +import ATrade.Exceptions import ATrade.Types +import Control.Exception.Safe (MonadThrow, throw) +import Control.Monad.IO.Class (MonadIO, liftIO) import Data.Aeson import Data.Binary.Get -import qualified Data.ByteString.Lazy as BL -import qualified Data.Text as T +import qualified Data.ByteString.Lazy as BL +import qualified Data.Text as T import Data.Time.Calendar import Data.Time.Clock import Data.Time.Clock.POSIX @@ -39,6 +44,39 @@ instance Show Period where show PeriodWeek = "W" show PeriodMonth = "MN" +data QHPHandle = QHPHandle + { + qhpContext :: Context + , qhpEndpoint :: T.Text + } + +mkQHPHandle :: Context -> T.Text -> QHPHandle +mkQHPHandle = QHPHandle + +requestHistoryFromQHP :: (MonadThrow m, MonadIO m) => QHPHandle -> TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] +requestHistoryFromQHP qhp tickerId timeframe fromTime toTime = + case parseQHPPeriod (unBarTimeframe timeframe) of + Just tf -> liftIO $ getQuotes (qhpContext qhp) (params tf) + _ -> throw $ BadParams "QHP: Unable to parse timeframe" + where + params tf = RequestParams + { + endpoint = qhpEndpoint qhp, + ticker = tickerId, + startDate = utctDay fromTime, + endDate = utctDay toTime, + period = tf + } + + parseQHPPeriod x + | x == 60 = Just Period1Min + | x == 5 * 60 = Just Period5Min + | x == 15 * 60 = Just Period15Min + | x == 30 * 60 = Just Period30Min + | x == 60 * 60 = Just PeriodHour + | x == 24 * 60 * 60 = Just PeriodDay + | otherwise = Nothing + data RequestParams = RequestParams {