diff --git a/src/ATrade/Driver/Real.hs b/src/ATrade/Driver/Real.hs index 0da5eca..e415dea 100644 --- a/src/ATrade/Driver/Real.hs +++ b/src/ATrade/Driver/Real.hs @@ -64,7 +64,8 @@ data Params = Params { redisSocket :: Maybe String, qtisSocket :: Maybe String, accountId :: String, - volumeFactor :: Int + volumeFactor :: Int, + sourceBarTimeframe :: Maybe Int } deriving (Show, Eq) paramsParser :: Parser Params @@ -102,6 +103,9 @@ paramsParser = Params <*> option auto ( long "volume" <> metavar "VOLUME" ) + <*> optional ( option auto + ( long "source-timeframe" + <> metavar "SECONDS" )) data BigConfig c = BigConfig { @@ -187,7 +191,7 @@ robotMain dataDownloadDelta defaultState initCallback callback = do storeState params stateRef timersRef debugM "main" "Starting strategy driver" - barStrategyDriver tickFilter strategy stateRef timersRef shutdownMv `finally` killThread stateSavingThread + barStrategyDriver (sourceBarTimeframe params) tickFilter strategy stateRef timersRef shutdownMv `finally` killThread stateSavingThread where tickFilter :: Tick -> Bool tickFilter tick = @@ -289,8 +293,8 @@ mkBarStrategy instanceParams dd params initialState cb = BarStrategy { -- | Main function which handles incoming events (ticks/orders), passes them to strategy callback -- and executes returned strategy actions -barStrategyDriver :: (Tick -> Bool) -> Strategy c s -> IORef s -> IORef [UTCTime] -> MVar () -> IO () -barStrategyDriver tickFilter strategy stateRef timersRef shutdownVar = do +barStrategyDriver :: Maybe Int -> (Tick -> Bool) -> Strategy c s -> IORef s -> IORef [UTCTime] -> MVar () -> IO () +barStrategyDriver mbSourceTimeframe tickFilter strategy stateRef timersRef shutdownVar = do -- Make channels -- Event channel is for strategy events, like new tick arrival, or order execution notification eventChan <- BC.newBoundedChan 1000 @@ -308,7 +312,7 @@ barStrategyDriver tickFilter strategy stateRef timersRef shutdownVar = do | otherwise -> M.fromList <$> mapM (loadTickerFromQHP ctx ((strategyHistoryProvider . strategyInstanceParams) strategy)) (tickers . strategyInstanceParams $ strategy) agg <- newIORef $ mkAggregatorFromBars historyBars [(hmsToDiffTime 6 50 0, hmsToDiffTime 21 0 0)] - bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter) killThread (\_ -> do + bracket (startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter mbSourceTimeframe) killThread (\_ -> do debugM "Strategy" "QuoteSource thread forked" bracket (startBrokerClientThread (strategyInstanceId . strategyInstanceParams $ strategy) ctx brEp ordersChan eventChan shutdownVar) killThread (\_ -> do debugM "Strategy" "Broker thread forked" diff --git a/src/ATrade/Driver/Real/QuoteSourceThread.hs b/src/ATrade/Driver/Real/QuoteSourceThread.hs index 80ac1a9..32617d6 100644 --- a/src/ATrade/Driver/Real/QuoteSourceThread.hs +++ b/src/ATrade/Driver/Real/QuoteSourceThread.hs @@ -24,10 +24,10 @@ import Control.Monad import System.Log.Logger import System.ZMQ4 hiding (Event) -startQuoteSourceThread :: Context -> T.Text -> Strategy c s -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> IO ThreadId -startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter = forkIO $ do +startQuoteSourceThread :: Context -> T.Text -> Strategy c s -> BoundedChan Event -> IORef BarAggregator -> (Tick -> Bool) -> Maybe Int -> IO ThreadId +startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter maybeSourceTimeframe = forkIO $ do tickChan <- newBoundedChan 1000 - bracket (startQuoteSourceClient tickChan (fmap code . (tickers . strategyInstanceParams) $ strategy) ctx qsEp) + bracket (startQuoteSourceClient tickChan tickersList ctx qsEp) (\qs -> do stopQuoteSourceClient qs debugM "Strategy" "Quotesource client: stop") @@ -40,9 +40,17 @@ startQuoteSourceThread ctx qsEp strategy eventChan agg tickFilter = forkIO $ do case handleTick tick aggValue of (Just bar, !newAggValue) -> writeChan eventChan (NewBar bar) >> writeIORef agg newAggValue (Nothing, !newAggValue) -> writeIORef agg newAggValue - QDBar (tf, bar) -> return () -- TODO - ) + QDBar (_, bar) -> do + aggValue <- readIORef agg + case handleBar bar aggValue of + (Just bar', !newAggValue) -> writeChan eventChan (NewBar bar') >> writeIORef agg newAggValue + (Nothing, !newAggValue) -> writeIORef agg newAggValue) where goodTick tick = tickFilter tick && (datatype tick /= LastTradePrice || (datatype tick == LastTradePrice && volume tick > 0)) + tickersList' = fmap code . (tickers . strategyInstanceParams) $ strategy + tickersList = case maybeSourceTimeframe of + Just tf -> fmap (\x -> T.append x (T.pack $ ":" ++ show tf ++ ";")) tickersList' + _ -> tickersList' +