From e45d51f6a35ea347a9efafced00c3167ed413efc Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 4 Dec 2021 12:49:38 +0700 Subject: [PATCH] BrokerCLient: hslogger => co-log --- src/ATrade/Broker/Client.hs | 130 +++++++++++++++++++++++++----------- 1 file changed, 91 insertions(+), 39 deletions(-) diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index 11eb416..e69c57f 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -10,30 +10,57 @@ module ATrade.Broker.Client ( NotificationCallback(..) ) where -import ATrade.Broker.Protocol -import ATrade.Types -import Control.Concurrent hiding (readChan, writeChan) -import Control.Concurrent.BoundedChan -import Control.Concurrent.MVar -import Control.Exception -import Control.Monad -import Control.Monad.Loops -import Data.Aeson +import ATrade.Broker.Protocol (BrokerServerRequest (..), + BrokerServerResponse (..), + ClientIdentity, Notification, + NotificationSqnum (NotificationSqnum), + RequestSqnum, + getNotificationSqnum, + nextSqnum) +import ATrade.Logging (Message, + Severity (Debug, Info, Warning), + logWith) +import ATrade.Types (ClientSecurityParams (cspCertificate, cspServerCertificate), + Order, OrderId) +import Colog (LogAction) +import Control.Concurrent (MVar, ThreadId, forkIO, + killThread, newEmptyMVar, + putMVar, readMVar, takeMVar, + threadDelay, tryReadMVar, + yield) +import Control.Concurrent.BoundedChan () +import Control.Concurrent.MVar () +import Control.Exception (SomeException, finally, handle, + throwIO) +import Control.Monad (forM_, when) +import Control.Monad.Loops (andM, whileM_) +import Data.Aeson (decode, encode) import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as BL -import Data.Int -import Data.IORef +import Data.Int (Int64) +import Data.IORef (IORef, atomicModifyIORef', + atomicWriteIORef, newIORef, + readIORef, writeIORef) import qualified Data.List as L -import Data.List.NonEmpty -import Data.Maybe +import Data.List.NonEmpty () +import Data.Maybe (isNothing) import qualified Data.Text as T -import Data.Text.Encoding +import Data.Text.Encoding (decodeUtf8) import qualified Data.Text.Encoding as T -import Safe -import System.Log.Logger -import System.Timeout -import System.ZMQ4 -import System.ZMQ4.ZAP +import Safe (lastMay) +import System.Timeout (timeout) +import System.ZMQ4 (Context, Event (In), + Poll (Sock), Req (Req), + Sub (Sub), Switch (On), + connect, poll, receive, + receiveMulti, restrict, send, + setLinger, setTcpKeepAlive, + setTcpKeepAliveCount, + setTcpKeepAliveIdle, + setTcpKeepAliveInterval, + subscribe, withSocket) +import System.ZMQ4.ZAP (zapApplyCertificate, + zapSetServerCertificate) type NotificationCallback = Notification -> IO () @@ -50,17 +77,26 @@ data BrokerClientHandle = BrokerClientHandle { notificationThreadId :: ThreadId } -brokerClientThread :: B.ByteString -> Context -> T.Text -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> MVar () -> MVar () -> ClientSecurityParams -> IO () -brokerClientThread socketIdentity ctx ep cmd comp killMv secParams = finally brokerClientThread' cleanup +brokerClientThread :: B.ByteString -> + Context -> + T.Text -> + MVar (BrokerServerRequest, MVar BrokerServerResponse) -> + MVar () -> + MVar () -> + ClientSecurityParams -> + LogAction IO Message -> + IO () +brokerClientThread socketIdentity ctx ep cmd comp killMv secParams logger = finally brokerClientThread' cleanup where - cleanup = infoM "Broker.Client" "Quitting broker client thread" >> putMVar comp () + log = logWith logger + cleanup = log Info "Broker.Client" "Quitting broker client thread" >> putMVar comp () brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do - debugM "Broker.Client" "Starting event loop" + log Debug "Broker.Client" "Starting event loop" handle (\e -> do - warningM "Broker.Client" $ "Broker client: exception: " ++ show (e :: SomeException) ++ "; isZMQ: " ++ show (isZMQError e) + log Warning "Broker.Client" $ "Broker client: exception: " <> (T.pack . show) (e :: SomeException) <> "; isZMQ: " <> (T.pack . show) (isZMQError e) if isZMQError e then do - debugM "Broker.Client" "Rethrowing exception" + log Debug "Broker.Client" "Rethrowing exception" throwIO e else do return ()) $ withSocket ctx Req (\sock -> do @@ -74,7 +110,7 @@ brokerClientThread socketIdentity ctx ep cmd comp killMv secParams = finally bro Nothing -> return () connect sock $ T.unpack ep - debugM "Broker.Client" "Connected" + log Debug "Broker.Client" "Connected" isTimeout <- newIORef False whileM_ (andM [isNothing <$> tryReadMVar killMv, (== False) <$> readIORef isTimeout]) $ do @@ -92,8 +128,17 @@ brokerClientThread socketIdentity ctx ep cmd comp killMv secParams = finally bro isZMQError e = "ZMQError" `L.isPrefixOf` show e -notificationThread :: ClientIdentity -> [NotificationCallback] -> Context -> T.Text -> IORef RequestSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> MVar () -> ClientSecurityParams -> IO () -notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secParams = flip finally (return ()) $ do +notificationThread :: ClientIdentity -> + [NotificationCallback] -> + Context -> + T.Text -> + IORef RequestSqnum -> + MVar (BrokerServerRequest, MVar BrokerServerResponse) -> + MVar () -> + ClientSecurityParams -> + LogAction IO Message -> + IO () +notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secParams logger = flip finally (return ()) $ do whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub $ \sock -> do setLinger (restrict 0) sock @@ -108,7 +153,7 @@ notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secPa setTcpKeepAliveIdle (restrict 60) sock setTcpKeepAliveInterval (restrict 10) sock connect sock $ T.unpack ep - debugM "Broker.Client" $ "Subscribing: [" <> T.unpack clientIdentity <> "]" + log Debug "Broker.Client" $ "Subscribing: [" <> clientIdentity <> "]" subscribe sock $ T.encodeUtf8 clientIdentity initialSqnum <- requestCurrentSqnum cmdVar idCounter clientIdentity @@ -130,8 +175,8 @@ notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secPa when (getNotificationSqnum notif >= lastSqnum) $ do forM_ callbacks $ \c -> c notif atomicWriteIORef notifSqnumRef (nextSqnum lastSqnum) - (ResponseError msg) -> warningM "Broker.Client" $ "ResponseError: " <> T.unpack msg - _ -> warningM "Broker.Client" "Unknown error when requesting notifications" + (ResponseError msg) -> log Warning "Broker.Client" $ "ResponseError: " <> msg + _ -> log Warning "Broker.Client" "Unknown error when requesting notifications" else do msg <- receiveMulti sock case msg of @@ -141,14 +186,16 @@ notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secPa if getNotificationSqnum notification /= currentSqnum then if currentSqnum > getNotificationSqnum notification - then debugM "Broker.Client" $ "Already processed notification: " <> show (getNotificationSqnum notification) - else warningM "Broker.Client" $ "Notification sqnum mismatch: " <> show currentSqnum <> " -> " <> show (getNotificationSqnum notification) + then log Debug "Broker.Client" $ "Already processed notification: " <> (T.pack . show) (getNotificationSqnum notification) + else log Warning "Broker.Client" $ + "Notification sqnum mismatch: " <> (T.pack . show) currentSqnum <> " -> " <> (T.pack . show) (getNotificationSqnum notification) else do atomicWriteIORef notifSqnumRef (nextSqnum currentSqnum) forM_ callbacks $ \c -> c notification _ -> return () _ -> return () where + log = logWith logger requestCurrentSqnum cmdVar idCounter clientIdentity = do respVar <- newEmptyMVar sqnum <- nextId idCounter @@ -157,10 +204,10 @@ notificationThread clientIdentity callbacks ctx ep idCounter cmdVar killMv secPa case resp of (ResponseCurrentSqnum sqnum) -> return sqnum (ResponseError msg) -> do - warningM "Broker.Client" $ "ResponseError: " <> T.unpack msg + log Warning "Broker.Client" $ "ResponseError: " <> msg return (NotificationSqnum 1) _ -> do - warningM "Broker.Client" "Unknown error when requesting notifications" + log Warning "Broker.Client" "Unknown error when requesting notifications" return (NotificationSqnum 1) @@ -170,15 +217,16 @@ startBrokerClient :: B.ByteString -- ^ Socket Identity -> T.Text -- ^ Notification endpoing -> [NotificationCallback] -- ^ List of notification callbacks -> ClientSecurityParams -- ^ + -> LogAction IO Message -> IO BrokerClientHandle -startBrokerClient socketIdentity ctx endpoint notifEndpoint notificationCallbacks secParams = do +startBrokerClient socketIdentity ctx endpoint notifEndpoint notificationCallbacks secParams logger = do idCounter <- newIORef 1 compMv <- newEmptyMVar killMv <- newEmptyMVar cmdVar <- newEmptyMVar :: IO (MVar (BrokerServerRequest, MVar BrokerServerResponse)) - tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar compMv killMv secParams) + tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar compMv killMv secParams logger) notifSqnumRef <- newIORef (NotificationSqnum 0) - notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint idCounter cmdVar killMv secParams) + notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint idCounter cmdVar killMv secParams logger) return BrokerClientHandle { tid = tid, @@ -226,7 +274,11 @@ bcCancelOrder clientIdentity idCounter cmdVar orderId = do (ResponseError msg) -> return $ Left msg _ -> return $ Left "Unknown error" -bcGetNotifications :: ClientIdentity -> IORef RequestSqnum -> IORef NotificationSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> IO (Either T.Text [Notification]) +bcGetNotifications :: ClientIdentity -> + IORef RequestSqnum -> + IORef NotificationSqnum -> + MVar (BrokerServerRequest, MVar BrokerServerResponse) -> + IO (Either T.Text [Notification]) bcGetNotifications clientIdentity idCounter notifSqnumRef cmdVar = do respVar <- newEmptyMVar sqnum <- nextId idCounter