Browse Source

bugfix: update strategy environment

stable
Denis Tereshkin 6 years ago
parent
commit
f4c4fc811c
  1. 222
      src/ATrade/Driver/Real.hs

222
src/ATrade/Driver/Real.hs

@ -119,7 +119,9 @@ data Env c s = Env { @@ -119,7 +119,9 @@ data Env c s = Env {
envStateRef :: IORef s,
envBrokerChan :: BC.BoundedChan BrokerCommand,
envTimers :: IORef [UTCTime],
envEventChan :: BC.BoundedChan Event
envEventChan :: BC.BoundedChan Event,
envAggregator :: IORef BarAggregator,
envLastTimestamp :: IORef UTCTime
}
type App c s = ReaderT (Env c s) IO
@ -149,7 +151,15 @@ instance MonadRobot (App c s) c s where @@ -149,7 +151,15 @@ instance MonadRobot (App c s) c s where
setState s = do
ref <- asks envStateRef
lift $ writeIORef ref s
getEnvironment = asks envStrategyEnvironment >>= lift . readIORef
getEnvironment = do
aggRef <- asks envAggregator
envRef <- asks envStrategyEnvironment
agg <- lift $ readIORef aggRef
env <- lift $ readIORef envRef
nowRef <- asks envLastTimestamp
now <- lift $ readIORef nowRef
return $ env { seBars = bars agg, seLastTimestamp = now }
data BigConfig c = BigConfig {
confTickers :: [Ticker],
@ -246,16 +256,31 @@ robotMain dataDownloadDelta defaultState initCallback callback = do @@ -246,16 +256,31 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
brokerChan <- BC.newBoundedChan 1000
debugM "main" "Starting strategy driver"
let env = Env {
envStrategyInstanceParams = instanceParams,
envStrategyEnvironment = straEnv,
envConfigRef = configRef,
envStateRef = stateRef,
envBrokerChan = brokerChan,
envTimers = timersRef,
envEventChan = eventChan
}
withContext (\ctx ->
withContext (\ctx -> do
infoM "main" "Loading history"
-- Load tickers data and create BarAggregator from them
historyBars <-
if
| (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" ->
M.fromList <$> mapM (loadTickerFromFinam (downloadDelta strategy)) (tickers . strategyInstanceParams $ strategy)
| (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" ->
M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy) (downloadDelta strategy)) (tickers . strategyInstanceParams $ strategy)
| otherwise ->
M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy) (downloadDelta strategy)) (tickers . strategyInstanceParams $ strategy)
agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 10 0)]
now <- getCurrentTime >>= newIORef
let env = Env {
envStrategyInstanceParams = instanceParams,
envStrategyEnvironment = straEnv,
envConfigRef = configRef,
envStateRef = stateRef,
envBrokerChan = brokerChan,
envTimers = timersRef,
envEventChan = eventChan,
envAggregator = agg,
envLastTimestamp = now
}
runReaderT (barStrategyDriver ctx (sourceBarTimeframe params) tickFilter strategy configRef stateRef timersRef shutdownMv) env `finally` killThread stateSavingThread)
where
tickFilter :: Tick -> Bool
@ -345,88 +370,8 @@ robotMain dataDownloadDelta defaultState initCallback callback = do @@ -345,88 +370,8 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
Nothing -> return defaultState ) `catch`
(\e -> warningM "main" ("Unable to load state: " ++ show (e :: IOException)) >> return defaultState)
-- | Helper function to make 'Strategy' instances
mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s
mkBarStrategy instanceParams dd params initialState cb = BarStrategy {
downloadDelta = dd,
eventCallback = cb,
currentState = initialState,
strategyParams = params,
strategyTimers = [],
strategyInstanceParams = instanceParams }
-- | Main function which handles incoming events (ticks/orders), passes them to strategy callback
-- and executes returned strategy actions
barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> App c s ()
barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do
-- Load tickers data and create BarAggregator from them
historyBars <-
lift $ if
| (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" ->
M.fromList <$> mapM loadTickerFromFinam (tickers . strategyInstanceParams $ strategy)
| (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" ->
M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy)
| otherwise ->
M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy)
agg <- lift . newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 10 0)]
eventChan <- asks envEventChan
brokerChan <- asks envBrokerChan
bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do
lift $ debugM "Strategy" "QuoteSource thread forked"
bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do
lift $ debugM "Strategy" "Broker thread forked"
wakeupTid <- lift . forkIO $ forever $ do
maybeShutdown <- tryTakeMVar shutdownVar
if isJust maybeShutdown
then writeChan eventChan Shutdown
else do
threadDelay 1000000
writeChan brokerChan BrokerRequestNotifications
lift $ debugM "Strategy" "Wakeup thread forked"
readAndHandleEvents agg strategy
lift $ debugM "Strategy" "Stopping strategy driver"
lift $ killThread wakeupTid))
lift $ debugM "Strategy" "Strategy done"
where
qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy
brEp = strategyBrokerEp . strategyInstanceParams $ strategy
readAndHandleEvents agg strategy' = do
eventChan <- asks envEventChan
event <- lift $ readChan eventChan
if event /= Shutdown
then do
lift $ debugM "Strategy" $ "event: " ++ show event
env <- getEnvironment
let newTimestamp = case event of
NewTick tick -> timestamp tick
_ -> seLastTimestamp env
newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy')
(eventCallback strategy) event
lift $ writeIORef timersRef newTimers
readAndHandleEvents agg strategy'
else
lift $ debugM "Strategy" "Shutdown requested"
where
checkTimer eventChan' newTimestamp timerTime =
if newTimestamp >= timerTime
then do
lift $ writeChan eventChan' $ TimerFired timerTime
return Nothing
else
return $ Just timerTime
loadTickerFromHAP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries)
loadTickerFromHAP ctx ep t = do
loadTickerFromHAP :: Context -> T.Text -> DiffTime -> Ticker -> IO (TickerId, BarSeries)
loadTickerFromHAP ctx ep delta t = do
debugM "Strategy" $ "Loading ticker from HAP: " ++ show (code t)
case parseHAPPeriod $ timeframeSeconds t of
Just tf -> do
@ -434,7 +379,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t @@ -434,7 +379,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t
historyBars <- QH.getQuotes ctx QH.RequestParams {
QH.endpoint = ep,
QH.ticker = code t,
QH.startDate = addUTCTime (negate . (1 +) . fromRational . toRational $ downloadDelta strategy) now,
QH.startDate = addUTCTime (negate . (1 +) . fromRational . toRational $ delta) now,
QH.endDate = now,
QH.period = tf }
debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars"
@ -442,8 +387,8 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t @@ -442,8 +387,8 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t
_ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] })
loadTickerFromQHP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries)
loadTickerFromQHP ctx ep t = do
loadTickerFromQHP :: Context -> T.Text -> DiffTime -> Ticker -> IO (TickerId, BarSeries)
loadTickerFromQHP ctx ep delta t = do
debugM "Strategy" $ "Loading ticker from QHP: " ++ show (code t)
case parseQHPPeriod $ timeframeSeconds t of
Just tf -> do
@ -451,7 +396,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t @@ -451,7 +396,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t
historyBars <- QQ.getQuotes ctx QQ.RequestParams {
QQ.endpoint = ep,
QQ.ticker = code t,
QQ.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now),
QQ.startDate = addDays (negate . (1 +) . ceiling $ delta / 86400) (utctDay now),
QQ.endDate = utctDay now,
QQ.period = tf }
debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars"
@ -460,8 +405,8 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t @@ -460,8 +405,8 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t
_ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] })
loadTickerFromFinam :: Ticker -> IO (TickerId, BarSeries)
loadTickerFromFinam t = do
loadTickerFromFinam :: DiffTime -> Ticker -> IO (TickerId, BarSeries)
loadTickerFromFinam delta t = do
randDelay <- getStdRandom (randomR (1, 5))
threadDelay $ randDelay * 1000000
now <- getCurrentTime
@ -470,7 +415,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t @@ -470,7 +415,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t
(Just finamCode, Just per) -> do
debugM "Strategy" $ "Downloading ticker: " ++ finamCode
history <- downloadAndParseQuotes $ defaultParams { QF.ticker = T.pack finamCode,
QF.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now),
QF.startDate = addDays (negate . (1 +) . ceiling $ delta / 86400) (utctDay now),
QF.endDate = utctDay now,
QF.period = per }
case history of
@ -516,3 +461,78 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t @@ -516,3 +461,78 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t
| x == 24 * 60 * 60 = Just QH.PeriodDay
| otherwise = Nothing
-- | Helper function to make 'Strategy' instances
mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s
mkBarStrategy instanceParams dd params initialState cb = BarStrategy {
downloadDelta = dd,
eventCallback = cb,
currentState = initialState,
strategyParams = params,
strategyTimers = [],
strategyInstanceParams = instanceParams }
-- | Main function which handles incoming events (ticks/orders), passes them to strategy callback
-- and executes returned strategy actions
barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> App c s ()
barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do
eventChan <- asks envEventChan
brokerChan <- asks envBrokerChan
agg <- asks envAggregator
bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do
lift $ debugM "Strategy" "QuoteSource thread forked"
bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do
lift $ debugM "Strategy" "Broker thread forked"
wakeupTid <- lift . forkIO $ forever $ do
maybeShutdown <- tryTakeMVar shutdownVar
if isJust maybeShutdown
then writeChan eventChan Shutdown
else do
threadDelay 1000000
writeChan brokerChan BrokerRequestNotifications
lift $ debugM "Strategy" "Wakeup thread forked"
readAndHandleEvents agg strategy
lift $ debugM "Strategy" "Stopping strategy driver"
lift $ killThread wakeupTid))
lift $ debugM "Strategy" "Strategy done"
where
qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy
brEp = strategyBrokerEp . strategyInstanceParams $ strategy
readAndHandleEvents agg strategy' = do
eventChan <- asks envEventChan
event <- lift $ readChan eventChan
if event /= Shutdown
then do
case event of
NewTick _ -> return ()
_ -> lift $ debugM "Strategy" $ "event: " ++ show event
env <- getEnvironment
let newTimestamp = case event of
NewTick tick -> timestamp tick
_ -> seLastTimestamp env
nowRef <- asks envLastTimestamp
lift $ writeIORef nowRef newTimestamp
newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy')
(eventCallback strategy) event
lift $ writeIORef timersRef newTimers
readAndHandleEvents agg strategy'
else
lift $ debugM "Strategy" "Shutdown requested"
where
checkTimer eventChan' newTimestamp timerTime =
if newTimestamp >= timerTime
then do
lift $ writeChan eventChan' $ TimerFired timerTime
return Nothing
else
return $ Just timerTime

Loading…
Cancel
Save