From a245a01a660be92f25ee8239f67158763ddfbc96 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 4 Jan 2022 14:38:19 +0700 Subject: [PATCH] QuoteStream unsubscription --- src/ATrade/Driver/Junction/JunctionMonad.hs | 5 +-- src/ATrade/Driver/Junction/QuoteStream.hs | 3 ++ src/ATrade/Driver/Junction/QuoteThread.hs | 48 +++++++++++++++------ 3 files changed, 40 insertions(+), 16 deletions(-) diff --git a/src/ATrade/Driver/Junction/JunctionMonad.hs b/src/ATrade/Driver/Junction/JunctionMonad.hs index 43be49b..ae7e3af 100644 --- a/src/ATrade/Driver/Junction/JunctionMonad.hs +++ b/src/ATrade/Driver/Junction/JunctionMonad.hs @@ -138,10 +138,9 @@ instance MonadPersistence JunctionM where return def instance QuoteStream JunctionM where - addSubscription (QuoteSubscription ticker timeframe) chan = do + addSubscription (QuoteSubscription ticker tf) chan = do qt <- asks peQuoteThread - QT.addSubscription qt ticker timeframe chan - return (SubscriptionId 0) -- TODO subscription Ids + QT.addSubscription qt ticker tf chan removeSubscription _ = undefined startRobot :: LogAction IO Message -> ProgramConfiguration -> IORef Bars -> IORef TickerInfoMap -> diff --git a/src/ATrade/Driver/Junction/QuoteStream.hs b/src/ATrade/Driver/Junction/QuoteStream.hs index d391147..31fd105 100644 --- a/src/ATrade/Driver/Junction/QuoteStream.hs +++ b/src/ATrade/Driver/Junction/QuoteStream.hs @@ -21,6 +21,9 @@ instance Hashable BarTimeframe instance Hashable QuoteSubscription newtype SubscriptionId = SubscriptionId { unSubscriptionId :: Int } + deriving (Show, Eq, Generic) + +instance Hashable SubscriptionId class (Monad m) => QuoteStream m where addSubscription :: QuoteSubscription -> BoundedChan QuoteData -> m SubscriptionId diff --git a/src/ATrade/Driver/Junction/QuoteThread.hs b/src/ATrade/Driver/Junction/QuoteThread.hs index 11825d9..779a6f4 100644 --- a/src/ATrade/Driver/Junction/QuoteThread.hs +++ b/src/ATrade/Driver/Junction/QuoteThread.hs @@ -13,6 +13,7 @@ module ATrade.Driver.Junction.QuoteThread startQuoteThread, stopQuoteThread, addSubscription, + removeSubscription, DownloaderM, DownloaderEnv(..), runDownloaderM, @@ -20,7 +21,8 @@ module ATrade.Driver.Junction.QuoteThread ) where import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..)) -import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..)) +import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..), + SubscriptionId (SubscriptionId)) import ATrade.Logging (Message) import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) import ATrade.Quotes.QHP (QHPHandle, requestHistoryFromQHP) @@ -50,6 +52,7 @@ import Control.Concurrent (ThreadId, forkIO, import Control.Concurrent.BoundedChan (BoundedChan, newBoundedChan, readChan, + tryWriteChan, writeChan) import Control.Exception.Safe (MonadMask, MonadThrow, @@ -77,10 +80,12 @@ data QuoteThreadEnv = QuoteThreadEnv { bars :: IORef Bars, - endpoints :: IORef (HM.HashMap QuoteSubscription [BoundedChan QuoteData]), + endpoints :: IORef (HM.HashMap QuoteSubscription [(SubscriptionId, BoundedChan QuoteData)]), qsclient :: QuoteSourceClientHandle, paramsCache :: IORef TickerInfoMap, - downloaderChan :: BoundedChan QuoteSubscription + downloaderChan :: BoundedChan QuoteSubscription, + subscriptionIdCounter :: IORef Int, + subscriptions :: IORef (HM.HashMap SubscriptionId QuoteSubscription) } startQuoteThread :: (MonadIO m, @@ -100,7 +105,7 @@ startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do chan <- liftIO $ newBoundedChan 2000 dChan <- liftIO $ newBoundedChan 2000 qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger - env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan + env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan <*> newIORef 0 <*> newIORef HM.empty tid <- liftIO . forkIO $ quoteThread env chan downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) return $ QuoteThreadHandle tid downloaderTid env @@ -142,7 +147,7 @@ startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do subs <- asks endpoints >>= (lift . readIORef) case HM.lookup key subs of Just clientChannels -> do - lift $ mapM_ (`writeChan` qssData) clientChannels + lift $ mapM_ (\(_, chan') -> tryWriteChan chan' qssData) clientChannels Nothing -> return () stopQuoteThread :: (MonadIO m) => QuoteThreadHandle -> m () @@ -151,19 +156,36 @@ stopQuoteThread (QuoteThreadHandle tid dtid env) = liftIO $ do killThread dtid stopQuoteSourceClient (qsclient env) -addSubscription :: (MonadIO m) => QuoteThreadHandle -> TickerId -> BarTimeframe -> BoundedChan QuoteData -> m () +addSubscription :: (MonadIO m) => QuoteThreadHandle -> TickerId -> BarTimeframe -> BoundedChan QuoteData -> m SubscriptionId addSubscription (QuoteThreadHandle _ _ env) tid tf chan = liftIO $ do - writeChan (downloaderChan env) (QuoteSubscription tid tf) - atomicModifyIORef' (endpoints env) (\m -> (doAddSubscription m tid, ())) + cnt <- atomicModifyIORef' (subscriptionIdCounter env) (\c -> (c + 1, c)) + let subscription = QuoteSubscription tid tf + let subid = SubscriptionId cnt + writeChan (downloaderChan env) subscription + atomicModifyIORef' (endpoints env) (\m -> (doAddSubscription m subid tid, ())) + atomicModifyIORef' (subscriptions env) (\m -> (HM.insert subid subscription m, ())) quoteSourceClientSubscribe (qsclient env) [(tid, BarTimeframe 0)] + return subid where - doAddSubscription m tickerid = + doAddSubscription m subid tickerid = let m1 = HM.alter (\case - Just chans -> Just (chan : chans) - _ -> Just [chan]) (QuoteSubscription tickerid tf) m in + Just chans -> Just ((subid, chan) : chans) + _ -> Just [(subid, chan)]) (QuoteSubscription tickerid tf) m in HM.alter (\case - Just chans -> Just (chan : chans) - _ -> Just [chan]) (QuoteSubscription tickerid (BarTimeframe 0)) m1 + Just chans -> Just ((subid, chan) : chans) + _ -> Just [(subid, chan)]) (QuoteSubscription tickerid (BarTimeframe 0)) m1 + +removeSubscription :: (MonadIO m) => QuoteThreadHandle -> SubscriptionId -> m () +removeSubscription (QuoteThreadHandle _ _ env) subId = liftIO $ do + subs <- readIORef (subscriptions env) + case HM.lookup subId subs of + Just sub -> atomicModifyIORef' (endpoints env) (\m -> (doRemoveSubscription m sub, ())) + Nothing -> return () + where + doRemoveSubscription m sub = + let m1 = HM.adjust (filter (\(subId', _) -> subId' == subId)) sub m in + HM.adjust (filter (\(subId', _) -> subId' == subId)) (sub0 sub) m1 + sub0 sub = let QuoteSubscription tid _ = sub in QuoteSubscription tid (BarTimeframe 0) updateBarsMap :: Bars -> Bar -> BarTimeframe -> Bars updateBarsMap barsMap bar tf = M.adjust (addToSeries bar) (BarSeriesId (barSecurity bar) tf) barsMap