diff --git a/src/Config.hs b/src/Config.hs index 846b6e2..e9cec59 100644 --- a/src/Config.hs +++ b/src/Config.hs @@ -33,7 +33,8 @@ data TransaqConnectorConfig = TransaqConnectorConfig { transaqLogPath :: FilePath, transaqLogLevel :: Int, tradesinkDashboard :: T.Text, - mqttUri :: T.Text, + gotifyUri :: T.Text, + gotifyToken :: T.Text, allTradesSubscriptions :: [SubscriptionConfig], quotationsSubscriptions :: [SubscriptionConfig], quotesSubscriptions :: [SubscriptionConfig] diff --git a/src/Main.hs b/src/Main.hs index f4ffd0b..b8e6175 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -1,41 +1,41 @@ module Main (main) where -import ATrade (libatrade_gitrev, - libatrade_version) -import ATrade.Broker.Protocol (NotificationSqnum (NotificationSqnum)) -import ATrade.Broker.Server (startBrokerServer, - stopBrokerServer) -import ATrade.Broker.TradeSinks.MQTTTradeSink (withMQTTTradeSink) -import ATrade.Broker.TradeSinks.ZMQTradeSink (withZMQTradeSink) -import ATrade.Logging (Message (..), Severity (Debug, Error, Info, Trace, Warning), - fmtMessage, logWith) -import ATrade.QuoteSource.Server (startQuoteSourceServer, - stopQuoteSourceServer) -import ATrade.Types (defaultServerSecurityParams) -import Colog (LogAction, cfilter, - logTextStdout, (>$<)) -import Colog.Actions (logTextHandle) -import Config (TransaqConnectorConfig (..), - loadConfig) -import Control.Concurrent (threadDelay) -import Control.Concurrent.BoundedChan (newBoundedChan) -import Control.Exception (bracket) -import Control.Monad (forever, void) -import Control.Monad.IO.Class (MonadIO) -import qualified Data.Text as T -import Data.Version (showVersion) -import Debug.EventCounters (initEventCounters) -import HistoryProviderServer (withHistoryProviderServer) -import Network.URI (parseURI) -import Prelude hiding (log) -import System.IO (Handle, - IOMode (AppendMode), - withFile) -import System.ZMQ4 (withContext) -import TickerInfoServer (withTickerInfoServer) -import qualified TXMLConnector as Connector -import Version (transaqConnectorVersionText) +import ATrade (libatrade_gitrev, + libatrade_version) +import ATrade.Broker.Protocol (NotificationSqnum (NotificationSqnum)) +import ATrade.Broker.Server (startBrokerServer, + stopBrokerServer) +import ATrade.Broker.TradeSinks.GotifyTradeSink (withGotifyTradeSink) +import ATrade.Broker.TradeSinks.ZMQTradeSink (withZMQTradeSink) +import ATrade.Logging (Message (..), Severity (Debug, Error, Info, Trace, Warning), + fmtMessage, logWith) +import ATrade.QuoteSource.Server (startQuoteSourceServer, + stopQuoteSourceServer) +import ATrade.Types (defaultServerSecurityParams) +import Colog (LogAction, cfilter, + logTextStdout, (>$<)) +import Colog.Actions (logTextHandle) +import Config (TransaqConnectorConfig (..), + loadConfig) +import Control.Concurrent (threadDelay) +import Control.Concurrent.BoundedChan (newBoundedChan) +import Control.Exception (bracket) +import Control.Monad (forever, void) +import Control.Monad.IO.Class (MonadIO) +import qualified Data.Text as T +import Data.Version (showVersion) +import Debug.EventCounters (initEventCounters) +import HistoryProviderServer (withHistoryProviderServer) +import Network.URI (parseURI) +import Prelude hiding (log) +import System.IO (Handle, + IOMode (AppendMode), + withFile) +import System.ZMQ4 (withContext) +import TickerInfoServer (withTickerInfoServer) +import qualified TXMLConnector as Connector +import Version (transaqConnectorVersionText) mkLogger :: (MonadIO m) => Handle -> Severity -> LogAction m Message mkLogger h sev = cfilter (\m -> msgSeverity m >= sev) (fmtMessage >$< (logTextStdout <> logTextHandle h)) @@ -71,27 +71,22 @@ main = do defaultServerSecurityParams) stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do withZMQTradeSink ctx (tradesinkDashboard cfg) logger $ \tsDashboard -> - case parseURI (T.unpack $ mqttUri cfg) of - Just uri -> do - withMQTTTradeSink uri mqttTradeSinkTopic logger $ \tsMqtt -> do - txml <- Connector.start logger cfg qssChannel tisH - bracket (startBrokerServer - [Connector.makeBrokerBackend txml (account cfg)] - ctx - (brokerEndpoint cfg) - (brokerNotificationsEndpoint cfg) - (NotificationSqnum 1) - [tsDashboard, tsMqtt] - defaultServerSecurityParams - logger) (\x -> do - stopBrokerServer x - log Info "main" "Stopping TXMLConnector" - Connector.stop txml) $ \_ -> do - withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml tisH logger id $ \_ -> do - forever $ threadDelay 1000000 - Nothing -> log Warning "main" "Can't parse MQTT URI" + withGotifyTradeSink (T.unpack $ gotifyUri cfg) (T.unpack $ gotifyToken cfg) logger $ \tsGotify -> do + txml <- Connector.start logger cfg qssChannel tisH + bracket (startBrokerServer + [Connector.makeBrokerBackend txml (account cfg)] + ctx + (brokerEndpoint cfg) + (brokerNotificationsEndpoint cfg) + (NotificationSqnum 1) + [tsDashboard, tsGotify] + defaultServerSecurityParams + logger) (\x -> do + stopBrokerServer x + log Info "main" "Stopping TXMLConnector" + Connector.stop txml) $ \_ -> do + withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml tisH logger id $ \_ -> do + forever $ threadDelay 1000000 log Info "main" "Shutting down" - where - mqttTradeSinkTopic = "/atrade/trades"