From 9161b32709a28856db696ea95246d301d0c37a82 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 1 Aug 2023 22:27:22 +0700 Subject: [PATCH] MQTT & ZMQ sinks --- src/ATrade/Broker/TradeSinks/MQTTTradeSink.hs | 22 ++++++--- src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs | 49 +++++++++++-------- 2 files changed, 43 insertions(+), 28 deletions(-) diff --git a/src/ATrade/Broker/TradeSinks/MQTTTradeSink.hs b/src/ATrade/Broker/TradeSinks/MQTTTradeSink.hs index f94877a..6b9207b 100644 --- a/src/ATrade/Broker/TradeSinks/MQTTTradeSink.hs +++ b/src/ATrade/Broker/TradeSinks/MQTTTradeSink.hs @@ -6,6 +6,8 @@ module ATrade.Broker.TradeSinks.MQTTTradeSink withMQTTTradeSink ) where import ATrade.Broker.Protocol (TradeSinkMessage (..)) +import ATrade.Logging (Severity (Debug, Info), + logWith) import ATrade.Types (SignalId (..), Trade (..), toDouble) import Control.Concurrent (forkIO, killThread, @@ -15,7 +17,7 @@ import Control.Concurrent.MVar (isEmptyMVar, newEmptyMVar, putMVar, tryReadMVar) import Control.Exception (bracket, handle, throwIO) import Control.Monad (void, when) -import Control.Monad.Extra (unlessM) +import Control.Monad.Extra (whenM) import Control.Monad.Loops (whileM_) import Data.Aeson (encode) import qualified Data.ByteString as B @@ -28,19 +30,25 @@ import qualified Data.Text.Lazy as TL import GHC.Exception (SomeException) import Language.Haskell.Printf import Network.MQTT.Client (connectURI, mqttConfig, - publish) + normalDisconnect, publish) -withMQTTTradeSink mqttBrokerUri mqttTopic f = do +withMQTTTradeSink mqttBrokerUri mqttTopic logger f = do killMv <- newEmptyMVar chan <- BC.newBoundedChan 1000 - bracket (forkIO $ sinkThread mqttBrokerUri mqttTopic killMv chan) (stopSinkThread killMv) (\_ -> f $ sink chan) + bracket (forkIO $ sinkThread mqttBrokerUri mqttTopic killMv chan logger) (stopSinkThread killMv) (\_ -> f $ sink chan) where sink = BC.writeChan -sinkThread mqttBrokerUri mqttTopic killMv chan = whileM_ (not <$> wasKilled) $ do +sinkThread mqttBrokerUri mqttTopic killMv chan logger = whileM_ (not <$> wasKilled) $ do + log Info "Thread started" mqtt <- connectURI mqttConfig mqttBrokerUri + log Debug "Connected" sinkThread' mqtt + log Debug "Disconnecting" + normalDisconnect mqtt + log Info "Disconnected" where + log sev = logWith logger sev "MQTTTradeSink" sinkThread' mqtt = do maybeTrade <- BC.tryReadChan chan case maybeTrade of @@ -48,7 +56,7 @@ sinkThread mqttBrokerUri mqttTopic killMv chan = whileM_ (not <$> wasKilled) $ d void $ publish mqtt mqttTopic (BL.fromStrict $ encodeTrade trade) False Nothing -> do threadDelay 1000000 - unlessM (isEmptyMVar killMv) $ sinkThread' mqtt + whenM (isEmptyMVar killMv) $ sinkThread' mqtt wasKilled = isJust <$> tryReadMVar killMv encodeTrade :: Trade -> B.ByteString @@ -61,5 +69,5 @@ sinkThread mqttBrokerUri mqttTopic killMv chan = whileM_ (not <$> wasKilled) $ d (strategyId . tradeSignalId $ trade) (signalName . tradeSignalId $ trade) -stopSinkThread killMv threadId = putMVar killMv () >> killThread threadId +stopSinkThread killMv threadId = putMVar killMv () >> threadDelay 10000000 diff --git a/src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs b/src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs index 2c0c94a..9383187 100644 --- a/src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs +++ b/src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs @@ -1,42 +1,49 @@ +{-# LANGUAGE OverloadedStrings #-} module ATrade.Broker.TradeSinks.ZMQTradeSink ( withZMQTradeSink ) where -import Control.Exception -import Control.Concurrent +import Control.Concurrent import qualified Control.Concurrent.BoundedChan as BC -import Data.Aeson -import Data.IORef -import Data.Maybe -import qualified Data.Text as T -import Data.List.NonEmpty -import qualified Data.List as L -import qualified Data.ByteString as B hiding (putStrLn) -import qualified Data.ByteString.Lazy as BL hiding (putStrLn) -import Control.Monad.Loops -import Control.Monad.Extra -import System.Timeout -import System.ZMQ4 +import Control.Exception +import Control.Monad.Extra +import Control.Monad.Loops +import Data.Aeson +import qualified Data.ByteString as B hiding (putStrLn) +import qualified Data.ByteString.Lazy as BL hiding (putStrLn) +import Data.IORef +import qualified Data.List as L +import Data.List.NonEmpty +import Data.Maybe +import qualified Data.Text as T +import System.Timeout +import System.ZMQ4 -import ATrade.Types -import ATrade.Broker.Protocol +import ATrade.Broker.Protocol +import ATrade.Logging (Severity (..), logWith) +import ATrade.Types -withZMQTradeSink ctx tradeSinkEp f = do +withZMQTradeSink ctx tradeSinkEp logger f = do killMv <- newEmptyMVar chan <- BC.newBoundedChan 1000 - bracket (forkIO $ sinkThread ctx tradeSinkEp killMv chan) (stopSinkThread killMv) (\_ -> f $ sink chan) + bracket (forkIO $ sinkThread ctx tradeSinkEp killMv chan logger) (stopSinkThread killMv) (\_ -> f $ sink chan) where sink = BC.writeChan -sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $ +sinkThread ctx tradeSinkEp killMv chan logger = whileM_ (not <$> wasKilled) $ do + log Info "Start of loop" handle (\e -> do when (isZMQError (e :: SomeException)) $ do throwIO e) sinkThread' + log Info "End of loop" where + log sev = logWith logger sev "ZMQTradeSink" sinkThread' = withSocket ctx Dealer (\sock -> do connect sock $ T.unpack tradeSinkEp - whenM (not <$> wasKilled) $ sinkThread'' sock) + whenM (not <$> wasKilled) $ sinkThread'' sock + disconnect sock $ T.unpack tradeSinkEp + close sock) sinkThread'' sock = do maybeTrade <- BC.tryReadChan chan @@ -73,4 +80,4 @@ sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $ } stopSinkThread killMv threadId = putMVar killMv () >> killThread threadId - +