Browse Source

More real driver refactoring

stable
Denis Tereshkin 5 years ago
parent
commit
9d5cb7f076
  1. 4
      src/ATrade/Driver/Backtest.hs
  2. 77
      src/ATrade/Driver/Real.hs
  3. 14
      src/ATrade/Driver/Types.hs

4
src/ATrade/Driver/Backtest.hs

@ -102,10 +102,6 @@ backtestMain _dataDownloadDelta defaultState initCallback callback = do
strategyAccount = "foo", strategyAccount = "foo",
strategyVolume = 1, strategyVolume = 1,
tickers = tickerList, tickers = tickerList,
strategyQuotesourceEp = "",
strategyBrokerEp = "",
strategyHistoryProviderType = "",
strategyHistoryProvider = "",
strategyQTISEp = T.pack <$> qtisEndpoint params} strategyQTISEp = T.pack <$> qtisEndpoint params}
updatedConfig <- case initCallback of updatedConfig <- case initCallback of

77
src/ATrade/Driver/Real.hs

@ -223,10 +223,6 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
strategyAccount = T.pack . accountId $ params, strategyAccount = T.pack . accountId $ params,
strategyVolume = volumeFactor params, strategyVolume = volumeFactor params,
tickers = tickerList, tickers = tickerList,
strategyQuotesourceEp = T.pack . quotesourceEp $ params,
strategyBrokerEp = T.pack . brokerEp $ params,
strategyHistoryProviderType = T.pack $ fromMaybe "finam" $ historyProviderType params,
strategyHistoryProvider = T.pack $ fromMaybe "" $ historyProvider params,
strategyQTISEp = T.pack <$> qtisSocket params} strategyQTISEp = T.pack <$> qtisSocket params}
updatedConfig <- case initCallback of updatedConfig <- case initCallback of
@ -261,22 +257,30 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
debugM "main" "Starting strategy driver" debugM "main" "Starting strategy driver"
withContext (\ctx -> do withContext (\ctx -> do
let qsEp = T.pack $ quotesourceEp params
let brEp = T.pack $ brokerEp params
agg <- newIORef $ mkAggregatorFromBars M.empty [(hmsToDiffTime 3 50 0, hmsToDiffTime 21 10 0)] agg <- newIORef $ mkAggregatorFromBars M.empty [(hmsToDiffTime 3 50 0, hmsToDiffTime 21 10 0)]
now <- getCurrentTime >>= newIORef bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter (sourceBarTimeframe params)) killThread $ \_ -> do
debugM "Strategy" "QuoteSource thread forked"
let env = Env { bracket (startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownMv) killThread $ \_ -> do
envHistorySource = mkQHPHandle ctx (strategyHistoryProvider . strategyInstanceParams $ strategy), debugM "Strategy" "Broker thread forked"
envStrategyInstanceParams = instanceParams,
envStrategyEnvironment = straEnv, now <- getCurrentTime >>= newIORef
envConfigRef = configRef,
envStateRef = stateRef, let env = Env {
envBrokerChan = brokerChan, envHistorySource = mkQHPHandle ctx (T.pack . fromMaybe "" . historyProvider $ params),
envTimers = timersRef, envStrategyInstanceParams = instanceParams,
envEventChan = eventChan, envStrategyEnvironment = straEnv,
envAggregator = agg, envConfigRef = configRef,
envLastTimestamp = now envStateRef = stateRef,
} envBrokerChan = brokerChan,
runReaderT (barStrategyDriver ctx (sourceBarTimeframe params) tickFilter strategy shutdownMv) env `finally` killThread stateSavingThread) envTimers = timersRef,
envEventChan = eventChan,
envAggregator = agg,
envLastTimestamp = now
}
runReaderT (barStrategyDriver strategy shutdownMv) env `finally` killThread stateSavingThread)
where where
tickFilter :: Tick -> Bool tickFilter :: Tick -> Bool
tickFilter tick = tickFilter tick =
@ -378,8 +382,8 @@ mkBarStrategy instanceParams dd params initialState cb = BarStrategy {
-- | Main function which handles incoming events (ticks/orders), passes them to strategy callback -- | Main function which handles incoming events (ticks/orders), passes them to strategy callback
-- and executes returned strategy actions -- and executes returned strategy actions
barStrategyDriver :: (MonadHistory (App hs c s)) => Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> MVar () -> App hs c s () barStrategyDriver :: (MonadHistory (App hs c s)) => Strategy c s -> MVar () -> App hs c s ()
barStrategyDriver ctx mbSourceTimeframe tickFilter strategy shutdownVar = do barStrategyDriver strategy shutdownVar = do
now <- liftIO getCurrentTime now <- liftIO getCurrentTime
history <- M.fromList <$> mapM (loadTickerHistory now) (tickers . strategyInstanceParams $ strategy) history <- M.fromList <$> mapM (loadTickerHistory now) (tickers . strategyInstanceParams $ strategy)
eventChan <- asks envEventChan eventChan <- asks envEventChan
@ -387,29 +391,20 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy shutdownVar = do
agg <- asks envAggregator agg <- asks envAggregator
liftIO $ atomicModifyIORef' agg (\s -> (replaceHistory s history, ())) liftIO $ atomicModifyIORef' agg (\s -> (replaceHistory s history, ()))
bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do wakeupTid <- lift . forkIO $ forever $ do
lift $ debugM "Strategy" "QuoteSource thread forked" maybeShutdown <- tryTakeMVar shutdownVar
bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do if isJust maybeShutdown
lift $ debugM "Strategy" "Broker thread forked" then writeChan eventChan Shutdown
else do
wakeupTid <- lift . forkIO $ forever $ do threadDelay 1000000
maybeShutdown <- tryTakeMVar shutdownVar writeChan brokerChan BrokerRequestNotifications
if isJust maybeShutdown lift $ debugM "Strategy" "Wakeup thread forked"
then writeChan eventChan Shutdown
else do
threadDelay 1000000
writeChan brokerChan BrokerRequestNotifications
lift $ debugM "Strategy" "Wakeup thread forked"
readAndHandleEvents agg strategy
lift $ debugM "Strategy" "Stopping strategy driver"
lift $ killThread wakeupTid))
lift $ debugM "Strategy" "Strategy done" readAndHandleEvents agg strategy
lift $ debugM "Strategy" "Stopping strategy driver"
lift $ killThread wakeupTid
where where
qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy
brEp = strategyBrokerEp . strategyInstanceParams $ strategy
loadTickerHistory now t = do loadTickerHistory now t = do
history <- getHistory (code t) (BarTimeframe (fromInteger . timeframeSeconds $ t)) history <- getHistory (code t) (BarTimeframe (fromInteger . timeframeSeconds $ t))

14
src/ATrade/Driver/Types.hs

@ -26,15 +26,11 @@ data Strategy c s = BarStrategy {
-- | Strategy instance params store few params which are common for all strategies -- | Strategy instance params store few params which are common for all strategies
data StrategyInstanceParams = StrategyInstanceParams { data StrategyInstanceParams = StrategyInstanceParams {
strategyInstanceId :: T.Text, -- ^ Strategy instance identifier. Should be unique among all strategies (very desirable) strategyInstanceId :: T.Text, -- ^ Strategy instance identifier. Should be unique among all strategies (very desirable)
strategyAccount :: T.Text, -- ^ Account string to use for this strategy instance. Broker-dependent strategyAccount :: T.Text, -- ^ Account string to use for this strategy instance. Broker-dependent
strategyVolume :: Int, -- ^ Volume to use for this instance (in lots/contracts) strategyVolume :: Int, -- ^ Volume to use for this instance (in lots/contracts)
tickers :: [Ticker], -- ^ List of tickers which is used by this strategy tickers :: [Ticker], -- ^ List of tickers which is used by this strategy
strategyQuotesourceEp :: T.Text, -- ^ QuoteSource server endpoint strategyQTISEp :: Maybe T.Text
strategyBrokerEp :: T.Text, -- ^ Broker server endpoint
strategyHistoryProviderType :: T.Text,
strategyHistoryProvider :: T.Text,
strategyQTISEp :: Maybe T.Text
} }
type InitializationCallback c = c -> StrategyInstanceParams -> IO c type InitializationCallback c = c -> StrategyInstanceParams -> IO c

Loading…
Cancel
Save