{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE MultiWayIf #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} module HistoryProviderServer ( startHistoryProviderServer , stopHistoryProviderServer , withHistoryProviderServer ) where import ATrade.Logging (Message, Severity (Debug, Info, Warning), log) import ATrade.Types (Bar (..), BarTimeframe (..), TickerId, toDouble) import Colog (HasLog (getLogAction, setLogAction), LogAction (LogAction, unLogAction)) import Control.Concurrent (ThreadId, forkIO) import Control.Concurrent.STM (TVar, atomically, newTVarIO, putTMVar, readTVarIO, takeTMVar, writeTVar) import Control.Concurrent.STM.TMVar (TMVar) import Control.Exception (bracket) import Control.Monad (forM_, void) import Control.Monad.Extra (whileM) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Reader (MonadReader, asks) import Control.Monad.Trans.Reader (ReaderT (runReaderT)) import Data.Aeson (FromJSON (..), eitherDecode, withObject, (.:)) import qualified Data.Aeson.KeyMap as KM import Data.Aeson.Types as Aeson import Data.Attoparsec.Text as Attoparsec import Data.Binary.Put (putDoublele, putWord64le, runPut) import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as BL import qualified Data.List as L import Data.List.NonEmpty (NonEmpty ((:|))) import qualified Data.Text as T import Data.Text.Encoding (encodeUtf8) import Data.Time (Day, UTCTime (UTCTime), fromGregorianValid) import Data.Time.Clock (diffUTCTime, getCurrentTime, secondsToDiffTime) import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) import Prelude hiding (log) import System.ZMQ4 (Context, Router (Router), bind, close, receive, receiveMulti, sendMulti, socket, withSocket) import TickerInfoServer (TickerInfoServerHandle, getAllTickers) import TXMLConnector (HistoryRequest (..), HistoryResponse (..), Request (..), Response (..), TXMLConnectorHandle, hrBars, makeRequest) data HistoryProviderServerHandle = HistoryProviderServerHandle { hpsThreadId :: ThreadId , hpsRun :: TVar Bool } data Period = Period1Min | Period5Min | Period15Min | Period30Min | PeriodHour | PeriodDay | PeriodWeek | PeriodMonth deriving (Eq, Show) parsePeriod :: T.Text -> Maybe Period parsePeriod "M1" = Just Period1Min parsePeriod "M5" = Just Period5Min parsePeriod "M15" = Just Period15Min parsePeriod "M30" = Just Period30Min parsePeriod "H1" = Just PeriodHour parsePeriod "D" = Just PeriodDay parsePeriod "W" = Just PeriodWeek parsePeriod "M" = Just PeriodMonth parsePeriod _ = Nothing periodToSeconds :: Period -> Int periodToSeconds Period1Min = 60 periodToSeconds Period5Min = 60 * 5 periodToSeconds Period15Min = 60 * 15 periodToSeconds Period30Min = 60 * 30 periodToSeconds PeriodHour = 60 * 60 periodToSeconds PeriodDay = 60 * 60 * 24 periodToSeconds PeriodWeek = 60 * 60 * 24 * 7 periodToSeconds PeriodMonth = 60 * 60 * 24 * 30 data TickerRequest = TickerRequest { rqTicker :: !T.Text, rqStartTime :: !UTCTime, rqEndTime :: !UTCTime, rqPeriod :: !Period, rqCompression :: !(Maybe T.Text) } deriving (Show, Eq) data QHPRequest = QHPTickerRequest TickerRequest | QHPAllTickersRequest deriving (Show, Eq) data QHPResponse = QHPBarsResponse ![Bar] | QHPTickersListResponse ![TickerId] instance FromJSON QHPRequest where parseJSON = withObject "Request" $ \v -> if KM.lookup "get_sec_list" v == Just (Bool True) then pure QHPAllTickersRequest else QHPTickerRequest <$> (TickerRequest <$> v .: "ticker" <*> (v .: "from" >>= parseTime) <*> (v .: "to" >>= parseTime) <*> (v .: "timeframe" >>= parseTf) <*> v .:? "compression") where parseTf :: T.Text -> Aeson.Parser Period parseTf t = if | t == "M1" -> return Period1Min | t == "M5" -> return Period5Min | t == "M15" -> return Period15Min | t == "M30" -> return Period30Min | t == "H1" -> return PeriodHour | t == "D" -> return PeriodDay | t == "W" -> return PeriodWeek | t == "MN" -> return PeriodMonth | otherwise -> fail "Invalid period specified" parseTime :: T.Text -> Aeson.Parser UTCTime parseTime text = case Attoparsec.parseOnly (timeParse <* Attoparsec.endOfInput) text of Right r -> return r Left e -> fail $ "Can't parse time: " ++ T.unpack text ++ "/" ++ e timeParse :: Attoparsec.Parser UTCTime timeParse = do year <- decimal void $ char '-' month <- decimal void $ char '-' day <- decimal void $ char 'T' hour <- decimal void $ char ':' minute <- decimal void $ char ':' sec <- decimal case fromGregorianValid year month day of Just gregorianDay -> return $ UTCTime gregorianDay (secondsToDiffTime $ hour * 3600 + minute * 60 + sec) _ -> fail "Can't parse date: invalid values" data Env = Env { eRun :: !(TVar Bool) , eContext :: !Context , eEndpoint :: !T.Text , eLogger :: !(LogAction IO Message) , eTxml :: !TXMLConnectorHandle , eTisHandle :: !TickerInfoServerHandle } newtype App a = App { unApp :: ReaderT Env IO a } deriving (Functor, Applicative, Monad, MonadReader Env, MonadIO) instance HasLog Env Message App where getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . eLogger $ env) } setLogAction _ env = env -- fuck it startHistoryProviderServer :: (MonadIO m) => Context -> T.Text -> TXMLConnectorHandle -> TickerInfoServerHandle -> LogAction IO Message -> m HistoryProviderServerHandle startHistoryProviderServer ctx endpoint txmlH tisH logger = do hpsRun <- liftIO . newTVarIO $ True let env = Env { eRun = hpsRun , eContext = ctx , eEndpoint = endpoint , eLogger = logger , eTxml = txmlH , eTisHandle = tisH } hpsThreadId <- liftIO . forkIO $ (runReaderT . unApp) workThread env pure HistoryProviderServerHandle {..} stopHistoryProviderServer :: (MonadIO m) => HistoryProviderServerHandle -> m () stopHistoryProviderServer h = liftIO . atomically $ writeTVar (hpsRun h) False withHistoryProviderServer :: (MonadIO m) => Context -> T.Text -> TXMLConnectorHandle -> TickerInfoServerHandle -> LogAction IO Message -> (forall a. m a -> IO a) -> (HistoryProviderServerHandle -> m ()) -> m () withHistoryProviderServer ctx endpoint txmlH tisH logger runner action = liftIO $ bracket (startHistoryProviderServer ctx endpoint txmlH tisH logger) stopHistoryProviderServer (runner . action) workThread :: App () workThread = do runVar <- asks eRun ctx <- asks eContext sock <- liftIO $ socket ctx Router ep <- asks eEndpoint liftIO $ bind sock $ T.unpack ep whileM $ do incomingData <- liftIO . receiveMulti $ sock log Debug "HistoryProviderServer.WorkThread" $ "Incoming data: " <> (T.pack . show) incomingData case incomingData of (sender:_:rawRq:_) -> case eitherDecode $ BL.fromStrict rawRq of Right request -> do response <- handleRequest sender request sendResponseWithDelimiter sock sender response Left err -> log Warning "HistoryProviderServer.WorkThread" $ "Unable to parse request" (sender:rawRq:_) -> case eitherDecode $ BL.fromStrict rawRq of Right request -> do response <- handleRequest sender request sendResponse sock sender response Left err -> log Warning "HistoryProviderServer.WorkThread" $ "Unable to parse request" _ -> log Warning "HistoryProviderServer.WorkThread" "Unable to parse request" liftIO $ readTVarIO runVar liftIO $ close sock where handleRequest _ (QHPTickerRequest request) = do now <- liftIO getCurrentTime let diff = now `diffUTCTime` rqStartTime request let count = truncate diff `div` periodToSeconds (rqPeriod request) log Debug "HistoryProviderServer.WorkThread" $ "Requesting bars: " <> (T.pack . show) count txml <- asks eTxml response <- liftIO . makeRequest txml $ RequestHistory HistoryRequest { hrTickerId = rqTicker request , hrTimeframe = BarTimeframe . periodToSeconds . rqPeriod $ request , hrCount = count , hrReset = True } log Info "HistoryProviderServer.WorkThread" "Got response from TXML" case response of ResponseHistory hr -> do log Info "HistoryProviderServer.WorkThread" $ "Bars1: " <> (T.pack . show . length) (hrBars hr) let bs = L.filter (timestampBetween (rqStartTime request) (rqEndTime request)) $ hrBars hr log Info "HistoryProviderServer.WorkThread" $ "Bars: " <> (T.pack . show . length) bs pure $ QHPBarsResponse bs _ -> do log Warning "HistoryProviderServer.WorkThread" "Invalid response" pure $ QHPBarsResponse [] handleRequest _ QHPAllTickersRequest = do log Debug "HistoryProviderServer.WorkThread" "Requesting all tickers list" tisH <- asks eTisHandle tickers <- liftIO $ getAllTickers tisH pure $ QHPTickersListResponse tickers timestampBetween start end bar = let ts = barTimestamp bar in start <= ts && ts <= end sendResponse sock receiver response = liftIO $ sendMulti sock $ receiver :| encodeResponse response sendResponseWithDelimiter sock receiver response = liftIO $ sendMulti sock $ receiver :| [""] <> encodeResponse response encodeResponse (QHPBarsResponse bars) = ["OK"] <> [serializeBars bars] encodeResponse (QHPTickersListResponse tickers) = ["OK"] <> [serializeTickers tickers] serializeBars :: [Bar] -> B.ByteString serializeBars bars = BL.toStrict $ runPut $ forM_ bars serializeBar' serializeBar' bar = do putWord64le (truncate . utcTimeToPOSIXSeconds . barTimestamp $ bar) putDoublele (toDouble . barOpen $ bar) putDoublele (toDouble . barHigh $ bar) putDoublele (toDouble . barLow $ bar) putDoublele (toDouble . barClose $ bar) putWord64le (fromInteger . barVolume $ bar) serializeTickers :: [TickerId] -> B.ByteString serializeTickers tickers = encodeUtf8 $ T.intercalate "," tickers