From 4a4687a1a7f0fafad7fcfe92d231285c753b4d32 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 19 Mar 2023 23:05:13 +0700 Subject: [PATCH] TIS support --- src/Main.hs | 5 +- src/TXMLConnector.hs | 54 +++++++++++++------ src/TickerInfoServer.hs | 116 ++++++++++++++++++++++++++++++++++++++++ src/Transaq.hs | 60 ++++++++++----------- transaq-connector.cabal | 1 + 5 files changed, 188 insertions(+), 48 deletions(-) create mode 100644 src/TickerInfoServer.hs diff --git a/src/Main.hs b/src/Main.hs index aed8fe2..d0cbeae 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -26,6 +26,7 @@ import Prelude hiding (log) import System.IO (Handle, IOMode (AppendMode), withFile) import System.ZMQ4 (withContext) +import TickerInfoServer (withTickerInfoServer) import qualified TXMLConnector as Connector import Version (transaqConnectorVersionText) @@ -53,8 +54,8 @@ main = do ctx (quotesourceEndpoint cfg) defaultServerSecurityParams) - stopQuoteSourceServer $ \_ -> do - _ <- Connector.start logger cfg qssChannel + stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do + void $ Connector.start logger cfg qssChannel tisH forever $ threadDelay 1000000 log Info "main" "Shutting down" diff --git a/src/TXMLConnector.hs b/src/TXMLConnector.hs index 16f23bc..1d44703 100644 --- a/src/TXMLConnector.hs +++ b/src/TXMLConnector.hs @@ -43,7 +43,7 @@ import Transaq (AllTradesTrade (..), ResponseQuotations (ResponseQuotations), ResponseQuotes (ResponseQuotes), ResponseSecurities (ResponseSecurities), - SecurityId (..), + Security (..), SecurityId (..), TransaqCommand (toXml), TransaqResponse (..), TransaqResponse (..), @@ -54,13 +54,17 @@ import TXML (LogLevel, freeCallback, setCallback) import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) -import ATrade.Types (DataType (BestBid, BestOffer, LastTradePrice), +import ATrade.Types (BarTimeframe, DataType (BestBid, BestOffer, LastTradePrice), Tick (..), TickerId, fromDouble) import Control.Concurrent.BoundedChan (BoundedChan, writeChan) +import Control.Concurrent.STM.TMVar (TMVar) import Control.Monad (forM_) import qualified Data.Map.Strict as M import Data.Time.Clock (UTCTime, getCurrentTime) +import TickerInfoServer (TickerInfo (..), + TickerInfoServerHandle, + putTickerInfo) import qualified Transaq import qualified TXML @@ -76,11 +80,26 @@ data ConnectionParams = } deriving (Show, Eq, Ord) +data HistoryRequest = + HistoryRequest + { + hrTIckerId :: TickerId + , hrTimeframe :: BarTimeframe + , hrStartTime :: UTCTime + , hrEndTime :: UTCTime + } deriving (Show, Eq, Ord) + +data Request = + Request HistoryRequest + deriving (Show, Eq, Ord) + data TXMLConnectorHandle = TXMLConnectorHandle { threadId :: ThreadId, - notificationQueue :: TBQueue TransaqResponse + notificationQueue :: TBQueue TransaqResponse, + requestVar :: TMVar Request, + responseVar :: TMVar Response } data ConnectionStage = StageConnection | StageGetInfo | StageConnected @@ -93,12 +112,13 @@ start :: LogAction IO Message -> TransaqConnectorConfig -> BoundedChan QuoteSourceServerData + -> TickerInfoServerHandle -> IO TXMLConnectorHandle -start logger config qssChannel = do +start logger config qssChannel tisH = do logWith logger Info "TXMLConnector" "Starting" notificationQueue <- atomically $ newTBQueue 50000 tickTable <- newTVarIO M.empty - threadId <- forkIO (workThread logger config notificationQueue qssChannel tickTable) + threadId <- forkIO (workThread logger config notificationQueue qssChannel tickTable tisH) return $ TXMLConnectorHandle {..} workThread :: @@ -107,8 +127,9 @@ workThread :: -> TBQueue TransaqResponse -> BoundedChan QuoteSourceServerData -> TVar (M.Map TickKey Tick) + -> TickerInfoServerHandle -> IO () -workThread logger config queue qssChannel tickMap = do +workThread logger config queue qssChannel tickMap tisH = do rc <- initialize (transaqLogPath config) (parseTransaqLogLevel $ transaqLogLevel config) case rc of Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str @@ -201,19 +222,10 @@ workThread logger config queue qssChannel tickMap = do -- TODO: Pass to qtis, maybe something else? TransaqResponseSecurities (ResponseSecurities securities) -> do log Debug "TXMLConnector.WorkThread" "Incoming securities:" - forM_ securities (log Debug "TXMLConnector.WorkThread" . (T.pack . show)) - -- TODO: Pass to qtis + forM_ securities (putTickerInfo tisH . securityToTickerInfo) TransaqResponseSecInfo secInfo -> log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo -- TODO: Pass to qtis - TransaqResponseQuotations (ResponseQuotations quotations) -> do - log Debug "TXMLConnector.WorkThread" "Incoming quotations:" - forM_ quotations (log Debug "TXMLConnector.WorkThread" . (T.pack . show)) - -- Pass to ticktable and quotesource server - TransaqResponseQuotes (ResponseQuotes quotes) -> do - log Debug "TXMLConnector.WorkThread" "Incoming quotes:" - forM_ quotes (log Debug "TXMLConnector.WorkThread" . (T.pack . show)) - -- Pass to quotesource server _ -> pure () handleUnconnected serverConnected = do log Debug "TXMLConnector.WorkThread" "Sending connect command" @@ -306,3 +318,13 @@ quotationToTicks timestamp q = value = fromDouble $ qOffer q, volume = fromIntegral $ qQuantity q }] + +securityToTickerInfo :: Security -> TickerInfo +securityToTickerInfo sec = + TickerInfo + { + tiTicker = sBoard sec <> "#" <> sSeccode sec + , tiLotSize = sLotSize sec + , tiTickSize = sMinStep sec + } + diff --git a/src/TickerInfoServer.hs b/src/TickerInfoServer.hs new file mode 100644 index 0000000..7612de3 --- /dev/null +++ b/src/TickerInfoServer.hs @@ -0,0 +1,116 @@ +{-# LANGUAGE RecordWildCards #-} + +module TickerInfoServer + ( + TickerInfoServerHandle, + startTickerInfoServer, + stopTickerInfoServer, + withTickerInfoServer, + putTickerInfo, + getTickerInfo, + TickerInfo(..) + ) where +import ATrade.Logging (Message, Severity (Warning), + logWith) +import ATrade.Types (Tick, TickerId, security) +import Colog (LogAction) +import Control.Concurrent (ThreadId, forkIO) +import Control.Concurrent.STM (TVar, atomically, newTVarIO, + readTVarIO) +import Control.Concurrent.STM.TVar (modifyTVar', writeTVar) +import Control.Exception (bracket) +import Control.Monad.Extra (whileM) +import Data.Aeson (FromJSON (parseJSON), + ToJSON (toJSON), decode, encode, + object, withObject) +import Data.Aeson.Types ((.:), (.=)) +import qualified Data.ByteString.Lazy as BL +import Data.List.NonEmpty (NonEmpty ((:|))) +import qualified Data.Map.Strict as M +import qualified Data.Text as T +import Data.Text.Encoding (decodeUtf8With, encodeUtf8) +import Data.Text.Encoding.Error (lenientDecode) +import Prelude hiding (log) +import System.ZMQ4 (Context, Router (Router), connect, + receiveMulti, sendMulti, + withSocket) + +data TickerInfo = + TickerInfo + { + tiTicker :: TickerId + , tiLotSize :: Int + , tiTickSize :: Double + } deriving (Show, Eq, Ord) + +instance FromJSON TickerInfo where + parseJSON = withObject "TickerInfo" (\obj -> + TickerInfo <$> + obj .: "ticker" <*> + obj .: "lot_size" <*> + obj .: "tick_size") + +instance ToJSON TickerInfo where + toJSON ti = object [ "ticker" .= tiTicker ti, + "lot_size" .= tiLotSize ti, + "tick_size" .= tiTickSize ti ] + +newtype TickerInfoRequest = + TickerInfoRequest + { + tirTickerId :: TickerId + } deriving (Show, Eq, Ord) + +instance FromJSON TickerInfoRequest where + parseJSON = withObject "TickerInfoRequest" (\obj -> + TickerInfoRequest <$> + obj .: "ticker") + +instance ToJSON TickerInfoRequest where + toJSON tir = object [ "ticker" .= tirTickerId tir ] + +data TickerInfoServerHandle = + TickerInfoServerHandle + { + tisThreadId :: ThreadId + , tisRun :: TVar Bool + , tisMap :: TVar (M.Map TickerId TickerInfo) + } + +startTickerInfoServer :: LogAction IO Message -> Context -> T.Text -> IO TickerInfoServerHandle +startTickerInfoServer logger ctx endpoint = do + tisRun <- newTVarIO True + tisMap <- newTVarIO M.empty + tisThreadId <- forkIO $ tisThread tisRun tisMap + pure $ TickerInfoServerHandle {..} + where + log = logWith logger + tisThread tisRun tisMap = withSocket ctx Router $ \sock -> do + connect sock (T.unpack endpoint) + whileM $ do + rq <- receiveMulti sock + case rq of + (sender:message:_) -> case decode (BL.fromStrict message) of + Just tir -> do + maybeTi <- M.lookup (tirTickerId tir) <$> readTVarIO tisMap + case maybeTi of + Just ti -> sendMulti sock (sender :| ["OK", BL.toStrict $ encode ti]) + _ -> do + log Warning "TIS" $ "Requested unknown ticker: " <> tirTickerId tir + sendMulti sock (sender :| ["ERROR"]) + _ -> log Warning "TIS" "Unable to parse incoming request" + _ -> log Warning "TIS" "Malformed packet" + readTVarIO tisRun + +stopTickerInfoServer :: TickerInfoServerHandle -> IO () +stopTickerInfoServer h = atomically $ writeTVar (tisRun h) False + +withTickerInfoServer :: LogAction IO Message -> Context -> T.Text -> (TickerInfoServerHandle -> IO ()) -> IO () +withTickerInfoServer logger ctx endpoint = bracket (startTickerInfoServer logger ctx endpoint) stopTickerInfoServer + +getTickerInfo :: TickerId -> TickerInfoServerHandle -> IO (Maybe TickerInfo) +getTickerInfo tickerId tisH = M.lookup tickerId <$> readTVarIO (tisMap tisH) + +putTickerInfo :: TickerInfoServerHandle -> TickerInfo -> IO () +putTickerInfo tisH tickerInfo = atomically $ modifyTVar' (tisMap tisH) (M.insert (tiTicker tickerInfo) tickerInfo) + diff --git a/src/Transaq.hs b/src/Transaq.hs index 7de1938..4420a80 100644 --- a/src/Transaq.hs +++ b/src/Transaq.hs @@ -32,7 +32,8 @@ module Transaq AllTradesTrade(..), Tick(..), ConnectionState(..), - MarketInfo(..) + MarketInfo(..), + Security(..) ) where import Control.Applicative ((<|>)) @@ -48,7 +49,6 @@ import Data.Maybe (catMaybes, fromMaybe, mapMaybe, import qualified Data.Text as T import Data.Time (fromGregorian) import Data.Time.Clock (UTCTime (UTCTime)) -import Debug.Trace import Text.Read (readMaybe) import Text.XML.Light (Attr (..), CData (cdData), Element (elName), Node (..), QName (..), @@ -477,20 +477,20 @@ instance TransaqResponseC ResponseCandleKinds where data Security = Security { - secId :: Int - , active :: Bool - , seccode :: T.Text - , instrClass :: T.Text - , board :: T.Text - , market :: T.Text - , currency :: T.Text - , shortName :: T.Text - , decimals :: Int - , minStep :: Double - , lotSize :: Int - , lotDivider :: Int - , pointCost :: Double - , secType :: T.Text + sSecId :: Int + , sActive :: Bool + , sSeccode :: T.Text + , sInstrClass :: T.Text + , sBoard :: T.Text + , sMarket :: T.Text + , sCurrency :: T.Text + , sShortName :: T.Text + , sDecimals :: Int + , sMinStep :: Double + , sLotSize :: Int + , sLotDivider :: Int + , sPointCost :: Double + , sSecType :: T.Text } deriving (Show, Eq, Ord) newtype ResponseSecurities = @@ -505,20 +505,20 @@ instance TransaqResponseC ResponseSecurities where parseSecurity tag = if (qName . elName) tag == "security" then do - secId <- findAttr (uname "secid") tag >>= readMaybe - active <- findAttr (uname "active") tag >>= parseBool - seccode <- T.pack <$> childContent "seccode" tag - instrClass <- T.pack <$> childContent "instrclass" tag - board <- T.pack <$> childContent "instrclass" tag - market <- T.pack <$> childContent "market" tag - currency <- T.pack <$> childContent "currency" tag - shortName <- T.pack <$> childContent "shortname" tag - decimals <- childContent "decimals" tag >>= readMaybe - minStep <- childContent "minstep" tag >>= readMaybe - lotSize <- childContent "lotsize" tag >>= readMaybe - lotDivider <- childContent "lotdivider" tag >>= readMaybe - pointCost <- childContent "point_cost" tag >>= readMaybe - secType <- T.pack <$> childContent "sectype" tag + sSecId <- findAttr (uname "secid") tag >>= readMaybe + sActive <- findAttr (uname "active") tag >>= parseBool + sSeccode <- T.pack <$> childContent "seccode" tag + sInstrClass <- T.pack <$> childContent "instrclass" tag + sBoard <- T.pack <$> childContent "instrclass" tag + sMarket <- T.pack <$> childContent "market" tag + sCurrency <- T.pack <$> childContent "currency" tag + sShortName <- T.pack <$> childContent "shortname" tag + sDecimals <- childContent "decimals" tag >>= readMaybe + sMinStep <- childContent "minstep" tag >>= readMaybe + sLotSize <- childContent "lotsize" tag >>= readMaybe + sLotDivider <- childContent "lotdivider" tag >>= readMaybe + sPointCost <- childContent "point_cost" tag >>= readMaybe + sSecType <- T.pack <$> childContent "sectype" tag pure . Just $ Security {..} else pure Nothing diff --git a/transaq-connector.cabal b/transaq-connector.cabal index 5a70638..4a125ed 100644 --- a/transaq-connector.cabal +++ b/transaq-connector.cabal @@ -19,6 +19,7 @@ executable transaq-connector main-is: Main.hs other-modules: Config , Transaq + , TickerInfoServer , Version , TXML , TXMLConnector