From bcb7d9180571e2a75b4c4c0ba6bc87851366a9b2 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 25 Apr 2023 10:47:03 +0700 Subject: [PATCH] QuoteThread: Aggregate bars from ticks --- src/ATrade/Driver/Junction.hs | 1 - src/ATrade/Driver/Junction/QuoteThread.hs | 48 +++++++++++++++++++---- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/src/ATrade/Driver/Junction.hs b/src/ATrade/Driver/Junction.hs index b3e112f..b5de445 100644 --- a/src/ATrade/Driver/Junction.hs +++ b/src/ATrade/Driver/Junction.hs @@ -6,7 +6,6 @@ {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE RankNTypes #-} -{-# OPTIONS_GHC -Wno-unused-imports #-} module ATrade.Driver.Junction ( diff --git a/src/ATrade/Driver/Junction/QuoteThread.hs b/src/ATrade/Driver/Junction/QuoteThread.hs index 82ffc24..76c23fa 100644 --- a/src/ATrade/Driver/Junction/QuoteThread.hs +++ b/src/ATrade/Driver/Junction/QuoteThread.hs @@ -21,6 +21,7 @@ module ATrade.Driver.Junction.QuoteThread withQThread ) where +import qualified ATrade.BarAggregator as BA import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..)) import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..), SubscriptionId (SubscriptionId)) @@ -60,7 +61,8 @@ import Control.Concurrent.BoundedChan (BoundedChan, import Control.Exception.Safe (MonadMask, MonadThrow, bracket) -import Control.Monad (forM, forever) +import Control.Monad (forM, forM_, + forever) import Control.Monad.Reader (MonadIO (liftIO), ReaderT (runReaderT), lift) import Control.Monad.Reader.Class (MonadReader, asks) @@ -88,7 +90,8 @@ data QuoteThreadEnv = paramsCache :: IORef TickerInfoMap, downloaderChan :: BoundedChan QuoteSubscription, subscriptionIdCounter :: IORef Int, - subscriptions :: IORef (HM.HashMap SubscriptionId QuoteSubscription) + subscriptions :: IORef (HM.HashMap SubscriptionId QuoteSubscription), + aggregators :: IORef (HM.HashMap (TickerId, BarTimeframe) BA.BarAggregator) } startQuoteThread :: (MonadIO m, @@ -108,7 +111,7 @@ startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do chan <- liftIO $ newBoundedChan 2000 dChan <- liftIO $ newBoundedChan 2000 qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger - env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan <*> newIORef 0 <*> newIORef HM.empty + env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan <*> newIORef 0 <*> newIORef HM.empty <*> newIORef HM.empty tid <- liftIO . forkIO $ quoteThread env chan downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) return $ QuoteThreadHandle tid downloaderTid env @@ -140,6 +143,26 @@ startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) _ -> logWarning "QuoteThread" $ "Unable to find parameters for: " <> (T.pack . show $ BarSeriesId tickerid tf) + pushToBarAggregators tick = forM_ (BarTimeframe <$> [60, 300, 900, 3600]) (pushTickToAggregator tick) + + pushTickToAggregator tick tf = do + aggsRef <- asks aggregators + aggs <- liftIO . readIORef $ aggsRef + let key = (security tick, tf) + case HM.lookup key aggs of + Just agg -> do + let (mbar, agg') = BA.handleTick tick agg + liftIO $ atomicModifyIORef' aggsRef (\m -> (HM.insert key agg' m, ())) + barsRef' <- asks bars + case mbar of + Just bar -> do + liftIO $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ())) + writeBarData bar tf (QDBar (tf, bar)) + _ -> do + pure () + _ -> do + let agg = BA.mkAggregatorFromBars (M.singleton (security tick) (BarSeries (security tick) tf [] (InstrumentParameters (security tick) 1 1))) [(0, 86400)] + liftIO $ atomicModifyIORef' aggsRef (\m -> (HM.insert key agg m, ())) quoteThread env chan = flip runReaderT env $ forever $ do qssData <- lift $ readChan chan @@ -147,10 +170,21 @@ startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do QDBar (tf, bar) -> do barsRef' <- asks bars lift $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ())) - _ -> return () -- TODO pass to bar aggregator - let key = case qssData of - QDTick tick -> QuoteSubscription (security tick) (BarTimeframe 0) - QDBar (tf, bar) -> QuoteSubscription (barSecurity bar) tf + writeBarData bar tf qssData + QDTick tick -> do + pushToBarAggregators tick + writeTickData tick qssData + + writeTickData tick qssData = do + let key = QuoteSubscription (security tick) (BarTimeframe 0) + subs <- asks endpoints >>= (lift . readIORef) + case HM.lookup key subs of + Just clientChannels -> do + lift $ mapM_ (\(_, chan') -> tryWriteChan chan' qssData) clientChannels + Nothing -> return () + + writeBarData bar tf qssData = do + let key = QuoteSubscription (barSecurity bar) tf subs <- asks endpoints >>= (lift . readIORef) case HM.lookup key subs of Just clientChannels -> do