{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE OverloadedLabels #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeApplications #-} module TXMLConnector ( start ) where import ATrade.Logging (Message, Severity (..), logWith) import Colog (LogAction) import Config (SubscriptionConfig (SubscriptionConfig), TransaqConnectorConfig (..), transaqHost, transaqLogLevel, transaqLogPath, transaqLogin, transaqPassword, transaqPort) import Control.Concurrent (ThreadId, forkIO, threadDelay) import Control.Concurrent.STM (TVar, atomically, modifyTVar', newTVarIO, readTVarIO, writeTVar) import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue, readTBQueue, writeTBQueue) import Control.Monad (forever, void) import Data.Maybe (mapMaybe) import qualified Data.Text as T import Text.XML.Light.Input (parseXML) import Text.XML.Light.Types (Content (Elem), Element (elName), QName (qName)) import Transaq (AllTradesTrade (..), CommandConnect (..), CommandDisconnect (CommandDisconnect), CommandSubscribe (..), ConnectionState (Disconnected), Language (LanguageEn), MarketInfo (..), Quotation (..), ResponseAllTrades (ResponseAllTrades), ResponseCandleKinds (ResponseCandleKinds), ResponseMarkets (ResponseMarkets), ResponseQuotations (ResponseQuotations), ResponseQuotes (ResponseQuotes), ResponseSecurities (ResponseSecurities), SecurityId (..), TransaqCommand (toXml), TransaqResponse (..), TransaqResponse (..), TransaqResponseC (fromXml), state) import TXML (LogLevel, freeCallback, initialize, sendCommand, setCallback) import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) import ATrade.Types (DataType (BestBid, BestOffer, LastTradePrice), Tick (..), TickerId, fromDouble) import Control.Concurrent.BoundedChan (BoundedChan, writeChan) import Control.Monad (forM_) import qualified Data.Map.Strict as M import Data.Time.Clock (UTCTime, getCurrentTime) import qualified Transaq import qualified TXML data ConnectionParams = ConnectionParams { cpLogin :: T.Text , cpPassword :: T.Text , cpHost :: T.Text , cpPort :: Int , cpLogPath :: T.Text , cpLogLevel :: LogLevel } deriving (Show, Eq, Ord) data TXMLConnectorHandle = TXMLConnectorHandle { threadId :: ThreadId, notificationQueue :: TBQueue TransaqResponse } data ConnectionStage = StageConnection | StageGetInfo | StageConnected deriving (Eq, Show, Ord) data TickKey = TickKey TickerId DataType deriving (Show, Ord, Eq) start :: LogAction IO Message -> TransaqConnectorConfig -> BoundedChan QuoteSourceServerData -> IO TXMLConnectorHandle start logger config qssChannel = do logWith logger Info "TXMLConnector" "Starting" notificationQueue <- atomically $ newTBQueue 50000 tickTable <- newTVarIO M.empty threadId <- forkIO (workThread logger config notificationQueue qssChannel tickTable) return $ TXMLConnectorHandle {..} workThread :: LogAction IO Message -> TransaqConnectorConfig -> TBQueue TransaqResponse -> BoundedChan QuoteSourceServerData -> TVar (M.Map TickKey Tick) -> IO () workThread logger config queue qssChannel tickMap = do rc <- initialize (transaqLogPath config) (parseTransaqLogLevel $ transaqLogLevel config) case rc of Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str Right _ -> do rc <- setCallback parseAndWrite case rc of Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" Just cb -> do serverConnected <- newTVarIO StageConnection void $ forever $ do connStatus <- readTVarIO serverConnected case connStatus of StageConnection -> handleUnconnected serverConnected StageGetInfo -> handleGetInfo serverConnected StageConnected -> handleConnected serverConnected freeCallback cb where log = logWith logger parseTransaqLogLevel 1 = TXML.Warning parseTransaqLogLevel 3 = TXML.Debug parseTransaqLogLevel _ = TXML.Info parseAndWrite xml = do let parsed = mapMaybe parseContent $ parseXML xml log Debug "TXML.Callback" $ "Parsed entities: " <> (T.pack . show . length) parsed log Debug "TXML.Callback" $ "parsed xml: " <> (T.pack . show) (parseXML xml) log Debug "TXML.Callback" $ "parsed: " <> (T.pack . show) xml mapM_ writeToQueue parsed pure True parseContent (Elem el) = parseElement el parseContent _ = Nothing parseElement el = case qName $ elName el of "candles" -> TransaqResponseCandles <$> fromXml el "server_status" -> TransaqResponseServerStatus <$> fromXml el "markets" -> TransaqResponseMarkets <$> fromXml el "candlekinds" -> TransaqResponseCandleKinds <$> fromXml el "securities" -> TransaqResponseSecurities <$> fromXml el "sec_info" -> TransaqResponseSecInfo <$> fromXml el "quotations" -> TransaqResponseQuotations <$> fromXml el "alltrades" -> TransaqResponseAllTrades <$> fromXml el "quotes" -> TransaqResponseQuotes <$> fromXml el _ -> Nothing writeToQueue resp = atomically $ writeTBQueue queue resp handleConnected serverConnected = do item <- atomically $ readTBQueue queue case item of TransaqResponseAllTrades (ResponseAllTrades trades) -> do let ticks = fmap allTradeToTick trades forM_ ticks (writeChan qssChannel . QSSTick) forM_ ticks insertToTickMap TransaqResponseQuotations (ResponseQuotations quotations) -> do now <- getCurrentTime let ticks = concatMap (quotationToTicks now) quotations forM_ ticks (writeChan qssChannel . QSSTick) forM_ ticks insertToTickMap _ -> pure () handleGetInfo serverConnected = do item <- atomically $ readTBQueue queue case item of TransaqResponseServerStatus serverStatus -> case state serverStatus of Transaq.Disconnected -> do log Warning "TXMLConnector.WorkThread" "Server disconnected" atomically $ writeTVar serverConnected StageConnection Transaq.Connected -> do log Info "TXMLConnector.WorkThread" "Server connected" atomically $ writeTVar serverConnected StageConnected v <- makeSubscriptions config case v of Left errmsg -> do log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg void $ sendCommand $ toXml CommandDisconnect Right _ -> log Info "TXMLConnector.WorkThread" "Subscriptions done" Transaq.Error errmsg -> do log Warning "TXMLConnector.WorkThread" $ "Connection error: " <> errmsg atomically $ writeTVar serverConnected StageConnection TransaqResponseResult result -> log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result -- TODO: handle order response TransaqResponseCandles candles -> log Debug "TXMLConnector.WorkThread" $ "Incoming candles message: " <> (T.pack . show . length . Transaq.candles $ candles) -- TODO: Pass to qhp TransaqResponseMarkets (ResponseMarkets markets) -> do log Debug "TXMLConnector.WorkThread" "Incoming markets:" forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m) -- TODO: Pass to qtis TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do log Debug "TXMLConnector.WorkThread" "Incoming candle kinds:" forM_ kinds (log Debug "TXMLConnector.WorkThread" . (T.pack . show)) -- TODO: Pass to qtis, maybe something else? TransaqResponseSecurities (ResponseSecurities securities) -> do log Debug "TXMLConnector.WorkThread" "Incoming securities:" forM_ securities (log Debug "TXMLConnector.WorkThread" . (T.pack . show)) -- TODO: Pass to qtis TransaqResponseSecInfo secInfo -> log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo -- TODO: Pass to qtis TransaqResponseQuotations (ResponseQuotations quotations) -> do log Debug "TXMLConnector.WorkThread" "Incoming quotations:" forM_ quotations (log Debug "TXMLConnector.WorkThread" . (T.pack . show)) -- Pass to ticktable and quotesource server TransaqResponseQuotes (ResponseQuotes quotes) -> do log Debug "TXMLConnector.WorkThread" "Incoming quotes:" forM_ quotes (log Debug "TXMLConnector.WorkThread" . (T.pack . show)) -- Pass to quotesource server _ -> pure () handleUnconnected serverConnected = do log Debug "TXMLConnector.WorkThread" "Sending connect command" v <- sendCommand $ toXml $ CommandConnect { login = transaqLogin config, password = transaqPassword config, host = transaqHost config, port = transaqPort config, language = LanguageEn, autopos = False, micexRegisters = True, milliseconds = True, utcTime = True, proxy = (), rqDelay = Nothing, sessionTimeout = Nothing, requestTimeout = Nothing, pushULimits = Nothing, pushPosEquity = Nothing } case v of Left err -> do log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]" threadDelay (1000 * 1000 * 10) Right _ -> do atomically $ writeTVar serverConnected StageGetInfo -- item <- atomically $ readTBQueue queue -- case item of -- TransaqResponseServerStatus status -> do -- case state status of -- Transaq.Error errmsg -> do -- log Warning "TXMLConnector.WorkThread" $ "Unable to connect: " <> errmsg -- void $ sendCommand $ toXml CommandDisconnect -- threadDelay (10 * 1000 * 1000) -- Transaq.Connected -> do -- atomically $ writeTVar serverConnected StageGetInfo -- -- v <- makeSubscriptions config -- -- case v of -- -- Left errmsg -> do -- -- log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg -- -- void $ sendCommand $ toXml CommandDisconnect -- -- Right _ -> -- Transaq.Disconnected -> do -- log Warning "TXMLConnector.WorkThread" "Unable to connect (disconnected)" -- threadDelay (10 * 1000 * 1000) -- other -> do -- log Warning "TXMLConnector.WorkThread" $ "Stray message: " <> (T.pack . show) other -- threadDelay (1000 * 1000) makeSubscriptions config = sendCommand $ toXml $ CommandSubscribe { alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config), quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config), quotes = fmap subscriptionToSecurityId (quotesSubscriptions config) } subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code insertToTickMap tick = atomically $ modifyTVar' tickMap (M.insert (TickKey (security tick) (datatype tick)) tick) allTradeToTick :: AllTradesTrade -> Tick allTradeToTick att = Tick { security = attBoard att <> "#" <> attSecCode att, datatype = LastTradePrice, timestamp = attTimestamp att, value = fromDouble $ attPrice att, volume = fromIntegral $ attQuantity att } quotationToTicks :: UTCTime -> Quotation -> [Tick] quotationToTicks timestamp q = let security = qBoard q <> "#" <> qSeccode q in [ Tick { security = security, datatype = BestBid, timestamp = timestamp, value = fromDouble $ qBid q, volume = fromIntegral $ qQuantity q }, Tick { security = security, datatype = BestOffer, timestamp = timestamp, value = fromDouble $ qOffer q, volume = fromIntegral $ qQuantity q }]