{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE OverloadedLabels #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE TypeApplications #-} module TXMLConnector ( start , Request(..) , HistoryRequest(..) , Response(..) , HistoryResponse(..) , makeRequest , TXMLConnectorHandle ) where import ATrade.Logging (Message, Severity (..), log, logWith) import Colog (HasLog (getLogAction, setLogAction), LogAction (LogAction, unLogAction)) import Config (SubscriptionConfig (SubscriptionConfig), TransaqConnectorConfig (..), transaqHost, transaqLogLevel, transaqLogPath, transaqLogin, transaqPassword, transaqPort) import Control.Concurrent (ThreadId, forkIO, threadDelay) import Control.Concurrent.STM (TVar, atomically, modifyTVar', newEmptyTMVar, newEmptyTMVarIO, newTVarIO, orElse, putTMVar, readTMVar, readTVar, readTVarIO, takeTMVar, tryReadTMVar, writeTVar) import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue, readTBQueue, writeTBQueue) import Control.Monad (forever, void, when) import Data.Maybe (mapMaybe) import qualified Data.Text as T import Text.XML.Light.Input (parseXML) import Text.XML.Light.Types (Content (Elem), Element (elName), QName (qName)) import Transaq (AllTradesTrade (..), Candle (..), CommandConnect (..), CommandDisconnect (CommandDisconnect), CommandGetHistoryData (CommandGetHistoryData), CommandSubscribe (..), ConnectionState (Disconnected), Language (LanguageEn), MarketInfo (..), Quotation (..), ResponseAllTrades (ResponseAllTrades), ResponseCandleKinds (ResponseCandleKinds), ResponseCandles (..), ResponseCandlesStatus (StatusPending), ResponseMarkets (ResponseMarkets), ResponseQuotations (ResponseQuotations), ResponseQuotes (ResponseQuotes), ResponseSecurities (ResponseSecurities), Security (..), SecurityId (..), TransaqCommand (toXml), TransaqResponse (..), TransaqResponse (..), TransaqResponseC (fromXml), kCandleKindId, kPeriod, state, status) import TXML (LogLevel, freeCallback, initialize, sendCommand, setCallback) import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) import ATrade.Types (Bar (..), BarTimeframe (unBarTimeframe), DataType (BestBid, BestOffer, LastTradePrice), Tick (..), TickerId, fromDouble) import Colog.Monad (WithLog) import Control.Concurrent.BoundedChan (BoundedChan, writeChan) import Control.Concurrent.STM.TMVar (TMVar) import Control.Monad (forM_) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Reader (ReaderT (runReaderT)) import Control.Monad.Reader.Class (MonadReader, asks) import qualified Data.Map.Strict as M import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime) import Prelude hiding (log) import TickerInfoServer (TickerInfo (..), TickerInfoServerHandle, putTickerInfo) import qualified Transaq import qualified TXML data ConnectionParams = ConnectionParams { cpLogin :: T.Text , cpPassword :: T.Text , cpHost :: T.Text , cpPort :: Int , cpLogPath :: T.Text , cpLogLevel :: LogLevel } deriving (Show, Eq, Ord) data HistoryRequest = HistoryRequest { hrTickerId :: TickerId , hrTimeframe :: BarTimeframe , hrCount :: Int , hrReset :: Bool } deriving (Show, Eq, Ord) newtype Request = Request HistoryRequest deriving (Show, Eq, Ord) data Response = ResponseHistory HistoryResponse | ResponseTimeout data HistoryResponse = HistoryResponse { hrBars :: [Bar] , hrMoreData :: Bool } deriving (Show, Eq) data TXMLConnectorHandle = TXMLConnectorHandle { threadId :: ThreadId , notificationQueue :: TBQueue TransaqResponse , hRequestVar :: TMVar Request , hResponseVar :: TMVar (TMVar Response) , hRequestTimestamp :: TVar UTCTime } data ConnectionStage = StageConnection | StageGetInfo | StageConnected deriving (Eq, Show, Ord) data MainQueueData = MainQueueTransaqData TransaqResponse | MainQueueRequest Request deriving (Eq, Show, Ord) data TickKey = TickKey TickerId DataType deriving (Show, Ord, Eq) data Env = Env { qssChannel :: BoundedChan QuoteSourceServerData , tisHandle :: TickerInfoServerHandle , requestVar :: TMVar Request , responseVar :: TMVar (TMVar Response) , requestTimestamp :: TVar UTCTime , currentCandles :: TVar [Candle] , tickMap :: TVar (M.Map TickKey Tick) , transaqQueue :: TBQueue TransaqResponse , logger :: LogAction IO Message , config :: TransaqConnectorConfig , serverConnected :: TVar ConnectionStage , candleKindMap :: TVar (M.Map Int Int) } newtype App a = App { unApp :: ReaderT Env IO a } deriving (Monad, Applicative, Functor, MonadIO, MonadReader Env) instance HasLog Env Message App where getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) } setLogAction _ env = env -- fuck it start :: LogAction IO Message -> TransaqConnectorConfig -> BoundedChan QuoteSourceServerData -> TickerInfoServerHandle -> IO TXMLConnectorHandle start logger config qssChannel tisH = do logWith logger Info "TXMLConnector" "Starting" notificationQueue <- atomically $ newTBQueue 50000 tickTable <- newTVarIO M.empty requestVar <- newEmptyTMVarIO responseVar <- newEmptyTMVarIO currentCandles <- newTVarIO [] serverConnected <- liftIO $ newTVarIO StageConnection candleKindMap <- newTVarIO M.empty requestTimestamp <- getCurrentTime >>= newTVarIO let env = Env { qssChannel = qssChannel , tisHandle = tisH , requestVar = requestVar , responseVar = responseVar , requestTimestamp = requestTimestamp , currentCandles = currentCandles , tickMap = tickTable , transaqQueue = notificationQueue , logger = logger , config = config , serverConnected = serverConnected , candleKindMap = candleKindMap } threadId <- forkIO $ (runReaderT . unApp) workThread env return $ TXMLConnectorHandle { threadId = threadId , notificationQueue = notificationQueue , hRequestVar = requestVar , hResponseVar = responseVar } workThread :: App () workThread = do cfg <- asks config rc <- liftIO $ initialize (transaqLogPath cfg) (parseTransaqLogLevel $ transaqLogLevel cfg) case rc of Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str Right _ -> do queue <- asks transaqQueue logger' <- asks logger rc <- liftIO $ setCallback (parseAndWrite queue logger') case rc of Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" Just cb -> do void $ forever $ do connStatus <- asks serverConnected >>= (liftIO . readTVarIO) case connStatus of StageConnection -> handleUnconnected StageGetInfo -> handleGetInfo StageConnected -> handleConnected liftIO $ freeCallback cb where parseTransaqLogLevel 1 = TXML.Warning parseTransaqLogLevel 3 = TXML.Debug parseTransaqLogLevel _ = TXML.Info parseAndWrite queue logger xml = do let parsed = mapMaybe parseContent $ parseXML xml logWith logger Debug "TXML.Callback" $ "Parsed entities: " <> (T.pack . show . length) parsed mapM_ (writeToQueue queue) parsed pure True parseContent (Elem el) = parseElement el parseContent _ = Nothing parseElement el = case qName $ elName el of "candles" -> TransaqResponseCandles <$> fromXml el "server_status" -> TransaqResponseServerStatus <$> fromXml el "markets" -> TransaqResponseMarkets <$> fromXml el "candlekinds" -> TransaqResponseCandleKinds <$> fromXml el "securities" -> TransaqResponseSecurities <$> fromXml el "sec_info" -> TransaqResponseSecInfo <$> fromXml el "quotations" -> TransaqResponseQuotations <$> fromXml el "alltrades" -> TransaqResponseAllTrades <$> fromXml el "quotes" -> TransaqResponseQuotes <$> fromXml el _ -> Nothing writeToQueue queue resp = atomically $ writeTBQueue queue resp handleConnected :: App () handleConnected = do rqVar <- asks requestVar queue <- asks transaqQueue item <- liftIO . atomically $ (MainQueueTransaqData <$> readTBQueue queue) `orElse` (MainQueueRequest <$> takeTMVar rqVar) case item of MainQueueTransaqData transaqData -> do tm <- asks tickMap case transaqData of TransaqResponseAllTrades (ResponseAllTrades trades) -> do qssChan <- asks qssChannel let ticks = fmap allTradeToTick trades forM_ ticks (liftIO . writeChan qssChan . QSSTick) forM_ ticks (insertToTickMap tm) TransaqResponseQuotations (ResponseQuotations quotations) -> do qssChan <- asks qssChannel now <- liftIO getCurrentTime let ticks = concatMap (quotationToTicks now) quotations forM_ ticks (liftIO . writeChan qssChan . QSSTick) forM_ ticks (insertToTickMap tm) TransaqResponseCandles respCandle -> do resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle) case resp of Just tmvar -> if cStatus respCandle == StatusPending then do cur <- asks currentCandles liftIO $ atomically . modifyTVar' cur $ (\c -> cCandles respCandle <> c) else do cur <- asks currentCandles liftIO $ atomically $ do candles <- readTVar cur putTMVar tmvar $ ResponseHistory $ HistoryResponse { hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles) , hrMoreData = False } _ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var" _ -> pure () MainQueueRequest (Request request) -> do cur <- asks currentCandles liftIO $ atomically $ writeTVar cur [] maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO) case maybeCk of Just candleKindId -> do case parseSecurityId (hrTickerId request) of Just secId -> void $ liftIO . sendCommand $ toXml CommandGetHistoryData { security = secId , periodId = candleKindId , count = hrCount request , reset = hrReset request } _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request _ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request) checkRequestTimeout requestTimeout = 10 checkRequestTimeout = do now <- liftIO getCurrentTime tsVar <- asks requestTimestamp ts <- liftIO $ readTVarIO tsVar when (now `diffUTCTime` ts >= requestTimeout) $ do resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar case resp of Just tmvar -> do log Warning "TXMLConnector.WorkThread" "Request timeout" liftIO . atomically . putTMVar tmvar $ ResponseTimeout _ -> pure () handleGetInfo :: App () handleGetInfo = do queue <- asks transaqQueue cfg <- asks config item <- liftIO . atomically $ readTBQueue queue conn <- asks serverConnected case item of TransaqResponseServerStatus serverStatus -> case state serverStatus of Transaq.Disconnected -> do log Warning "TXMLConnector.WorkThread" "Server disconnected" liftIO . atomically $ writeTVar conn StageConnection Transaq.Connected -> do log Info "TXMLConnector.WorkThread" "Server connected" liftIO . atomically $ writeTVar conn StageConnected v <- makeSubscriptions cfg case v of Left errmsg -> do log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg void $ liftIO . sendCommand $ toXml CommandDisconnect Right _ -> log Info "TXMLConnector.WorkThread" "Subscriptions done" Transaq.Error errmsg -> do log Warning "TXMLConnector.WorkThread" $ "Connection error: " <> errmsg liftIO . atomically $ writeTVar conn StageConnection TransaqResponseResult result -> log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result -- TODO: handle order response TransaqResponseCandles candles -> log Debug "TXMLConnector.WorkThread" $ "Incoming candles message: " <> (T.pack . show . length . Transaq.cCandles $ candles) -- TODO: Pass to qhp TransaqResponseMarkets (ResponseMarkets markets) -> do log Debug "TXMLConnector.WorkThread" "Incoming markets:" forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m) -- TODO: Pass to qtis TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do ckMap <- asks candleKindMap log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k))) TransaqResponseSecurities (ResponseSecurities securities) -> do tisH <- asks tisHandle let tickerInfos = securityToTickerInfo <$> securities log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker) forM_ tickerInfos (liftIO . putTickerInfo tisH) TransaqResponseSecInfo secInfo -> log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo -- TODO: Pass to qtis _ -> pure () handleUnconnected :: App () handleUnconnected = do cfg <- asks config log Debug "TXMLConnector.WorkThread" "Sending connect command" v <- liftIO . sendCommand . toXml $ CommandConnect { login = transaqLogin cfg, password = transaqPassword cfg, host = transaqHost cfg, port = transaqPort cfg, language = LanguageEn, autopos = False, micexRegisters = True, milliseconds = True, utcTime = True, proxy = (), rqDelay = Nothing, sessionTimeout = Nothing, requestTimeout = Nothing, pushULimits = Nothing, pushPosEquity = Nothing } case v of Left err -> do log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]" liftIO $ threadDelay (1000 * 1000 * 10) Right _ -> do conn <- asks serverConnected liftIO . atomically $ writeTVar conn StageGetInfo -- item <- atomically $ readTBQueue queue -- case item of -- TransaqResponseServerStatus status -> do -- case state status of -- Transaq.Error errmsg -> do -- log Warning "TXMLConnector.WorkThread" $ "Unable to connect: " <> errmsg -- void $ sendCommand $ toXml CommandDisconnect -- threadDelay (10 * 1000 * 1000) -- Transaq.Connected -> do -- atomically $ writeTVar serverConnected StageGetInfo -- -- v <- makeSubscriptions config -- -- case v of -- -- Left errmsg -> do -- -- log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg -- -- void $ sendCommand $ toXml CommandDisconnect -- -- Right _ -> -- Transaq.Disconnected -> do -- log Warning "TXMLConnector.WorkThread" "Unable to connect (disconnected)" -- threadDelay (10 * 1000 * 1000) -- other -> do -- log Warning "TXMLConnector.WorkThread" $ "Stray message: " <> (T.pack . show) other -- threadDelay (1000 * 1000) makeSubscriptions config = liftIO . sendCommand . toXml $ CommandSubscribe { alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config), quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config), quotes = fmap subscriptionToSecurityId (quotesSubscriptions config) } subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code insertToTickMap tickMap tick = liftIO . atomically $ modifyTVar' tickMap (M.insert (TickKey (security tick) (datatype tick)) tick) allTradeToTick :: AllTradesTrade -> Tick allTradeToTick att = Tick { security = attBoard att <> "#" <> attSecCode att, datatype = LastTradePrice, timestamp = attTimestamp att, value = fromDouble $ attPrice att, volume = fromIntegral $ attQuantity att } quotationToTicks :: UTCTime -> Quotation -> [Tick] quotationToTicks timestamp q = let security = qBoard q <> "#" <> qSeccode q in [ Tick { security = security, datatype = BestBid, timestamp = timestamp, value = fromDouble $ qBid q, volume = fromIntegral $ qQuantity q }, Tick { security = security, datatype = BestOffer, timestamp = timestamp, value = fromDouble $ qOffer q, volume = fromIntegral $ qQuantity q }] securityToTickerInfo :: Security -> TickerInfo securityToTickerInfo sec = TickerInfo { tiTicker = sBoard sec <> "#" <> sSeccode sec , tiLotSize = sLotSize sec , tiTickSize = sMinStep sec } parseSecurityId :: TickerId -> Maybe SecurityId parseSecurityId tickerId = case T.findIndex (== '#') tickerId of Just ix -> Just $ SecurityId (T.take ix tickerId) (T.drop (ix + 1) tickerId) Nothing -> Nothing makeTickerId :: SecurityId -> TickerId makeTickerId sec = board sec <> "#" <> seccode sec makeRequest :: TXMLConnectorHandle -> Request -> IO Response makeRequest h request = do now <- getCurrentTime resp <- atomically $ do resp <- newEmptyTMVar writeTVar (hRequestTimestamp h) now putTMVar (hResponseVar h) resp putTMVar (hRequestVar h) request pure resp atomically $ do void $ takeTMVar (hResponseVar h) takeTMVar resp candleToBar :: SecurityId -> Candle -> Bar candleToBar sec candle = Bar { barSecurity = makeTickerId sec , barTimestamp = cTimestamp candle , barOpen = fromDouble (cOpen candle) , barHigh = fromDouble (cHigh candle) , barLow = fromDouble (cLow candle) , barClose = fromDouble (cClose candle) , barVolume = fromIntegral $ cVolume candle }