Browse Source

QuoteThread: Aggregate bars from ticks

master
Denis Tereshkin 3 years ago
parent
commit
bcb7d91805
  1. 1
      src/ATrade/Driver/Junction.hs
  2. 48
      src/ATrade/Driver/Junction/QuoteThread.hs

1
src/ATrade/Driver/Junction.hs

@ -6,7 +6,6 @@
{-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE RankNTypes #-} {-# LANGUAGE RankNTypes #-}
{-# OPTIONS_GHC -Wno-unused-imports #-}
module ATrade.Driver.Junction module ATrade.Driver.Junction
( (

48
src/ATrade/Driver/Junction/QuoteThread.hs

@ -21,6 +21,7 @@ module ATrade.Driver.Junction.QuoteThread
withQThread withQThread
) where ) where
import qualified ATrade.BarAggregator as BA
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..)) import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..))
import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..), import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..),
SubscriptionId (SubscriptionId)) SubscriptionId (SubscriptionId))
@ -60,7 +61,8 @@ import Control.Concurrent.BoundedChan (BoundedChan,
import Control.Exception.Safe (MonadMask, import Control.Exception.Safe (MonadMask,
MonadThrow, MonadThrow,
bracket) bracket)
import Control.Monad (forM, forever) import Control.Monad (forM, forM_,
forever)
import Control.Monad.Reader (MonadIO (liftIO), ReaderT (runReaderT), import Control.Monad.Reader (MonadIO (liftIO), ReaderT (runReaderT),
lift) lift)
import Control.Monad.Reader.Class (MonadReader, asks) import Control.Monad.Reader.Class (MonadReader, asks)
@ -88,7 +90,8 @@ data QuoteThreadEnv =
paramsCache :: IORef TickerInfoMap, paramsCache :: IORef TickerInfoMap,
downloaderChan :: BoundedChan QuoteSubscription, downloaderChan :: BoundedChan QuoteSubscription,
subscriptionIdCounter :: IORef Int, subscriptionIdCounter :: IORef Int,
subscriptions :: IORef (HM.HashMap SubscriptionId QuoteSubscription) subscriptions :: IORef (HM.HashMap SubscriptionId QuoteSubscription),
aggregators :: IORef (HM.HashMap (TickerId, BarTimeframe) BA.BarAggregator)
} }
startQuoteThread :: (MonadIO m, startQuoteThread :: (MonadIO m,
@ -108,7 +111,7 @@ startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do
chan <- liftIO $ newBoundedChan 2000 chan <- liftIO $ newBoundedChan 2000
dChan <- liftIO $ newBoundedChan 2000 dChan <- liftIO $ newBoundedChan 2000
qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger
env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan <*> newIORef 0 <*> newIORef HM.empty env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan <*> newIORef 0 <*> newIORef HM.empty <*> newIORef HM.empty
tid <- liftIO . forkIO $ quoteThread env chan tid <- liftIO . forkIO $ quoteThread env chan
downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan)
return $ QuoteThreadHandle tid downloaderTid env return $ QuoteThreadHandle tid downloaderTid env
@ -140,6 +143,26 @@ startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do
liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ()))
_ -> logWarning "QuoteThread" $ "Unable to find parameters for: " <> (T.pack . show $ BarSeriesId tickerid tf) _ -> logWarning "QuoteThread" $ "Unable to find parameters for: " <> (T.pack . show $ BarSeriesId tickerid tf)
pushToBarAggregators tick = forM_ (BarTimeframe <$> [60, 300, 900, 3600]) (pushTickToAggregator tick)
pushTickToAggregator tick tf = do
aggsRef <- asks aggregators
aggs <- liftIO . readIORef $ aggsRef
let key = (security tick, tf)
case HM.lookup key aggs of
Just agg -> do
let (mbar, agg') = BA.handleTick tick agg
liftIO $ atomicModifyIORef' aggsRef (\m -> (HM.insert key agg' m, ()))
barsRef' <- asks bars
case mbar of
Just bar -> do
liftIO $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ()))
writeBarData bar tf (QDBar (tf, bar))
_ -> do
pure ()
_ -> do
let agg = BA.mkAggregatorFromBars (M.singleton (security tick) (BarSeries (security tick) tf [] (InstrumentParameters (security tick) 1 1))) [(0, 86400)]
liftIO $ atomicModifyIORef' aggsRef (\m -> (HM.insert key agg m, ()))
quoteThread env chan = flip runReaderT env $ forever $ do quoteThread env chan = flip runReaderT env $ forever $ do
qssData <- lift $ readChan chan qssData <- lift $ readChan chan
@ -147,10 +170,21 @@ startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do
QDBar (tf, bar) -> do QDBar (tf, bar) -> do
barsRef' <- asks bars barsRef' <- asks bars
lift $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ())) lift $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ()))
_ -> return () -- TODO pass to bar aggregator writeBarData bar tf qssData
let key = case qssData of QDTick tick -> do
QDTick tick -> QuoteSubscription (security tick) (BarTimeframe 0) pushToBarAggregators tick
QDBar (tf, bar) -> QuoteSubscription (barSecurity bar) tf writeTickData tick qssData
writeTickData tick qssData = do
let key = QuoteSubscription (security tick) (BarTimeframe 0)
subs <- asks endpoints >>= (lift . readIORef)
case HM.lookup key subs of
Just clientChannels -> do
lift $ mapM_ (\(_, chan') -> tryWriteChan chan' qssData) clientChannels
Nothing -> return ()
writeBarData bar tf qssData = do
let key = QuoteSubscription (barSecurity bar) tf
subs <- asks endpoints >>= (lift . readIORef) subs <- asks endpoints >>= (lift . readIORef)
case HM.lookup key subs of case HM.lookup key subs of
Just clientChannels -> do Just clientChannels -> do

Loading…
Cancel
Save