From 698d93a1fd2c08aa9ed3e32454d2188e1fd4b01f Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 1 Aug 2023 23:03:53 +0700 Subject: [PATCH] gotify trade sink --- libatrade.cabal | 1 + .../Broker/TradeSinks/GotifyTradeSink.hs | 81 +++++++++++++++++++ 2 files changed, 82 insertions(+) create mode 100644 src/ATrade/Broker/TradeSinks/GotifyTradeSink.hs diff --git a/libatrade.cabal b/libatrade.cabal index fe1575a..ddcf95a 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -28,6 +28,7 @@ library , ATrade.Broker.TradeSinks.MQTTTradeSink , ATrade.Broker.TradeSinks.TelegramTradeSink , ATrade.Broker.TradeSinks.ZMQTradeSink + , ATrade.Broker.TradeSinks.GotifyTradeSink , ATrade.Util , ATrade other-modules: Paths_libatrade diff --git a/src/ATrade/Broker/TradeSinks/GotifyTradeSink.hs b/src/ATrade/Broker/TradeSinks/GotifyTradeSink.hs new file mode 100644 index 0000000..b57ba13 --- /dev/null +++ b/src/ATrade/Broker/TradeSinks/GotifyTradeSink.hs @@ -0,0 +1,81 @@ +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} + +module ATrade.Broker.TradeSinks.GotifyTradeSink + ( + withGotifyTradeSink + ) where +import ATrade.Broker.Protocol (TradeSinkMessage (..)) +import ATrade.Logging (Severity (Debug, Info), + logWith) +import ATrade.Types (SignalId (..), + Trade (..), toDouble) +import Control.Concurrent (forkIO, killThread, + threadDelay) +import qualified Control.Concurrent.BoundedChan as BC +import Control.Concurrent.MVar (isEmptyMVar, + newEmptyMVar, putMVar, + tryReadMVar) +import Control.Exception (bracket, handle, + throwIO) +import Control.Monad (void, when) +import Control.Monad.Extra (whenM) +import Control.Monad.Loops (whileM_) +import Data.Aeson (encode) +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL +import qualified Data.List as L +import Data.Maybe (isJust) +import qualified Data.Text as T +import Data.Text.Encoding (encodeUtf8) +import qualified Data.Text.Lazy as TL +import GHC.Exception (SomeException) +import Language.Haskell.Printf +import Network.HTTP.Client (defaultManagerSettings, + httpLbs, newManager, + parseRequest, + withManager) +import Network.HTTP.Client.MultipartFormData (formDataBody, partBS) + +withGotifyTradeSink server token logger f = do + killMv <- newEmptyMVar + chan <- BC.newBoundedChan 1000 + bracket (forkIO $ sinkThread server token killMv chan logger) (stopSinkThread killMv) (\_ -> f $ sink chan) + where + sink = BC.writeChan + +sinkThread server token killMv chan logger = whileM_ (not <$> wasKilled) $ do + log Info "GotifyTradeSink thread started" + manager <- newManager defaultManagerSettings + log Debug "Connected" + sinkThread' manager + log Info "Disconnected" + where + log sev = logWith logger sev "GotifyTradeSink" + sinkThread' manager = do + maybeTrade <- BC.tryReadChan chan + case maybeTrade of + Just trade -> do + request <- parseRequest $ server <> "/message?token=" <> token + requestWithData <- + formDataBody [ partBS "title" "Trade" + , partBS "message" (encodeTrade trade) + , partBS "priority" "5"] request + void $ httpLbs requestWithData manager + Nothing -> do + threadDelay 1000000 + whenM (isEmptyMVar killMv) $ sinkThread' manager + + wasKilled = isJust <$> tryReadMVar killMv + encodeTrade :: Trade -> B.ByteString + encodeTrade = encodeUtf8 . TL.toStrict . formatTrade + formatTrade trade = [t|New trade: %? %d %Q for %f (%Q/%Q)|] + (tradeOperation trade) + (tradeQuantity trade) + (tradeSecurity trade) + (toDouble $ tradePrice trade) + (strategyId . tradeSignalId $ trade) + (signalName . tradeSignalId $ trade) + +stopSinkThread killMv threadId = putMVar killMv () >> threadDelay 10000000 +