From 61cdf6cc286ea88741ad3b38144ed2216670f9e9 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 25 Mar 2023 09:57:48 +0700 Subject: [PATCH] txmlconnector: refactor workthread in ReaderT --- src/TXMLConnector.hs | 256 ++++++++++++++++++++++++++++------------ transaq-connector.cabal | 1 + 2 files changed, 182 insertions(+), 75 deletions(-) diff --git a/src/TXMLConnector.hs b/src/TXMLConnector.hs index 1d44703..7553ae3 100644 --- a/src/TXMLConnector.hs +++ b/src/TXMLConnector.hs @@ -1,16 +1,20 @@ -{-# LANGUAGE DuplicateRecordFields #-} -{-# LANGUAGE OverloadedLabels #-} -{-# LANGUAGE RecordWildCards #-} -{-# LANGUAGE TypeApplications #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE OverloadedLabels #-} +{-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE TypeApplications #-} module TXMLConnector ( start ) where -import ATrade.Logging (Message, Severity (..), +import ATrade.Logging (Message, Severity (..), log, logWith) -import Colog (LogAction) +import Colog (HasLog (getLogAction, setLogAction), + LogAction (LogAction, unLogAction)) import Config (SubscriptionConfig (SubscriptionConfig), TransaqConnectorConfig (..), transaqHost, transaqLogLevel, @@ -18,7 +22,8 @@ import Config (SubscriptionConfig (Subscriptio transaqPassword, transaqPort) import Control.Concurrent (ThreadId, forkIO, threadDelay) import Control.Concurrent.STM (TVar, atomically, modifyTVar', - newTVarIO, readTVarIO, + newEmptyTMVarIO, newTVarIO, + orElse, readTMVar, readTVarIO, writeTVar) import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue, readTBQueue, writeTBQueue) @@ -32,6 +37,7 @@ import Text.XML.Light.Types (Content (Elem), import Transaq (AllTradesTrade (..), CommandConnect (..), CommandDisconnect (CommandDisconnect), + CommandGetHistoryData (CommandGetHistoryData), CommandSubscribe (..), ConnectionState (Disconnected), Language (LanguageEn), @@ -48,20 +54,27 @@ import Transaq (AllTradesTrade (..), TransaqResponse (..), TransaqResponse (..), TransaqResponseC (fromXml), - state) + kCandleKindId, kPeriod, state) import TXML (LogLevel, freeCallback, initialize, sendCommand, setCallback) import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) -import ATrade.Types (BarTimeframe, DataType (BestBid, BestOffer, LastTradePrice), +import ATrade.Types (Bar, + BarTimeframe (unBarTimeframe), + DataType (BestBid, BestOffer, LastTradePrice), Tick (..), TickerId, fromDouble) +import Colog.Monad (WithLog) import Control.Concurrent.BoundedChan (BoundedChan, writeChan) import Control.Concurrent.STM.TMVar (TMVar) import Control.Monad (forM_) +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Control.Monad.Reader (ReaderT (runReaderT)) +import Control.Monad.Reader.Class (MonadReader, asks) import qualified Data.Map.Strict as M import Data.Time.Clock (UTCTime, getCurrentTime) +import Prelude hiding (log) import TickerInfoServer (TickerInfo (..), TickerInfoServerHandle, putTickerInfo) @@ -83,31 +96,70 @@ data ConnectionParams = data HistoryRequest = HistoryRequest { - hrTIckerId :: TickerId + hrTickerId :: TickerId , hrTimeframe :: BarTimeframe - , hrStartTime :: UTCTime - , hrEndTime :: UTCTime + , hrCount :: Int + , hrReset :: Bool } deriving (Show, Eq, Ord) -data Request = +newtype Request = Request HistoryRequest deriving (Show, Eq, Ord) +data HistoryResponse = + HistoryResponse + { + hrBars :: [Bar] + , hrMoreData :: Bool + } + deriving (Show, Eq) + +newtype Response = + Response HistoryResponse + deriving (Show, Eq) + data TXMLConnectorHandle = TXMLConnectorHandle { threadId :: ThreadId, notificationQueue :: TBQueue TransaqResponse, - requestVar :: TMVar Request, - responseVar :: TMVar Response + hRequestVar :: TMVar Request, + hResponseVar :: TMVar Response } data ConnectionStage = StageConnection | StageGetInfo | StageConnected deriving (Eq, Show, Ord) +data MainQueueData = + MainQueueTransaqData TransaqResponse + | MainQueueRequest Request + deriving (Eq, Show, Ord) + data TickKey = TickKey TickerId DataType deriving (Show, Ord, Eq) +data Env = + Env + { + qssChannel :: BoundedChan QuoteSourceServerData + , tisHandle :: TickerInfoServerHandle + , requestVar :: TMVar Request + , responseVar :: TMVar Response + , tickMap :: TVar (M.Map TickKey Tick) + , transaqQueue :: TBQueue TransaqResponse + , logger :: LogAction IO Message + , config :: TransaqConnectorConfig + , serverConnected :: TVar ConnectionStage + , candleKindMap :: TVar (M.Map Int Int) + } + +newtype App a = App { unApp :: ReaderT Env IO a } + deriving (Monad, Applicative, Functor, MonadIO, MonadReader Env) + +instance HasLog Env Message App where + getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) } + setLogAction _ env = env -- fuck it + start :: LogAction IO Message -> TransaqConnectorConfig @@ -118,45 +170,61 @@ start logger config qssChannel tisH = do logWith logger Info "TXMLConnector" "Starting" notificationQueue <- atomically $ newTBQueue 50000 tickTable <- newTVarIO M.empty - threadId <- forkIO (workThread logger config notificationQueue qssChannel tickTable tisH) - return $ TXMLConnectorHandle {..} + requestVar <- newEmptyTMVarIO + responseVar <- newEmptyTMVarIO + serverConnected <- liftIO $ newTVarIO StageConnection + candleKindMap <- newTVarIO M.empty + let env = + Env + { + qssChannel = qssChannel + , tisHandle = tisH + , requestVar = requestVar + , responseVar = responseVar + , tickMap = tickTable + , transaqQueue = notificationQueue + , logger = logger + , config = config + , serverConnected = serverConnected + , candleKindMap = candleKindMap + } + threadId <- forkIO $ (runReaderT . unApp) workThread env + return $ TXMLConnectorHandle + { + threadId = threadId + , notificationQueue = notificationQueue + , hRequestVar = requestVar + , hResponseVar = responseVar + } -workThread :: - LogAction IO Message - -> TransaqConnectorConfig - -> TBQueue TransaqResponse - -> BoundedChan QuoteSourceServerData - -> TVar (M.Map TickKey Tick) - -> TickerInfoServerHandle - -> IO () -workThread logger config queue qssChannel tickMap tisH = do - rc <- initialize (transaqLogPath config) (parseTransaqLogLevel $ transaqLogLevel config) +workThread :: App () +workThread = do + cfg <- asks config + rc <- liftIO $ initialize (transaqLogPath cfg) (parseTransaqLogLevel $ transaqLogLevel cfg) case rc of Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str Right _ -> do - rc <- setCallback parseAndWrite + queue <- asks transaqQueue + logger' <- asks logger + rc <- liftIO $ setCallback (parseAndWrite queue logger') case rc of Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" Just cb -> do - serverConnected <- newTVarIO StageConnection void $ forever $ do - connStatus <- readTVarIO serverConnected + connStatus <- asks serverConnected >>= (liftIO . readTVarIO) case connStatus of - StageConnection -> handleUnconnected serverConnected - StageGetInfo -> handleGetInfo serverConnected - StageConnected -> handleConnected serverConnected - freeCallback cb + StageConnection -> handleUnconnected + StageGetInfo -> handleGetInfo + StageConnected -> handleConnected + liftIO $ freeCallback cb where - log = logWith logger parseTransaqLogLevel 1 = TXML.Warning parseTransaqLogLevel 3 = TXML.Debug parseTransaqLogLevel _ = TXML.Info - parseAndWrite xml = do + parseAndWrite queue logger xml = do let parsed = mapMaybe parseContent $ parseXML xml - log Debug "TXML.Callback" $ "Parsed entities: " <> (T.pack . show . length) parsed - log Debug "TXML.Callback" $ "parsed xml: " <> (T.pack . show) (parseXML xml) - log Debug "TXML.Callback" $ "parsed: " <> (T.pack . show) xml - mapM_ writeToQueue parsed + logWith logger Debug "TXML.Callback" $ "Parsed entities: " <> (T.pack . show . length) parsed + mapM_ (writeToQueue queue) parsed pure True parseContent (Elem el) = parseElement el parseContent _ = Nothing @@ -171,40 +239,70 @@ workThread logger config queue qssChannel tickMap tisH = do "alltrades" -> TransaqResponseAllTrades <$> fromXml el "quotes" -> TransaqResponseQuotes <$> fromXml el _ -> Nothing - writeToQueue resp = atomically $ writeTBQueue queue resp - handleConnected serverConnected = do - item <- atomically $ readTBQueue queue + writeToQueue queue resp = atomically $ writeTBQueue queue resp + handleConnected :: App () + handleConnected = do + rqVar <- asks requestVar + queue <- asks transaqQueue + item <- liftIO . atomically $ (MainQueueTransaqData <$> readTBQueue queue) `orElse` + (MainQueueRequest <$> readTMVar rqVar) case item of - TransaqResponseAllTrades (ResponseAllTrades trades) -> do - let ticks = fmap allTradeToTick trades - forM_ ticks (writeChan qssChannel . QSSTick) - forM_ ticks insertToTickMap - TransaqResponseQuotations (ResponseQuotations quotations) -> do - now <- getCurrentTime - let ticks = concatMap (quotationToTicks now) quotations - forM_ ticks (writeChan qssChannel . QSSTick) - forM_ ticks insertToTickMap - _ -> pure () - handleGetInfo serverConnected = do - item <- atomically $ readTBQueue queue + MainQueueTransaqData transaqData -> do + tm <- asks tickMap + case transaqData of + TransaqResponseAllTrades (ResponseAllTrades trades) -> do + qssChan <- asks qssChannel + let ticks = fmap allTradeToTick trades + forM_ ticks (liftIO . writeChan qssChan . QSSTick) + forM_ ticks (insertToTickMap tm) + TransaqResponseQuotations (ResponseQuotations quotations) -> do + qssChan <- asks qssChannel + now <- liftIO getCurrentTime + let ticks = concatMap (quotationToTicks now) quotations + forM_ ticks (liftIO . writeChan qssChan . QSSTick) + forM_ ticks (insertToTickMap tm) + TransaqResponseCandles respCandle -> undefined + _ -> pure () + MainQueueRequest (Request request) -> do + maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO) + case maybeCk of + Just candleKindId -> do + case parseSecurityId (hrTickerId request) of + Just secId -> void $ liftIO . sendCommand $ + toXml CommandGetHistoryData + { + security = secId + , periodId = candleKindId + , count = hrCount request + , reset = hrReset request + } + _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request + _ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request) + + handleGetInfo :: App () + handleGetInfo = do + queue <- asks transaqQueue + cfg <- asks config + item <- liftIO . atomically $ readTBQueue queue + conn <- asks serverConnected case item of TransaqResponseServerStatus serverStatus -> case state serverStatus of Transaq.Disconnected -> do log Warning "TXMLConnector.WorkThread" "Server disconnected" - atomically $ writeTVar serverConnected StageConnection + liftIO . atomically $ writeTVar conn StageConnection Transaq.Connected -> do log Info "TXMLConnector.WorkThread" "Server connected" - atomically $ writeTVar serverConnected StageConnected - v <- makeSubscriptions config + liftIO . atomically $ writeTVar conn StageConnected + v <- makeSubscriptions cfg case v of Left errmsg -> do log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg - void $ sendCommand $ toXml CommandDisconnect + void $ liftIO . sendCommand $ toXml CommandDisconnect Right _ -> log Info "TXMLConnector.WorkThread" "Subscriptions done" Transaq.Error errmsg -> do log Warning "TXMLConnector.WorkThread" $ "Connection error: " <> errmsg - atomically $ writeTVar serverConnected StageConnection + liftIO . atomically $ writeTVar conn StageConnection TransaqResponseResult result -> log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result -- TODO: handle order response @@ -217,25 +315,30 @@ workThread logger config queue qssChannel tickMap tisH = do forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m) -- TODO: Pass to qtis TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do - log Debug "TXMLConnector.WorkThread" "Incoming candle kinds:" - forM_ kinds (log Debug "TXMLConnector.WorkThread" . (T.pack . show)) - -- TODO: Pass to qtis, maybe something else? + ckMap <- asks candleKindMap + log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds + forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k))) TransaqResponseSecurities (ResponseSecurities securities) -> do - log Debug "TXMLConnector.WorkThread" "Incoming securities:" - forM_ securities (putTickerInfo tisH . securityToTickerInfo) + tisH <- asks tisHandle + let tickerInfos = securityToTickerInfo <$> securities + log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities + forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker) + forM_ tickerInfos (liftIO . putTickerInfo tisH) TransaqResponseSecInfo secInfo -> log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo -- TODO: Pass to qtis _ -> pure () - handleUnconnected serverConnected = do + handleUnconnected :: App () + handleUnconnected = do + cfg <- asks config log Debug "TXMLConnector.WorkThread" "Sending connect command" - v <- sendCommand $ + v <- liftIO . sendCommand . toXml $ CommandConnect { - login = transaqLogin config, - password = transaqPassword config, - host = transaqHost config, - port = transaqPort config, + login = transaqLogin cfg, + password = transaqPassword cfg, + host = transaqHost cfg, + port = transaqPort cfg, language = LanguageEn, autopos = False, micexRegisters = True, @@ -251,9 +354,10 @@ workThread logger config queue qssChannel tickMap tisH = do case v of Left err -> do log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]" - threadDelay (1000 * 1000 * 10) + liftIO $ threadDelay (1000 * 1000 * 10) Right _ -> do - atomically $ writeTVar serverConnected StageGetInfo + conn <- asks serverConnected + liftIO . atomically $ writeTVar conn StageGetInfo -- item <- atomically $ readTBQueue queue -- case item of -- TransaqResponseServerStatus status -> do @@ -277,7 +381,7 @@ workThread logger config queue qssChannel tickMap tisH = do -- log Warning "TXMLConnector.WorkThread" $ "Stray message: " <> (T.pack . show) other -- threadDelay (1000 * 1000) makeSubscriptions config = - sendCommand $ toXml $ + liftIO . sendCommand . toXml $ CommandSubscribe { alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config), @@ -285,7 +389,7 @@ workThread logger config queue qssChannel tickMap tisH = do quotes = fmap subscriptionToSecurityId (quotesSubscriptions config) } subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code - insertToTickMap tick = atomically $ modifyTVar' tickMap (M.insert (TickKey (security tick) (datatype tick)) tick) + insertToTickMap tickMap tick = liftIO . atomically $ modifyTVar' tickMap (M.insert (TickKey (security tick) (datatype tick)) tick) allTradeToTick :: AllTradesTrade -> Tick allTradeToTick att = @@ -328,3 +432,5 @@ securityToTickerInfo sec = , tiTickSize = sMinStep sec } +parseSecurityId :: TickerId -> Maybe SecurityId +parseSecurityId = undefined diff --git a/transaq-connector.cabal b/transaq-connector.cabal index 4a125ed..e295263 100644 --- a/transaq-connector.cabal +++ b/transaq-connector.cabal @@ -46,6 +46,7 @@ executable transaq-connector , stm , extra , errors + , mtl extra-lib-dirs: lib ghc-options: -Wall -Wcompat