diff --git a/src/Config.hs b/src/Config.hs index 0a3438d..846b6e2 100644 --- a/src/Config.hs +++ b/src/Config.hs @@ -7,10 +7,8 @@ module Config loadConfig, ) where -import ATrade.Logging (Severity) -import qualified Data.Text as T -import Dhall (FromDhall (autoWith), auto, expected, - inputFile) +import qualified Data.Text as T +import Dhall (FromDhall (autoWith), auto, expected, inputFile) import GHC.Generics data SubscriptionConfig = SubscriptionConfig T.Text T.Text @@ -34,7 +32,8 @@ data TransaqConnectorConfig = TransaqConnectorConfig { transaqPort :: Int, transaqLogPath :: FilePath, transaqLogLevel :: Int, - tradesinks :: [T.Text], + tradesinkDashboard :: T.Text, + mqttUri :: T.Text, allTradesSubscriptions :: [SubscriptionConfig], quotationsSubscriptions :: [SubscriptionConfig], quotesSubscriptions :: [SubscriptionConfig] diff --git a/src/Main.hs b/src/Main.hs index eb36dd6..17b6c1e 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -1,37 +1,41 @@ module Main (main) where -import ATrade (libatrade_gitrev, - libatrade_version) -import ATrade.Broker.Protocol (NotificationSqnum (NotificationSqnum)) -import ATrade.Broker.Server (startBrokerServer, - stopBrokerServer) -import ATrade.Logging (Message (..), Severity (Debug, Error, Info, Trace, Warning), - fmtMessage, logWith) -import ATrade.QuoteSource.Server (startQuoteSourceServer, - stopQuoteSourceServer) -import ATrade.Types (defaultServerSecurityParams) -import Colog (LogAction, cfilter, - logTextStdout, (>$<)) -import Colog.Actions (logTextHandle) -import Config (TransaqConnectorConfig (..), - loadConfig) -import Control.Concurrent (threadDelay) -import Control.Concurrent.BoundedChan (newBoundedChan) -import Control.Exception (bracket) -import Control.Monad (forever, void) -import Control.Monad.IO.Class (MonadIO) -import qualified Data.Text as T -import Data.Version (showVersion) -import Debug.EventCounters (initEventCounters) -import HistoryProviderServer (withHistoryProviderServer) -import Prelude hiding (log) -import System.IO (Handle, IOMode (AppendMode), - withFile) -import System.ZMQ4 (withContext) -import TickerInfoServer (withTickerInfoServer) -import qualified TXMLConnector as Connector -import Version (transaqConnectorVersionText) +import ATrade (libatrade_gitrev, + libatrade_version) +import ATrade.Broker.Protocol (NotificationSqnum (NotificationSqnum)) +import ATrade.Broker.Server (startBrokerServer, + stopBrokerServer) +import ATrade.Broker.TradeSinks.MQTTTradeSink (withMQTTTradeSink) +import ATrade.Broker.TradeSinks.ZMQTradeSink (withZMQTradeSink) +import ATrade.Logging (Message (..), Severity (Debug, Error, Info, Trace, Warning), + fmtMessage, logWith) +import ATrade.QuoteSource.Server (startQuoteSourceServer, + stopQuoteSourceServer) +import ATrade.Types (defaultServerSecurityParams) +import Colog (LogAction, cfilter, + logTextStdout, (>$<)) +import Colog.Actions (logTextHandle) +import Config (TransaqConnectorConfig (..), + loadConfig) +import Control.Concurrent (threadDelay) +import Control.Concurrent.BoundedChan (newBoundedChan) +import Control.Exception (bracket) +import Control.Monad (forever, void) +import Control.Monad.IO.Class (MonadIO) +import qualified Data.Text as T +import Data.Version (showVersion) +import Debug.EventCounters (initEventCounters) +import HistoryProviderServer (withHistoryProviderServer) +import Network.URI (parseURI) +import Prelude hiding (log) +import System.IO (Handle, + IOMode (AppendMode), + withFile) +import System.ZMQ4 (withContext) +import TickerInfoServer (withTickerInfoServer) +import qualified TXMLConnector as Connector +import Version (transaqConnectorVersionText) mkLogger :: (MonadIO m) => Handle -> Severity -> LogAction m Message mkLogger h sev = cfilter (\m -> msgSeverity m >= sev) (fmtMessage >$< (logTextStdout <> logTextHandle h)) @@ -43,7 +47,7 @@ parseLoglevel 2 = Info parseLoglevel 3 = Debug parseLoglevel _ = Trace -main :: IO () +main :: IO () main = do initEventCounters cfg <- loadConfig "transaq-connector.dhall" @@ -66,21 +70,28 @@ main = do (quotesourceEndpoint cfg) defaultServerSecurityParams) stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do - txml <- Connector.start logger cfg qssChannel tisH - bracket (startBrokerServer - [Connector.makeBrokerBackend txml (account cfg)] - ctx - (brokerEndpoint cfg) - (brokerNotificationsEndpoint cfg) - (NotificationSqnum 1) - [] - defaultServerSecurityParams - logger) (\x -> do - stopBrokerServer x - log Info "main" "Stopping TXMLConnector" - Connector.stop txml) $ \_ -> do - withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml tisH logger id $ \_ -> do - forever $ threadDelay 1000000 + withZMQTradeSink ctx (tradesinkDashboard cfg) $ \tsDashboard -> + case parseURI (T.unpack $ mqttUri cfg) of + Just uri -> do + withMQTTTradeSink uri mqttTradeSinkTopic $ \tsMqtt -> do + txml <- Connector.start logger cfg qssChannel tisH + bracket (startBrokerServer + [Connector.makeBrokerBackend txml (account cfg)] + ctx + (brokerEndpoint cfg) + (brokerNotificationsEndpoint cfg) + (NotificationSqnum 1) + [tsDashboard, tsMqtt] + defaultServerSecurityParams + logger) (\x -> do + stopBrokerServer x + log Info "main" "Stopping TXMLConnector" + Connector.stop txml) $ \_ -> do + withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml tisH logger id $ \_ -> do + forever $ threadDelay 1000000 + Nothing -> log Warning "main" "Can't parse MQTT URI" log Info "main" "Shutting down" + where + mqttTradeSinkTopic = "/atrade/trades" diff --git a/src/TXMLConnector.hs b/src/TXMLConnector.hs index 5001cef..c4fa42e 100644 --- a/src/TXMLConnector.hs +++ b/src/TXMLConnector.hs @@ -357,9 +357,11 @@ workThread = do case item of MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown MainQueuePingServer -> do - maybeServerStatus<- liftIO $ sendCommand $ toXml CommandServerStatus + maybeServerStatus <- liftIO $ sendCommand $ toXml CommandServerStatus case maybeServerStatus of - Left serverStatusRaw -> void $ liftIO $ parseAndWrite queue logger serverStatusRaw + Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of + ((TransaqResponseResult (ResponseFailure _)):_) -> liftIO $ atomically $ writeTVar serverConn StageConnection + _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw Right () -> pure () MainQueueTransaqData transaqData -> do tm <- asks tickMap @@ -437,6 +439,7 @@ workThread = do log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <> (T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId Just (TransaqResponseResult (ResponseFailure err)) -> do + brState <- asks brokerState log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) case maybeCb of @@ -619,7 +622,7 @@ workThread = do log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]" liftIO $ threadDelay (1000 * 1000 * 10) Right _ -> do - log Warning "TXMLConnector.WorkThread" "Connected" + log Info "TXMLConnector.WorkThread" "Connected" conn <- asks serverConnected liftIO . atomically $ writeTVar conn StageGetInfo -- item <- atomically $ readTBQueue queue diff --git a/transaq-connector.cabal b/transaq-connector.cabal index 70b6a88..ea96684 100644 --- a/transaq-connector.cabal +++ b/transaq-connector.cabal @@ -31,7 +31,7 @@ executable transaq-connector build-depends: base >= 4.7 && < 5 , dhall , eventcounters - , libatrade + , libatrade == 0.14.0.0 , text , transformers , co-log @@ -52,6 +52,7 @@ executable transaq-connector , binary , bimap , deque + , network-uri extra-lib-dirs: lib ghc-options: -Wall -Wcompat