Browse Source

HP,TIS: support get_sec_list command

master
Denis Tereshkin 3 years ago
parent
commit
3f0d0cc834
  1. 56
      src/HistoryProviderServer.hs
  2. 7
      src/Main.hs
  3. 4
      src/TickerInfoServer.hs

56
src/HistoryProviderServer.hs

@ -16,7 +16,7 @@ import ATrade.Logging (Message,
Severity (Debug, Info, Warning), Severity (Debug, Info, Warning),
log) log)
import ATrade.Types (Bar (..), BarTimeframe (..), import ATrade.Types (Bar (..), BarTimeframe (..),
toDouble) TickerId, toDouble)
import Colog (HasLog (getLogAction, setLogAction), import Colog (HasLog (getLogAction, setLogAction),
LogAction (LogAction, unLogAction)) LogAction (LogAction, unLogAction))
import Control.Concurrent (ThreadId, forkIO) import Control.Concurrent (ThreadId, forkIO)
@ -32,6 +32,7 @@ import Control.Monad.Reader (MonadReader, asks)
import Control.Monad.Trans.Reader (ReaderT (runReaderT)) import Control.Monad.Trans.Reader (ReaderT (runReaderT))
import Data.Aeson (FromJSON (..), eitherDecode, import Data.Aeson (FromJSON (..), eitherDecode,
withObject, (.:)) withObject, (.:))
import qualified Data.Aeson.KeyMap as KM
import Data.Aeson.Types as Aeson import Data.Aeson.Types as Aeson
import Data.Attoparsec.Text as Attoparsec import Data.Attoparsec.Text as Attoparsec
import Data.Binary.Put (putDoublele, putWord64le, runPut) import Data.Binary.Put (putDoublele, putWord64le, runPut)
@ -40,6 +41,7 @@ import qualified Data.ByteString.Lazy as BL
import qualified Data.List as L import qualified Data.List as L
import Data.List.NonEmpty (NonEmpty ((:|))) import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Text as T import qualified Data.Text as T
import Data.Text.Encoding (encodeUtf8)
import Data.Time (Day, UTCTime (UTCTime), import Data.Time (Day, UTCTime (UTCTime),
fromGregorianValid) fromGregorianValid)
import Data.Time.Clock (diffUTCTime, getCurrentTime, import Data.Time.Clock (diffUTCTime, getCurrentTime,
@ -49,6 +51,8 @@ import Prelude hiding (log)
import System.ZMQ4 (Context, Router (Router), bind, import System.ZMQ4 (Context, Router (Router), bind,
close, receive, receiveMulti, close, receive, receiveMulti,
sendMulti, socket, withSocket) sendMulti, socket, withSocket)
import TickerInfoServer (TickerInfoServerHandle,
getAllTickers)
import TXMLConnector (HistoryRequest (..), import TXMLConnector (HistoryRequest (..),
HistoryResponse (..), HistoryResponse (..),
Request (..), Response (..), Request (..), Response (..),
@ -94,8 +98,8 @@ periodToSeconds PeriodDay = 60 * 60 * 24
periodToSeconds PeriodWeek = 60 * 60 * 24 * 7 periodToSeconds PeriodWeek = 60 * 60 * 24 * 7
periodToSeconds PeriodMonth = 60 * 60 * 24 * 30 periodToSeconds PeriodMonth = 60 * 60 * 24 * 30
data QHPRequest = data TickerRequest =
QHPRequest { TickerRequest {
rqTicker :: T.Text, rqTicker :: T.Text,
rqStartTime :: UTCTime, rqStartTime :: UTCTime,
rqEndTime :: UTCTime, rqEndTime :: UTCTime,
@ -103,13 +107,27 @@ data QHPRequest =
rqCompression :: Maybe T.Text rqCompression :: Maybe T.Text
} deriving (Show, Eq) } deriving (Show, Eq)
data QHPRequest =
QHPTickerRequest TickerRequest
| QHPAllTickersRequest
deriving (Show, Eq)
data QHPResponse =
QHPBarsResponse [Bar]
| QHPTickersListResponse [TickerId]
instance FromJSON QHPRequest where instance FromJSON QHPRequest where
parseJSON = withObject "Request" $ \v -> QHPRequest <$> parseJSON = withObject "Request" $ \v ->
if KM.lookup "get_sec_list" v == Just (Bool True)
then
pure QHPAllTickersRequest
else
QHPTickerRequest <$> (TickerRequest <$>
v .: "ticker" <*> v .: "ticker" <*>
(v .: "from" >>= parseTime) <*> (v .: "from" >>= parseTime) <*>
(v .: "to" >>= parseTime) <*> (v .: "to" >>= parseTime) <*>
(v .: "timeframe" >>= parseTf) <*> (v .: "timeframe" >>= parseTf) <*>
v .:? "compression" v .:? "compression")
where where
parseTf :: T.Text -> Aeson.Parser Period parseTf :: T.Text -> Aeson.Parser Period
parseTf t = if parseTf t = if
@ -151,6 +169,7 @@ data Env = Env
, eEndpoint :: T.Text , eEndpoint :: T.Text
, eLogger :: LogAction IO Message , eLogger :: LogAction IO Message
, eTxml :: TXMLConnectorHandle , eTxml :: TXMLConnectorHandle
, eTisHandle :: TickerInfoServerHandle
} }
newtype App a = App { unApp :: ReaderT Env IO a } newtype App a = App { unApp :: ReaderT Env IO a }
@ -165,9 +184,10 @@ startHistoryProviderServer ::
Context -> Context ->
T.Text -> T.Text ->
TXMLConnectorHandle -> TXMLConnectorHandle ->
TickerInfoServerHandle ->
LogAction IO Message -> LogAction IO Message ->
m HistoryProviderServerHandle m HistoryProviderServerHandle
startHistoryProviderServer ctx endpoint txmlH logger = do startHistoryProviderServer ctx endpoint txmlH tisH logger = do
hpsRun <- liftIO . newTVarIO $ True hpsRun <- liftIO . newTVarIO $ True
let env = Env { let env = Env {
eRun = hpsRun eRun = hpsRun
@ -175,6 +195,7 @@ startHistoryProviderServer ctx endpoint txmlH logger = do
, eEndpoint = endpoint , eEndpoint = endpoint
, eLogger = logger , eLogger = logger
, eTxml = txmlH , eTxml = txmlH
, eTisHandle = tisH
} }
hpsThreadId <- liftIO . forkIO $ (runReaderT . unApp) workThread env hpsThreadId <- liftIO . forkIO $ (runReaderT . unApp) workThread env
pure HistoryProviderServerHandle {..} pure HistoryProviderServerHandle {..}
@ -190,13 +211,14 @@ withHistoryProviderServer ::
Context -> Context ->
T.Text -> T.Text ->
TXMLConnectorHandle -> TXMLConnectorHandle ->
TickerInfoServerHandle ->
LogAction IO Message -> LogAction IO Message ->
(forall a. m a -> IO a) -> (forall a. m a -> IO a) ->
(HistoryProviderServerHandle -> m ()) -> (HistoryProviderServerHandle -> m ()) ->
m () m ()
withHistoryProviderServer ctx endpoint txmlH logger runner action = withHistoryProviderServer ctx endpoint txmlH tisH logger runner action =
liftIO $ bracket liftIO $ bracket
(startHistoryProviderServer ctx endpoint txmlH logger) (startHistoryProviderServer ctx endpoint txmlH tisH logger)
stopHistoryProviderServer stopHistoryProviderServer
(runner . action) (runner . action)
@ -227,7 +249,7 @@ workThread = do
liftIO $ readTVarIO runVar liftIO $ readTVarIO runVar
liftIO $ close sock liftIO $ close sock
where where
handleRequest sender request = do handleRequest _ (QHPTickerRequest request) = do
now <- liftIO getCurrentTime now <- liftIO getCurrentTime
let diff = now `diffUTCTime` rqStartTime request let diff = now `diffUTCTime` rqStartTime request
let count = truncate diff `div` periodToSeconds (rqPeriod request) let count = truncate diff `div` periodToSeconds (rqPeriod request)
@ -246,17 +268,24 @@ workThread = do
log Info "HistoryProviderServer.WorkThread" $ "Bars1: " <> (T.pack . show . length) (hrBars hr) log Info "HistoryProviderServer.WorkThread" $ "Bars1: " <> (T.pack . show . length) (hrBars hr)
let bs = L.filter (timestampBetween (rqStartTime request) (rqEndTime request)) $ hrBars hr let bs = L.filter (timestampBetween (rqStartTime request) (rqEndTime request)) $ hrBars hr
log Info "HistoryProviderServer.WorkThread" $ "Bars: " <> (T.pack . show . length) bs log Info "HistoryProviderServer.WorkThread" $ "Bars: " <> (T.pack . show . length) bs
pure bs pure $ QHPBarsResponse bs
_ -> do _ -> do
log Warning "HistoryProviderServer.WorkThread" "Invalid response" log Warning "HistoryProviderServer.WorkThread" "Invalid response"
pure [] pure $ QHPBarsResponse []
handleRequest _ QHPAllTickersRequest = do
log Debug "HistoryProviderServer.WorkThread" "Requesting all tickers list"
tisH <- asks eTisHandle
tickers <- liftIO $ getAllTickers tisH
pure $ QHPTickersListResponse tickers
timestampBetween start end bar = let ts = barTimestamp bar in start <= ts && ts <= end timestampBetween start end bar = let ts = barTimestamp bar in start <= ts && ts <= end
sendResponse sock receiver response = liftIO $ sendMulti sock $ receiver :| encodeResponse response sendResponse sock receiver response = liftIO $ sendMulti sock $ receiver :| encodeResponse response
sendResponseWithDelimiter sock receiver response = liftIO $ sendMulti sock $ receiver :| [""] <> encodeResponse response sendResponseWithDelimiter sock receiver response = liftIO $ sendMulti sock $ receiver :| [""] <> encodeResponse response
encodeResponse response = ["OK"] <> [serializeBars response] encodeResponse (QHPBarsResponse bars) = ["OK"] <> [serializeBars bars]
encodeResponse (QHPTickersListResponse tickers) = ["OK"] <> [serializeTickers tickers]
serializeBars :: [Bar] -> B.ByteString serializeBars :: [Bar] -> B.ByteString
serializeBars bars = BL.toStrict $ runPut $ forM_ bars serializeBar' serializeBars bars = BL.toStrict $ runPut $ forM_ bars serializeBar'
@ -268,3 +297,6 @@ workThread = do
putDoublele (toDouble . barClose $ bar) putDoublele (toDouble . barClose $ bar)
putWord64le (fromInteger . barVolume $ bar) putWord64le (fromInteger . barVolume $ bar)
serializeTickers :: [TickerId] -> B.ByteString
serializeTickers tickers = encodeUtf8 $ T.intercalate "," tickers

7
src/Main.hs

@ -75,8 +75,11 @@ main = do
(NotificationSqnum 1) (NotificationSqnum 1)
[] []
defaultServerSecurityParams defaultServerSecurityParams
logger) (\x -> log Info "main" "Stopping" >> stopBrokerServer x) $ \_ -> do logger) (\x -> do
withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml logger id $ \_ -> do stopBrokerServer x
log Info "main" "Stopping TXMLConnector"
Connector.stop txml) $ \_ -> do
withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml tisH logger id $ \_ -> do
forever $ threadDelay 1000000 forever $ threadDelay 1000000
log Info "main" "Shutting down" log Info "main" "Shutting down"

4
src/TickerInfoServer.hs

@ -8,6 +8,7 @@ module TickerInfoServer
withTickerInfoServer, withTickerInfoServer,
putTickerInfo, putTickerInfo,
getTickerInfo, getTickerInfo,
getAllTickers,
TickerInfo(..) TickerInfo(..)
) where ) where
import ATrade.Logging (Message, import ATrade.Logging (Message,
@ -118,3 +119,6 @@ getTickerInfo tickerId tisH = M.lookup tickerId <$> readTVarIO (tisMap tisH)
putTickerInfo :: TickerInfoServerHandle -> TickerInfo -> IO () putTickerInfo :: TickerInfoServerHandle -> TickerInfo -> IO ()
putTickerInfo tisH tickerInfo = atomically $ modifyTVar' (tisMap tisH) (M.insert (tiTicker tickerInfo) tickerInfo) putTickerInfo tisH tickerInfo = atomically $ modifyTVar' (tisMap tisH) (M.insert (tiTicker tickerInfo) tickerInfo)
getAllTickers :: TickerInfoServerHandle -> IO [TickerId]
getAllTickers tisH = M.keys <$> readTVarIO (tisMap tisH)

Loading…
Cancel
Save