From 6c749d7b89eaee278f18d01a880ec695b16cb172 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 4 Dec 2021 13:24:34 +0700 Subject: [PATCH] QuoteSourceClient: hslogger => co-log --- src/ATrade/QuoteSource/Client.hs | 68 ++++++++++++++++++++++---------- test/TestQuoteSourceClient.hs | 9 +++-- 2 files changed, 52 insertions(+), 25 deletions(-) diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs index b8444a0..0b46898 100644 --- a/src/ATrade/QuoteSource/Client.hs +++ b/src/ATrade/QuoteSource/Client.hs @@ -10,29 +10,47 @@ module ATrade.QuoteSource.Client ( quoteSourceClientSubscribe ) where -import ATrade.Types -import Control.Concurrent hiding (readChan, writeChan, +import ATrade.Types (Bar, + BarTimeframe (BarTimeframe), + ClientSecurityParams (cspCertificate, cspServerCertificate), + Tick (security), TickerId, + deserializeBar, + deserializeTickBody) +import Control.Concurrent (MVar, ThreadId, forkIO, + newEmptyMVar, putMVar, + readMVar, tryReadMVar, yield) +import Control.Concurrent.BoundedChan (BoundedChan, newBoundedChan, + tryReadChan, writeChan, writeList2Chan) -import Control.Concurrent.BoundedChan -import Control.Concurrent.MVar -import Control.Exception -import Control.Monad -import Control.Monad.Loops +import Control.Concurrent.MVar () +import Control.Exception (finally) +import Control.Monad (unless) +import Control.Monad.Loops (andM, whileJust, whileM_) import qualified Data.ByteString.Char8 as B8 import qualified Data.ByteString.Lazy as BL -import Data.IORef +import Data.IORef (IORef, atomicModifyIORef', + newIORef, readIORef, + writeIORef) import qualified Data.List as L -import Data.List.NonEmpty -import Data.Maybe +import Data.List.NonEmpty () +import Data.Maybe (isNothing) import qualified Data.Set as S import qualified Data.Text as T -import Data.Text.Encoding -import Data.Time.Clock -import System.Log.Logger -import System.ZMQ4 -import System.ZMQ4.ZAP +import Data.Text.Encoding (decodeUtf8, encodeUtf8) +import Data.Time.Clock (diffUTCTime, getCurrentTime) +import System.Log.Logger (debugM) +import System.ZMQ4 (Context, Event (In), + Poll (Sock), Sub (Sub), + connect, poll, receiveMulti, + restrict, setLinger, subscribe, + withSocket) +import System.ZMQ4.ZAP (zapApplyCertificate, + zapSetServerCertificate) -import Safe +import ATrade.Logging (Message, Severity (Debug), + logWith) +import Colog (LogAction) +import Safe (headMay) data QSSClientMessage = QSSSubscribe [(TickerId, BarTimeframe)] | QSSUnsubscribe [(TickerId, BarTimeframe)] @@ -56,8 +74,14 @@ deserializeTicks (secname:raw:_) = deserializeWithName (decodeUtf8 . BL.toStrict deserializeTicks _ = [] -startQuoteSourceClient :: BoundedChan QuoteData -> [T.Text] -> Context -> T.Text -> ClientSecurityParams -> IO QuoteSourceClientHandle -startQuoteSourceClient chan tickers ctx endpoint csp = do +startQuoteSourceClient :: BoundedChan QuoteData -- ^ Channel that will be filled with QuoteData + -> [T.Text] -- ^ Tickers list that will be used for initial subscriptions + -> Context -- ^ 0MQ Context + -> T.Text -- ^ QuoteSourceServer endpoint + -> ClientSecurityParams -- ^ Client & server certificates + -> LogAction IO Message -- ^ Logger which will be used by QuoteSource.Client + -> IO QuoteSourceClientHandle +startQuoteSourceClient chan tickers ctx endpoint csp logger = do compMv <- newEmptyMVar killMv <- newEmptyMVar msgbox <- newBoundedChan 500 @@ -67,9 +91,10 @@ startQuoteSourceClient chan tickers ctx endpoint csp = do tid <- forkIO $ finally (clientThread lastHeartbeat killMv msgbox subs) (cleanup compMv) return QuoteSourceClientHandle { tid = tid, completionMvar = compMv, killMVar = killMv, messageBox = msgbox, subscriptions = subs } where + log = logWith logger clientThread lastHeartbeat killMv msgbox subs = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do setLinger (restrict 0) sock - debugM "QuoteSource.Client" $ "Client security parameters: " ++ show csp + log Debug "QuoteSource.Client" $ "Client security parameters: " <> (T.pack . show) csp case (cspCertificate csp, cspServerCertificate csp) of (Just cert, Just serverCert) -> do zapApplyCertificate cert sock @@ -77,7 +102,7 @@ startQuoteSourceClient chan tickers ctx endpoint csp = do _ -> return () connect sock $ T.unpack endpoint subslist <- readIORef subs - debugM "QuoteSource.Client" $ "Tickers: " ++ show subslist + log Debug "QuoteSource.Client" $ "Tickers: " <> (T.pack . show) subslist mapM_ (subscribe sock . encodeUtf8 . mkSubCode) subslist subscribe sock $ B8.pack "SYSTEM#HEARTBEAT" @@ -99,7 +124,7 @@ startQuoteSourceClient chan tickers ctx endpoint csp = do atomicModifyIORef' subs (\x -> (foldr S.insert x tickers, ())) mapM_ (subscribe sock . encodeUtf8 . mkSubCode) tickers _ -> return () - debugM "QuoteSource.Client" "Heartbeat timeout") + log Debug "QuoteSource.Client" "Heartbeat timeout") notTimeout ts = do now <- getCurrentTime @@ -116,3 +141,4 @@ stopQuoteSourceClient handle = yield >> putMVar (killMVar handle) () >> readMVar quoteSourceClientSubscribe :: QuoteSourceClientHandle -> [(TickerId, BarTimeframe)] -> IO () quoteSourceClientSubscribe handle tickers = writeChan (messageBox handle) (QSSSubscribe tickers) + diff --git a/test/TestQuoteSourceClient.hs b/test/TestQuoteSourceClient.hs index 7dc8601..a19b4a9 100644 --- a/test/TestQuoteSourceClient.hs +++ b/test/TestQuoteSourceClient.hs @@ -7,6 +7,7 @@ module TestQuoteSourceClient ( import Test.Tasty import Test.Tasty.HUnit +import ATrade.Logging (emptyLogger) import ATrade.QuoteSource.Client import ATrade.QuoteSource.Server import ATrade.Types @@ -40,7 +41,7 @@ testStartStop = testCase "QuoteSource client connects and disconnects" $ withCon chan <- newBoundedChan 1000 clientChan <- newBoundedChan 1000 bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> - bracket (startQuoteSourceClient clientChan [] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (const yield))) + bracket (startQuoteSourceClient clientChan [] ctx ep defaultClientSecurityParams emptyLogger) stopQuoteSourceClient (const yield))) testTickStream :: TestTree testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do @@ -49,7 +50,7 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c clientChan <- newBoundedChan 1000 bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> do threadDelay 20000 - bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\_ -> do + bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams emptyLogger) stopQuoteSourceClient (\_ -> do let tick = Tick { security = "FOOBAR", datatype = LastTradePrice, @@ -67,7 +68,7 @@ testBarStream = testCase "QuoteSource clients receives bars" $ withContext (\ctx clientChan <- newBoundedChan 1000 bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> do threadDelay 20000 - bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\_ -> do + bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams emptyLogger) stopQuoteSourceClient (\_ -> do let bar = Bar { barSecurity = "FOOBAR", barTimestamp = UTCTime (fromGregorian 2016 9 27) 16000, @@ -86,7 +87,7 @@ testDynamicSubscription = testCase "QuoteSource clients can subscribe dynamicall chan <- newBoundedChan 1000 clientChan <- newBoundedChan 1000 bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> - bracket (startQuoteSourceClient clientChan [] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\client -> do + bracket (startQuoteSourceClient clientChan [] ctx ep defaultClientSecurityParams emptyLogger) stopQuoteSourceClient (\client -> do quoteSourceClientSubscribe client [("FOOBAR", BarTimeframe 60)] let bar = Bar { barSecurity = "FOOBAR",