Browse Source

TIS support

master
Denis Tereshkin 3 years ago
parent
commit
4a4687a1a7
  1. 5
      src/Main.hs
  2. 54
      src/TXMLConnector.hs
  3. 116
      src/TickerInfoServer.hs
  4. 60
      src/Transaq.hs
  5. 1
      transaq-connector.cabal

5
src/Main.hs

@ -26,6 +26,7 @@ import Prelude hiding (log)
import System.IO (Handle, IOMode (AppendMode), import System.IO (Handle, IOMode (AppendMode),
withFile) withFile)
import System.ZMQ4 (withContext) import System.ZMQ4 (withContext)
import TickerInfoServer (withTickerInfoServer)
import qualified TXMLConnector as Connector import qualified TXMLConnector as Connector
import Version (transaqConnectorVersionText) import Version (transaqConnectorVersionText)
@ -53,8 +54,8 @@ main = do
ctx ctx
(quotesourceEndpoint cfg) (quotesourceEndpoint cfg)
defaultServerSecurityParams) defaultServerSecurityParams)
stopQuoteSourceServer $ \_ -> do stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do
_ <- Connector.start logger cfg qssChannel void $ Connector.start logger cfg qssChannel tisH
forever $ threadDelay 1000000 forever $ threadDelay 1000000
log Info "main" "Shutting down" log Info "main" "Shutting down"

54
src/TXMLConnector.hs

@ -43,7 +43,7 @@ import Transaq (AllTradesTrade (..),
ResponseQuotations (ResponseQuotations), ResponseQuotations (ResponseQuotations),
ResponseQuotes (ResponseQuotes), ResponseQuotes (ResponseQuotes),
ResponseSecurities (ResponseSecurities), ResponseSecurities (ResponseSecurities),
SecurityId (..), Security (..), SecurityId (..),
TransaqCommand (toXml), TransaqCommand (toXml),
TransaqResponse (..), TransaqResponse (..),
TransaqResponse (..), TransaqResponse (..),
@ -54,13 +54,17 @@ import TXML (LogLevel, freeCallback,
setCallback) setCallback)
import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) import ATrade.QuoteSource.Server (QuoteSourceServerData (..))
import ATrade.Types (DataType (BestBid, BestOffer, LastTradePrice), import ATrade.Types (BarTimeframe, DataType (BestBid, BestOffer, LastTradePrice),
Tick (..), TickerId, Tick (..), TickerId,
fromDouble) fromDouble)
import Control.Concurrent.BoundedChan (BoundedChan, writeChan) import Control.Concurrent.BoundedChan (BoundedChan, writeChan)
import Control.Concurrent.STM.TMVar (TMVar)
import Control.Monad (forM_) import Control.Monad (forM_)
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import Data.Time.Clock (UTCTime, getCurrentTime) import Data.Time.Clock (UTCTime, getCurrentTime)
import TickerInfoServer (TickerInfo (..),
TickerInfoServerHandle,
putTickerInfo)
import qualified Transaq import qualified Transaq
import qualified TXML import qualified TXML
@ -76,11 +80,26 @@ data ConnectionParams =
} }
deriving (Show, Eq, Ord) deriving (Show, Eq, Ord)
data HistoryRequest =
HistoryRequest
{
hrTIckerId :: TickerId
, hrTimeframe :: BarTimeframe
, hrStartTime :: UTCTime
, hrEndTime :: UTCTime
} deriving (Show, Eq, Ord)
data Request =
Request HistoryRequest
deriving (Show, Eq, Ord)
data TXMLConnectorHandle = data TXMLConnectorHandle =
TXMLConnectorHandle TXMLConnectorHandle
{ {
threadId :: ThreadId, threadId :: ThreadId,
notificationQueue :: TBQueue TransaqResponse notificationQueue :: TBQueue TransaqResponse,
requestVar :: TMVar Request,
responseVar :: TMVar Response
} }
data ConnectionStage = StageConnection | StageGetInfo | StageConnected data ConnectionStage = StageConnection | StageGetInfo | StageConnected
@ -93,12 +112,13 @@ start ::
LogAction IO Message LogAction IO Message
-> TransaqConnectorConfig -> TransaqConnectorConfig
-> BoundedChan QuoteSourceServerData -> BoundedChan QuoteSourceServerData
-> TickerInfoServerHandle
-> IO TXMLConnectorHandle -> IO TXMLConnectorHandle
start logger config qssChannel = do start logger config qssChannel tisH = do
logWith logger Info "TXMLConnector" "Starting" logWith logger Info "TXMLConnector" "Starting"
notificationQueue <- atomically $ newTBQueue 50000 notificationQueue <- atomically $ newTBQueue 50000
tickTable <- newTVarIO M.empty tickTable <- newTVarIO M.empty
threadId <- forkIO (workThread logger config notificationQueue qssChannel tickTable) threadId <- forkIO (workThread logger config notificationQueue qssChannel tickTable tisH)
return $ TXMLConnectorHandle {..} return $ TXMLConnectorHandle {..}
workThread :: workThread ::
@ -107,8 +127,9 @@ workThread ::
-> TBQueue TransaqResponse -> TBQueue TransaqResponse
-> BoundedChan QuoteSourceServerData -> BoundedChan QuoteSourceServerData
-> TVar (M.Map TickKey Tick) -> TVar (M.Map TickKey Tick)
-> TickerInfoServerHandle
-> IO () -> IO ()
workThread logger config queue qssChannel tickMap = do workThread logger config queue qssChannel tickMap tisH = do
rc <- initialize (transaqLogPath config) (parseTransaqLogLevel $ transaqLogLevel config) rc <- initialize (transaqLogPath config) (parseTransaqLogLevel $ transaqLogLevel config)
case rc of case rc of
Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str
@ -201,19 +222,10 @@ workThread logger config queue qssChannel tickMap = do
-- TODO: Pass to qtis, maybe something else? -- TODO: Pass to qtis, maybe something else?
TransaqResponseSecurities (ResponseSecurities securities) -> do TransaqResponseSecurities (ResponseSecurities securities) -> do
log Debug "TXMLConnector.WorkThread" "Incoming securities:" log Debug "TXMLConnector.WorkThread" "Incoming securities:"
forM_ securities (log Debug "TXMLConnector.WorkThread" . (T.pack . show)) forM_ securities (putTickerInfo tisH . securityToTickerInfo)
-- TODO: Pass to qtis
TransaqResponseSecInfo secInfo -> TransaqResponseSecInfo secInfo ->
log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo
-- TODO: Pass to qtis -- TODO: Pass to qtis
TransaqResponseQuotations (ResponseQuotations quotations) -> do
log Debug "TXMLConnector.WorkThread" "Incoming quotations:"
forM_ quotations (log Debug "TXMLConnector.WorkThread" . (T.pack . show))
-- Pass to ticktable and quotesource server
TransaqResponseQuotes (ResponseQuotes quotes) -> do
log Debug "TXMLConnector.WorkThread" "Incoming quotes:"
forM_ quotes (log Debug "TXMLConnector.WorkThread" . (T.pack . show))
-- Pass to quotesource server
_ -> pure () _ -> pure ()
handleUnconnected serverConnected = do handleUnconnected serverConnected = do
log Debug "TXMLConnector.WorkThread" "Sending connect command" log Debug "TXMLConnector.WorkThread" "Sending connect command"
@ -306,3 +318,13 @@ quotationToTicks timestamp q =
value = fromDouble $ qOffer q, value = fromDouble $ qOffer q,
volume = fromIntegral $ qQuantity q volume = fromIntegral $ qQuantity q
}] }]
securityToTickerInfo :: Security -> TickerInfo
securityToTickerInfo sec =
TickerInfo
{
tiTicker = sBoard sec <> "#" <> sSeccode sec
, tiLotSize = sLotSize sec
, tiTickSize = sMinStep sec
}

116
src/TickerInfoServer.hs

@ -0,0 +1,116 @@
{-# LANGUAGE RecordWildCards #-}
module TickerInfoServer
(
TickerInfoServerHandle,
startTickerInfoServer,
stopTickerInfoServer,
withTickerInfoServer,
putTickerInfo,
getTickerInfo,
TickerInfo(..)
) where
import ATrade.Logging (Message, Severity (Warning),
logWith)
import ATrade.Types (Tick, TickerId, security)
import Colog (LogAction)
import Control.Concurrent (ThreadId, forkIO)
import Control.Concurrent.STM (TVar, atomically, newTVarIO,
readTVarIO)
import Control.Concurrent.STM.TVar (modifyTVar', writeTVar)
import Control.Exception (bracket)
import Control.Monad.Extra (whileM)
import Data.Aeson (FromJSON (parseJSON),
ToJSON (toJSON), decode, encode,
object, withObject)
import Data.Aeson.Types ((.:), (.=))
import qualified Data.ByteString.Lazy as BL
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8With, encodeUtf8)
import Data.Text.Encoding.Error (lenientDecode)
import Prelude hiding (log)
import System.ZMQ4 (Context, Router (Router), connect,
receiveMulti, sendMulti,
withSocket)
data TickerInfo =
TickerInfo
{
tiTicker :: TickerId
, tiLotSize :: Int
, tiTickSize :: Double
} deriving (Show, Eq, Ord)
instance FromJSON TickerInfo where
parseJSON = withObject "TickerInfo" (\obj ->
TickerInfo <$>
obj .: "ticker" <*>
obj .: "lot_size" <*>
obj .: "tick_size")
instance ToJSON TickerInfo where
toJSON ti = object [ "ticker" .= tiTicker ti,
"lot_size" .= tiLotSize ti,
"tick_size" .= tiTickSize ti ]
newtype TickerInfoRequest =
TickerInfoRequest
{
tirTickerId :: TickerId
} deriving (Show, Eq, Ord)
instance FromJSON TickerInfoRequest where
parseJSON = withObject "TickerInfoRequest" (\obj ->
TickerInfoRequest <$>
obj .: "ticker")
instance ToJSON TickerInfoRequest where
toJSON tir = object [ "ticker" .= tirTickerId tir ]
data TickerInfoServerHandle =
TickerInfoServerHandle
{
tisThreadId :: ThreadId
, tisRun :: TVar Bool
, tisMap :: TVar (M.Map TickerId TickerInfo)
}
startTickerInfoServer :: LogAction IO Message -> Context -> T.Text -> IO TickerInfoServerHandle
startTickerInfoServer logger ctx endpoint = do
tisRun <- newTVarIO True
tisMap <- newTVarIO M.empty
tisThreadId <- forkIO $ tisThread tisRun tisMap
pure $ TickerInfoServerHandle {..}
where
log = logWith logger
tisThread tisRun tisMap = withSocket ctx Router $ \sock -> do
connect sock (T.unpack endpoint)
whileM $ do
rq <- receiveMulti sock
case rq of
(sender:message:_) -> case decode (BL.fromStrict message) of
Just tir -> do
maybeTi <- M.lookup (tirTickerId tir) <$> readTVarIO tisMap
case maybeTi of
Just ti -> sendMulti sock (sender :| ["OK", BL.toStrict $ encode ti])
_ -> do
log Warning "TIS" $ "Requested unknown ticker: " <> tirTickerId tir
sendMulti sock (sender :| ["ERROR"])
_ -> log Warning "TIS" "Unable to parse incoming request"
_ -> log Warning "TIS" "Malformed packet"
readTVarIO tisRun
stopTickerInfoServer :: TickerInfoServerHandle -> IO ()
stopTickerInfoServer h = atomically $ writeTVar (tisRun h) False
withTickerInfoServer :: LogAction IO Message -> Context -> T.Text -> (TickerInfoServerHandle -> IO ()) -> IO ()
withTickerInfoServer logger ctx endpoint = bracket (startTickerInfoServer logger ctx endpoint) stopTickerInfoServer
getTickerInfo :: TickerId -> TickerInfoServerHandle -> IO (Maybe TickerInfo)
getTickerInfo tickerId tisH = M.lookup tickerId <$> readTVarIO (tisMap tisH)
putTickerInfo :: TickerInfoServerHandle -> TickerInfo -> IO ()
putTickerInfo tisH tickerInfo = atomically $ modifyTVar' (tisMap tisH) (M.insert (tiTicker tickerInfo) tickerInfo)

60
src/Transaq.hs

@ -32,7 +32,8 @@ module Transaq
AllTradesTrade(..), AllTradesTrade(..),
Tick(..), Tick(..),
ConnectionState(..), ConnectionState(..),
MarketInfo(..) MarketInfo(..),
Security(..)
) where ) where
import Control.Applicative ((<|>)) import Control.Applicative ((<|>))
@ -48,7 +49,6 @@ import Data.Maybe (catMaybes, fromMaybe, mapMaybe,
import qualified Data.Text as T import qualified Data.Text as T
import Data.Time (fromGregorian) import Data.Time (fromGregorian)
import Data.Time.Clock (UTCTime (UTCTime)) import Data.Time.Clock (UTCTime (UTCTime))
import Debug.Trace
import Text.Read (readMaybe) import Text.Read (readMaybe)
import Text.XML.Light (Attr (..), CData (cdData), import Text.XML.Light (Attr (..), CData (cdData),
Element (elName), Node (..), QName (..), Element (elName), Node (..), QName (..),
@ -477,20 +477,20 @@ instance TransaqResponseC ResponseCandleKinds where
data Security = data Security =
Security Security
{ {
secId :: Int sSecId :: Int
, active :: Bool , sActive :: Bool
, seccode :: T.Text , sSeccode :: T.Text
, instrClass :: T.Text , sInstrClass :: T.Text
, board :: T.Text , sBoard :: T.Text
, market :: T.Text , sMarket :: T.Text
, currency :: T.Text , sCurrency :: T.Text
, shortName :: T.Text , sShortName :: T.Text
, decimals :: Int , sDecimals :: Int
, minStep :: Double , sMinStep :: Double
, lotSize :: Int , sLotSize :: Int
, lotDivider :: Int , sLotDivider :: Int
, pointCost :: Double , sPointCost :: Double
, secType :: T.Text , sSecType :: T.Text
} deriving (Show, Eq, Ord) } deriving (Show, Eq, Ord)
newtype ResponseSecurities = newtype ResponseSecurities =
@ -505,20 +505,20 @@ instance TransaqResponseC ResponseSecurities where
parseSecurity tag = parseSecurity tag =
if (qName . elName) tag == "security" if (qName . elName) tag == "security"
then do then do
secId <- findAttr (uname "secid") tag >>= readMaybe sSecId <- findAttr (uname "secid") tag >>= readMaybe
active <- findAttr (uname "active") tag >>= parseBool sActive <- findAttr (uname "active") tag >>= parseBool
seccode <- T.pack <$> childContent "seccode" tag sSeccode <- T.pack <$> childContent "seccode" tag
instrClass <- T.pack <$> childContent "instrclass" tag sInstrClass <- T.pack <$> childContent "instrclass" tag
board <- T.pack <$> childContent "instrclass" tag sBoard <- T.pack <$> childContent "instrclass" tag
market <- T.pack <$> childContent "market" tag sMarket <- T.pack <$> childContent "market" tag
currency <- T.pack <$> childContent "currency" tag sCurrency <- T.pack <$> childContent "currency" tag
shortName <- T.pack <$> childContent "shortname" tag sShortName <- T.pack <$> childContent "shortname" tag
decimals <- childContent "decimals" tag >>= readMaybe sDecimals <- childContent "decimals" tag >>= readMaybe
minStep <- childContent "minstep" tag >>= readMaybe sMinStep <- childContent "minstep" tag >>= readMaybe
lotSize <- childContent "lotsize" tag >>= readMaybe sLotSize <- childContent "lotsize" tag >>= readMaybe
lotDivider <- childContent "lotdivider" tag >>= readMaybe sLotDivider <- childContent "lotdivider" tag >>= readMaybe
pointCost <- childContent "point_cost" tag >>= readMaybe sPointCost <- childContent "point_cost" tag >>= readMaybe
secType <- T.pack <$> childContent "sectype" tag sSecType <- T.pack <$> childContent "sectype" tag
pure . Just $ Security {..} pure . Just $ Security {..}
else else
pure Nothing pure Nothing

1
transaq-connector.cabal

@ -19,6 +19,7 @@ executable transaq-connector
main-is: Main.hs main-is: Main.hs
other-modules: Config other-modules: Config
, Transaq , Transaq
, TickerInfoServer
, Version , Version
, TXML , TXML
, TXMLConnector , TXMLConnector

Loading…
Cancel
Save