From 3f0d0cc8342fe4b9c60c61a6495ee6714263bb39 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 29 Apr 2023 18:54:52 +0700 Subject: [PATCH] HP,TIS: support get_sec_list command --- src/HistoryProviderServer.hs | 74 ++++++++++++++++++++++++++---------- src/Main.hs | 7 +++- src/TickerInfoServer.hs | 4 ++ 3 files changed, 62 insertions(+), 23 deletions(-) diff --git a/src/HistoryProviderServer.hs b/src/HistoryProviderServer.hs index f693932..bb5ee1d 100644 --- a/src/HistoryProviderServer.hs +++ b/src/HistoryProviderServer.hs @@ -16,7 +16,7 @@ import ATrade.Logging (Message, Severity (Debug, Info, Warning), log) import ATrade.Types (Bar (..), BarTimeframe (..), - toDouble) + TickerId, toDouble) import Colog (HasLog (getLogAction, setLogAction), LogAction (LogAction, unLogAction)) import Control.Concurrent (ThreadId, forkIO) @@ -32,6 +32,7 @@ import Control.Monad.Reader (MonadReader, asks) import Control.Monad.Trans.Reader (ReaderT (runReaderT)) import Data.Aeson (FromJSON (..), eitherDecode, withObject, (.:)) +import qualified Data.Aeson.KeyMap as KM import Data.Aeson.Types as Aeson import Data.Attoparsec.Text as Attoparsec import Data.Binary.Put (putDoublele, putWord64le, runPut) @@ -40,6 +41,7 @@ import qualified Data.ByteString.Lazy as BL import qualified Data.List as L import Data.List.NonEmpty (NonEmpty ((:|))) import qualified Data.Text as T +import Data.Text.Encoding (encodeUtf8) import Data.Time (Day, UTCTime (UTCTime), fromGregorianValid) import Data.Time.Clock (diffUTCTime, getCurrentTime, @@ -49,6 +51,8 @@ import Prelude hiding (log) import System.ZMQ4 (Context, Router (Router), bind, close, receive, receiveMulti, sendMulti, socket, withSocket) +import TickerInfoServer (TickerInfoServerHandle, + getAllTickers) import TXMLConnector (HistoryRequest (..), HistoryResponse (..), Request (..), Response (..), @@ -94,8 +98,8 @@ periodToSeconds PeriodDay = 60 * 60 * 24 periodToSeconds PeriodWeek = 60 * 60 * 24 * 7 periodToSeconds PeriodMonth = 60 * 60 * 24 * 30 -data QHPRequest = - QHPRequest { +data TickerRequest = + TickerRequest { rqTicker :: T.Text, rqStartTime :: UTCTime, rqEndTime :: UTCTime, @@ -103,13 +107,27 @@ data QHPRequest = rqCompression :: Maybe T.Text } deriving (Show, Eq) +data QHPRequest = + QHPTickerRequest TickerRequest + | QHPAllTickersRequest + deriving (Show, Eq) + +data QHPResponse = + QHPBarsResponse [Bar] + | QHPTickersListResponse [TickerId] + instance FromJSON QHPRequest where - parseJSON = withObject "Request" $ \v -> QHPRequest <$> - v .: "ticker" <*> - (v .: "from" >>= parseTime) <*> - (v .: "to" >>= parseTime) <*> - (v .: "timeframe" >>= parseTf) <*> - v .:? "compression" + parseJSON = withObject "Request" $ \v -> + if KM.lookup "get_sec_list" v == Just (Bool True) + then + pure QHPAllTickersRequest + else + QHPTickerRequest <$> (TickerRequest <$> + v .: "ticker" <*> + (v .: "from" >>= parseTime) <*> + (v .: "to" >>= parseTime) <*> + (v .: "timeframe" >>= parseTf) <*> + v .:? "compression") where parseTf :: T.Text -> Aeson.Parser Period parseTf t = if @@ -146,11 +164,12 @@ timeParse = do data Env = Env { - eRun :: TVar Bool - , eContext :: Context - , eEndpoint :: T.Text - , eLogger :: LogAction IO Message - , eTxml :: TXMLConnectorHandle + eRun :: TVar Bool + , eContext :: Context + , eEndpoint :: T.Text + , eLogger :: LogAction IO Message + , eTxml :: TXMLConnectorHandle + , eTisHandle :: TickerInfoServerHandle } newtype App a = App { unApp :: ReaderT Env IO a } @@ -165,9 +184,10 @@ startHistoryProviderServer :: Context -> T.Text -> TXMLConnectorHandle -> + TickerInfoServerHandle -> LogAction IO Message -> m HistoryProviderServerHandle -startHistoryProviderServer ctx endpoint txmlH logger = do +startHistoryProviderServer ctx endpoint txmlH tisH logger = do hpsRun <- liftIO . newTVarIO $ True let env = Env { eRun = hpsRun @@ -175,6 +195,7 @@ startHistoryProviderServer ctx endpoint txmlH logger = do , eEndpoint = endpoint , eLogger = logger , eTxml = txmlH + , eTisHandle = tisH } hpsThreadId <- liftIO . forkIO $ (runReaderT . unApp) workThread env pure HistoryProviderServerHandle {..} @@ -190,13 +211,14 @@ withHistoryProviderServer :: Context -> T.Text -> TXMLConnectorHandle -> + TickerInfoServerHandle -> LogAction IO Message -> (forall a. m a -> IO a) -> (HistoryProviderServerHandle -> m ()) -> m () -withHistoryProviderServer ctx endpoint txmlH logger runner action = +withHistoryProviderServer ctx endpoint txmlH tisH logger runner action = liftIO $ bracket - (startHistoryProviderServer ctx endpoint txmlH logger) + (startHistoryProviderServer ctx endpoint txmlH tisH logger) stopHistoryProviderServer (runner . action) @@ -227,7 +249,7 @@ workThread = do liftIO $ readTVarIO runVar liftIO $ close sock where - handleRequest sender request = do + handleRequest _ (QHPTickerRequest request) = do now <- liftIO getCurrentTime let diff = now `diffUTCTime` rqStartTime request let count = truncate diff `div` periodToSeconds (rqPeriod request) @@ -246,17 +268,24 @@ workThread = do log Info "HistoryProviderServer.WorkThread" $ "Bars1: " <> (T.pack . show . length) (hrBars hr) let bs = L.filter (timestampBetween (rqStartTime request) (rqEndTime request)) $ hrBars hr log Info "HistoryProviderServer.WorkThread" $ "Bars: " <> (T.pack . show . length) bs - pure bs + pure $ QHPBarsResponse bs _ -> do log Warning "HistoryProviderServer.WorkThread" "Invalid response" - pure [] + pure $ QHPBarsResponse [] + + handleRequest _ QHPAllTickersRequest = do + log Debug "HistoryProviderServer.WorkThread" "Requesting all tickers list" + tisH <- asks eTisHandle + tickers <- liftIO $ getAllTickers tisH + pure $ QHPTickersListResponse tickers timestampBetween start end bar = let ts = barTimestamp bar in start <= ts && ts <= end sendResponse sock receiver response = liftIO $ sendMulti sock $ receiver :| encodeResponse response sendResponseWithDelimiter sock receiver response = liftIO $ sendMulti sock $ receiver :| [""] <> encodeResponse response - encodeResponse response = ["OK"] <> [serializeBars response] + encodeResponse (QHPBarsResponse bars) = ["OK"] <> [serializeBars bars] + encodeResponse (QHPTickersListResponse tickers) = ["OK"] <> [serializeTickers tickers] serializeBars :: [Bar] -> B.ByteString serializeBars bars = BL.toStrict $ runPut $ forM_ bars serializeBar' @@ -268,3 +297,6 @@ workThread = do putDoublele (toDouble . barClose $ bar) putWord64le (fromInteger . barVolume $ bar) + serializeTickers :: [TickerId] -> B.ByteString + serializeTickers tickers = encodeUtf8 $ T.intercalate "," tickers + diff --git a/src/Main.hs b/src/Main.hs index b50909c..eb36dd6 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -75,8 +75,11 @@ main = do (NotificationSqnum 1) [] defaultServerSecurityParams - logger) (\x -> log Info "main" "Stopping" >> stopBrokerServer x) $ \_ -> do - withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml logger id $ \_ -> do + logger) (\x -> do + stopBrokerServer x + log Info "main" "Stopping TXMLConnector" + Connector.stop txml) $ \_ -> do + withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml tisH logger id $ \_ -> do forever $ threadDelay 1000000 log Info "main" "Shutting down" diff --git a/src/TickerInfoServer.hs b/src/TickerInfoServer.hs index de60353..37d9640 100644 --- a/src/TickerInfoServer.hs +++ b/src/TickerInfoServer.hs @@ -8,6 +8,7 @@ module TickerInfoServer withTickerInfoServer, putTickerInfo, getTickerInfo, + getAllTickers, TickerInfo(..) ) where import ATrade.Logging (Message, @@ -118,3 +119,6 @@ getTickerInfo tickerId tisH = M.lookup tickerId <$> readTVarIO (tisMap tisH) putTickerInfo :: TickerInfoServerHandle -> TickerInfo -> IO () putTickerInfo tisH tickerInfo = atomically $ modifyTVar' (tisMap tisH) (M.insert (tiTicker tickerInfo) tickerInfo) +getAllTickers :: TickerInfoServerHandle -> IO [TickerId] +getAllTickers tisH = M.keys <$> readTVarIO (tisMap tisH) +