From 9c5cce031f17dbd5a40bed191d460c4071aa9416 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 6 Jun 2021 14:34:57 +0700 Subject: [PATCH] Refactoring: make QTIS requests in driver --- src/ATrade/Driver/Backtest.hs | 38 ++++++++++++++++++--------- src/ATrade/Driver/Real.hs | 48 ++++++++++++++++++++++------------- src/ATrade/Exceptions.hs | 1 + src/ATrade/Quotes.hs | 10 +++++--- src/ATrade/Quotes/QTIS.hs | 39 +++++++++++++--------------- src/ATrade/RoboCom/Types.hs | 12 +++++++-- 6 files changed, 93 insertions(+), 55 deletions(-) diff --git a/src/ATrade/Driver/Backtest.hs b/src/ATrade/Driver/Backtest.hs index 46a8350..d924dd9 100644 --- a/src/ATrade/Driver/Backtest.hs +++ b/src/ATrade/Driver/Backtest.hs @@ -16,14 +16,16 @@ module ATrade.Driver.Backtest ( import ATrade.Driver.Types (InitializationCallback, StrategyInstanceParams (..)) import ATrade.Exceptions +import ATrade.Quotes import ATrade.Quotes.Finam as QF +import ATrade.Quotes.QTIS import ATrade.RoboCom.Monad (Event (..), EventCallback, MonadRobot (..), StrategyEnvironment (..), appendToLog, seBars, seLastTimestamp) import ATrade.RoboCom.Positions -import ATrade.RoboCom.Types (BarSeries (..), Ticker (..), - Timeframe (..)) +import ATrade.RoboCom.Types (BarSeries (..), Bars, InstrumentParameters (InstrumentParameters), + Ticker (..), Timeframe (..)) import ATrade.Types import Conduit (awaitForever, runConduit, yield, (.|)) @@ -52,13 +54,14 @@ import qualified Data.Vector as V import Options.Applicative hiding (Success) import Prelude hiding (lookup, putStrLn, readFile) import Safe (headMay) +import System.ZMQ4 hiding (Event) data Feed = Feed TickerId FilePath deriving (Show, Eq) data Params = Params { strategyConfigFile :: FilePath, - qtisEndpoint :: Maybe String, + qtisEndpoint :: String, paramsFeeds :: [Feed] } deriving (Show, Eq) @@ -82,8 +85,8 @@ paramsParser = Params <$> strOption ( long "config" <> short 'c' ) - <*> optional ( strOption - ( long "qtis" <> short 'q' <> metavar "ENDPOINT/ID" )) + <*> strOption + ( long "qtis" <> short 'q' <> metavar "ENDPOINT/ID" ) <*> some (option feedArgParser ( long "feed" <> short 'f' )) @@ -103,7 +106,7 @@ backtestMain _dataDownloadDelta defaultState initCallback callback = do strategyAccount = "foo", strategyVolume = 1, tickers = tickerList, - strategyQTISEp = T.pack <$> qtisEndpoint params} + strategyQTISEp = Nothing } updatedConfig <- case initCallback of Just cb -> cb config instanceParams @@ -111,11 +114,24 @@ backtestMain _dataDownloadDelta defaultState initCallback callback = do feeds <- loadFeeds (paramsFeeds params) - runBacktestDriver feeds updatedConfig tickerList + bars <- makeBars (T.pack $ qtisEndpoint params) tickerList + + runBacktestDriver feeds updatedConfig bars where opts = info (helper <*> paramsParser) ( fullDesc <> header "ATrade strategy backtesting framework" ) + makeBars :: T.Text -> [Ticker] -> IO (M.Map TickerId BarSeries) + makeBars qtisEp tickersList = + withContext $ \ctx -> + M.fromList <$> mapM (mkBarEntry ctx qtisEp) tickersList + + mkBarEntry ctx qtisEp tickerEntry = do + info <- qtisGetTickersInfo ctx qtisEp (code tickerEntry) + return (code tickerEntry, BarSeries (code tickerEntry) (Timeframe (timeframeSeconds tickerEntry)) [] (InstrumentParameters (fromInteger $ tiLotSize info) (tiTickSize info))) + + + runBacktestDriver feeds params tickerList = do let s = runConduit $ barStreamFromFeeds feeds .| backtestLoop let finalState = execState (unBacktestingMonad s) $ defaultBacktestState defaultState params tickerList @@ -286,12 +302,10 @@ backtestMain _dataDownloadDelta defaultState initCallback callback = do instance (Default c, Default s) => Default (BacktestState c s) where - def = defaultBacktestState def def [] + def = defaultBacktestState def def def -defaultBacktestState :: s -> c -> [Ticker] -> BacktestState c s -defaultBacktestState s c tickerList = BacktestState 0 s c (StrategyEnvironment "" "" 1 tickers' (UTCTime (fromGregorian 1970 1 1) 0)) [] Seq.empty [] 1 [] [] - where - tickers' = M.fromList $ map (\x -> (code x, BarSeries (code x) (Timeframe (timeframeSeconds x)) [])) tickerList +defaultBacktestState :: s -> c -> Bars -> BacktestState c s +defaultBacktestState s c bars = BacktestState 0 s c (StrategyEnvironment "" "" 1 bars (UTCTime (fromGregorian 1970 1 1) 0)) [] Seq.empty [] 1 [] [] newtype BacktestingMonad s c a = BacktestingMonad { unBacktestingMonad :: State (BacktestState s c) a } deriving (Functor, Applicative, Monad, MonadState (BacktestState s c)) diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 8fcc815..6064561 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -19,14 +19,16 @@ import ATrade.Driver.Real.BrokerClientThread import ATrade.Driver.Real.QuoteSourceThread import ATrade.Driver.Types (InitializationCallback, StrategyInstanceParams (..)) import ATrade.Exceptions -import ATrade.Quotes +import ATrade.Quotes (MonadHistory (..), MonadInstrumentParametersSource (..)) import ATrade.Quotes.QHP as QQ +import ATrade.Quotes.QTIS (TickerInfo (..), + qtisGetTickersInfo) import ATrade.RoboCom.Monad (Event (..), EventCallback, MonadRobot (..), StrategyEnvironment (..), seBars, seLastTimestamp) -import ATrade.RoboCom.Types (BarSeries (..), +import ATrade.RoboCom.Types (BarSeries (..), InstrumentParameters (..), Ticker (..), Timeframe (..)) import ATrade.RoboCom.Utils (fromHMS) @@ -72,7 +74,7 @@ data Params = Params { historyProviderType :: Maybe String, historyProvider :: Maybe String, redisSocket :: Maybe String, - qtisSocket :: Maybe String, + qtisEndpoint :: String, accountId :: String, volumeFactor :: Int, sourceBarTimeframe :: Maybe Int @@ -104,9 +106,9 @@ paramsParser = Params <*> optional ( strOption ( long "redis-socket" <> metavar "ADDRESS" )) - <*> optional ( strOption + <*> strOption ( long "qtis" - <> metavar "ENDPOINT/ID" )) + <> metavar "ENDPOINT/ID" ) <*> strOption ( long "account" <> metavar "ACCOUNT" ) @@ -118,7 +120,9 @@ paramsParser = Params <> metavar "SECONDS" )) data Env historySource c s = Env { + envZeromqContext :: Context, envHistorySource :: historySource, + envQtisEndpoint :: T.Text, envStrategyInstanceParams :: StrategyInstanceParams, envStrategyEnvironment :: IORef StrategyEnvironment, envConfigRef :: IORef c, @@ -172,6 +176,20 @@ instance MonadHistory (App QQ.QHPHandle c s) where qhp <- asks envHistorySource QQ.requestHistoryFromQHP qhp tickerId timeframe fromTime toTime +instance MonadInstrumentParametersSource (App hs c s) where + getInstrumentParameters tickerIds = do + ctx <- asks envZeromqContext + ep <- asks envQtisEndpoint + info <- liftIO $ qtisGetTickersInfo ctx ep tickerIds + return $ (tiTicker info, convert info) + where + convert info = InstrumentParameters + { + ipLotSize = fromInteger $ tiLotSize info, + ipTickSize = tiTickSize info + } + + data BigConfig c = BigConfig { confTickers :: [Ticker], strategyConfig :: c @@ -205,9 +223,6 @@ storeState params stateRef timersRef = do Left _ -> warningM "main" "Unable to save state" Right _ -> return () - - - gracefulShutdown :: (ToJSON s) => Params -> IORef s -> IORef [UTCTime] -> MVar () -> Signal -> IO () gracefulShutdown params stateRef timersRef shutdownMv _ = do infoM "main" "Shutdown, saving state" @@ -215,8 +230,8 @@ gracefulShutdown params stateRef timersRef shutdownMv _ = do putMVar shutdownMv () exitSuccess -robotMain :: (ToJSON s, FromJSON s, FromJSON c) => DiffTime -> s -> Maybe (InitializationCallback c) -> EventCallback c s -> IO () -robotMain dataDownloadDelta defaultState initCallback callback = do +robotMain :: (ToJSON s, FromJSON s, FromJSON c) => DiffTime -> s -> EventCallback c s -> IO () +robotMain dataDownloadDelta defaultState callback = do params <- execParser opts initLogging params infoM "main" "Starting" @@ -230,14 +245,10 @@ robotMain dataDownloadDelta defaultState initCallback callback = do strategyAccount = T.pack . accountId $ params, strategyVolume = volumeFactor params, tickers = tickerList, - strategyQTISEp = T.pack <$> qtisSocket params} - - updatedConfig <- case initCallback of - Just cb -> cb config instanceParams - Nothing -> return config + strategyQTISEp = Nothing } stateRef <- newIORef stratState - configRef <- newIORef updatedConfig + configRef <- newIORef config timersRef <- newIORef timersState shutdownMv <- newEmptyMVar installHandler sigINT (gracefulShutdown params stateRef timersRef shutdownMv) @@ -273,6 +284,8 @@ robotMain dataDownloadDelta defaultState initCallback callback = do now <- getCurrentTime >>= newIORef let env = Env { + envZeromqContext = ctx, + envQtisEndpoint = T.pack . qtisEndpoint $ params, envHistorySource = mkQHPHandle ctx (T.pack . fromMaybe "" . historyProvider $ params), envStrategyInstanceParams = instanceParams, envStrategyEnvironment = straEnv, @@ -393,7 +406,8 @@ barStrategyDriver downloadDelta instanceParams callback shutdownVar = do loadTickerHistory now t = do history <- getHistory (code t) (BarTimeframe (fromInteger . timeframeSeconds $ t)) ((fromRational . toRational . negate $ downloadDelta) `addUTCTime` now) now - return (code t, BarSeries (code t) (Timeframe (timeframeSeconds t)) history) + instrumentParams <- snd <$> getInstrumentParameters (code t) + return (code t, BarSeries (code t) (Timeframe (timeframeSeconds t)) history instrumentParams) readAndHandleEvents agg instanceParams' = do eventChan <- asks envEventChan diff --git a/src/ATrade/Exceptions.hs b/src/ATrade/Exceptions.hs index 42aec43..6fd7d12 100644 --- a/src/ATrade/Exceptions.hs +++ b/src/ATrade/Exceptions.hs @@ -13,6 +13,7 @@ data RoboComException = UnableToLoadConfig T.Text | UnableToLoadState T.Text | UnableToSaveState T.Text | BadParams T.Text + | QTISFailure T.Text deriving (Show, Generic) instance Exception RoboComException diff --git a/src/ATrade/Quotes.hs b/src/ATrade/Quotes.hs index f322438..0c11452 100644 --- a/src/ATrade/Quotes.hs +++ b/src/ATrade/Quotes.hs @@ -7,13 +7,17 @@ module ATrade.Quotes ( - MonadHistory(..) + MonadHistory(..) + , MonadInstrumentParametersSource(..) ) where -import ATrade.Types (Bar, BarTimeframe, TickerId) -import Data.Time.Clock (UTCTime) +import ATrade.RoboCom.Types (InstrumentParameters (..)) +import ATrade.Types (Bar, BarTimeframe, TickerId) +import Data.Time.Clock (UTCTime) class (Monad m) => MonadHistory m where -- | 'getHistory tickerId timeframe fromTime toTime' should return requested timeframe between 'fromTime' and 'toTime' getHistory :: TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] +class (Monad m) => MonadInstrumentParametersSource m where + getInstrumentParameters :: TickerId -> m (TickerId, InstrumentParameters) diff --git a/src/ATrade/Quotes/QTIS.hs b/src/ATrade/Quotes/QTIS.hs index 19a01a7..b4d2163 100644 --- a/src/ATrade/Quotes/QTIS.hs +++ b/src/ATrade/Quotes/QTIS.hs @@ -3,17 +3,16 @@ module ATrade.Quotes.QTIS ( TickerInfo(..), - qtisGetTickersInfo, - qtisGetTickersInfo' + qtisGetTickersInfo ) where +import ATrade.Exceptions import ATrade.Types -import Control.Monad +import Control.Exception.Safe import Data.Aeson -import qualified Data.ByteString.Char8 as BC8 -import qualified Data.ByteString.Lazy as BL -import Data.Maybe -import qualified Data.Text as T +import qualified Data.ByteString.Char8 as BC8 +import qualified Data.ByteString.Lazy as BL +import qualified Data.Text as T import System.Log.Logger import System.ZMQ4 @@ -35,23 +34,21 @@ instance ToJSON TickerInfo where "lot_size" .= tiLotSize ti, "tick_size" .= tiTickSize ti ] -qtisGetTickersInfo' :: T.Text -> [TickerId] -> IO [TickerInfo] -qtisGetTickersInfo' endpoint tickers = withContext (\ctx -> qtisGetTickersInfo ctx endpoint tickers) - -qtisGetTickersInfo :: Context -> T.Text -> [TickerId] -> IO [TickerInfo] -qtisGetTickersInfo ctx endpoint tickers = - withSocket ctx Req (\sock -> do +qtisGetTickersInfo :: Context -> T.Text -> TickerId -> IO TickerInfo +qtisGetTickersInfo ctx endpoint tickerId = + withSocket ctx Req $ \sock -> do debugM "QTIS" $ "Connecting to: " ++ T.unpack endpoint connect sock $ T.unpack endpoint - catMaybes <$> forM tickers (\tickerId -> do - debugM "QTIS" $ "Requesting: " ++ T.unpack tickerId - send sock [] $ BL.toStrict (tickerRequest tickerId) - response <- receiveMulti sock - let r = parseResponse response - debugM "QTIS" $ "Got response: " ++ show r - return r)) + debugM "QTIS" $ "Requesting: " ++ T.unpack tickerId + send sock [] $ BL.toStrict tickerRequest + response <- receiveMulti sock + let r = parseResponse response + debugM "QTIS" $ "Got response: " ++ show r + case r of + Just resp -> return resp + Nothing -> throw $ QTISFailure "Can't parse response" where - tickerRequest tickerId = encode $ object ["ticker" .= tickerId] + tickerRequest = encode $ object ["ticker" .= tickerId] parseResponse :: [BC8.ByteString] -> Maybe TickerInfo parseResponse (header:payload:_) = if header == "OK" then decode $ BL.fromStrict payload diff --git a/src/ATrade/RoboCom/Types.hs b/src/ATrade/RoboCom/Types.hs index 1c8e8e2..935e798 100644 --- a/src/ATrade/RoboCom/Types.hs +++ b/src/ATrade/RoboCom/Types.hs @@ -10,7 +10,8 @@ module ATrade.RoboCom.Types ( Timeframe(..), tfSeconds, Ticker(..), - Bars + Bars, + InstrumentParameters(..) ) where import ATrade.Types @@ -26,11 +27,18 @@ newtype Timeframe = tfSeconds :: (Num a) => Timeframe -> a tfSeconds (Timeframe s) = fromInteger s +data InstrumentParameters = + InstrumentParameters { + ipLotSize :: Int, + ipTickSize :: Price + } deriving (Show, Eq) + data BarSeries = BarSeries { bsTickerId :: TickerId, bsTimeframe :: Timeframe, - bsBars :: [Bar] + bsBars :: [Bar], + bsParams :: InstrumentParameters } deriving (Show, Eq) -- | Ticker description record