Browse Source

gotify trade sink

master
Denis Tereshkin 2 years ago
parent
commit
7b2366d664
  1. 3
      src/Config.hs
  2. 107
      src/Main.hs

3
src/Config.hs

@ -33,7 +33,8 @@ data TransaqConnectorConfig = TransaqConnectorConfig {
transaqLogPath :: FilePath, transaqLogPath :: FilePath,
transaqLogLevel :: Int, transaqLogLevel :: Int,
tradesinkDashboard :: T.Text, tradesinkDashboard :: T.Text,
mqttUri :: T.Text, gotifyUri :: T.Text,
gotifyToken :: T.Text,
allTradesSubscriptions :: [SubscriptionConfig], allTradesSubscriptions :: [SubscriptionConfig],
quotationsSubscriptions :: [SubscriptionConfig], quotationsSubscriptions :: [SubscriptionConfig],
quotesSubscriptions :: [SubscriptionConfig] quotesSubscriptions :: [SubscriptionConfig]

107
src/Main.hs

@ -1,41 +1,41 @@
module Main (main) where module Main (main) where
import ATrade (libatrade_gitrev, import ATrade (libatrade_gitrev,
libatrade_version) libatrade_version)
import ATrade.Broker.Protocol (NotificationSqnum (NotificationSqnum)) import ATrade.Broker.Protocol (NotificationSqnum (NotificationSqnum))
import ATrade.Broker.Server (startBrokerServer, import ATrade.Broker.Server (startBrokerServer,
stopBrokerServer) stopBrokerServer)
import ATrade.Broker.TradeSinks.MQTTTradeSink (withMQTTTradeSink) import ATrade.Broker.TradeSinks.GotifyTradeSink (withGotifyTradeSink)
import ATrade.Broker.TradeSinks.ZMQTradeSink (withZMQTradeSink) import ATrade.Broker.TradeSinks.ZMQTradeSink (withZMQTradeSink)
import ATrade.Logging (Message (..), Severity (Debug, Error, Info, Trace, Warning), import ATrade.Logging (Message (..), Severity (Debug, Error, Info, Trace, Warning),
fmtMessage, logWith) fmtMessage, logWith)
import ATrade.QuoteSource.Server (startQuoteSourceServer, import ATrade.QuoteSource.Server (startQuoteSourceServer,
stopQuoteSourceServer) stopQuoteSourceServer)
import ATrade.Types (defaultServerSecurityParams) import ATrade.Types (defaultServerSecurityParams)
import Colog (LogAction, cfilter, import Colog (LogAction, cfilter,
logTextStdout, (>$<)) logTextStdout, (>$<))
import Colog.Actions (logTextHandle) import Colog.Actions (logTextHandle)
import Config (TransaqConnectorConfig (..), import Config (TransaqConnectorConfig (..),
loadConfig) loadConfig)
import Control.Concurrent (threadDelay) import Control.Concurrent (threadDelay)
import Control.Concurrent.BoundedChan (newBoundedChan) import Control.Concurrent.BoundedChan (newBoundedChan)
import Control.Exception (bracket) import Control.Exception (bracket)
import Control.Monad (forever, void) import Control.Monad (forever, void)
import Control.Monad.IO.Class (MonadIO) import Control.Monad.IO.Class (MonadIO)
import qualified Data.Text as T import qualified Data.Text as T
import Data.Version (showVersion) import Data.Version (showVersion)
import Debug.EventCounters (initEventCounters) import Debug.EventCounters (initEventCounters)
import HistoryProviderServer (withHistoryProviderServer) import HistoryProviderServer (withHistoryProviderServer)
import Network.URI (parseURI) import Network.URI (parseURI)
import Prelude hiding (log) import Prelude hiding (log)
import System.IO (Handle, import System.IO (Handle,
IOMode (AppendMode), IOMode (AppendMode),
withFile) withFile)
import System.ZMQ4 (withContext) import System.ZMQ4 (withContext)
import TickerInfoServer (withTickerInfoServer) import TickerInfoServer (withTickerInfoServer)
import qualified TXMLConnector as Connector import qualified TXMLConnector as Connector
import Version (transaqConnectorVersionText) import Version (transaqConnectorVersionText)
mkLogger :: (MonadIO m) => Handle -> Severity -> LogAction m Message mkLogger :: (MonadIO m) => Handle -> Severity -> LogAction m Message
mkLogger h sev = cfilter (\m -> msgSeverity m >= sev) (fmtMessage >$< (logTextStdout <> logTextHandle h)) mkLogger h sev = cfilter (\m -> msgSeverity m >= sev) (fmtMessage >$< (logTextStdout <> logTextHandle h))
@ -71,27 +71,22 @@ main = do
defaultServerSecurityParams) defaultServerSecurityParams)
stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do
withZMQTradeSink ctx (tradesinkDashboard cfg) logger $ \tsDashboard -> withZMQTradeSink ctx (tradesinkDashboard cfg) logger $ \tsDashboard ->
case parseURI (T.unpack $ mqttUri cfg) of withGotifyTradeSink (T.unpack $ gotifyUri cfg) (T.unpack $ gotifyToken cfg) logger $ \tsGotify -> do
Just uri -> do txml <- Connector.start logger cfg qssChannel tisH
withMQTTTradeSink uri mqttTradeSinkTopic logger $ \tsMqtt -> do bracket (startBrokerServer
txml <- Connector.start logger cfg qssChannel tisH [Connector.makeBrokerBackend txml (account cfg)]
bracket (startBrokerServer ctx
[Connector.makeBrokerBackend txml (account cfg)] (brokerEndpoint cfg)
ctx (brokerNotificationsEndpoint cfg)
(brokerEndpoint cfg) (NotificationSqnum 1)
(brokerNotificationsEndpoint cfg) [tsDashboard, tsGotify]
(NotificationSqnum 1) defaultServerSecurityParams
[tsDashboard, tsMqtt] logger) (\x -> do
defaultServerSecurityParams stopBrokerServer x
logger) (\x -> do log Info "main" "Stopping TXMLConnector"
stopBrokerServer x Connector.stop txml) $ \_ -> do
log Info "main" "Stopping TXMLConnector" withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml tisH logger id $ \_ -> do
Connector.stop txml) $ \_ -> do forever $ threadDelay 1000000
withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml tisH logger id $ \_ -> do
forever $ threadDelay 1000000
Nothing -> log Warning "main" "Can't parse MQTT URI"
log Info "main" "Shutting down" log Info "main" "Shutting down"
where
mqttTradeSinkTopic = "/atrade/trades"

Loading…
Cancel
Save