From 03b01e84243850e28dc5301084e8e2035a2dc723 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 14 May 2023 12:44:32 +0700 Subject: [PATCH] Add MQTT-based TradeSink --- src/ATrade/Broker/TradeSinks/MQTTTradeSink.hs | 65 +++++++++++++++++++ 1 file changed, 65 insertions(+) create mode 100644 src/ATrade/Broker/TradeSinks/MQTTTradeSink.hs diff --git a/src/ATrade/Broker/TradeSinks/MQTTTradeSink.hs b/src/ATrade/Broker/TradeSinks/MQTTTradeSink.hs new file mode 100644 index 0000000..f94877a --- /dev/null +++ b/src/ATrade/Broker/TradeSinks/MQTTTradeSink.hs @@ -0,0 +1,65 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} + +module ATrade.Broker.TradeSinks.MQTTTradeSink + ( + withMQTTTradeSink + ) where +import ATrade.Broker.Protocol (TradeSinkMessage (..)) +import ATrade.Types (SignalId (..), Trade (..), + toDouble) +import Control.Concurrent (forkIO, killThread, + threadDelay) +import qualified Control.Concurrent.BoundedChan as BC +import Control.Concurrent.MVar (isEmptyMVar, newEmptyMVar, + putMVar, tryReadMVar) +import Control.Exception (bracket, handle, throwIO) +import Control.Monad (void, when) +import Control.Monad.Extra (unlessM) +import Control.Monad.Loops (whileM_) +import Data.Aeson (encode) +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL +import qualified Data.List as L +import Data.Maybe (isJust) +import qualified Data.Text as T +import Data.Text.Encoding (encodeUtf8) +import qualified Data.Text.Lazy as TL +import GHC.Exception (SomeException) +import Language.Haskell.Printf +import Network.MQTT.Client (connectURI, mqttConfig, + publish) + +withMQTTTradeSink mqttBrokerUri mqttTopic f = do + killMv <- newEmptyMVar + chan <- BC.newBoundedChan 1000 + bracket (forkIO $ sinkThread mqttBrokerUri mqttTopic killMv chan) (stopSinkThread killMv) (\_ -> f $ sink chan) + where + sink = BC.writeChan + +sinkThread mqttBrokerUri mqttTopic killMv chan = whileM_ (not <$> wasKilled) $ do + mqtt <- connectURI mqttConfig mqttBrokerUri + sinkThread' mqtt + where + sinkThread' mqtt = do + maybeTrade <- BC.tryReadChan chan + case maybeTrade of + Just trade -> + void $ publish mqtt mqttTopic (BL.fromStrict $ encodeTrade trade) False + Nothing -> do + threadDelay 1000000 + unlessM (isEmptyMVar killMv) $ sinkThread' mqtt + + wasKilled = isJust <$> tryReadMVar killMv + encodeTrade :: Trade -> B.ByteString + encodeTrade = encodeUtf8 . TL.toStrict . formatTrade + formatTrade trade = [t|New trade: %? %d %Q for %f (%Q/%Q)|] + (tradeOperation trade) + (tradeQuantity trade) + (tradeSecurity trade) + (toDouble $ tradePrice trade) + (strategyId . tradeSignalId $ trade) + (signalName . tradeSignalId $ trade) + +stopSinkThread killMv threadId = putMVar killMv () >> killThread threadId +