From 9ba81cc9cca17db9af581690571d306fb045807f Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Mon, 3 Apr 2023 21:18:53 +0700 Subject: [PATCH] history provider --- src/Config.hs | 1 + src/HistoryProviderServer.hs | 269 +++++++++++++++++++++++++++++++++++ src/Main.hs | 6 +- src/TXML.hs | 1 + src/TXMLConnector.hs | 100 ++++++++++--- src/Transaq.hs | 36 ++--- transaq-connector.cabal | 3 + 7 files changed, 379 insertions(+), 37 deletions(-) create mode 100644 src/HistoryProviderServer.hs diff --git a/src/Config.hs b/src/Config.hs index 917ac42..ce6b834 100644 --- a/src/Config.hs +++ b/src/Config.hs @@ -23,6 +23,7 @@ data TransaqConnectorConfig = TransaqConnectorConfig { brokerServerCertPath :: Maybe FilePath, brokerClientCertificateDir :: Maybe FilePath, tisEndpoint :: T.Text, + historyProviderEndpoint :: T.Text, transaqLogin :: T.Text, transaqPassword :: T.Text, transaqHost :: T.Text, diff --git a/src/HistoryProviderServer.hs b/src/HistoryProviderServer.hs new file mode 100644 index 0000000..b7be431 --- /dev/null +++ b/src/HistoryProviderServer.hs @@ -0,0 +1,269 @@ +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE MultiWayIf #-} +{-# LANGUAGE RankNTypes #-} +{-# LANGUAGE RecordWildCards #-} + +module HistoryProviderServer + ( + startHistoryProviderServer + , stopHistoryProviderServer + , withHistoryProviderServer + ) where + +import ATrade.Logging (Message, + Severity (Debug, Info, Warning), + log) +import ATrade.Types (Bar (..), BarTimeframe (..), + toDouble) +import Colog (HasLog (getLogAction, setLogAction), + LogAction (LogAction, unLogAction)) +import Control.Concurrent (ThreadId, forkIO) +import Control.Concurrent.STM (TVar, atomically, newTVarIO, + putTMVar, readTVarIO, takeTMVar, + writeTVar) +import Control.Concurrent.STM.TMVar (TMVar) +import Control.Exception (bracket) +import Control.Monad (forM_, void) +import Control.Monad.Extra (whileM) +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Control.Monad.Reader (MonadReader, asks) +import Control.Monad.Trans.Reader (ReaderT (runReaderT)) +import Data.Aeson (FromJSON (..), eitherDecode, + withObject, (.:)) +import Data.Aeson.Types as Aeson +import Data.Attoparsec.Text as Attoparsec +import Data.Binary.Put (putDoublele, putWord64le, runPut) +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL +import qualified Data.List as L +import Data.List.NonEmpty (NonEmpty ((:|))) +import qualified Data.Text as T +import Data.Time (Day, UTCTime (UTCTime), + fromGregorianValid) +import Data.Time.Clock (diffUTCTime, getCurrentTime, + secondsToDiffTime) +import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) +import Prelude hiding (log) +import System.ZMQ4 (Context, Router (Router), bind, + close, receive, receiveMulti, + sendMulti, socket, withSocket) +import TXMLConnector (HistoryRequest (..), + HistoryResponse (..), + Request (..), Response (..), + TXMLConnectorHandle, hrBars, + makeRequest) + +data HistoryProviderServerHandle = + HistoryProviderServerHandle + { + hpsThreadId :: ThreadId + , hpsRun :: TVar Bool + } + +data Period = + Period1Min | + Period5Min | + Period15Min | + Period30Min | + PeriodHour | + PeriodDay | + PeriodWeek | + PeriodMonth + deriving (Eq, Show) + +parsePeriod :: T.Text -> Maybe Period +parsePeriod "M1" = Just Period1Min +parsePeriod "M5" = Just Period5Min +parsePeriod "M15" = Just Period15Min +parsePeriod "M30" = Just Period30Min +parsePeriod "H1" = Just PeriodHour +parsePeriod "D" = Just PeriodDay +parsePeriod "W" = Just PeriodWeek +parsePeriod "M" = Just PeriodMonth +parsePeriod _ = Nothing + +periodToSeconds :: Period -> Int +periodToSeconds Period1Min = 60 +periodToSeconds Period5Min = 60 * 5 +periodToSeconds Period15Min = 60 * 15 +periodToSeconds Period30Min = 60 * 30 +periodToSeconds PeriodHour = 60 * 60 +periodToSeconds PeriodDay = 60 * 60 * 24 +periodToSeconds PeriodWeek = 60 * 60 * 24 * 7 +periodToSeconds PeriodMonth = 60 * 60 * 24 * 30 + +data QHPRequest = + QHPRequest { + rqTicker :: T.Text, + rqStartTime :: UTCTime, + rqEndTime :: UTCTime, + rqPeriod :: Period, + rqCompression :: Maybe T.Text + } deriving (Show, Eq) + +instance FromJSON QHPRequest where + parseJSON = withObject "Request" $ \v -> QHPRequest <$> + v .: "ticker" <*> + (v .: "from" >>= parseTime) <*> + (v .: "to" >>= parseTime) <*> + (v .: "timeframe" >>= parseTf) <*> + v .:? "compression" + where + parseTf :: T.Text -> Aeson.Parser Period + parseTf t = if + | t == "M1" -> return Period1Min + | t == "M5" -> return Period5Min + | t == "M15" -> return Period15Min + | t == "M30" -> return Period30Min + | t == "H1" -> return PeriodHour + | t == "D" -> return PeriodDay + | t == "W" -> return PeriodWeek + | t == "MN" -> return PeriodMonth + | otherwise -> fail "Invalid period specified" + +parseTime :: T.Text -> Aeson.Parser UTCTime +parseTime text = case Attoparsec.parseOnly (timeParse <* Attoparsec.endOfInput) text of + Right r -> return r + Left e -> fail $ "Can't parse time: " ++ T.unpack text ++ "/" ++ e +timeParse :: Attoparsec.Parser UTCTime +timeParse = do + year <- decimal + void $ char '-' + month <- decimal + void $ char '-' + day <- decimal + void $ char 'T' + hour <- decimal + void $ char ':' + minute <- decimal + void $ char ':' + sec <- decimal + case fromGregorianValid year month day of + Just gregorianDay -> return $ UTCTime gregorianDay (secondsToDiffTime $ hour * 3600 + minute * 60 + sec) + _ -> fail "Can't parse date: invalid values" + +data Env = Env + { + eRun :: TVar Bool + , eContext :: Context + , eEndpoint :: T.Text + , eLogger :: LogAction IO Message + , eTxml :: TXMLConnectorHandle + } + +newtype App a = App { unApp :: ReaderT Env IO a } + deriving (Functor, Applicative, Monad, MonadReader Env, MonadIO) + +instance HasLog Env Message App where + getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . eLogger $ env) } + setLogAction _ env = env -- fuck it + +startHistoryProviderServer :: + (MonadIO m) => + Context -> + T.Text -> + TXMLConnectorHandle -> + LogAction IO Message -> + m HistoryProviderServerHandle +startHistoryProviderServer ctx endpoint txmlH logger = do + hpsRun <- liftIO . newTVarIO $ True + let env = Env { + eRun = hpsRun + , eContext = ctx + , eEndpoint = endpoint + , eLogger = logger + , eTxml = txmlH + } + hpsThreadId <- liftIO . forkIO $ (runReaderT . unApp) workThread env + pure HistoryProviderServerHandle {..} + +stopHistoryProviderServer :: + (MonadIO m) => + HistoryProviderServerHandle -> + m () +stopHistoryProviderServer h = liftIO . atomically $ writeTVar (hpsRun h) False + +withHistoryProviderServer :: + (MonadIO m) => + Context -> + T.Text -> + TXMLConnectorHandle -> + LogAction IO Message -> + (forall a. m a -> IO a) -> + (HistoryProviderServerHandle -> m ()) -> + m () +withHistoryProviderServer ctx endpoint txmlH logger runner action = + liftIO $ bracket + (startHistoryProviderServer ctx endpoint txmlH logger) + stopHistoryProviderServer + (runner . action) + +workThread :: App () +workThread = do + runVar <- asks eRun + ctx <- asks eContext + sock <- liftIO $ socket ctx Router + ep <- asks eEndpoint + liftIO $ bind sock $ T.unpack ep + whileM $ do + incomingData <- liftIO . receiveMulti $ sock + case incomingData of + (sender:_:rawRq:_) -> + case eitherDecode $ BL.fromStrict rawRq of + Right request -> do + response <- handleRequest sender request + sendResponseWithDelimiter sock sender response + Left err -> log Warning "HistoryProviderServer.WorkThread" $ "Unable to parse request: " <> T.pack err + (sender:rawRq:_) -> + case eitherDecode $ BL.fromStrict rawRq of + Right request -> do + response <- handleRequest sender request + sendResponse sock sender response + Left err -> log Warning "HistoryProviderServer.WorkThread" $ "Unable to parse request: " <> T.pack err + _ -> log Warning "HistoryProviderServer.WorkThread" "Unable to parse request" + liftIO $ readTVarIO runVar + liftIO $ close sock + where + handleRequest sender request = do + now <- liftIO getCurrentTime + let diff = now `diffUTCTime` rqStartTime request + let count = truncate diff `div` periodToSeconds (rqPeriod request) + log Debug "HistoryProviderServer.WorkThread" $ "Requesting bars: " <> (T.pack . show) count + txml <- asks eTxml + response <- liftIO . makeRequest txml $ Request HistoryRequest + { + hrTickerId = rqTicker request + , hrTimeframe = BarTimeframe . periodToSeconds . rqPeriod $ request + , hrCount = count + , hrReset = True + } + log Info "HistoryProviderServer.WorkThread" "Got response from TXML" + case response of + ResponseHistory hr -> do + log Info "HistoryProviderServer.WorkThread" $ "Bars1: " <> (T.pack . show . length) (hrBars hr) + let bs = L.filter (timestampBetween (rqStartTime request) (rqEndTime request)) $ hrBars hr + log Info "HistoryProviderServer.WorkThread" $ "Bars: " <> (T.pack . show . length) bs + pure bs + _ -> do + log Warning "HistoryProviderServer.WorkThread" "Invalid response" + pure [] + + timestampBetween start end bar = let ts = barTimestamp bar in start <= ts && ts <= end + + sendResponse sock receiver response = liftIO $ sendMulti sock $ receiver :| encodeResponse response + sendResponseWithDelimiter sock receiver response = liftIO $ sendMulti sock $ receiver :| [""] <> encodeResponse response + + encodeResponse response = ["OK"] <> [serializeBars response] + + serializeBars :: [Bar] -> B.ByteString + serializeBars bars = BL.toStrict $ runPut $ forM_ bars serializeBar' + serializeBar' bar = do + putWord64le (truncate . utcTimeToPOSIXSeconds . barTimestamp $ bar) + putDoublele (toDouble . barOpen $ bar) + putDoublele (toDouble . barHigh $ bar) + putDoublele (toDouble . barLow $ bar) + putDoublele (toDouble . barClose $ bar) + putWord64le (fromInteger . barVolume $ bar) + diff --git a/src/Main.hs b/src/Main.hs index d0cbeae..ac8819d 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -22,6 +22,7 @@ import Control.Monad.IO.Class (MonadIO) import qualified Data.Text as T import Data.Version (showVersion) import Debug.EventCounters (initEventCounters) +import HistoryProviderServer (withHistoryProviderServer) import Prelude hiding (log) import System.IO (Handle, IOMode (AppendMode), withFile) @@ -55,8 +56,9 @@ main = do (quotesourceEndpoint cfg) defaultServerSecurityParams) stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do - void $ Connector.start logger cfg qssChannel tisH - forever $ threadDelay 1000000 + txml <- Connector.start logger cfg qssChannel tisH + withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml logger id $ \_ -> do + forever $ threadDelay 1000000 log Info "main" "Shutting down" diff --git a/src/TXML.hs b/src/TXML.hs index 4d43c82..71ca7e4 100644 --- a/src/TXML.hs +++ b/src/TXML.hs @@ -81,6 +81,7 @@ uninitialize = c_UnInitialize >>= rawStringToResult sendCommand :: T.Text -> IO (Either T.Text ()) sendCommand cmdData = do + putStrLn $ T.unpack cmdData BS.useAsCString (encodeUtf8 cmdData) $ \fpcstr -> c_SendCommand fpcstr >>= rawStringToResult diff --git a/src/TXMLConnector.hs b/src/TXMLConnector.hs index 7553ae3..2843751 100644 --- a/src/TXMLConnector.hs +++ b/src/TXMLConnector.hs @@ -8,7 +8,13 @@ module TXMLConnector ( - start + start + , Request(..) + , HistoryRequest(..) + , Response(..) + , HistoryResponse(..) + , makeRequest + , TXMLConnectorHandle ) where import ATrade.Logging (Message, Severity (..), log, @@ -22,9 +28,11 @@ import Config (SubscriptionConfig (Subscriptio transaqPassword, transaqPort) import Control.Concurrent (ThreadId, forkIO, threadDelay) import Control.Concurrent.STM (TVar, atomically, modifyTVar', - newEmptyTMVarIO, newTVarIO, - orElse, readTMVar, readTVarIO, - writeTVar) + newEmptyTMVar, newEmptyTMVarIO, + newTVarIO, orElse, putTMVar, + readTMVar, readTVar, + readTVarIO, takeTMVar, + tryReadTMVar, writeTVar) import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue, readTBQueue, writeTBQueue) import Control.Monad (forever, void) @@ -35,6 +43,7 @@ import Text.XML.Light.Types (Content (Elem), Element (elName), QName (qName)) import Transaq (AllTradesTrade (..), + Candle (..), CommandConnect (..), CommandDisconnect (CommandDisconnect), CommandGetHistoryData (CommandGetHistoryData), @@ -45,6 +54,8 @@ import Transaq (AllTradesTrade (..), Quotation (..), ResponseAllTrades (ResponseAllTrades), ResponseCandleKinds (ResponseCandleKinds), + ResponseCandles (..), + ResponseCandlesStatus (StatusPending), ResponseMarkets (ResponseMarkets), ResponseQuotations (ResponseQuotations), ResponseQuotes (ResponseQuotes), @@ -54,13 +65,14 @@ import Transaq (AllTradesTrade (..), TransaqResponse (..), TransaqResponse (..), TransaqResponseC (fromXml), - kCandleKindId, kPeriod, state) + kCandleKindId, kPeriod, state, + status) import TXML (LogLevel, freeCallback, initialize, sendCommand, setCallback) import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) -import ATrade.Types (Bar, +import ATrade.Types (Bar (..), BarTimeframe (unBarTimeframe), DataType (BestBid, BestOffer, LastTradePrice), Tick (..), TickerId, @@ -106,6 +118,8 @@ newtype Request = Request HistoryRequest deriving (Show, Eq, Ord) +data Response = ResponseHistory HistoryResponse + data HistoryResponse = HistoryResponse { @@ -114,17 +128,13 @@ data HistoryResponse = } deriving (Show, Eq) -newtype Response = - Response HistoryResponse - deriving (Show, Eq) - data TXMLConnectorHandle = TXMLConnectorHandle { - threadId :: ThreadId, - notificationQueue :: TBQueue TransaqResponse, - hRequestVar :: TMVar Request, - hResponseVar :: TMVar Response + threadId :: ThreadId + , notificationQueue :: TBQueue TransaqResponse + , hRequestVar :: TMVar Request + , hResponseVar :: TMVar (TMVar Response) } data ConnectionStage = StageConnection | StageGetInfo | StageConnected @@ -144,7 +154,8 @@ data Env = qssChannel :: BoundedChan QuoteSourceServerData , tisHandle :: TickerInfoServerHandle , requestVar :: TMVar Request - , responseVar :: TMVar Response + , responseVar :: TMVar (TMVar Response) + , currentCandles :: TVar [Candle] , tickMap :: TVar (M.Map TickKey Tick) , transaqQueue :: TBQueue TransaqResponse , logger :: LogAction IO Message @@ -172,6 +183,7 @@ start logger config qssChannel tisH = do tickTable <- newTVarIO M.empty requestVar <- newEmptyTMVarIO responseVar <- newEmptyTMVarIO + currentCandles <- newTVarIO [] serverConnected <- liftIO $ newTVarIO StageConnection candleKindMap <- newTVarIO M.empty let env = @@ -181,6 +193,7 @@ start logger config qssChannel tisH = do , tisHandle = tisH , requestVar = requestVar , responseVar = responseVar + , currentCandles = currentCandles , tickMap = tickTable , transaqQueue = notificationQueue , logger = logger @@ -245,7 +258,7 @@ workThread = do rqVar <- asks requestVar queue <- asks transaqQueue item <- liftIO . atomically $ (MainQueueTransaqData <$> readTBQueue queue) `orElse` - (MainQueueRequest <$> readTMVar rqVar) + (MainQueueRequest <$> takeTMVar rqVar) case item of MainQueueTransaqData transaqData -> do tm <- asks tickMap @@ -261,9 +274,28 @@ workThread = do let ticks = concatMap (quotationToTicks now) quotations forM_ ticks (liftIO . writeChan qssChan . QSSTick) forM_ ticks (insertToTickMap tm) - TransaqResponseCandles respCandle -> undefined + TransaqResponseCandles respCandle -> do + resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar + log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle) + case resp of + Just tmvar -> if cStatus respCandle == StatusPending + then do + cur <- asks currentCandles + liftIO $ atomically . modifyTVar' cur $ (\c -> cCandles respCandle <> c) + else do + cur <- asks currentCandles + liftIO $ atomically $ do + candles <- readTVar cur + putTMVar tmvar $ ResponseHistory $ HistoryResponse + { + hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles) + , hrMoreData = False + } + _ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var" _ -> pure () MainQueueRequest (Request request) -> do + cur <- asks currentCandles + liftIO $ atomically $ writeTVar cur [] maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO) case maybeCk of Just candleKindId -> do @@ -308,7 +340,7 @@ workThread = do -- TODO: handle order response TransaqResponseCandles candles -> log Debug "TXMLConnector.WorkThread" $ - "Incoming candles message: " <> (T.pack . show . length . Transaq.candles $ candles) + "Incoming candles message: " <> (T.pack . show . length . Transaq.cCandles $ candles) -- TODO: Pass to qhp TransaqResponseMarkets (ResponseMarkets markets) -> do log Debug "TXMLConnector.WorkThread" "Incoming markets:" @@ -433,4 +465,34 @@ securityToTickerInfo sec = } parseSecurityId :: TickerId -> Maybe SecurityId -parseSecurityId = undefined +parseSecurityId tickerId = case T.findIndex (== '#') tickerId of + Just ix -> Just $ SecurityId (T.take ix tickerId) (T.drop (ix + 1) tickerId) + Nothing -> Nothing + +makeTickerId :: SecurityId -> TickerId +makeTickerId sec = board sec <> "#" <> seccode sec + +makeRequest :: TXMLConnectorHandle -> Request -> IO Response +makeRequest h request = do + resp <- atomically $ do + resp <- newEmptyTMVar + putTMVar (hResponseVar h) resp + putTMVar (hRequestVar h) request + pure resp + atomically $ do + void $ takeTMVar (hResponseVar h) + takeTMVar resp + + +candleToBar :: SecurityId -> Candle -> Bar +candleToBar sec candle = + Bar + { + barSecurity = makeTickerId sec + , barTimestamp = cTimestamp candle + , barOpen = fromDouble (cOpen candle) + , barHigh = fromDouble (cHigh candle) + , barLow = fromDouble (cLow candle) + , barClose = fromDouble (cClose candle) + , barVolume = fromIntegral $ cVolume candle + } diff --git a/src/Transaq.hs b/src/Transaq.hs index 4420a80..29c87f9 100644 --- a/src/Transaq.hs +++ b/src/Transaq.hs @@ -14,6 +14,7 @@ module Transaq CommandNewOrder(..), CommandCancelOrder(..), CommandGetSecuritiesInfo(..), + CommandGetHistoryData(..), ResponseResult(..), ResponseCandles(..), ResponseServerStatus(..), @@ -33,7 +34,10 @@ module Transaq Tick(..), ConnectionState(..), MarketInfo(..), - Security(..) + Security(..), + CandleKind(..), + ResponseCandlesStatus(..), + Candle(..) ) where import Control.Applicative ((<|>)) @@ -319,10 +323,10 @@ data Candle = Candle { cTimestamp :: UTCTime - , cOpen :: TransaqPrice - , cHigh :: TransaqPrice - , cLow :: TransaqPrice - , cClose :: TransaqPrice + , cOpen :: Double + , cHigh :: Double + , cLow :: Double + , cClose :: Double , cVolume :: Int , cOpenInterest :: Int } deriving (Show, Eq, Ord) @@ -337,10 +341,10 @@ data ResponseCandlesStatus = data ResponseCandles = ResponseCandles { - periodId :: Int - , status :: ResponseCandlesStatus - , security :: SecurityId - , candles :: [Candle] + cPeriodId :: Int + , cStatus :: ResponseCandlesStatus + , cSecurity :: SecurityId + , cCandles :: [Candle] } deriving (Show, Eq, Ord) uname :: String -> QName @@ -358,10 +362,10 @@ instance TransaqResponseC ResponseCandles where let candles = mapMaybe parseCandle . elChildren $ root return ResponseCandles { - periodId = periodId - , status = status - , security = SecurityId board seccode - , candles = candles + cPeriodId = periodId + , cStatus = status + , cSecurity = SecurityId board seccode + , cCandles = candles } where parseStatus :: Int -> Maybe ResponseCandlesStatus @@ -373,13 +377,13 @@ instance TransaqResponseC ResponseCandles where 3 -> Just StatusUnavaliable _ -> Nothing parseCandle element = do - timestamp <- findAttr (uname "open") element >>= parseTimestamp . T.pack + timestamp <- findAttr (uname "date") element >>= parseTimestamp . T.pack open <- findAttr (uname "open") element >>= readMaybe high <- findAttr (uname "high") element >>= readMaybe low <- findAttr (uname "low") element >>= readMaybe close <- findAttr (uname "close") element >>= readMaybe volume <- findAttr (uname "volume") element >>= readMaybe - openInterest <- findAttr (uname "oi") element >>= readMaybe + let openInterest = fromMaybe 0 $ findAttr (uname "oi") element >>= readMaybe return Candle { cTimestamp = timestamp @@ -509,7 +513,7 @@ instance TransaqResponseC ResponseSecurities where sActive <- findAttr (uname "active") tag >>= parseBool sSeccode <- T.pack <$> childContent "seccode" tag sInstrClass <- T.pack <$> childContent "instrclass" tag - sBoard <- T.pack <$> childContent "instrclass" tag + sBoard <- T.pack <$> childContent "board" tag sMarket <- T.pack <$> childContent "market" tag sCurrency <- T.pack <$> childContent "currency" tag sShortName <- T.pack <$> childContent "shortname" tag diff --git a/transaq-connector.cabal b/transaq-connector.cabal index e295263..f812cea 100644 --- a/transaq-connector.cabal +++ b/transaq-connector.cabal @@ -20,6 +20,7 @@ executable transaq-connector other-modules: Config , Transaq , TickerInfoServer + , HistoryProviderServer , Version , TXML , TXMLConnector @@ -47,6 +48,8 @@ executable transaq-connector , extra , errors , mtl + , vector + , binary extra-lib-dirs: lib ghc-options: -Wall -Wcompat