From 04e53b9f0d57b6bf07260d92f05bed7555bfe209 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Wed, 28 Aug 2019 16:35:02 +0700 Subject: [PATCH] Driver.Real refactoring --- src/ATrade/Driver/Real.hs | 195 ++++++++++++++++++++++---------------- 1 file changed, 113 insertions(+), 82 deletions(-) diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 9a667db..95c02cc 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -3,6 +3,10 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-} {-# LANGUAGE RankNTypes #-} +{-# LANGUAGE TypeSynonymInstances #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE FlexibleContexts #-} module ATrade.Driver.Real ( Strategy(..), @@ -23,9 +27,10 @@ import System.Log.Handler.Simple import System.Log.Handler (setFormatter) import System.Log.Formatter import Control.Monad +import Control.Monad.Reader import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield) import Control.Concurrent.BoundedChan as BC -import Control.Exception +import Control.Exception.Safe import qualified Data.ByteString as BS import qualified Data.ByteString.Lazy as BL import qualified Data.List as L @@ -41,7 +46,7 @@ import Data.Maybe import Data.Monoid import Database.Redis hiding (info, decode) import ATrade.Types -import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..)) +import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..), MonadRobot(..)) import ATrade.BarAggregator import ATrade.Driver.Real.BrokerClientThread import ATrade.Driver.Real.QuoteSourceThread @@ -107,6 +112,44 @@ paramsParser = Params ( long "source-timeframe" <> metavar "SECONDS" )) +data Env c s = Env { + envStrategyInstanceParams :: StrategyInstanceParams, + envStrategyEnvironment :: IORef StrategyEnvironment, + envConfigRef :: IORef c, + envStateRef :: IORef s, + envBrokerChan :: BC.BoundedChan BrokerCommand, + envTimers :: IORef [UTCTime], + envEventChan :: BC.BoundedChan Event +} + +type App c s = ReaderT (Env c s) IO + +instance MonadRobot (App c s) c s where + submitOrder order = do + bc <- asks envBrokerChan + lift $ BC.writeChan bc $ BrokerSubmitOrder order + + cancelOrder oId = do + bc <- asks envBrokerChan + lift $ BC.writeChan bc $ BrokerCancelOrder oId + + appendToLog = lift . debugM "Strategy" . T.unpack + setupTimer t = do + timers <- asks envTimers + lift $ atomicModifyIORef' timers (\s -> (t : s, ())) + + enqueueIOAction actionId action = do + eventChan <- asks envEventChan + lift $ void $ forkIO $ do + v <- action + BC.writeChan eventChan $ ActionCompleted actionId v + + getConfig = asks envConfigRef >>= lift . readIORef + getState = asks envStateRef >>= lift . readIORef + setState s = do + ref <- asks envStateRef + lift $ writeIORef ref s + getEnvironment = asks envStrategyEnvironment >>= lift . readIORef data BigConfig c = BigConfig { confTickers :: [Ticker], @@ -190,8 +233,30 @@ robotMain dataDownloadDelta defaultState initCallback callback = do threadDelay 1000000 storeState params stateRef timersRef + straEnv <- newIORef StrategyEnvironment { + seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy, + seAccount = strategyAccount . strategyInstanceParams $ strategy, + seVolume = strategyVolume . strategyInstanceParams $ strategy, + seBars = M.empty, + seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0 + } + -- Event channel is for strategy events, like new tick arrival, or order execution notification + eventChan <- BC.newBoundedChan 1000 + -- Orders channel passes strategy orders to broker thread + brokerChan <- BC.newBoundedChan 1000 + debugM "main" "Starting strategy driver" - barStrategyDriver (sourceBarTimeframe params) tickFilter strategy configRef stateRef timersRef shutdownMv `finally` killThread stateSavingThread + let env = Env { + envStrategyInstanceParams = instanceParams, + envStrategyEnvironment = straEnv, + envConfigRef = configRef, + envStateRef = stateRef, + envBrokerChan = brokerChan, + envTimers = timersRef, + envEventChan = eventChan + } + withContext (\ctx -> + runReaderT (barStrategyDriver ctx (sourceBarTimeframe params) tickFilter strategy configRef stateRef timersRef shutdownMv) env `finally` killThread stateSavingThread) where tickFilter :: Tick -> Bool tickFilter tick = @@ -293,102 +358,68 @@ mkBarStrategy instanceParams dd params initialState cb = BarStrategy { -- | Main function which handles incoming events (ticks/orders), passes them to strategy callback -- and executes returned strategy actions -barStrategyDriver :: Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> IO () -barStrategyDriver mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do - -- Make channels - -- Event channel is for strategy events, like new tick arrival, or order execution notification - eventChan <- BC.newBoundedChan 1000 - -- Orders channel passes strategy orders to broker thread - ordersChan <- BC.newBoundedChan 1000 - - withContext (\ctx -> do - -- Load tickers data and create BarAggregator from them - historyBars <- - if - | (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" -> - M.fromList <$> mapM loadTickerFromFinam (tickers . strategyInstanceParams $ strategy) - | (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" -> - M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) - | otherwise -> - M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) - agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 0 0)] - bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) killThread (\_ -> do - debugM "Strategy" "QuoteSource thread forked" - bracket (startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp ordersChan eventChan shutdownVar) killThread (\_ -> do - debugM "Strategy" "Broker thread forked" - - wakeupTid <- forkIO $ forever $ do - maybeShutdown <- tryTakeMVar shutdownVar - if isJust maybeShutdown - then writeChan eventChan Shutdown - else do - threadDelay 1000000 - writeChan ordersChan BrokerRequestNotifications - debugM "Strategy" "Wakeup thread forked" - - let env = StrategyEnvironment { - seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy, - seAccount = strategyAccount . strategyInstanceParams $ strategy, - seVolume = strategyVolume . strategyInstanceParams $ strategy, - seBars = M.empty, - seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0 - } - readAndHandleEvents agg ordersChan eventChan strategy env - debugM "Strategy" "Stopping strategy driver" - killThread wakeupTid))) - - debugM "Strategy" "Strategy done" +barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> App c s () +barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do + -- Load tickers data and create BarAggregator from them + historyBars <- + lift $ if + | (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" -> + M.fromList <$> mapM loadTickerFromFinam (tickers . strategyInstanceParams $ strategy) + | (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" -> + M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) + | otherwise -> + M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) + + agg <- lift . newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 0 0)] + eventChan <- asks envEventChan + brokerChan <- asks envBrokerChan + bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do + lift $ debugM "Strategy" "QuoteSource thread forked" + bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do + lift $ debugM "Strategy" "Broker thread forked" + + wakeupTid <- lift . forkIO $ forever $ do + maybeShutdown <- tryTakeMVar shutdownVar + if isJust maybeShutdown + then writeChan eventChan Shutdown + else do + threadDelay 1000000 + writeChan brokerChan BrokerRequestNotifications + lift $ debugM "Strategy" "Wakeup thread forked" + + readAndHandleEvents agg strategy + lift $ debugM "Strategy" "Stopping strategy driver" + lift $ killThread wakeupTid)) + + lift $ debugM "Strategy" "Strategy done" where qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy brEp = strategyBrokerEp . strategyInstanceParams $ strategy - readAndHandleEvents agg ordersChan eventChan strategy' env = do - event <- readChan eventChan + + readAndHandleEvents agg strategy' = do + eventChan <- asks envEventChan + event <- lift $ readChan eventChan if event /= Shutdown then do - currentBars <- bars <$> readIORef agg - params <- readIORef configRef - curState <- readIORef stateRef - let instId = strategyInstanceId . strategyInstanceParams $ strategy' - let acc = strategyAccount . strategyInstanceParams $ strategy' - let vol = strategyVolume . strategyInstanceParams $ strategy' - - let oldTimestamp = seLastTimestamp env + env <- getEnvironment let newTimestamp = case event of NewTick tick -> timestamp tick _ -> seLastTimestamp env newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy') + (eventCallback strategy) event + lift $ writeIORef timersRef newTimers - let !newenv = env { seBars = currentBars, seLastTimestamp = newTimestamp } - let (!newState, !actions, _) = runStrategyElement params curState newenv $ (eventCallback strategy) event - writeIORef stateRef newState - writeIORef timersRef newTimers - - newTimers' <- catMaybes <$> mapM handleTimerActions actions - mapM_ (handleActions ordersChan) actions - readAndHandleEvents agg ordersChan eventChan (strategy' { currentState = newState, strategyTimers = newTimers ++ newTimers' }) newenv - else debugM "Strategy" "Shutdown requested" + readAndHandleEvents agg strategy' + else + lift $ debugM "Strategy" "Shutdown requested" where - handleTimerActions action = - case action of - ActionSetupTimer timerTime -> return $ Just timerTime - _ -> return Nothing - - handleActions ordersChan' action = - case action of - (ActionLog logText) -> debugM "Strategy" $ T.unpack logText - (ActionOrder order) -> writeChan ordersChan' $ BrokerSubmitOrder order - (ActionCancelOrder oid) -> writeChan ordersChan' $ BrokerCancelOrder oid - (ActionSetupTimer _) -> return () - (ActionIO tag io) -> void $ forkIO $ do - v <- io - writeChan eventChan (ActionCompleted tag v) checkTimer eventChan' newTimestamp timerTime = if newTimestamp >= timerTime then do - writeChan eventChan' $ TimerFired timerTime + lift $ writeChan eventChan' $ TimerFired timerTime return Nothing else return $ Just timerTime