Browse Source

Experimental support for bar aggregation

master
Denis Tereshkin 7 years ago
parent
commit
4f72e23337
  1. 14
      src/ATrade/Driver/Real.hs
  2. 18
      src/ATrade/Driver/Real/QuoteSourceThread.hs

14
src/ATrade/Driver/Real.hs

@ -64,7 +64,8 @@ data Params = Params { @@ -64,7 +64,8 @@ data Params = Params {
redisSocket :: Maybe String,
qtisSocket :: Maybe String,
accountId :: String,
volumeFactor :: Int
volumeFactor :: Int,
sourceBarTimeframe :: Maybe Int
} deriving (Show, Eq)
paramsParser :: Parser Params
@ -102,6 +103,9 @@ paramsParser = Params @@ -102,6 +103,9 @@ paramsParser = Params
<*> option auto
( long "volume"
<> metavar "VOLUME" )
<*> optional ( option auto
( long "source-timeframe"
<> metavar "SECONDS" ))
data BigConfig c = BigConfig {
@ -187,7 +191,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do @@ -187,7 +191,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do
storeState params stateRef timersRef
debugM "main" "Starting strategy driver"
barStrategyDriver tickFilter strategy stateRef timersRef shutdownMv `finally` killThread stateSavingThread
barStrategyDriver (sourceBarTimeframe params) tickFilter strategy stateRef timersRef shutdownMv `finally` killThread stateSavingThread
where
tickFilter :: Tick -> Bool
tickFilter tick =
@ -289,8 +293,8 @@ mkBarStrategy instanceParams dd params initialState cb = BarStrategy { @@ -289,8 +293,8 @@ mkBarStrategy instanceParams dd params initialState cb = BarStrategy {
-- | Main function which handles incoming events (ticks/orders), passes them to strategy callback
-- and executes returned strategy actions
barStrategyDriver :: (Tick -> Bool) -> Strategy c s -> IORef s -> IORef [UTCTime] -> MVar () -> IO ()
barStrategyDriver tickFilter strategy stateRef timersRef shutdownVar = do
barStrategyDriver :: Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef s -> IORef [UTCTime] -> MVar () -> IO ()
barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutdownVar = do
-- Make channels
-- Event channel is for strategy events, like new tick arrival, or order execution notification
eventChan <- BC.newBoundedChan 1000
@ -308,7 +312,7 @@ barStrategyDriver tickFilter strategy stateRef timersRef shutdownVar = do @@ -308,7 +312,7 @@ barStrategyDriver tickFilter strategy stateRef timersRef shutdownVar = do
| otherwise ->
M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy)
agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 0 0)]
bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter) killThread (\_ -> do
bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) killThread (\_ -> do
debugM "Strategy" "QuoteSource thread forked"
bracket (startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp ordersChan eventChan shutdownVar) killThread (\_ -> do
debugM "Strategy" "Broker thread forked"

18
src/ATrade/Driver/Real/QuoteSourceThread.hs

@ -24,10 +24,10 @@ import Control.Monad @@ -24,10 +24,10 @@ import Control.Monad
import System.Log.Logger
import System.ZMQ4 hiding (Event)
startQuoteSourceThread :: Context -> T.Text -> Strategy c s -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> IO ThreadId
startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter = forkIO $ do
startQuoteSourceThread :: Context -> T.Text -> Strategy c s -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> Maybe Int -> IO ThreadId
startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTimeframe = forkIO $ do
tickChan <- newBoundedChan 1000
bracket (startQuoteSourceClient tickChan (fmap code . (tickers . strategyInstanceParams) $ strategy) ctx qsEp)
bracket (startQuoteSourceClient tickChan tickersList ctx qsEp)
(\qs -> do
stopQuoteSourceClient qs
debugM "Strategy" "Quotesource client: stop")
@ -40,9 +40,17 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter = forkIO $ do @@ -40,9 +40,17 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter = forkIO $ do
case handleTick tick aggValue of
(Just bar, !newAggValue) -> writeChan eventChan (NewBar bar) >> writeIORef agg newAggValue
(Nothing, !newAggValue) -> writeIORef agg newAggValue
QDBar (tf, bar) -> return () -- TODO
)
QDBar (_, bar) -> do
aggValue <- readIORef agg
case handleBar bar aggValue of
(Just bar', !newAggValue) -> writeChan eventChan (NewBar bar') >> writeIORef agg newAggValue
(Nothing, !newAggValue) -> writeIORef agg newAggValue)
where
goodTick tick = tickFilter tick &&
(datatype tick /= LastTradePrice || (datatype tick == LastTradePrice && volume tick > 0))
tickersList' = fmap code . (tickers . strategyInstanceParams) $ strategy
tickersList = case maybeSourceTimeframe of
Just tf -> fmap (\x -> T.append x (T.pack $ ":" ++ show tf ++ ";")) tickersList'
_ -> tickersList'

Loading…
Cancel
Save