From 9c95b7291ab838f9d0b87be580a71cb2f5d8f68b Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 4 Dec 2021 13:14:06 +0700 Subject: [PATCH] BrokerServer: hslogger => co-log --- src/ATrade/Broker/Server.hs | 47 ++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 16 deletions(-) diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 6b9d6a4..03f3e03 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -18,11 +18,16 @@ import ATrade.Broker.Protocol (BrokerServerRequest (..), RequestSqnum, getNotificationSqnum, nextSqnum, requestSqnum) +import ATrade.Logging (Message (Message), + Severity (Debug, Warning), + logWith) +import ATrade.Logging (Severity (Info)) import ATrade.Types (Order (orderAccountId, orderId), OrderId, ServerSecurityParams (sspCertificate, sspDomain), Trade (tradeOrderId)) import ATrade.Util (atomicMapIORef) +import Colog (LogAction) import Control.Concurrent (MVar, ThreadId, forkIO, killThread, myThreadId, newEmptyMVar, putMVar, @@ -93,8 +98,9 @@ startBrokerServer :: [BrokerBackend] -> T.Text -> [TradeSink] -> ServerSecurityParams -> + LogAction IO Message -> IO BrokerServerHandle -startBrokerServer brokers c ep notificationsEp tradeSinks params = do +startBrokerServer brokers c ep notificationsEp tradeSinks params logger = do sock <- socket c Router notificationsSock <- socket c Pub setLinger (restrict 0) sock @@ -136,14 +142,19 @@ startBrokerServer brokers c ep notificationsEp tradeSinks params = do orderIdCounter = 1, tradeSink = tsChan } - mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers + mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state logger)) brokers - debugM "Broker.Server" "Forking broker server thread" - BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinks) <*> pure compMv <*> pure killMv + log Info "Broker.Server" "Forking broker server thread" + BrokerServerHandle <$> forkIO (brokerServerThread state logger) <*> forkIO (tradeSinkHandler c state tradeSinks) <*> pure compMv <*> pure killMv + where + log = logWith logger -notificationCallback :: IORef BrokerServerState -> BrokerBackendNotification -> IO () -notificationCallback state n = do - debugM "Broker.Server" $ "Notification: " ++ show n +notificationCallback :: IORef BrokerServerState -> + LogAction IO Message -> + BrokerBackendNotification -> + IO () +notificationCallback state logger n = do + log Debug "Broker.Server" $ "Notification: " <> (T.pack . show) n chan <- tradeSink <$> readIORef state case n of BackendTradeNotification trade -> tryWriteChan chan trade @@ -157,11 +168,12 @@ notificationCallback state n = do case n of BackendTradeNotification trade -> addNotification clientIdentity (TradeNotification sqnum trade { tradeOrderId = localOrderId }) BackendOrderNotification globalOrderId newstate -> addNotification clientIdentity (OrderNotification sqnum localOrderId newstate) - Nothing -> warningM "Broker.Server" "Notification: unknown order" + Nothing -> log Warning "Broker.Server" "Notification: unknown order" where + log = logWith logger addNotification clientIdentity n = do - debugM "Broker.Server" $ "Sending notification to client [" <> T.unpack clientIdentity <> "]" + log Debug "Broker.Server" $ "Sending notification to client [" <> clientIdentity <> "]" atomicMapIORef state (\s -> case M.lookup clientIdentity . pendingNotifications $ s of Just ns -> s { pendingNotifications = M.insert clientIdentity (n : ns) (pendingNotifications s)} @@ -178,12 +190,15 @@ tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $ Just trade -> mapM_ (\x -> x trade) tradeSinks Nothing -> threadDelay 100000 where - wasKilled = isJust <$> (killMvar <$> readIORef state >>= tryReadMVar) + wasKilled = isJust <$> (readIORef state >>= tryReadMVar . killMvar) -brokerServerThread :: IORef BrokerServerState -> IO () -brokerServerThread state = finally brokerServerThread' cleanup +brokerServerThread :: IORef BrokerServerState -> + LogAction IO Message -> + IO () +brokerServerThread state logger = finally brokerServerThread' cleanup where + log = logWith logger brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ do sock <- bsSocket <$> readIORef state events <- poll 100 [Sock sock [In] Nothing] @@ -199,7 +214,7 @@ brokerServerThread state = finally brokerServerThread' cleanup lastPackMap <- lastPacket <$> readIORef state case shouldResend sqnum peerId lastPackMap of Just response -> do - debugM "Broker.Server" $ "Resending packet for peerId: " ++ show peerId + log Debug "Broker.Server" $ "Resending packet for peerId: " <> (T.pack . show) peerId sendMessage sock peerId response -- Resend atomicMapIORef state (\s -> s { lastPacket = M.delete peerId (lastPacket s)}) Nothing -> do @@ -213,7 +228,7 @@ brokerServerThread state = finally brokerServerThread' cleanup -- but shouldn't update lastPacket let response = ResponseError $ "Invalid request: " <> T.pack errmsg sendMessage sock peerId response - _ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg) + _ -> log Warning "Broker.Server" ("Invalid packet received: " <> (T.pack . show) msg) shouldResend sqnum peerId lastPackMap = case M.lookup peerId lastPackMap of Just (lastSqnum, response) -> if sqnum == lastSqnum @@ -233,7 +248,7 @@ brokerServerThread state = finally brokerServerThread' cleanup bros <- brokers <$> readIORef state case request of RequestSubmitOrder sqnum clientIdentity order -> do - debugM "Broker.Server" $ "Request: submit order:" ++ show request + log Debug "Broker.Server" $ "Request: submit order:" <> (T.pack . show) request case findBrokerForAccount (orderAccountId order) bros of Just bro -> do globalOrderId <- nextOrderId @@ -245,7 +260,7 @@ brokerServerThread state = finally brokerServerThread' cleanup return ResponseOk Nothing -> do - warningM "Broker.Server" $ "Unknown account: " ++ T.unpack (orderAccountId order) + log Warning "Broker.Server" $ "Unknown account: " <> (orderAccountId order) return $ ResponseError "Unknown account" RequestCancelOrder sqnum clientIdentity localOrderId -> do m <- orderToBroker <$> readIORef state