Browse Source

Driver.Real refactoring

stable
Denis Tereshkin 6 years ago
parent
commit
04e53b9f0d
  1. 195
      src/ATrade/Driver/Real.hs

195
src/ATrade/Driver/Real.hs

@ -3,6 +3,10 @@ @@ -3,6 +3,10 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE CPP #-}
{-# LANGUAGE RankNTypes #-}
{-# LANGUAGE TypeSynonymInstances #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleContexts #-}
module ATrade.Driver.Real (
Strategy(..),
@ -23,9 +27,10 @@ import System.Log.Handler.Simple @@ -23,9 +27,10 @@ import System.Log.Handler.Simple
import System.Log.Handler (setFormatter)
import System.Log.Formatter
import Control.Monad
import Control.Monad.Reader
import Control.Concurrent hiding (writeChan, readChan, writeList2Chan, yield)
import Control.Concurrent.BoundedChan as BC
import Control.Exception
import Control.Exception.Safe
import qualified Data.ByteString as BS
import qualified Data.ByteString.Lazy as BL
import qualified Data.List as L
@ -41,7 +46,7 @@ import Data.Maybe @@ -41,7 +46,7 @@ import Data.Maybe
import Data.Monoid
import Database.Redis hiding (info, decode)
import ATrade.Types
import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..))
import ATrade.RoboCom.Monad (StrategyMonad, StrategyAction(..), EventCallback, Event(..), runStrategyElement, StrategyEnvironment(..), Event(..), MonadRobot(..))
import ATrade.BarAggregator
import ATrade.Driver.Real.BrokerClientThread
import ATrade.Driver.Real.QuoteSourceThread
@ -107,6 +112,44 @@ paramsParser = Params @@ -107,6 +112,44 @@ paramsParser = Params
( long "source-timeframe"
<> metavar "SECONDS" ))
data Env c s = Env {
envStrategyInstanceParams :: StrategyInstanceParams,
envStrategyEnvironment :: IORef StrategyEnvironment,
envConfigRef :: IORef c,
envStateRef :: IORef s,
envBrokerChan :: BC.BoundedChan BrokerCommand,
envTimers :: IORef [UTCTime],
envEventChan :: BC.BoundedChan Event
}
type App c s = ReaderT (Env c s) IO
instance MonadRobot (App c s) c s where
submitOrder order = do
bc <- asks envBrokerChan
lift $ BC.writeChan bc $ BrokerSubmitOrder order
cancelOrder oId = do
bc <- asks envBrokerChan
lift $ BC.writeChan bc $ BrokerCancelOrder oId
appendToLog = lift . debugM "Strategy" . T.unpack
setupTimer t = do
timers <- asks envTimers
lift $ atomicModifyIORef' timers (\s -> (t : s, ()))
enqueueIOAction actionId action = do
eventChan <- asks envEventChan
lift $ void $ forkIO $ do
v <- action
BC.writeChan eventChan $ ActionCompleted actionId v
getConfig = asks envConfigRef >>= lift . readIORef
getState = asks envStateRef >>= lift . readIORef
setState s = do
ref <- asks envStateRef
lift $ writeIORef ref s
getEnvironment = asks envStrategyEnvironment >>= lift . readIORef
data BigConfig c = BigConfig {
confTickers :: [Ticker],
@ -190,8 +233,30 @@ robotMain dataDownloadDelta defaultState initCallback callback = do @@ -190,8 +233,30 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
threadDelay 1000000
storeState params stateRef timersRef
straEnv <- newIORef StrategyEnvironment {
seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy,
seAccount = strategyAccount . strategyInstanceParams $ strategy,
seVolume = strategyVolume . strategyInstanceParams $ strategy,
seBars = M.empty,
seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0
}
-- Event channel is for strategy events, like new tick arrival, or order execution notification
eventChan <- BC.newBoundedChan 1000
-- Orders channel passes strategy orders to broker thread
brokerChan <- BC.newBoundedChan 1000
debugM "main" "Starting strategy driver"
barStrategyDriver (sourceBarTimeframe params) tickFilter strategy configRef stateRef timersRef shutdownMv `finally` killThread stateSavingThread
let env = Env {
envStrategyInstanceParams = instanceParams,
envStrategyEnvironment = straEnv,
envConfigRef = configRef,
envStateRef = stateRef,
envBrokerChan = brokerChan,
envTimers = timersRef,
envEventChan = eventChan
}
withContext (\ctx ->
runReaderT (barStrategyDriver ctx (sourceBarTimeframe params) tickFilter strategy configRef stateRef timersRef shutdownMv) env `finally` killThread stateSavingThread)
where
tickFilter :: Tick -> Bool
tickFilter tick =
@ -293,102 +358,68 @@ mkBarStrategy instanceParams dd params initialState cb = BarStrategy { @@ -293,102 +358,68 @@ mkBarStrategy instanceParams dd params initialState cb = BarStrategy {
-- | Main function which handles incoming events (ticks/orders), passes them to strategy callback
-- and executes returned strategy actions
barStrategyDriver :: Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> IO ()
barStrategyDriver mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do
-- Make channels
-- Event channel is for strategy events, like new tick arrival, or order execution notification
eventChan <- BC.newBoundedChan 1000
-- Orders channel passes strategy orders to broker thread
ordersChan <- BC.newBoundedChan 1000
withContext (\ctx -> do
-- Load tickers data and create BarAggregator from them
historyBars <-
if
| (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" ->
M.fromList <$> mapM loadTickerFromFinam (tickers . strategyInstanceParams $ strategy)
| (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" ->
M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy)
| otherwise ->
M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy)
agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 0 0)]
bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) killThread (\_ -> do
debugM "Strategy" "QuoteSource thread forked"
bracket (startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp ordersChan eventChan shutdownVar) killThread (\_ -> do
debugM "Strategy" "Broker thread forked"
wakeupTid <- forkIO $ forever $ do
maybeShutdown <- tryTakeMVar shutdownVar
if isJust maybeShutdown
then writeChan eventChan Shutdown
else do
threadDelay 1000000
writeChan ordersChan BrokerRequestNotifications
debugM "Strategy" "Wakeup thread forked"
let env = StrategyEnvironment {
seInstanceId = strategyInstanceId . strategyInstanceParams $ strategy,
seAccount = strategyAccount . strategyInstanceParams $ strategy,
seVolume = strategyVolume . strategyInstanceParams $ strategy,
seBars = M.empty,
seLastTimestamp = UTCTime (fromGregorian 1970 1 1) 0
}
readAndHandleEvents agg ordersChan eventChan strategy env
debugM "Strategy" "Stopping strategy driver"
killThread wakeupTid)))
debugM "Strategy" "Strategy done"
barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> App c s ()
barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do
-- Load tickers data and create BarAggregator from them
historyBars <-
lift $ if
| (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" ->
M.fromList <$> mapM loadTickerFromFinam (tickers . strategyInstanceParams $ strategy)
| (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" ->
M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy)
| otherwise ->
M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy)
agg <- lift . newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 0 0)]
eventChan <- asks envEventChan
brokerChan <- asks envBrokerChan
bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do
lift $ debugM "Strategy" "QuoteSource thread forked"
bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do
lift $ debugM "Strategy" "Broker thread forked"
wakeupTid <- lift . forkIO $ forever $ do
maybeShutdown <- tryTakeMVar shutdownVar
if isJust maybeShutdown
then writeChan eventChan Shutdown
else do
threadDelay 1000000
writeChan brokerChan BrokerRequestNotifications
lift $ debugM "Strategy" "Wakeup thread forked"
readAndHandleEvents agg strategy
lift $ debugM "Strategy" "Stopping strategy driver"
lift $ killThread wakeupTid))
lift $ debugM "Strategy" "Strategy done"
where
qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy
brEp = strategyBrokerEp . strategyInstanceParams $ strategy
readAndHandleEvents agg ordersChan eventChan strategy' env = do
event <- readChan eventChan
readAndHandleEvents agg strategy' = do
eventChan <- asks envEventChan
event <- lift $ readChan eventChan
if event /= Shutdown
then do
currentBars <- bars <$> readIORef agg
params <- readIORef configRef
curState <- readIORef stateRef
let instId = strategyInstanceId . strategyInstanceParams $ strategy'
let acc = strategyAccount . strategyInstanceParams $ strategy'
let vol = strategyVolume . strategyInstanceParams $ strategy'
let oldTimestamp = seLastTimestamp env
env <- getEnvironment
let newTimestamp = case event of
NewTick tick -> timestamp tick
_ -> seLastTimestamp env
newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy')
(eventCallback strategy) event
lift $ writeIORef timersRef newTimers
let !newenv = env { seBars = currentBars, seLastTimestamp = newTimestamp }
let (!newState, !actions, _) = runStrategyElement params curState newenv $ (eventCallback strategy) event
writeIORef stateRef newState
writeIORef timersRef newTimers
newTimers' <- catMaybes <$> mapM handleTimerActions actions
mapM_ (handleActions ordersChan) actions
readAndHandleEvents agg ordersChan eventChan (strategy' { currentState = newState, strategyTimers = newTimers ++ newTimers' }) newenv
else debugM "Strategy" "Shutdown requested"
readAndHandleEvents agg strategy'
else
lift $ debugM "Strategy" "Shutdown requested"
where
handleTimerActions action =
case action of
ActionSetupTimer timerTime -> return $ Just timerTime
_ -> return Nothing
handleActions ordersChan' action =
case action of
(ActionLog logText) -> debugM "Strategy" $ T.unpack logText
(ActionOrder order) -> writeChan ordersChan' $ BrokerSubmitOrder order
(ActionCancelOrder oid) -> writeChan ordersChan' $ BrokerCancelOrder oid
(ActionSetupTimer _) -> return ()
(ActionIO tag io) -> void $ forkIO $ do
v <- io
writeChan eventChan (ActionCompleted tag v)
checkTimer eventChan' newTimestamp timerTime =
if newTimestamp >= timerTime
then do
writeChan eventChan' $ TimerFired timerTime
lift $ writeChan eventChan' $ TimerFired timerTime
return Nothing
else
return $ Just timerTime

Loading…
Cancel
Save