Browse Source

Tradesinks

master
Denis Tereshkin 3 years ago
parent
commit
bb61c8d00d
  1. 9
      src/Config.hs
  2. 105
      src/Main.hs
  3. 9
      src/TXMLConnector.hs
  4. 3
      transaq-connector.cabal

9
src/Config.hs

@ -7,10 +7,8 @@ module Config
loadConfig, loadConfig,
) where ) where
import ATrade.Logging (Severity) import qualified Data.Text as T
import qualified Data.Text as T import Dhall (FromDhall (autoWith), auto, expected, inputFile)
import Dhall (FromDhall (autoWith), auto, expected,
inputFile)
import GHC.Generics import GHC.Generics
data SubscriptionConfig = SubscriptionConfig T.Text T.Text data SubscriptionConfig = SubscriptionConfig T.Text T.Text
@ -34,7 +32,8 @@ data TransaqConnectorConfig = TransaqConnectorConfig {
transaqPort :: Int, transaqPort :: Int,
transaqLogPath :: FilePath, transaqLogPath :: FilePath,
transaqLogLevel :: Int, transaqLogLevel :: Int,
tradesinks :: [T.Text], tradesinkDashboard :: T.Text,
mqttUri :: T.Text,
allTradesSubscriptions :: [SubscriptionConfig], allTradesSubscriptions :: [SubscriptionConfig],
quotationsSubscriptions :: [SubscriptionConfig], quotationsSubscriptions :: [SubscriptionConfig],
quotesSubscriptions :: [SubscriptionConfig] quotesSubscriptions :: [SubscriptionConfig]

105
src/Main.hs

@ -1,37 +1,41 @@
module Main (main) where module Main (main) where
import ATrade (libatrade_gitrev, import ATrade (libatrade_gitrev,
libatrade_version) libatrade_version)
import ATrade.Broker.Protocol (NotificationSqnum (NotificationSqnum)) import ATrade.Broker.Protocol (NotificationSqnum (NotificationSqnum))
import ATrade.Broker.Server (startBrokerServer, import ATrade.Broker.Server (startBrokerServer,
stopBrokerServer) stopBrokerServer)
import ATrade.Logging (Message (..), Severity (Debug, Error, Info, Trace, Warning), import ATrade.Broker.TradeSinks.MQTTTradeSink (withMQTTTradeSink)
fmtMessage, logWith) import ATrade.Broker.TradeSinks.ZMQTradeSink (withZMQTradeSink)
import ATrade.QuoteSource.Server (startQuoteSourceServer, import ATrade.Logging (Message (..), Severity (Debug, Error, Info, Trace, Warning),
stopQuoteSourceServer) fmtMessage, logWith)
import ATrade.Types (defaultServerSecurityParams) import ATrade.QuoteSource.Server (startQuoteSourceServer,
import Colog (LogAction, cfilter, stopQuoteSourceServer)
logTextStdout, (>$<)) import ATrade.Types (defaultServerSecurityParams)
import Colog.Actions (logTextHandle) import Colog (LogAction, cfilter,
import Config (TransaqConnectorConfig (..), logTextStdout, (>$<))
loadConfig) import Colog.Actions (logTextHandle)
import Control.Concurrent (threadDelay) import Config (TransaqConnectorConfig (..),
import Control.Concurrent.BoundedChan (newBoundedChan) loadConfig)
import Control.Exception (bracket) import Control.Concurrent (threadDelay)
import Control.Monad (forever, void) import Control.Concurrent.BoundedChan (newBoundedChan)
import Control.Monad.IO.Class (MonadIO) import Control.Exception (bracket)
import qualified Data.Text as T import Control.Monad (forever, void)
import Data.Version (showVersion) import Control.Monad.IO.Class (MonadIO)
import Debug.EventCounters (initEventCounters) import qualified Data.Text as T
import HistoryProviderServer (withHistoryProviderServer) import Data.Version (showVersion)
import Prelude hiding (log) import Debug.EventCounters (initEventCounters)
import System.IO (Handle, IOMode (AppendMode), import HistoryProviderServer (withHistoryProviderServer)
withFile) import Network.URI (parseURI)
import System.ZMQ4 (withContext) import Prelude hiding (log)
import TickerInfoServer (withTickerInfoServer) import System.IO (Handle,
import qualified TXMLConnector as Connector IOMode (AppendMode),
import Version (transaqConnectorVersionText) withFile)
import System.ZMQ4 (withContext)
import TickerInfoServer (withTickerInfoServer)
import qualified TXMLConnector as Connector
import Version (transaqConnectorVersionText)
mkLogger :: (MonadIO m) => Handle -> Severity -> LogAction m Message mkLogger :: (MonadIO m) => Handle -> Severity -> LogAction m Message
mkLogger h sev = cfilter (\m -> msgSeverity m >= sev) (fmtMessage >$< (logTextStdout <> logTextHandle h)) mkLogger h sev = cfilter (\m -> msgSeverity m >= sev) (fmtMessage >$< (logTextStdout <> logTextHandle h))
@ -43,7 +47,7 @@ parseLoglevel 2 = Info
parseLoglevel 3 = Debug parseLoglevel 3 = Debug
parseLoglevel _ = Trace parseLoglevel _ = Trace
main :: IO () main :: IO ()
main = do main = do
initEventCounters initEventCounters
cfg <- loadConfig "transaq-connector.dhall" cfg <- loadConfig "transaq-connector.dhall"
@ -66,21 +70,28 @@ main = do
(quotesourceEndpoint cfg) (quotesourceEndpoint cfg)
defaultServerSecurityParams) defaultServerSecurityParams)
stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do
txml <- Connector.start logger cfg qssChannel tisH withZMQTradeSink ctx (tradesinkDashboard cfg) $ \tsDashboard ->
bracket (startBrokerServer case parseURI (T.unpack $ mqttUri cfg) of
[Connector.makeBrokerBackend txml (account cfg)] Just uri -> do
ctx withMQTTTradeSink uri mqttTradeSinkTopic $ \tsMqtt -> do
(brokerEndpoint cfg) txml <- Connector.start logger cfg qssChannel tisH
(brokerNotificationsEndpoint cfg) bracket (startBrokerServer
(NotificationSqnum 1) [Connector.makeBrokerBackend txml (account cfg)]
[] ctx
defaultServerSecurityParams (brokerEndpoint cfg)
logger) (\x -> do (brokerNotificationsEndpoint cfg)
stopBrokerServer x (NotificationSqnum 1)
log Info "main" "Stopping TXMLConnector" [tsDashboard, tsMqtt]
Connector.stop txml) $ \_ -> do defaultServerSecurityParams
withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml tisH logger id $ \_ -> do logger) (\x -> do
forever $ threadDelay 1000000 stopBrokerServer x
log Info "main" "Stopping TXMLConnector"
Connector.stop txml) $ \_ -> do
withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml tisH logger id $ \_ -> do
forever $ threadDelay 1000000
Nothing -> log Warning "main" "Can't parse MQTT URI"
log Info "main" "Shutting down" log Info "main" "Shutting down"
where
mqttTradeSinkTopic = "/atrade/trades"

9
src/TXMLConnector.hs

@ -357,9 +357,11 @@ workThread = do
case item of case item of
MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown
MainQueuePingServer -> do MainQueuePingServer -> do
maybeServerStatus<- liftIO $ sendCommand $ toXml CommandServerStatus maybeServerStatus <- liftIO $ sendCommand $ toXml CommandServerStatus
case maybeServerStatus of case maybeServerStatus of
Left serverStatusRaw -> void $ liftIO $ parseAndWrite queue logger serverStatusRaw Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of
((TransaqResponseResult (ResponseFailure _)):_) -> liftIO $ atomically $ writeTVar serverConn StageConnection
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw
Right () -> pure () Right () -> pure ()
MainQueueTransaqData transaqData -> do MainQueueTransaqData transaqData -> do
tm <- asks tickMap tm <- asks tickMap
@ -437,6 +439,7 @@ workThread = do
log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <> log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <>
(T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId (T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId
Just (TransaqResponseResult (ResponseFailure err)) -> do Just (TransaqResponseResult (ResponseFailure err)) -> do
brState <- asks brokerState
log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of case maybeCb of
@ -619,7 +622,7 @@ workThread = do
log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]" log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]"
liftIO $ threadDelay (1000 * 1000 * 10) liftIO $ threadDelay (1000 * 1000 * 10)
Right _ -> do Right _ -> do
log Warning "TXMLConnector.WorkThread" "Connected" log Info "TXMLConnector.WorkThread" "Connected"
conn <- asks serverConnected conn <- asks serverConnected
liftIO . atomically $ writeTVar conn StageGetInfo liftIO . atomically $ writeTVar conn StageGetInfo
-- item <- atomically $ readTBQueue queue -- item <- atomically $ readTBQueue queue

3
transaq-connector.cabal

@ -31,7 +31,7 @@ executable transaq-connector
build-depends: base >= 4.7 && < 5 build-depends: base >= 4.7 && < 5
, dhall , dhall
, eventcounters , eventcounters
, libatrade , libatrade == 0.14.0.0
, text , text
, transformers , transformers
, co-log , co-log
@ -52,6 +52,7 @@ executable transaq-connector
, binary , binary
, bimap , bimap
, deque , deque
, network-uri
extra-lib-dirs: lib extra-lib-dirs: lib
ghc-options: -Wall ghc-options: -Wall
-Wcompat -Wcompat

Loading…
Cancel
Save