Browse Source

MQTT & ZMQ sinks

master
Denis Tereshkin 2 years ago
parent
commit
9161b32709
  1. 22
      src/ATrade/Broker/TradeSinks/MQTTTradeSink.hs
  2. 49
      src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs

22
src/ATrade/Broker/TradeSinks/MQTTTradeSink.hs

@ -6,6 +6,8 @@ module ATrade.Broker.TradeSinks.MQTTTradeSink @@ -6,6 +6,8 @@ module ATrade.Broker.TradeSinks.MQTTTradeSink
withMQTTTradeSink
) where
import ATrade.Broker.Protocol (TradeSinkMessage (..))
import ATrade.Logging (Severity (Debug, Info),
logWith)
import ATrade.Types (SignalId (..), Trade (..),
toDouble)
import Control.Concurrent (forkIO, killThread,
@ -15,7 +17,7 @@ import Control.Concurrent.MVar (isEmptyMVar, newEmptyMVar, @@ -15,7 +17,7 @@ import Control.Concurrent.MVar (isEmptyMVar, newEmptyMVar,
putMVar, tryReadMVar)
import Control.Exception (bracket, handle, throwIO)
import Control.Monad (void, when)
import Control.Monad.Extra (unlessM)
import Control.Monad.Extra (whenM)
import Control.Monad.Loops (whileM_)
import Data.Aeson (encode)
import qualified Data.ByteString as B
@ -28,19 +30,25 @@ import qualified Data.Text.Lazy as TL @@ -28,19 +30,25 @@ import qualified Data.Text.Lazy as TL
import GHC.Exception (SomeException)
import Language.Haskell.Printf
import Network.MQTT.Client (connectURI, mqttConfig,
publish)
normalDisconnect, publish)
withMQTTTradeSink mqttBrokerUri mqttTopic f = do
withMQTTTradeSink mqttBrokerUri mqttTopic logger f = do
killMv <- newEmptyMVar
chan <- BC.newBoundedChan 1000
bracket (forkIO $ sinkThread mqttBrokerUri mqttTopic killMv chan) (stopSinkThread killMv) (\_ -> f $ sink chan)
bracket (forkIO $ sinkThread mqttBrokerUri mqttTopic killMv chan logger) (stopSinkThread killMv) (\_ -> f $ sink chan)
where
sink = BC.writeChan
sinkThread mqttBrokerUri mqttTopic killMv chan = whileM_ (not <$> wasKilled) $ do
sinkThread mqttBrokerUri mqttTopic killMv chan logger = whileM_ (not <$> wasKilled) $ do
log Info "Thread started"
mqtt <- connectURI mqttConfig mqttBrokerUri
log Debug "Connected"
sinkThread' mqtt
log Debug "Disconnecting"
normalDisconnect mqtt
log Info "Disconnected"
where
log sev = logWith logger sev "MQTTTradeSink"
sinkThread' mqtt = do
maybeTrade <- BC.tryReadChan chan
case maybeTrade of
@ -48,7 +56,7 @@ sinkThread mqttBrokerUri mqttTopic killMv chan = whileM_ (not <$> wasKilled) $ d @@ -48,7 +56,7 @@ sinkThread mqttBrokerUri mqttTopic killMv chan = whileM_ (not <$> wasKilled) $ d
void $ publish mqtt mqttTopic (BL.fromStrict $ encodeTrade trade) False
Nothing -> do
threadDelay 1000000
unlessM (isEmptyMVar killMv) $ sinkThread' mqtt
whenM (isEmptyMVar killMv) $ sinkThread' mqtt
wasKilled = isJust <$> tryReadMVar killMv
encodeTrade :: Trade -> B.ByteString
@ -61,5 +69,5 @@ sinkThread mqttBrokerUri mqttTopic killMv chan = whileM_ (not <$> wasKilled) $ d @@ -61,5 +69,5 @@ sinkThread mqttBrokerUri mqttTopic killMv chan = whileM_ (not <$> wasKilled) $ d
(strategyId . tradeSignalId $ trade)
(signalName . tradeSignalId $ trade)
stopSinkThread killMv threadId = putMVar killMv () >> killThread threadId
stopSinkThread killMv threadId = putMVar killMv () >> threadDelay 10000000

49
src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs

@ -1,42 +1,49 @@ @@ -1,42 +1,49 @@
{-# LANGUAGE OverloadedStrings #-}
module ATrade.Broker.TradeSinks.ZMQTradeSink (
withZMQTradeSink
) where
import Control.Exception
import Control.Concurrent
import Control.Concurrent
import qualified Control.Concurrent.BoundedChan as BC
import Data.Aeson
import Data.IORef
import Data.Maybe
import qualified Data.Text as T
import Data.List.NonEmpty
import qualified Data.List as L
import qualified Data.ByteString as B hiding (putStrLn)
import qualified Data.ByteString.Lazy as BL hiding (putStrLn)
import Control.Monad.Loops
import Control.Monad.Extra
import System.Timeout
import System.ZMQ4
import Control.Exception
import Control.Monad.Extra
import Control.Monad.Loops
import Data.Aeson
import qualified Data.ByteString as B hiding (putStrLn)
import qualified Data.ByteString.Lazy as BL hiding (putStrLn)
import Data.IORef
import qualified Data.List as L
import Data.List.NonEmpty
import Data.Maybe
import qualified Data.Text as T
import System.Timeout
import System.ZMQ4
import ATrade.Types
import ATrade.Broker.Protocol
import ATrade.Broker.Protocol
import ATrade.Logging (Severity (..), logWith)
import ATrade.Types
withZMQTradeSink ctx tradeSinkEp f = do
withZMQTradeSink ctx tradeSinkEp logger f = do
killMv <- newEmptyMVar
chan <- BC.newBoundedChan 1000
bracket (forkIO $ sinkThread ctx tradeSinkEp killMv chan) (stopSinkThread killMv) (\_ -> f $ sink chan)
bracket (forkIO $ sinkThread ctx tradeSinkEp killMv chan logger) (stopSinkThread killMv) (\_ -> f $ sink chan)
where
sink = BC.writeChan
sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $
sinkThread ctx tradeSinkEp killMv chan logger = whileM_ (not <$> wasKilled) $ do
log Info "Start of loop"
handle (\e -> do
when (isZMQError (e :: SomeException)) $ do
throwIO e) sinkThread'
log Info "End of loop"
where
log sev = logWith logger sev "ZMQTradeSink"
sinkThread' = withSocket ctx Dealer (\sock -> do
connect sock $ T.unpack tradeSinkEp
whenM (not <$> wasKilled) $ sinkThread'' sock)
whenM (not <$> wasKilled) $ sinkThread'' sock
disconnect sock $ T.unpack tradeSinkEp
close sock)
sinkThread'' sock = do
maybeTrade <- BC.tryReadChan chan
@ -73,4 +80,4 @@ sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $ @@ -73,4 +80,4 @@ sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $
}
stopSinkThread killMv threadId = putMVar killMv () >> killThread threadId

Loading…
Cancel
Save