Browse Source

QuoteSourceClient: dynamic subscription

master
Denis Tereshkin 4 years ago
parent
commit
c8e3d80f84
  1. 38
      src/ATrade/QuoteSource/Client.hs
  2. 2
      src/ATrade/Types.hs
  3. 30
      test/TestQuoteSourceClient.hs

38
src/ATrade/QuoteSource/Client.hs

@ -1,9 +1,12 @@
{-# LANGUAGE LambdaCase #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TupleSections #-}
module ATrade.QuoteSource.Client ( module ATrade.QuoteSource.Client (
QuoteData(..), QuoteData(..),
startQuoteSourceClient, startQuoteSourceClient,
stopQuoteSourceClient stopQuoteSourceClient,
quoteSourceClientSubscribe
) where ) where
import ATrade.Types import ATrade.Types
@ -20,6 +23,7 @@ import Data.IORef
import qualified Data.List as L import qualified Data.List as L
import Data.List.NonEmpty import Data.List.NonEmpty
import Data.Maybe import Data.Maybe
import qualified Data.Set as S
import qualified Data.Text as T import qualified Data.Text as T
import Data.Text.Encoding import Data.Text.Encoding
import Data.Time.Clock import Data.Time.Clock
@ -29,10 +33,14 @@ import System.ZMQ4.ZAP
import Safe import Safe
data QSSClientMessage = QSSSubscribe [(TickerId, BarTimeframe)] | QSSUnsubscribe [(TickerId, BarTimeframe)]
data QuoteSourceClientHandle = QuoteSourceClientHandle { data QuoteSourceClientHandle = QuoteSourceClientHandle {
tid :: ThreadId, tid :: ThreadId,
completionMvar :: MVar (), completionMvar :: MVar (),
killMVar :: MVar () killMVar :: MVar (),
messageBox :: BoundedChan QSSClientMessage,
subscriptions :: IORef (S.Set (TickerId, BarTimeframe))
} }
data QuoteData = QDTick Tick | QDBar (BarTimeframe, Bar) data QuoteData = QDTick Tick | QDBar (BarTimeframe, Bar)
@ -51,12 +59,14 @@ startQuoteSourceClient :: BoundedChan QuoteData -> [T.Text] -> Context -> T.Text
startQuoteSourceClient chan tickers ctx endpoint csp = do startQuoteSourceClient chan tickers ctx endpoint csp = do
compMv <- newEmptyMVar compMv <- newEmptyMVar
killMv <- newEmptyMVar killMv <- newEmptyMVar
msgbox <- newBoundedChan 500
subs <- newIORef $ S.fromList $ fmap (\x -> (x, BarTimeframe 0)) tickers
now <- getCurrentTime now <- getCurrentTime
lastHeartbeat <- newIORef now lastHeartbeat <- newIORef now
tid <- forkIO $ finally (clientThread lastHeartbeat killMv) (cleanup compMv) tid <- forkIO $ finally (clientThread lastHeartbeat killMv msgbox subs) (cleanup compMv)
return QuoteSourceClientHandle { tid = tid, completionMvar = compMv, killMVar = killMv } return QuoteSourceClientHandle { tid = tid, completionMvar = compMv, killMVar = killMv, messageBox = msgbox, subscriptions = subs }
where where
clientThread lastHeartbeat killMv = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do clientThread lastHeartbeat killMv msgbox subs = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do
setLinger (restrict 0) sock setLinger (restrict 0) sock
debugM "QuoteSource.Client" $ "Client security parameters: " ++ show csp debugM "QuoteSource.Client" $ "Client security parameters: " ++ show csp
case (cspCertificate csp, cspServerCertificate csp) of case (cspCertificate csp, cspServerCertificate csp) of
@ -65,14 +75,15 @@ startQuoteSourceClient chan tickers ctx endpoint csp = do
zapSetServerCertificate serverCert sock zapSetServerCertificate serverCert sock
_ -> return () _ -> return ()
connect sock $ T.unpack endpoint connect sock $ T.unpack endpoint
debugM "QuoteSource.Client" $ "Tickers: " ++ show tickers subslist <- readIORef subs
mapM_ (subscribe sock . encodeUtf8) tickers debugM "QuoteSource.Client" $ "Tickers: " ++ show subslist
mapM_ (subscribe sock . encodeUtf8 . mkSubCode) subslist
subscribe sock $ B8.pack "SYSTEM#HEARTBEAT" subscribe sock $ B8.pack "SYSTEM#HEARTBEAT"
now <- getCurrentTime now <- getCurrentTime
writeIORef lastHeartbeat now writeIORef lastHeartbeat now
whileM_ (andM [notTimeout lastHeartbeat, isNothing <$> tryReadMVar killMv]) $ do whileM_ (andM [notTimeout lastHeartbeat, isNothing <$> tryReadMVar killMv]) $ do
evs <- poll 200 [Sock sock [In] Nothing] evs <- poll 50 [Sock sock [In] Nothing]
unless (null (L.head evs)) $ do unless (null (L.head evs)) $ do
rawTick <- fmap BL.fromStrict <$> receiveMulti sock rawTick <- fmap BL.fromStrict <$> receiveMulti sock
now <- getCurrentTime now <- getCurrentTime
@ -82,6 +93,11 @@ startQuoteSourceClient chan tickers ctx endpoint csp = do
else case deserializeBar rawTick of else case deserializeBar rawTick of
Just (tf, bar) -> writeChan chan $ QDBar (tf, bar) Just (tf, bar) -> writeChan chan $ QDBar (tf, bar)
_ -> writeList2Chan chan (deserializeTicks rawTick) _ -> writeList2Chan chan (deserializeTicks rawTick)
whileJust (tryReadChan msgbox) $ \case
QSSSubscribe tickers -> do
atomicModifyIORef' subs (\x -> (foldr S.insert x tickers, ()))
mapM_ (subscribe sock . encodeUtf8 . mkSubCode) tickers
_ -> return ()
debugM "QuoteSource.Client" "Heartbeat timeout") debugM "QuoteSource.Client" "Heartbeat timeout")
notTimeout ts = do notTimeout ts = do
@ -91,5 +107,11 @@ startQuoteSourceClient chan tickers ctx endpoint csp = do
cleanup compMv = putMVar compMv () cleanup compMv = putMVar compMv ()
mkSubCode (tid, BarTimeframe tf) =
if tf == 0 then tid else tid <> ":" <> T.pack (show tf) <> ";"
stopQuoteSourceClient :: QuoteSourceClientHandle -> IO () stopQuoteSourceClient :: QuoteSourceClientHandle -> IO ()
stopQuoteSourceClient handle = yield >> putMVar (killMVar handle) () >> readMVar (completionMvar handle) stopQuoteSourceClient handle = yield >> putMVar (killMVar handle) () >> readMVar (completionMvar handle)
quoteSourceClientSubscribe :: QuoteSourceClientHandle -> [(TickerId, BarTimeframe)] -> IO ()
quoteSourceClientSubscribe handle tickers = writeChan (messageBox handle) (QSSSubscribe tickers)

2
src/ATrade/Types.hs

@ -177,7 +177,7 @@ data Bar = Bar {
-- | Stores timeframe in seconds -- | Stores timeframe in seconds
newtype BarTimeframe = BarTimeframe { unBarTimeframe :: Int } newtype BarTimeframe = BarTimeframe { unBarTimeframe :: Int }
deriving (Show, Eq) deriving (Show, Eq, Ord)
serializeBar :: BarTimeframe -> Bar -> [ByteString] serializeBar :: BarTimeframe -> Bar -> [ByteString]
serializeBar tf bar = serializeBarHeader tf bar : [serializeBarBody tf bar] serializeBar tf bar = serializeBarHeader tf bar : [serializeBarBody tf bar]

30
test/TestQuoteSourceClient.hs

@ -31,7 +31,8 @@ unitTests :: TestTree
unitTests = testGroup "QuoteSource.Client" [ unitTests = testGroup "QuoteSource.Client" [
testStartStop testStartStop
, testTickStream , testTickStream
, testBarStream ] , testBarStream
, testDynamicSubscription ]
testStartStop :: TestTree testStartStop :: TestTree
testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do
@ -46,7 +47,8 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c
ep <- makeEndpoint ep <- makeEndpoint
chan <- newBoundedChan 1000 chan <- newBoundedChan 1000
clientChan <- newBoundedChan 1000 clientChan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> do
threadDelay 20000
bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\_ -> do bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\_ -> do
let tick = Tick { let tick = Tick {
security = "FOOBAR", security = "FOOBAR",
@ -63,7 +65,8 @@ testBarStream = testCase "QuoteSource clients receives bars" $ withContext (\ctx
ep <- makeEndpoint ep <- makeEndpoint
chan <- newBoundedChan 1000 chan <- newBoundedChan 1000
clientChan <- newBoundedChan 1000 clientChan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> do
threadDelay 20000
bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\_ -> do bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\_ -> do
let bar = Bar { let bar = Bar {
barSecurity = "FOOBAR", barSecurity = "FOOBAR",
@ -76,3 +79,24 @@ testBarStream = testCase "QuoteSource clients receives bars" $ withContext (\ctx
forkIO $ forever $ writeChan chan $ QSSBar (BarTimeframe 60, bar) forkIO $ forever $ writeChan chan $ QSSBar (BarTimeframe 60, bar)
recvdData <- readChan clientChan recvdData <- readChan clientChan
QDBar (BarTimeframe 60, bar) @=? recvdData))) QDBar (BarTimeframe 60, bar) @=? recvdData)))
testDynamicSubscription :: TestTree
testDynamicSubscription = testCase "QuoteSource clients can subscribe dynamically" $ withContext (\ctx -> do
ep <- makeEndpoint
chan <- newBoundedChan 1000
clientChan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ ->
bracket (startQuoteSourceClient clientChan [] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\client -> do
quoteSourceClientSubscribe client [("FOOBAR", BarTimeframe 60)]
let bar = Bar {
barSecurity = "FOOBAR",
barTimestamp = UTCTime (fromGregorian 2021 11 18) 16000,
barOpen = fromDouble 10.0,
barHigh = fromDouble 15.0,
barLow = fromDouble 8.0,
barClose = fromDouble 11.0,
barVolume = 42 }
threadDelay 200000
forkIO $ forever $ writeChan chan $ QSSBar (BarTimeframe 60, bar)
recvdData <- readChan clientChan
QDBar (BarTimeframe 60, bar) @=? recvdData)))

Loading…
Cancel
Save