Browse Source

QuoteStream unsubscription

master
Denis Tereshkin 4 years ago
parent
commit
a245a01a66
  1. 5
      src/ATrade/Driver/Junction/JunctionMonad.hs
  2. 3
      src/ATrade/Driver/Junction/QuoteStream.hs
  3. 48
      src/ATrade/Driver/Junction/QuoteThread.hs

5
src/ATrade/Driver/Junction/JunctionMonad.hs

@ -138,10 +138,9 @@ instance MonadPersistence JunctionM where
return def return def
instance QuoteStream JunctionM where instance QuoteStream JunctionM where
addSubscription (QuoteSubscription ticker timeframe) chan = do addSubscription (QuoteSubscription ticker tf) chan = do
qt <- asks peQuoteThread qt <- asks peQuoteThread
QT.addSubscription qt ticker timeframe chan QT.addSubscription qt ticker tf chan
return (SubscriptionId 0) -- TODO subscription Ids
removeSubscription _ = undefined removeSubscription _ = undefined
startRobot :: LogAction IO Message -> ProgramConfiguration -> IORef Bars -> IORef TickerInfoMap -> startRobot :: LogAction IO Message -> ProgramConfiguration -> IORef Bars -> IORef TickerInfoMap ->

3
src/ATrade/Driver/Junction/QuoteStream.hs

@ -21,6 +21,9 @@ instance Hashable BarTimeframe
instance Hashable QuoteSubscription instance Hashable QuoteSubscription
newtype SubscriptionId = SubscriptionId { unSubscriptionId :: Int } newtype SubscriptionId = SubscriptionId { unSubscriptionId :: Int }
deriving (Show, Eq, Generic)
instance Hashable SubscriptionId
class (Monad m) => QuoteStream m where class (Monad m) => QuoteStream m where
addSubscription :: QuoteSubscription -> BoundedChan QuoteData -> m SubscriptionId addSubscription :: QuoteSubscription -> BoundedChan QuoteData -> m SubscriptionId

48
src/ATrade/Driver/Junction/QuoteThread.hs

@ -13,6 +13,7 @@ module ATrade.Driver.Junction.QuoteThread
startQuoteThread, startQuoteThread,
stopQuoteThread, stopQuoteThread,
addSubscription, addSubscription,
removeSubscription,
DownloaderM, DownloaderM,
DownloaderEnv(..), DownloaderEnv(..),
runDownloaderM, runDownloaderM,
@ -20,7 +21,8 @@ module ATrade.Driver.Junction.QuoteThread
) where ) where
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..)) import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..))
import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..)) import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..),
SubscriptionId (SubscriptionId))
import ATrade.Logging (Message) import ATrade.Logging (Message)
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) import ATrade.Quotes.HistoryProvider (HistoryProvider (..))
import ATrade.Quotes.QHP (QHPHandle, requestHistoryFromQHP) import ATrade.Quotes.QHP (QHPHandle, requestHistoryFromQHP)
@ -50,6 +52,7 @@ import Control.Concurrent (ThreadId, forkIO,
import Control.Concurrent.BoundedChan (BoundedChan, import Control.Concurrent.BoundedChan (BoundedChan,
newBoundedChan, newBoundedChan,
readChan, readChan,
tryWriteChan,
writeChan) writeChan)
import Control.Exception.Safe (MonadMask, import Control.Exception.Safe (MonadMask,
MonadThrow, MonadThrow,
@ -77,10 +80,12 @@ data QuoteThreadEnv =
QuoteThreadEnv QuoteThreadEnv
{ {
bars :: IORef Bars, bars :: IORef Bars,
endpoints :: IORef (HM.HashMap QuoteSubscription [BoundedChan QuoteData]), endpoints :: IORef (HM.HashMap QuoteSubscription [(SubscriptionId, BoundedChan QuoteData)]),
qsclient :: QuoteSourceClientHandle, qsclient :: QuoteSourceClientHandle,
paramsCache :: IORef TickerInfoMap, paramsCache :: IORef TickerInfoMap,
downloaderChan :: BoundedChan QuoteSubscription downloaderChan :: BoundedChan QuoteSubscription,
subscriptionIdCounter :: IORef Int,
subscriptions :: IORef (HM.HashMap SubscriptionId QuoteSubscription)
} }
startQuoteThread :: (MonadIO m, startQuoteThread :: (MonadIO m,
@ -100,7 +105,7 @@ startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do
chan <- liftIO $ newBoundedChan 2000 chan <- liftIO $ newBoundedChan 2000
dChan <- liftIO $ newBoundedChan 2000 dChan <- liftIO $ newBoundedChan 2000
qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger
env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan <*> newIORef 0 <*> newIORef HM.empty
tid <- liftIO . forkIO $ quoteThread env chan tid <- liftIO . forkIO $ quoteThread env chan
downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan)
return $ QuoteThreadHandle tid downloaderTid env return $ QuoteThreadHandle tid downloaderTid env
@ -142,7 +147,7 @@ startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do
subs <- asks endpoints >>= (lift . readIORef) subs <- asks endpoints >>= (lift . readIORef)
case HM.lookup key subs of case HM.lookup key subs of
Just clientChannels -> do Just clientChannels -> do
lift $ mapM_ (`writeChan` qssData) clientChannels lift $ mapM_ (\(_, chan') -> tryWriteChan chan' qssData) clientChannels
Nothing -> return () Nothing -> return ()
stopQuoteThread :: (MonadIO m) => QuoteThreadHandle -> m () stopQuoteThread :: (MonadIO m) => QuoteThreadHandle -> m ()
@ -151,19 +156,36 @@ stopQuoteThread (QuoteThreadHandle tid dtid env) = liftIO $ do
killThread dtid killThread dtid
stopQuoteSourceClient (qsclient env) stopQuoteSourceClient (qsclient env)
addSubscription :: (MonadIO m) => QuoteThreadHandle -> TickerId -> BarTimeframe -> BoundedChan QuoteData -> m () addSubscription :: (MonadIO m) => QuoteThreadHandle -> TickerId -> BarTimeframe -> BoundedChan QuoteData -> m SubscriptionId
addSubscription (QuoteThreadHandle _ _ env) tid tf chan = liftIO $ do addSubscription (QuoteThreadHandle _ _ env) tid tf chan = liftIO $ do
writeChan (downloaderChan env) (QuoteSubscription tid tf) cnt <- atomicModifyIORef' (subscriptionIdCounter env) (\c -> (c + 1, c))
atomicModifyIORef' (endpoints env) (\m -> (doAddSubscription m tid, ())) let subscription = QuoteSubscription tid tf
let subid = SubscriptionId cnt
writeChan (downloaderChan env) subscription
atomicModifyIORef' (endpoints env) (\m -> (doAddSubscription m subid tid, ()))
atomicModifyIORef' (subscriptions env) (\m -> (HM.insert subid subscription m, ()))
quoteSourceClientSubscribe (qsclient env) [(tid, BarTimeframe 0)] quoteSourceClientSubscribe (qsclient env) [(tid, BarTimeframe 0)]
return subid
where where
doAddSubscription m tickerid = doAddSubscription m subid tickerid =
let m1 = HM.alter (\case let m1 = HM.alter (\case
Just chans -> Just (chan : chans) Just chans -> Just ((subid, chan) : chans)
_ -> Just [chan]) (QuoteSubscription tickerid tf) m in _ -> Just [(subid, chan)]) (QuoteSubscription tickerid tf) m in
HM.alter (\case HM.alter (\case
Just chans -> Just (chan : chans) Just chans -> Just ((subid, chan) : chans)
_ -> Just [chan]) (QuoteSubscription tickerid (BarTimeframe 0)) m1 _ -> Just [(subid, chan)]) (QuoteSubscription tickerid (BarTimeframe 0)) m1
removeSubscription :: (MonadIO m) => QuoteThreadHandle -> SubscriptionId -> m ()
removeSubscription (QuoteThreadHandle _ _ env) subId = liftIO $ do
subs <- readIORef (subscriptions env)
case HM.lookup subId subs of
Just sub -> atomicModifyIORef' (endpoints env) (\m -> (doRemoveSubscription m sub, ()))
Nothing -> return ()
where
doRemoveSubscription m sub =
let m1 = HM.adjust (filter (\(subId', _) -> subId' == subId)) sub m in
HM.adjust (filter (\(subId', _) -> subId' == subId)) (sub0 sub) m1
sub0 sub = let QuoteSubscription tid _ = sub in QuoteSubscription tid (BarTimeframe 0)
updateBarsMap :: Bars -> Bar -> BarTimeframe -> Bars updateBarsMap :: Bars -> Bar -> BarTimeframe -> Bars
updateBarsMap barsMap bar tf = M.adjust (addToSeries bar) (BarSeriesId (barSecurity bar) tf) barsMap updateBarsMap barsMap bar tf = M.adjust (addToSeries bar) (BarSeriesId (barSecurity bar) tf) barsMap

Loading…
Cancel
Save