diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index ce2720d..7d870f9 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -119,7 +119,9 @@ data Env c s = Env { envStateRef :: IORef s, envBrokerChan :: BC.BoundedChan BrokerCommand, envTimers :: IORef [UTCTime], - envEventChan :: BC.BoundedChan Event + envEventChan :: BC.BoundedChan Event, + envAggregator :: IORef BarAggregator, + envLastTimestamp :: IORef UTCTime } type App c s = ReaderT (Env c s) IO @@ -149,7 +151,15 @@ instance MonadRobot (App c s) c s where setState s = do ref <- asks envStateRef lift $ writeIORef ref s - getEnvironment = asks envStrategyEnvironment >>= lift . readIORef + + getEnvironment = do + aggRef <- asks envAggregator + envRef <- asks envStrategyEnvironment + agg <- lift $ readIORef aggRef + env <- lift $ readIORef envRef + nowRef <- asks envLastTimestamp + now <- lift $ readIORef nowRef + return $ env { seBars = bars agg, seLastTimestamp = now } data BigConfig c = BigConfig { confTickers :: [Ticker], @@ -246,16 +256,31 @@ robotMain dataDownloadDelta defaultState initCallback callback = do brokerChan <- BC.newBoundedChan 1000 debugM "main" "Starting strategy driver" - let env = Env { - envStrategyInstanceParams = instanceParams, - envStrategyEnvironment = straEnv, - envConfigRef = configRef, - envStateRef = stateRef, - envBrokerChan = brokerChan, - envTimers = timersRef, - envEventChan = eventChan - } - withContext (\ctx -> + withContext (\ctx -> do + infoM "main" "Loading history" + -- Load tickers data and create BarAggregator from them + historyBars <- + if + | (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" -> + M.fromList <$> mapM (loadTickerFromFinam (downloadDelta strategy)) (tickers . strategyInstanceParams $ strategy) + | (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" -> + M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy) (downloadDelta strategy)) (tickers . strategyInstanceParams $ strategy) + | otherwise -> + M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy) (downloadDelta strategy)) (tickers . strategyInstanceParams $ strategy) + + agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 10 0)] + now <- getCurrentTime >>= newIORef + let env = Env { + envStrategyInstanceParams = instanceParams, + envStrategyEnvironment = straEnv, + envConfigRef = configRef, + envStateRef = stateRef, + envBrokerChan = brokerChan, + envTimers = timersRef, + envEventChan = eventChan, + envAggregator = agg, + envLastTimestamp = now + } runReaderT (barStrategyDriver ctx (sourceBarTimeframe params) tickFilter strategy configRef stateRef timersRef shutdownMv) env `finally` killThread stateSavingThread) where tickFilter :: Tick -> Bool @@ -344,89 +369,9 @@ robotMain dataDownloadDelta defaultState initCallback callback = do Just st -> return st Nothing -> return defaultState ) `catch` (\e -> warningM "main" ("Unable to load state: " ++ show (e :: IOException)) >> return defaultState) - --- | Helper function to make 'Strategy' instances -mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s -mkBarStrategy instanceParams dd params initialState cb = BarStrategy { - downloadDelta = dd, - eventCallback = cb, - currentState = initialState, - strategyParams = params, - strategyTimers = [], - - strategyInstanceParams = instanceParams } - --- | Main function which handles incoming events (ticks/orders), passes them to strategy callback --- and executes returned strategy actions -barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> App c s () -barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do - -- Load tickers data and create BarAggregator from them - historyBars <- - lift $ if - | (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" -> - M.fromList <$> mapM loadTickerFromFinam (tickers . strategyInstanceParams $ strategy) - | (strategyHistoryProviderType . strategyInstanceParams) strategy == "hap" -> - M.fromList <$> mapM (loadTickerFromHAP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) - | otherwise -> - M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) - - agg <- lift . newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 10 0)] - eventChan <- asks envEventChan - brokerChan <- asks envBrokerChan - bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do - lift $ debugM "Strategy" "QuoteSource thread forked" - bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do - lift $ debugM "Strategy" "Broker thread forked" - - wakeupTid <- lift . forkIO $ forever $ do - maybeShutdown <- tryTakeMVar shutdownVar - if isJust maybeShutdown - then writeChan eventChan Shutdown - else do - threadDelay 1000000 - writeChan brokerChan BrokerRequestNotifications - lift $ debugM "Strategy" "Wakeup thread forked" - - readAndHandleEvents agg strategy - lift $ debugM "Strategy" "Stopping strategy driver" - lift $ killThread wakeupTid)) - - lift $ debugM "Strategy" "Strategy done" - - where - qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy - brEp = strategyBrokerEp . strategyInstanceParams $ strategy - - readAndHandleEvents agg strategy' = do - eventChan <- asks envEventChan - event <- lift $ readChan eventChan - if event /= Shutdown - then do - lift $ debugM "Strategy" $ "event: " ++ show event - env <- getEnvironment - let newTimestamp = case event of - NewTick tick -> timestamp tick - _ -> seLastTimestamp env - - newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy') - (eventCallback strategy) event - lift $ writeIORef timersRef newTimers - - readAndHandleEvents agg strategy' - else - lift $ debugM "Strategy" "Shutdown requested" - where - - checkTimer eventChan' newTimestamp timerTime = - if newTimestamp >= timerTime - then do - lift $ writeChan eventChan' $ TimerFired timerTime - return Nothing - else - return $ Just timerTime - - loadTickerFromHAP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries) - loadTickerFromHAP ctx ep t = do + + loadTickerFromHAP :: Context -> T.Text -> DiffTime -> Ticker -> IO (TickerId, BarSeries) + loadTickerFromHAP ctx ep delta t = do debugM "Strategy" $ "Loading ticker from HAP: " ++ show (code t) case parseHAPPeriod $ timeframeSeconds t of Just tf -> do @@ -434,7 +379,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t historyBars <- QH.getQuotes ctx QH.RequestParams { QH.endpoint = ep, QH.ticker = code t, - QH.startDate = addUTCTime (negate . (1 +) . fromRational . toRational $ downloadDelta strategy) now, + QH.startDate = addUTCTime (negate . (1 +) . fromRational . toRational $ delta) now, QH.endDate = now, QH.period = tf } debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars" @@ -442,8 +387,8 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) - loadTickerFromQHP :: Context -> T.Text -> Ticker -> IO (TickerId, BarSeries) - loadTickerFromQHP ctx ep t = do + loadTickerFromQHP :: Context -> T.Text -> DiffTime -> Ticker -> IO (TickerId, BarSeries) + loadTickerFromQHP ctx ep delta t = do debugM "Strategy" $ "Loading ticker from QHP: " ++ show (code t) case parseQHPPeriod $ timeframeSeconds t of Just tf -> do @@ -451,7 +396,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t historyBars <- QQ.getQuotes ctx QQ.RequestParams { QQ.endpoint = ep, QQ.ticker = code t, - QQ.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now), + QQ.startDate = addDays (negate . (1 +) . ceiling $ delta / 86400) (utctDay now), QQ.endDate = utctDay now, QQ.period = tf } debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars" @@ -460,8 +405,8 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t _ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] }) - loadTickerFromFinam :: Ticker -> IO (TickerId, BarSeries) - loadTickerFromFinam t = do + loadTickerFromFinam :: DiffTime -> Ticker -> IO (TickerId, BarSeries) + loadTickerFromFinam delta t = do randDelay <- getStdRandom (randomR (1, 5)) threadDelay $ randDelay * 1000000 now <- getCurrentTime @@ -470,7 +415,7 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t (Just finamCode, Just per) -> do debugM "Strategy" $ "Downloading ticker: " ++ finamCode history <- downloadAndParseQuotes $ defaultParams { QF.ticker = T.pack finamCode, - QF.startDate = addDays (negate . (1 +) . ceiling $ downloadDelta strategy / 86400) (utctDay now), + QF.startDate = addDays (negate . (1 +) . ceiling $ delta / 86400) (utctDay now), QF.endDate = utctDay now, QF.period = per } case history of @@ -516,3 +461,78 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef t | x == 24 * 60 * 60 = Just QH.PeriodDay | otherwise = Nothing + +-- | Helper function to make 'Strategy' instances +mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s +mkBarStrategy instanceParams dd params initialState cb = BarStrategy { + downloadDelta = dd, + eventCallback = cb, + currentState = initialState, + strategyParams = params, + strategyTimers = [], + + strategyInstanceParams = instanceParams } + +-- | Main function which handles incoming events (ticks/orders), passes them to strategy callback +-- and executes returned strategy actions +barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef c -> IORef s -> IORef [UTCTime] -> MVar () -> App c s () +barStrategyDriver ctx mbSourceTimeframe tickFilter strategy configRef stateRef timersRef shutdownVar = do + eventChan <- asks envEventChan + brokerChan <- asks envBrokerChan + agg <- asks envAggregator + bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do + lift $ debugM "Strategy" "QuoteSource thread forked" + bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do + lift $ debugM "Strategy" "Broker thread forked" + + wakeupTid <- lift . forkIO $ forever $ do + maybeShutdown <- tryTakeMVar shutdownVar + if isJust maybeShutdown + then writeChan eventChan Shutdown + else do + threadDelay 1000000 + writeChan brokerChan BrokerRequestNotifications + lift $ debugM "Strategy" "Wakeup thread forked" + + readAndHandleEvents agg strategy + lift $ debugM "Strategy" "Stopping strategy driver" + lift $ killThread wakeupTid)) + + lift $ debugM "Strategy" "Strategy done" + + where + qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy + brEp = strategyBrokerEp . strategyInstanceParams $ strategy + + readAndHandleEvents agg strategy' = do + eventChan <- asks envEventChan + event <- lift $ readChan eventChan + if event /= Shutdown + then do + case event of + NewTick _ -> return () + _ -> lift $ debugM "Strategy" $ "event: " ++ show event + env <- getEnvironment + let newTimestamp = case event of + NewTick tick -> timestamp tick + _ -> seLastTimestamp env + nowRef <- asks envLastTimestamp + lift $ writeIORef nowRef newTimestamp + + newTimers <- catMaybes <$> (mapM (checkTimer eventChan newTimestamp) $ strategyTimers strategy') + (eventCallback strategy) event + lift $ writeIORef timersRef newTimers + + readAndHandleEvents agg strategy' + else + lift $ debugM "Strategy" "Shutdown requested" + where + + checkTimer eventChan' newTimestamp timerTime = + if newTimestamp >= timerTime + then do + lift $ writeChan eventChan' $ TimerFired timerTime + return Nothing + else + return $ Just timerTime +