Browse Source

Refactoring: history request

stable
Denis Tereshkin 5 years ago
parent
commit
f17fc33116
  1. 2
      robocom-zero.cabal
  2. 6
      src/ATrade/BarAggregator.hs
  3. 126
      src/ATrade/Driver/Real.hs
  4. 19
      src/ATrade/Quotes.hs
  5. 46
      src/ATrade/Quotes/QHP.hs

2
robocom-zero.cabal

@ -18,10 +18,10 @@ library
ghc-options: -Wall -fno-warn-orphans -Wno-type-defaults ghc-options: -Wall -fno-warn-orphans -Wno-type-defaults
exposed-modules: ATrade.RoboCom.Indicators exposed-modules: ATrade.RoboCom.Indicators
, ATrade.RoboCom.Monad , ATrade.RoboCom.Monad
, ATrade.RoboCom.Persistence
, ATrade.RoboCom.Positions , ATrade.RoboCom.Positions
, ATrade.RoboCom.Types , ATrade.RoboCom.Types
, ATrade.RoboCom.Utils , ATrade.RoboCom.Utils
, ATrade.Quotes
, ATrade.Quotes.Finam , ATrade.Quotes.Finam
, ATrade.Quotes.HAP , ATrade.Quotes.HAP
, ATrade.Quotes.QHP , ATrade.Quotes.QHP

6
src/ATrade/BarAggregator.hs

@ -22,7 +22,8 @@ module ATrade.BarAggregator (
handleTick, handleTick,
updateTime, updateTime,
handleBar, handleBar,
hmsToDiffTime hmsToDiffTime,
replaceHistory
) where ) where
import ATrade.RoboCom.Types import ATrade.RoboCom.Types
@ -47,6 +48,9 @@ mkAggregatorFromBars myBars timeWindows = BarAggregator {
lastTicks = M.empty, lastTicks = M.empty,
tickTimeWindows = timeWindows } tickTimeWindows = timeWindows }
replaceHistory :: BarAggregator -> M.Map TickerId BarSeries -> BarAggregator
replaceHistory agg bars' = agg { bars = bars' }
lBars :: (M.Map TickerId BarSeries -> Identity (M.Map TickerId BarSeries)) -> BarAggregator -> Identity BarAggregator lBars :: (M.Map TickerId BarSeries -> Identity (M.Map TickerId BarSeries)) -> BarAggregator -> Identity BarAggregator
lBars = lens bars (\s b -> s { bars = b }) lBars = lens bars (\s b -> s { bars = b })

126
src/ATrade/Driver/Real.hs

@ -5,6 +5,7 @@
{-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE DeriveGeneric #-}
module ATrade.Driver.Real ( module ATrade.Driver.Real (
Strategy(..), Strategy(..),
@ -44,6 +45,7 @@ import Data.Time.Clock.POSIX
import Data.Maybe import Data.Maybe
import Database.Redis hiding (info, decode) import Database.Redis hiding (info, decode)
import ATrade.Types import ATrade.Types
import ATrade.Quotes
import ATrade.RoboCom.Monad (EventCallback, Event(..), StrategyEnvironment(..), seBars, seLastTimestamp, Event(..), MonadRobot(..)) import ATrade.RoboCom.Monad (EventCallback, Event(..), StrategyEnvironment(..), seBars, seLastTimestamp, Event(..), MonadRobot(..))
import ATrade.BarAggregator import ATrade.BarAggregator
import ATrade.Driver.Real.BrokerClientThread import ATrade.Driver.Real.BrokerClientThread
@ -55,6 +57,7 @@ import ATrade.Quotes.Finam as QF
import ATrade.Quotes.QHP as QQ import ATrade.Quotes.QHP as QQ
import ATrade.Quotes.HAP as QH import ATrade.Quotes.HAP as QH
import System.ZMQ4 hiding (Event(..)) import System.ZMQ4 hiding (Event(..))
import GHC.Generics
data Params = Params { data Params = Params {
instanceId :: String, instanceId :: String,
@ -110,7 +113,8 @@ paramsParser = Params
( long "source-timeframe" ( long "source-timeframe"
<> metavar "SECONDS" )) <> metavar "SECONDS" ))
data Env c s = Env { data Env historySource c s = Env {
envHistorySource :: historySource,
envStrategyInstanceParams :: StrategyInstanceParams, envStrategyInstanceParams :: StrategyInstanceParams,
envStrategyEnvironment :: IORef StrategyEnvironment, envStrategyEnvironment :: IORef StrategyEnvironment,
envConfigRef :: IORef c, envConfigRef :: IORef c,
@ -120,11 +124,11 @@ data Env c s = Env {
envEventChan :: BC.BoundedChan Event, envEventChan :: BC.BoundedChan Event,
envAggregator :: IORef BarAggregator, envAggregator :: IORef BarAggregator,
envLastTimestamp :: IORef UTCTime envLastTimestamp :: IORef UTCTime
} } deriving (Generic)
type App c s = ReaderT (Env c s) IO type App historySource c s = ReaderT (Env historySource c s) IO
instance MonadRobot (App c s) c s where instance MonadRobot (App historySource c s) c s where
submitOrder order = do submitOrder order = do
bc <- asks envBrokerChan bc <- asks envBrokerChan
lift $ BC.writeChan bc $ BrokerSubmitOrder order lift $ BC.writeChan bc $ BrokerSubmitOrder order
@ -159,6 +163,11 @@ instance MonadRobot (App c s) c s where
now <- lift $ readIORef nowRef now <- lift $ readIORef nowRef
return $ env & seBars .~ bars agg & seLastTimestamp .~ now return $ env & seBars .~ bars agg & seLastTimestamp .~ now
instance MonadHistory (App QQ.QHPHandle c s) where
getHistory tickerId timeframe fromTime toTime = do
qhp <- asks envHistorySource
QQ.requestHistoryFromQHP qhp tickerId timeframe fromTime toTime
data BigConfig c = BigConfig { data BigConfig c = BigConfig {
confTickers :: [Ticker], confTickers :: [Ticker],
strategyConfig :: c strategyConfig :: c
@ -257,6 +266,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
withContext (\ctx -> do withContext (\ctx -> do
infoM "main" "Loading history" infoM "main" "Loading history"
-- Load tickers data and create BarAggregator from them -- Load tickers data and create BarAggregator from them
{-
historyBars <- historyBars <-
if if
| (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" -> | (strategyHistoryProviderType . strategyInstanceParams) strategy == "finam" ->
@ -266,9 +276,12 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
| otherwise -> | otherwise ->
M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy) (downloadDelta strategy)) (tickers . strategyInstanceParams $ strategy) M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy) (downloadDelta strategy)) (tickers . strategyInstanceParams $ strategy)
agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 3 50 0, hmsToDiffTime 21 10 0)] -}
agg <- newIORef $ mkAggregatorFromBars M.empty [(hmsToDiffTime 3 50 0, hmsToDiffTime 21 10 0)]
now <- getCurrentTime >>= newIORef now <- getCurrentTime >>= newIORef
let env = Env { let env = Env {
envHistorySource = mkQHPHandle ctx (strategyHistoryProvider . strategyInstanceParams $ strategy),
envStrategyInstanceParams = instanceParams, envStrategyInstanceParams = instanceParams,
envStrategyEnvironment = straEnv, envStrategyEnvironment = straEnv,
envConfigRef = configRef, envConfigRef = configRef,
@ -368,98 +381,6 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
Nothing -> return defaultState ) `catch` Nothing -> return defaultState ) `catch`
(\e -> warningM "main" ("Unable to load state: " ++ show (e :: IOException)) >> return defaultState) (\e -> warningM "main" ("Unable to load state: " ++ show (e :: IOException)) >> return defaultState)
loadTickerFromHAP :: Context -> T.Text -> DiffTime -> Ticker -> IO (TickerId, BarSeries)
loadTickerFromHAP ctx ep delta t = do
debugM "Strategy" $ "Loading ticker from HAP: " ++ show (code t)
case parseHAPPeriod $ timeframeSeconds t of
Just tf -> do
now <- getCurrentTime
historyBars <- QH.getQuotes ctx QH.RequestParams {
QH.endpoint = ep,
QH.ticker = code t,
QH.startDate = addUTCTime (negate . (1 +) . fromRational . toRational $ delta) now,
QH.endDate = now,
QH.period = tf }
debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars"
return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = historyBars })
_ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] })
loadTickerFromQHP :: Context -> T.Text -> DiffTime -> Ticker -> IO (TickerId, BarSeries)
loadTickerFromQHP ctx ep delta t = do
debugM "Strategy" $ "Loading ticker from QHP: " ++ show (code t)
case parseQHPPeriod $ timeframeSeconds t of
Just tf -> do
now <- getCurrentTime
historyBars <- QQ.getQuotes ctx QQ.RequestParams {
QQ.endpoint = ep,
QQ.ticker = code t,
QQ.startDate = addDays (negate . (1 +) . ceiling $ delta / 86400) (utctDay now),
QQ.endDate = utctDay now,
QQ.period = tf }
debugM "Strategy" $ "Obtained " ++ show (length historyBars) ++ " bars"
debugM "Strategy" $ show (take 20 historyBars)
return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = historyBars })
_ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] })
loadTickerFromFinam :: DiffTime -> Ticker -> IO (TickerId, BarSeries)
loadTickerFromFinam delta t = do
randDelay <- getStdRandom (randomR (1, 5))
threadDelay $ randDelay * 1000000
now <- getCurrentTime
debugM "Strategy" $ show (L.lookup "finam" (aliases t), parseFinamPeriod $ timeframeSeconds t)
case (L.lookup "finam" (aliases t), parseFinamPeriod $ timeframeSeconds t) of
(Just finamCode, Just per) -> do
debugM "Strategy" $ "Downloading ticker: " ++ finamCode
history <- downloadAndParseQuotes $ defaultParams { QF.ticker = T.pack finamCode,
QF.startDate = addDays (negate . (1 +) . ceiling $ delta / 86400) (utctDay now),
QF.endDate = utctDay now,
QF.period = per }
case history of
Just h -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = convertFromFinamHistory (code t) h })
Nothing -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] })
_ -> return (code t, BarSeries { bsTickerId = code t, bsTimeframe = Timeframe (timeframeSeconds t), bsBars = [] })
convertFromFinamHistory :: TickerId -> [Row] -> [Bar]
convertFromFinamHistory tid = L.reverse . fmap (\row -> Bar { barSecurity = tid,
barTimestamp = rowTime row,
barOpen = rowOpen row,
barHigh = rowHigh row,
barLow = rowLow row,
barClose = rowClose row,
barVolume = rowVolume row })
parseFinamPeriod x
| x == 0 = Just QF.PeriodTick
| x == 60 = Just QF.Period1Min
| x == 5 * 60 = Just QF.Period5Min
| x == 10 * 60 = Just QF.Period10Min
| x == 15 * 60 = Just QF.Period15Min
| x == 30 * 60 = Just QF.Period30Min
| x == 60 * 60 = Just QF.PeriodHour
| x == 24 * 60 * 60 = Just QF.PeriodDay
| otherwise = Nothing
parseQHPPeriod x
| x == 60 = Just QQ.Period1Min
| x == 5 * 60 = Just QQ.Period5Min
| x == 15 * 60 = Just QQ.Period15Min
| x == 30 * 60 = Just QQ.Period30Min
| x == 60 * 60 = Just QQ.PeriodHour
| x == 24 * 60 * 60 = Just QQ.PeriodDay
| otherwise = Nothing
parseHAPPeriod x
| x == 60 = Just QH.Period1Min
| x == 5 * 60 = Just QH.Period5Min
| x == 15 * 60 = Just QH.Period15Min
| x == 30 * 60 = Just QH.Period30Min
| x == 60 * 60 = Just QH.PeriodHour
| x == 24 * 60 * 60 = Just QH.PeriodDay
| otherwise = Nothing
-- | Helper function to make 'Strategy' instances -- | Helper function to make 'Strategy' instances
mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s mkBarStrategy :: StrategyInstanceParams -> DiffTime -> c -> s -> EventCallback c s -> Strategy c s
mkBarStrategy instanceParams dd params initialState cb = BarStrategy { mkBarStrategy instanceParams dd params initialState cb = BarStrategy {
@ -473,11 +394,15 @@ mkBarStrategy instanceParams dd params initialState cb = BarStrategy {
-- | Main function which handles incoming events (ticks/orders), passes them to strategy callback -- | Main function which handles incoming events (ticks/orders), passes them to strategy callback
-- and executes returned strategy actions -- and executes returned strategy actions
barStrategyDriver :: Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> MVar () -> App c s () barStrategyDriver :: (MonadHistory (App hs c s)) => Context -> Maybe Int -> (Tick -> Bool) -> Strategy c s -> MVar () -> App hs c s ()
barStrategyDriver ctx mbSourceTimeframe tickFilter strategy shutdownVar = do barStrategyDriver ctx mbSourceTimeframe tickFilter strategy shutdownVar = do
now <- liftIO getCurrentTime
history <- M.fromList <$> mapM (loadTickerHistory now) (tickers . strategyInstanceParams $ strategy)
eventChan <- asks envEventChan eventChan <- asks envEventChan
brokerChan <- asks envBrokerChan brokerChan <- asks envBrokerChan
agg <- asks envAggregator agg <- asks envAggregator
liftIO $ atomicModifyIORef' agg (\s -> (replaceHistory s history, ()))
bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do bracket (lift $ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) (lift . killThread) (\_ -> do
lift $ debugM "Strategy" "QuoteSource thread forked" lift $ debugM "Strategy" "QuoteSource thread forked"
bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do bracket (lift $ startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp brokerChan eventChan shutdownVar) (lift . killThread) (\_ -> do
@ -502,6 +427,11 @@ barStrategyDriver ctx mbSourceTimeframe tickFilter strategy shutdownVar = do
qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy qsEp = strategyQuotesourceEp . strategyInstanceParams $ strategy
brEp = strategyBrokerEp . strategyInstanceParams $ strategy brEp = strategyBrokerEp . strategyInstanceParams $ strategy
loadTickerHistory now t = do
history <- getHistory (code t) (BarTimeframe (fromInteger . timeframeSeconds $ t))
((fromRational . toRational . negate $ downloadDelta strategy) `addUTCTime` now) now
return (code t, BarSeries (code t) (Timeframe (timeframeSeconds t)) history)
readAndHandleEvents agg strategy' = do readAndHandleEvents agg strategy' = do
eventChan <- asks envEventChan eventChan <- asks envEventChan
event <- lift $ readChan eventChan event <- lift $ readChan eventChan

19
src/ATrade/Quotes.hs

@ -0,0 +1,19 @@
{- |
- Module : ATrade.Quotes
- Various historical price series management stuff
-}
module ATrade.Quotes
(
MonadHistory(..)
) where
import ATrade.Types (Bar, BarTimeframe, TickerId)
import Data.Time.Clock (UTCTime)
class (Monad m) => MonadHistory m where
-- | 'getHistory tickerId timeframe fromTime toTime' should return requested timeframe between 'fromTime' and 'toTime'
getHistory :: TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar]

46
src/ATrade/Quotes/QHP.hs

@ -1,16 +1,21 @@
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
module ATrade.Quotes.QHP ( module ATrade.Quotes.QHP (
getQuotes,
Period(..), Period(..),
RequestParams(..) RequestParams(..),
QHPHandle,
mkQHPHandle,
requestHistoryFromQHP
) where ) where
import ATrade.Exceptions
import ATrade.Types import ATrade.Types
import Control.Exception.Safe (MonadThrow, throw)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Data.Aeson import Data.Aeson
import Data.Binary.Get import Data.Binary.Get
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL
import qualified Data.Text as T import qualified Data.Text as T
import Data.Time.Calendar import Data.Time.Calendar
import Data.Time.Clock import Data.Time.Clock
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
@ -39,6 +44,39 @@ instance Show Period where
show PeriodWeek = "W" show PeriodWeek = "W"
show PeriodMonth = "MN" show PeriodMonth = "MN"
data QHPHandle = QHPHandle
{
qhpContext :: Context
, qhpEndpoint :: T.Text
}
mkQHPHandle :: Context -> T.Text -> QHPHandle
mkQHPHandle = QHPHandle
requestHistoryFromQHP :: (MonadThrow m, MonadIO m) => QHPHandle -> TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar]
requestHistoryFromQHP qhp tickerId timeframe fromTime toTime =
case parseQHPPeriod (unBarTimeframe timeframe) of
Just tf -> liftIO $ getQuotes (qhpContext qhp) (params tf)
_ -> throw $ BadParams "QHP: Unable to parse timeframe"
where
params tf = RequestParams
{
endpoint = qhpEndpoint qhp,
ticker = tickerId,
startDate = utctDay fromTime,
endDate = utctDay toTime,
period = tf
}
parseQHPPeriod x
| x == 60 = Just Period1Min
| x == 5 * 60 = Just Period5Min
| x == 15 * 60 = Just Period15Min
| x == 30 * 60 = Just Period30Min
| x == 60 * 60 = Just PeriodHour
| x == 24 * 60 * 60 = Just PeriodDay
| otherwise = Nothing
data RequestParams = data RequestParams =
RequestParams RequestParams
{ {

Loading…
Cancel
Save