1 changed files with 65 additions and 0 deletions
@ -0,0 +1,65 @@
@@ -0,0 +1,65 @@
|
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
{-# LANGUAGE QuasiQuotes #-} |
||||
|
||||
module ATrade.Broker.TradeSinks.MQTTTradeSink |
||||
( |
||||
withMQTTTradeSink |
||||
) where |
||||
import ATrade.Broker.Protocol (TradeSinkMessage (..)) |
||||
import ATrade.Types (SignalId (..), Trade (..), |
||||
toDouble) |
||||
import Control.Concurrent (forkIO, killThread, |
||||
threadDelay) |
||||
import qualified Control.Concurrent.BoundedChan as BC |
||||
import Control.Concurrent.MVar (isEmptyMVar, newEmptyMVar, |
||||
putMVar, tryReadMVar) |
||||
import Control.Exception (bracket, handle, throwIO) |
||||
import Control.Monad (void, when) |
||||
import Control.Monad.Extra (unlessM) |
||||
import Control.Monad.Loops (whileM_) |
||||
import Data.Aeson (encode) |
||||
import qualified Data.ByteString as B |
||||
import qualified Data.ByteString.Lazy as BL |
||||
import qualified Data.List as L |
||||
import Data.Maybe (isJust) |
||||
import qualified Data.Text as T |
||||
import Data.Text.Encoding (encodeUtf8) |
||||
import qualified Data.Text.Lazy as TL |
||||
import GHC.Exception (SomeException) |
||||
import Language.Haskell.Printf |
||||
import Network.MQTT.Client (connectURI, mqttConfig, |
||||
publish) |
||||
|
||||
withMQTTTradeSink mqttBrokerUri mqttTopic f = do |
||||
killMv <- newEmptyMVar |
||||
chan <- BC.newBoundedChan 1000 |
||||
bracket (forkIO $ sinkThread mqttBrokerUri mqttTopic killMv chan) (stopSinkThread killMv) (\_ -> f $ sink chan) |
||||
where |
||||
sink = BC.writeChan |
||||
|
||||
sinkThread mqttBrokerUri mqttTopic killMv chan = whileM_ (not <$> wasKilled) $ do |
||||
mqtt <- connectURI mqttConfig mqttBrokerUri |
||||
sinkThread' mqtt |
||||
where |
||||
sinkThread' mqtt = do |
||||
maybeTrade <- BC.tryReadChan chan |
||||
case maybeTrade of |
||||
Just trade -> |
||||
void $ publish mqtt mqttTopic (BL.fromStrict $ encodeTrade trade) False |
||||
Nothing -> do |
||||
threadDelay 1000000 |
||||
unlessM (isEmptyMVar killMv) $ sinkThread' mqtt |
||||
|
||||
wasKilled = isJust <$> tryReadMVar killMv |
||||
encodeTrade :: Trade -> B.ByteString |
||||
encodeTrade = encodeUtf8 . TL.toStrict . formatTrade |
||||
formatTrade trade = [t|New trade: %? %d %Q for %f (%Q/%Q)|] |
||||
(tradeOperation trade) |
||||
(tradeQuantity trade) |
||||
(tradeSecurity trade) |
||||
(toDouble $ tradePrice trade) |
||||
(strategyId . tradeSignalId $ trade) |
||||
(signalName . tradeSignalId $ trade) |
||||
|
||||
stopSinkThread killMv threadId = putMVar killMv () >> killThread threadId |
||||
|
||||
Loading…
Reference in new issue