From c8e3d80f844cada67cf6c2c4d7a793f464c31ada Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 18 Nov 2021 22:47:09 +0700 Subject: [PATCH] QuoteSourceClient: dynamic subscription --- src/ATrade/QuoteSource/Client.hs | 38 +++++++++++++++++++++++++------- src/ATrade/Types.hs | 2 +- test/TestQuoteSourceClient.hs | 30 ++++++++++++++++++++++--- 3 files changed, 58 insertions(+), 12 deletions(-) diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs index 0d33227..460482e 100644 --- a/src/ATrade/QuoteSource/Client.hs +++ b/src/ATrade/QuoteSource/Client.hs @@ -1,9 +1,12 @@ +{-# LANGUAGE LambdaCase #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TupleSections #-} module ATrade.QuoteSource.Client ( QuoteData(..), startQuoteSourceClient, - stopQuoteSourceClient + stopQuoteSourceClient, + quoteSourceClientSubscribe ) where import ATrade.Types @@ -20,6 +23,7 @@ import Data.IORef import qualified Data.List as L import Data.List.NonEmpty import Data.Maybe +import qualified Data.Set as S import qualified Data.Text as T import Data.Text.Encoding import Data.Time.Clock @@ -29,10 +33,14 @@ import System.ZMQ4.ZAP import Safe +data QSSClientMessage = QSSSubscribe [(TickerId, BarTimeframe)] | QSSUnsubscribe [(TickerId, BarTimeframe)] + data QuoteSourceClientHandle = QuoteSourceClientHandle { tid :: ThreadId, completionMvar :: MVar (), - killMVar :: MVar () + killMVar :: MVar (), + messageBox :: BoundedChan QSSClientMessage, + subscriptions :: IORef (S.Set (TickerId, BarTimeframe)) } data QuoteData = QDTick Tick | QDBar (BarTimeframe, Bar) @@ -51,12 +59,14 @@ startQuoteSourceClient :: BoundedChan QuoteData -> [T.Text] -> Context -> T.Text startQuoteSourceClient chan tickers ctx endpoint csp = do compMv <- newEmptyMVar killMv <- newEmptyMVar + msgbox <- newBoundedChan 500 + subs <- newIORef $ S.fromList $ fmap (\x -> (x, BarTimeframe 0)) tickers now <- getCurrentTime lastHeartbeat <- newIORef now - tid <- forkIO $ finally (clientThread lastHeartbeat killMv) (cleanup compMv) - return QuoteSourceClientHandle { tid = tid, completionMvar = compMv, killMVar = killMv } + tid <- forkIO $ finally (clientThread lastHeartbeat killMv msgbox subs) (cleanup compMv) + return QuoteSourceClientHandle { tid = tid, completionMvar = compMv, killMVar = killMv, messageBox = msgbox, subscriptions = subs } where - clientThread lastHeartbeat killMv = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do + clientThread lastHeartbeat killMv msgbox subs = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do setLinger (restrict 0) sock debugM "QuoteSource.Client" $ "Client security parameters: " ++ show csp case (cspCertificate csp, cspServerCertificate csp) of @@ -65,14 +75,15 @@ startQuoteSourceClient chan tickers ctx endpoint csp = do zapSetServerCertificate serverCert sock _ -> return () connect sock $ T.unpack endpoint - debugM "QuoteSource.Client" $ "Tickers: " ++ show tickers - mapM_ (subscribe sock . encodeUtf8) tickers + subslist <- readIORef subs + debugM "QuoteSource.Client" $ "Tickers: " ++ show subslist + mapM_ (subscribe sock . encodeUtf8 . mkSubCode) subslist subscribe sock $ B8.pack "SYSTEM#HEARTBEAT" now <- getCurrentTime writeIORef lastHeartbeat now whileM_ (andM [notTimeout lastHeartbeat, isNothing <$> tryReadMVar killMv]) $ do - evs <- poll 200 [Sock sock [In] Nothing] + evs <- poll 50 [Sock sock [In] Nothing] unless (null (L.head evs)) $ do rawTick <- fmap BL.fromStrict <$> receiveMulti sock now <- getCurrentTime @@ -82,6 +93,11 @@ startQuoteSourceClient chan tickers ctx endpoint csp = do else case deserializeBar rawTick of Just (tf, bar) -> writeChan chan $ QDBar (tf, bar) _ -> writeList2Chan chan (deserializeTicks rawTick) + whileJust (tryReadChan msgbox) $ \case + QSSSubscribe tickers -> do + atomicModifyIORef' subs (\x -> (foldr S.insert x tickers, ())) + mapM_ (subscribe sock . encodeUtf8 . mkSubCode) tickers + _ -> return () debugM "QuoteSource.Client" "Heartbeat timeout") notTimeout ts = do @@ -91,5 +107,11 @@ startQuoteSourceClient chan tickers ctx endpoint csp = do cleanup compMv = putMVar compMv () + mkSubCode (tid, BarTimeframe tf) = + if tf == 0 then tid else tid <> ":" <> T.pack (show tf) <> ";" + stopQuoteSourceClient :: QuoteSourceClientHandle -> IO () stopQuoteSourceClient handle = yield >> putMVar (killMVar handle) () >> readMVar (completionMvar handle) + +quoteSourceClientSubscribe :: QuoteSourceClientHandle -> [(TickerId, BarTimeframe)] -> IO () +quoteSourceClientSubscribe handle tickers = writeChan (messageBox handle) (QSSSubscribe tickers) diff --git a/src/ATrade/Types.hs b/src/ATrade/Types.hs index eb5017e..0118ace 100644 --- a/src/ATrade/Types.hs +++ b/src/ATrade/Types.hs @@ -177,7 +177,7 @@ data Bar = Bar { -- | Stores timeframe in seconds newtype BarTimeframe = BarTimeframe { unBarTimeframe :: Int } - deriving (Show, Eq) + deriving (Show, Eq, Ord) serializeBar :: BarTimeframe -> Bar -> [ByteString] serializeBar tf bar = serializeBarHeader tf bar : [serializeBarBody tf bar] diff --git a/test/TestQuoteSourceClient.hs b/test/TestQuoteSourceClient.hs index cb484eb..7dc8601 100644 --- a/test/TestQuoteSourceClient.hs +++ b/test/TestQuoteSourceClient.hs @@ -31,7 +31,8 @@ unitTests :: TestTree unitTests = testGroup "QuoteSource.Client" [ testStartStop , testTickStream - , testBarStream ] + , testBarStream + , testDynamicSubscription ] testStartStop :: TestTree testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do @@ -46,7 +47,8 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c ep <- makeEndpoint chan <- newBoundedChan 1000 clientChan <- newBoundedChan 1000 - bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> + bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> do + threadDelay 20000 bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\_ -> do let tick = Tick { security = "FOOBAR", @@ -63,7 +65,8 @@ testBarStream = testCase "QuoteSource clients receives bars" $ withContext (\ctx ep <- makeEndpoint chan <- newBoundedChan 1000 clientChan <- newBoundedChan 1000 - bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> + bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> do + threadDelay 20000 bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\_ -> do let bar = Bar { barSecurity = "FOOBAR", @@ -76,3 +79,24 @@ testBarStream = testCase "QuoteSource clients receives bars" $ withContext (\ctx forkIO $ forever $ writeChan chan $ QSSBar (BarTimeframe 60, bar) recvdData <- readChan clientChan QDBar (BarTimeframe 60, bar) @=? recvdData))) + +testDynamicSubscription :: TestTree +testDynamicSubscription = testCase "QuoteSource clients can subscribe dynamically" $ withContext (\ctx -> do + ep <- makeEndpoint + chan <- newBoundedChan 1000 + clientChan <- newBoundedChan 1000 + bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> + bracket (startQuoteSourceClient clientChan [] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\client -> do + quoteSourceClientSubscribe client [("FOOBAR", BarTimeframe 60)] + let bar = Bar { + barSecurity = "FOOBAR", + barTimestamp = UTCTime (fromGregorian 2021 11 18) 16000, + barOpen = fromDouble 10.0, + barHigh = fromDouble 15.0, + barLow = fromDouble 8.0, + barClose = fromDouble 11.0, + barVolume = 42 } + threadDelay 200000 + forkIO $ forever $ writeChan chan $ QSSBar (BarTimeframe 60, bar) + recvdData <- readChan clientChan + QDBar (BarTimeframe 60, bar) @=? recvdData)))