From 9d5cb7f076f4c9fa54e1187e29aa77aeb52709cd Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Fri, 4 Jun 2021 08:27:58 +0700 Subject: [PATCH] More real driver refactoring --- src/ATrade/Driver/Backtest.hs | 4 -- src/ATrade/Driver/Real.hs | 77 ++++++++++++++++------------------- src/ATrade/Driver/Types.hs | 14 +++---- 3 files changed, 41 insertions(+), 54 deletions(-) diff --git a/src/ATrade/Driver/Backtest.hs b/src/ATrade/Driver/Backtest.hs index d93bc47..e4c94c5 100644 --- a/src/ATrade/Driver/Backtest.hs +++ b/src/ATrade/Driver/Backtest.hs @@ -102,10 +102,6 @@ backtestMain _dataDownloadDelta defaultState initCallback callback = do strategyAccount = "foo", strategyVolume = 1, tickers = tickerList, - strategyQuotesourceEp = "", - strategyBrokerEp = "", - strategyHistoryProviderType = "", - strategyHistoryProvider = "", strategyQTISEp = T.pack <$> qtisEndpoint params} updatedConfig <- case initCallback of diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 55efe45..83f11a9 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -223,10 +223,6 @@ robotMain dataDownloadDelta defaultState initCallback callback = do strategyAccount = T.pack . accountId $ params, strategyVolume = volumeFactor params, tickers = tickerList, - strategyQuotesourceEp = T.pack . quotesourceEp $ params, - strategyBrokerEp = T.pack . brokerEp $ params, - strategyHistoryProviderType = T.pack $ fromMaybe "finam" $ historyProviderType params, - strategyHistoryProvider = T.pack $ fromMaybe "" $ historyProvider params, strategyQTISEp = T.pack <$> qtisSocket params} updatedConfig <- case initCallback of @@ -261,22 +257,30 @@ robotMain dataDownloadDelta defaultState initCallback callback = do debugM "main" "Starting strategy driver" withContext (\ctx -> do + + let qsEp = T.pack $ quotesourceEp params + let brEp = T.pack $ brokerEp params agg <- newIORef $ mkAggregatorFromBars M.empty [(hmsToDiffTime 3 50 0, hmsToDiffTime 21 10 0)] - now <- getCurrentTime >>= newIORef - - let env = Env { - envHistorySource = mkQHPHandle ctx (strategyHistoryProvider . strategyInstanceParams $ strategy), - envStrategyInstanceParams = instanceParams, - envStrategyEnvironment = straEnv, - envConfigRef = configRef, - envStateRef = stateRef, - envBrokerChan = brokerChan, - envTimers = timersRef, - envEventChan = eventChan, - envAggregator = agg, - envLastTimestamp = now - } - runReaderT (barStrategyDriver ctx (sourceBarTimeframe params) tickFilter strategy shutdownMv) env `finally` killThread stateSavingThread) + bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter (sourceBarTimeframe params)) killThread $ \_ -> do + debugM "Strategy" "QuoteSource thread forked" + bracket (startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownMv) killThread $ \_ -> do + debugM "Strategy" "Broker thread forked" + + now <- getCurrentTime >>= newIORef + + let env = Env { + envHistorySource = mkQHPHandle ctx (T.pack . fromMaybe "" . historyProvider $ params), + envStrategyInstanceParams = instanceParams, + envStrategyEnvironment = straEnv, + envConfigRef = configRef, + envStateRef = stateRef, + envBrokerChan = brokerChan, + envTimers = timersRef, + envEventChan = eventChan, + envAggregator = agg, + envLastTimestamp = now + } + runReaderT (barStrategyDriver strategy shutdownMv) env `finally` killThread stateSavingThread) where tickFilter :: Tick -> Bool tickFilter tick = @@ -378,8 +382,8 @@ mkBarStrategy instanceParams dd params initialState cb = BarStrategy { -- | Main function which handles incoming events (ticks/orders), passes them to strategy callback -- and executes returned strategy actions -barStrategyDriver :: (MonadHistory (App hs c s)) => Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> MVar () -> App hs c s () -barStrategyDriver ctx mbSourceTimeframe tickFilter strategy shutdownVar = do +barStrategyDriver :: (MonadHistory (App hs c s)) => Strategy c s -> MVar () -> App hs c s () +barStrategyDriver strategy shutdownVar = do now <- liftIO getCurrentTime history <- M.fromList <$> mapM (loadTickerHistory now) (tickers . strategyInstanceParams $ strategy) eventChan <- asks envEventChan @@ -387,29 +391,20 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy shutdownVar = do agg <- asks envAggregator liftIO $ atomicModifyIORef' agg (\s -> (replaceHistory s history, ())) - bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do - lift $ debugM "Strategy" "QuoteSource thread forked" - bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do - lift $ debugM "Strategy" "Broker thread forked" - - wakeupTid <- lift . forkIO $ forever $ do - maybeShutdown <- tryTakeMVar shutdownVar - if isJust maybeShutdown - then writeChan eventChan Shutdown - else do - threadDelay 1000000 - writeChan brokerChan BrokerRequestNotifications - lift $ debugM "Strategy" "Wakeup thread forked" - - readAndHandleEvents agg strategy - lift $ debugM "Strategy" "Stopping strategy driver" - lift $ killThread wakeupTid)) + wakeupTid <- lift . forkIO $ forever $ do + maybeShutdown <- tryTakeMVar shutdownVar + if isJust maybeShutdown + then writeChan eventChan Shutdown + else do + threadDelay 1000000 + writeChan brokerChan BrokerRequestNotifications + lift $ debugM "Strategy" "Wakeup thread forked" - lift $ debugM "Strategy" "Strategy done" + readAndHandleEvents agg strategy + lift $ debugM "Strategy" "Stopping strategy driver" + lift $ killThread wakeupTid where - qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy - brEp = strategyBrokerEp . strategyInstanceParams $ strategy loadTickerHistory now t = do history <- getHistory (code t) (BarTimeframe (fromInteger . timeframeSeconds $ t)) diff --git a/src/ATrade/Driver/Types.hs b/src/ATrade/Driver/Types.hs index ce247f6..74b8393 100644 --- a/src/ATrade/Driver/Types.hs +++ b/src/ATrade/Driver/Types.hs @@ -26,15 +26,11 @@ data Strategy c s = BarStrategy { -- | Strategy instance params store few params which are common for all strategies data StrategyInstanceParams = StrategyInstanceParams { - strategyInstanceId :: T.Text, -- ^ Strategy instance identifier. Should be unique among all strategies (very desirable) - strategyAccount :: T.Text, -- ^ Account string to use for this strategy instance. Broker-dependent - strategyVolume :: Int, -- ^ Volume to use for this instance (in lots/contracts) - tickers :: [Ticker], -- ^ List of tickers which is used by this strategy - strategyQuotesourceEp :: T.Text, -- ^ QuoteSource server endpoint - strategyBrokerEp :: T.Text, -- ^ Broker server endpoint - strategyHistoryProviderType :: T.Text, - strategyHistoryProvider :: T.Text, - strategyQTISEp :: Maybe T.Text + strategyInstanceId :: T.Text, -- ^ Strategy instance identifier. Should be unique among all strategies (very desirable) + strategyAccount :: T.Text, -- ^ Account string to use for this strategy instance. Broker-dependent + strategyVolume :: Int, -- ^ Volume to use for this instance (in lots/contracts) + tickers :: [Ticker], -- ^ List of tickers which is used by this strategy + strategyQTISEp :: Maybe T.Text } type InitializationCallback c = c -> StrategyInstanceParams -> IO c