Browse Source

QuoteSourceClient: hslogger => co-log

master
Denis Tereshkin 4 years ago
parent
commit
6c749d7b89
  1. 68
      src/ATrade/QuoteSource/Client.hs
  2. 9
      test/TestQuoteSourceClient.hs

68
src/ATrade/QuoteSource/Client.hs

@ -10,29 +10,47 @@ module ATrade.QuoteSource.Client (
quoteSourceClientSubscribe quoteSourceClientSubscribe
) where ) where
import ATrade.Types import ATrade.Types (Bar,
import Control.Concurrent hiding (readChan, writeChan, BarTimeframe (BarTimeframe),
ClientSecurityParams (cspCertificate, cspServerCertificate),
Tick (security), TickerId,
deserializeBar,
deserializeTickBody)
import Control.Concurrent (MVar, ThreadId, forkIO,
newEmptyMVar, putMVar,
readMVar, tryReadMVar, yield)
import Control.Concurrent.BoundedChan (BoundedChan, newBoundedChan,
tryReadChan, writeChan,
writeList2Chan) writeList2Chan)
import Control.Concurrent.BoundedChan import Control.Concurrent.MVar ()
import Control.Concurrent.MVar import Control.Exception (finally)
import Control.Exception import Control.Monad (unless)
import Control.Monad import Control.Monad.Loops (andM, whileJust, whileM_)
import Control.Monad.Loops
import qualified Data.ByteString.Char8 as B8 import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL
import Data.IORef import Data.IORef (IORef, atomicModifyIORef',
newIORef, readIORef,
writeIORef)
import qualified Data.List as L import qualified Data.List as L
import Data.List.NonEmpty import Data.List.NonEmpty ()
import Data.Maybe import Data.Maybe (isNothing)
import qualified Data.Set as S import qualified Data.Set as S
import qualified Data.Text as T import qualified Data.Text as T
import Data.Text.Encoding import Data.Text.Encoding (decodeUtf8, encodeUtf8)
import Data.Time.Clock import Data.Time.Clock (diffUTCTime, getCurrentTime)
import System.Log.Logger import System.Log.Logger (debugM)
import System.ZMQ4 import System.ZMQ4 (Context, Event (In),
import System.ZMQ4.ZAP Poll (Sock), Sub (Sub),
connect, poll, receiveMulti,
restrict, setLinger, subscribe,
withSocket)
import System.ZMQ4.ZAP (zapApplyCertificate,
zapSetServerCertificate)
import Safe import ATrade.Logging (Message, Severity (Debug),
logWith)
import Colog (LogAction)
import Safe (headMay)
data QSSClientMessage = QSSSubscribe [(TickerId, BarTimeframe)] | QSSUnsubscribe [(TickerId, BarTimeframe)] data QSSClientMessage = QSSSubscribe [(TickerId, BarTimeframe)] | QSSUnsubscribe [(TickerId, BarTimeframe)]
@ -56,8 +74,14 @@ deserializeTicks (secname:raw:_) = deserializeWithName (decodeUtf8 . BL.toStrict
deserializeTicks _ = [] deserializeTicks _ = []
startQuoteSourceClient :: BoundedChan QuoteData -> [T.Text] -> Context -> T.Text -> ClientSecurityParams -> IO QuoteSourceClientHandle startQuoteSourceClient :: BoundedChan QuoteData -- ^ Channel that will be filled with QuoteData
startQuoteSourceClient chan tickers ctx endpoint csp = do -> [T.Text] -- ^ Tickers list that will be used for initial subscriptions
-> Context -- ^ 0MQ Context
-> T.Text -- ^ QuoteSourceServer endpoint
-> ClientSecurityParams -- ^ Client & server certificates
-> LogAction IO Message -- ^ Logger which will be used by QuoteSource.Client
-> IO QuoteSourceClientHandle
startQuoteSourceClient chan tickers ctx endpoint csp logger = do
compMv <- newEmptyMVar compMv <- newEmptyMVar
killMv <- newEmptyMVar killMv <- newEmptyMVar
msgbox <- newBoundedChan 500 msgbox <- newBoundedChan 500
@ -67,9 +91,10 @@ startQuoteSourceClient chan tickers ctx endpoint csp = do
tid <- forkIO $ finally (clientThread lastHeartbeat killMv msgbox subs) (cleanup compMv) tid <- forkIO $ finally (clientThread lastHeartbeat killMv msgbox subs) (cleanup compMv)
return QuoteSourceClientHandle { tid = tid, completionMvar = compMv, killMVar = killMv, messageBox = msgbox, subscriptions = subs } return QuoteSourceClientHandle { tid = tid, completionMvar = compMv, killMVar = killMv, messageBox = msgbox, subscriptions = subs }
where where
log = logWith logger
clientThread lastHeartbeat killMv msgbox subs = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do clientThread lastHeartbeat killMv msgbox subs = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do
setLinger (restrict 0) sock setLinger (restrict 0) sock
debugM "QuoteSource.Client" $ "Client security parameters: " ++ show csp log Debug "QuoteSource.Client" $ "Client security parameters: " <> (T.pack . show) csp
case (cspCertificate csp, cspServerCertificate csp) of case (cspCertificate csp, cspServerCertificate csp) of
(Just cert, Just serverCert) -> do (Just cert, Just serverCert) -> do
zapApplyCertificate cert sock zapApplyCertificate cert sock
@ -77,7 +102,7 @@ startQuoteSourceClient chan tickers ctx endpoint csp = do
_ -> return () _ -> return ()
connect sock $ T.unpack endpoint connect sock $ T.unpack endpoint
subslist <- readIORef subs subslist <- readIORef subs
debugM "QuoteSource.Client" $ "Tickers: " ++ show subslist log Debug "QuoteSource.Client" $ "Tickers: " <> (T.pack . show) subslist
mapM_ (subscribe sock . encodeUtf8 . mkSubCode) subslist mapM_ (subscribe sock . encodeUtf8 . mkSubCode) subslist
subscribe sock $ B8.pack "SYSTEM#HEARTBEAT" subscribe sock $ B8.pack "SYSTEM#HEARTBEAT"
@ -99,7 +124,7 @@ startQuoteSourceClient chan tickers ctx endpoint csp = do
atomicModifyIORef' subs (\x -> (foldr S.insert x tickers, ())) atomicModifyIORef' subs (\x -> (foldr S.insert x tickers, ()))
mapM_ (subscribe sock . encodeUtf8 . mkSubCode) tickers mapM_ (subscribe sock . encodeUtf8 . mkSubCode) tickers
_ -> return () _ -> return ()
debugM "QuoteSource.Client" "Heartbeat timeout") log Debug "QuoteSource.Client" "Heartbeat timeout")
notTimeout ts = do notTimeout ts = do
now <- getCurrentTime now <- getCurrentTime
@ -116,3 +141,4 @@ stopQuoteSourceClient handle = yield >> putMVar (killMVar handle) () >> readMVar
quoteSourceClientSubscribe :: QuoteSourceClientHandle -> [(TickerId, BarTimeframe)] -> IO () quoteSourceClientSubscribe :: QuoteSourceClientHandle -> [(TickerId, BarTimeframe)] -> IO ()
quoteSourceClientSubscribe handle tickers = writeChan (messageBox handle) (QSSSubscribe tickers) quoteSourceClientSubscribe handle tickers = writeChan (messageBox handle) (QSSSubscribe tickers)

9
test/TestQuoteSourceClient.hs

@ -7,6 +7,7 @@ module TestQuoteSourceClient (
import Test.Tasty import Test.Tasty
import Test.Tasty.HUnit import Test.Tasty.HUnit
import ATrade.Logging (emptyLogger)
import ATrade.QuoteSource.Client import ATrade.QuoteSource.Client
import ATrade.QuoteSource.Server import ATrade.QuoteSource.Server
import ATrade.Types import ATrade.Types
@ -40,7 +41,7 @@ testStartStop = testCase "QuoteSource client connects and disconnects" $ withCon
chan <- newBoundedChan 1000 chan <- newBoundedChan 1000
clientChan <- newBoundedChan 1000 clientChan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ ->
bracket (startQuoteSourceClient clientChan [] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (const yield))) bracket (startQuoteSourceClient clientChan [] ctx ep defaultClientSecurityParams emptyLogger) stopQuoteSourceClient (const yield)))
testTickStream :: TestTree testTickStream :: TestTree
testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do
@ -49,7 +50,7 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c
clientChan <- newBoundedChan 1000 clientChan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> do bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> do
threadDelay 20000 threadDelay 20000
bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\_ -> do bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams emptyLogger) stopQuoteSourceClient (\_ -> do
let tick = Tick { let tick = Tick {
security = "FOOBAR", security = "FOOBAR",
datatype = LastTradePrice, datatype = LastTradePrice,
@ -67,7 +68,7 @@ testBarStream = testCase "QuoteSource clients receives bars" $ withContext (\ctx
clientChan <- newBoundedChan 1000 clientChan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> do bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> do
threadDelay 20000 threadDelay 20000
bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\_ -> do bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep defaultClientSecurityParams emptyLogger) stopQuoteSourceClient (\_ -> do
let bar = Bar { let bar = Bar {
barSecurity = "FOOBAR", barSecurity = "FOOBAR",
barTimestamp = UTCTime (fromGregorian 2016 9 27) 16000, barTimestamp = UTCTime (fromGregorian 2016 9 27) 16000,
@ -86,7 +87,7 @@ testDynamicSubscription = testCase "QuoteSource clients can subscribe dynamicall
chan <- newBoundedChan 1000 chan <- newBoundedChan 1000
clientChan <- newBoundedChan 1000 clientChan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ -> bracket (startQuoteSourceServer chan ctx ep defaultServerSecurityParams) stopQuoteSourceServer (\_ ->
bracket (startQuoteSourceClient clientChan [] ctx ep defaultClientSecurityParams) stopQuoteSourceClient (\client -> do bracket (startQuoteSourceClient clientChan [] ctx ep defaultClientSecurityParams emptyLogger) stopQuoteSourceClient (\client -> do
quoteSourceClientSubscribe client [("FOOBAR", BarTimeframe 60)] quoteSourceClientSubscribe client [("FOOBAR", BarTimeframe 60)]
let bar = Bar { let bar = Bar {
barSecurity = "FOOBAR", barSecurity = "FOOBAR",

Loading…
Cancel
Save