{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE MultiWayIf #-} {-# LANGUAGE RankNTypes #-} {-# LANGUAGE RecordWildCards #-} module HistoryProviderServer ( startHistoryProviderServer , stopHistoryProviderServer , withHistoryProviderServer ) where import ATrade.Logging (Message, Severity (Debug, Info, Warning), log) import ATrade.Types (Bar (..), BarTimeframe (..), toDouble) import Colog (HasLog (getLogAction, setLogAction), LogAction (LogAction, unLogAction)) import Control.Concurrent (ThreadId, forkIO) import Control.Concurrent.STM (TVar, atomically, newTVarIO, putTMVar, readTVarIO, takeTMVar, writeTVar) import Control.Concurrent.STM.TMVar (TMVar) import Control.Exception (bracket) import Control.Monad (forM_, void) import Control.Monad.Extra (whileM) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Reader (MonadReader, asks) import Control.Monad.Trans.Reader (ReaderT (runReaderT)) import Data.Aeson (FromJSON (..), eitherDecode, withObject, (.:)) import Data.Aeson.Types as Aeson import Data.Attoparsec.Text as Attoparsec import Data.Binary.Put (putDoublele, putWord64le, runPut) import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as BL import qualified Data.List as L import Data.List.NonEmpty (NonEmpty ((:|))) import qualified Data.Text as T import Data.Time (Day, UTCTime (UTCTime), fromGregorianValid) import Data.Time.Clock (diffUTCTime, getCurrentTime, secondsToDiffTime) import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) import Prelude hiding (log) import System.ZMQ4 (Context, Router (Router), bind, close, receive, receiveMulti, sendMulti, socket, withSocket) import TXMLConnector (HistoryRequest (..), HistoryResponse (..), Request (..), Response (..), TXMLConnectorHandle, hrBars, makeRequest) data HistoryProviderServerHandle = HistoryProviderServerHandle { hpsThreadId :: ThreadId , hpsRun :: TVar Bool } data Period = Period1Min | Period5Min | Period15Min | Period30Min | PeriodHour | PeriodDay | PeriodWeek | PeriodMonth deriving (Eq, Show) parsePeriod :: T.Text -> Maybe Period parsePeriod "M1" = Just Period1Min parsePeriod "M5" = Just Period5Min parsePeriod "M15" = Just Period15Min parsePeriod "M30" = Just Period30Min parsePeriod "H1" = Just PeriodHour parsePeriod "D" = Just PeriodDay parsePeriod "W" = Just PeriodWeek parsePeriod "M" = Just PeriodMonth parsePeriod _ = Nothing periodToSeconds :: Period -> Int periodToSeconds Period1Min = 60 periodToSeconds Period5Min = 60 * 5 periodToSeconds Period15Min = 60 * 15 periodToSeconds Period30Min = 60 * 30 periodToSeconds PeriodHour = 60 * 60 periodToSeconds PeriodDay = 60 * 60 * 24 periodToSeconds PeriodWeek = 60 * 60 * 24 * 7 periodToSeconds PeriodMonth = 60 * 60 * 24 * 30 data QHPRequest = QHPRequest { rqTicker :: T.Text, rqStartTime :: UTCTime, rqEndTime :: UTCTime, rqPeriod :: Period, rqCompression :: Maybe T.Text } deriving (Show, Eq) instance FromJSON QHPRequest where parseJSON = withObject "Request" $ \v -> QHPRequest <$> v .: "ticker" <*> (v .: "from" >>= parseTime) <*> (v .: "to" >>= parseTime) <*> (v .: "timeframe" >>= parseTf) <*> v .:? "compression" where parseTf :: T.Text -> Aeson.Parser Period parseTf t = if | t == "M1" -> return Period1Min | t == "M5" -> return Period5Min | t == "M15" -> return Period15Min | t == "M30" -> return Period30Min | t == "H1" -> return PeriodHour | t == "D" -> return PeriodDay | t == "W" -> return PeriodWeek | t == "MN" -> return PeriodMonth | otherwise -> fail "Invalid period specified" parseTime :: T.Text -> Aeson.Parser UTCTime parseTime text = case Attoparsec.parseOnly (timeParse <* Attoparsec.endOfInput) text of Right r -> return r Left e -> fail $ "Can't parse time: " ++ T.unpack text ++ "/" ++ e timeParse :: Attoparsec.Parser UTCTime timeParse = do year <- decimal void $ char '-' month <- decimal void $ char '-' day <- decimal void $ char 'T' hour <- decimal void $ char ':' minute <- decimal void $ char ':' sec <- decimal case fromGregorianValid year month day of Just gregorianDay -> return $ UTCTime gregorianDay (secondsToDiffTime $ hour * 3600 + minute * 60 + sec) _ -> fail "Can't parse date: invalid values" data Env = Env { eRun :: TVar Bool , eContext :: Context , eEndpoint :: T.Text , eLogger :: LogAction IO Message , eTxml :: TXMLConnectorHandle } newtype App a = App { unApp :: ReaderT Env IO a } deriving (Functor, Applicative, Monad, MonadReader Env, MonadIO) instance HasLog Env Message App where getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . eLogger $ env) } setLogAction _ env = env -- fuck it startHistoryProviderServer :: (MonadIO m) => Context -> T.Text -> TXMLConnectorHandle -> LogAction IO Message -> m HistoryProviderServerHandle startHistoryProviderServer ctx endpoint txmlH logger = do hpsRun <- liftIO . newTVarIO $ True let env = Env { eRun = hpsRun , eContext = ctx , eEndpoint = endpoint , eLogger = logger , eTxml = txmlH } hpsThreadId <- liftIO . forkIO $ (runReaderT . unApp) workThread env pure HistoryProviderServerHandle {..} stopHistoryProviderServer :: (MonadIO m) => HistoryProviderServerHandle -> m () stopHistoryProviderServer h = liftIO . atomically $ writeTVar (hpsRun h) False withHistoryProviderServer :: (MonadIO m) => Context -> T.Text -> TXMLConnectorHandle -> LogAction IO Message -> (forall a. m a -> IO a) -> (HistoryProviderServerHandle -> m ()) -> m () withHistoryProviderServer ctx endpoint txmlH logger runner action = liftIO $ bracket (startHistoryProviderServer ctx endpoint txmlH logger) stopHistoryProviderServer (runner . action) workThread :: App () workThread = do runVar <- asks eRun ctx <- asks eContext sock <- liftIO $ socket ctx Router ep <- asks eEndpoint liftIO $ bind sock $ T.unpack ep whileM $ do incomingData <- liftIO . receiveMulti $ sock log Debug "HistoryProviderServer.WorkThread" $ "Incoming data: " <> (T.pack . show) incomingData case incomingData of (sender:_:rawRq:_) -> case eitherDecode $ BL.fromStrict rawRq of Right request -> do response <- handleRequest sender request sendResponseWithDelimiter sock sender response Left err -> log Warning "HistoryProviderServer.WorkThread" $ "Unable to parse request: " <> T.pack err (sender:rawRq:_) -> case eitherDecode $ BL.fromStrict rawRq of Right request -> do response <- handleRequest sender request sendResponse sock sender response Left err -> log Warning "HistoryProviderServer.WorkThread" $ "Unable to parse request: " <> T.pack err _ -> log Warning "HistoryProviderServer.WorkThread" "Unable to parse request" liftIO $ readTVarIO runVar liftIO $ close sock where handleRequest sender request = do now <- liftIO getCurrentTime let diff = now `diffUTCTime` rqStartTime request let count = truncate diff `div` periodToSeconds (rqPeriod request) log Debug "HistoryProviderServer.WorkThread" $ "Requesting bars: " <> (T.pack . show) count txml <- asks eTxml response <- liftIO . makeRequest txml $ RequestHistory HistoryRequest { hrTickerId = rqTicker request , hrTimeframe = BarTimeframe . periodToSeconds . rqPeriod $ request , hrCount = count , hrReset = True } log Info "HistoryProviderServer.WorkThread" "Got response from TXML" case response of ResponseHistory hr -> do log Info "HistoryProviderServer.WorkThread" $ "Bars1: " <> (T.pack . show . length) (hrBars hr) let bs = L.filter (timestampBetween (rqStartTime request) (rqEndTime request)) $ hrBars hr log Info "HistoryProviderServer.WorkThread" $ "Bars: " <> (T.pack . show . length) bs pure bs _ -> do log Warning "HistoryProviderServer.WorkThread" "Invalid response" pure [] timestampBetween start end bar = let ts = barTimestamp bar in start <= ts && ts <= end sendResponse sock receiver response = liftIO $ sendMulti sock $ receiver :| encodeResponse response sendResponseWithDelimiter sock receiver response = liftIO $ sendMulti sock $ receiver :| [""] <> encodeResponse response encodeResponse response = ["OK"] <> [serializeBars response] serializeBars :: [Bar] -> B.ByteString serializeBars bars = BL.toStrict $ runPut $ forM_ bars serializeBar' serializeBar' bar = do putWord64le (truncate . utcTimeToPOSIXSeconds . barTimestamp $ bar) putDoublele (toDouble . barOpen $ bar) putDoublele (toDouble . barHigh $ bar) putDoublele (toDouble . barLow $ bar) putDoublele (toDouble . barClose $ bar) putWord64le (fromInteger . barVolume $ bar)