7 changed files with 379 additions and 37 deletions
@ -0,0 +1,269 @@
@@ -0,0 +1,269 @@
|
||||
{-# LANGUAGE FlexibleContexts #-} |
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
||||
{-# LANGUAGE MultiParamTypeClasses #-} |
||||
{-# LANGUAGE MultiWayIf #-} |
||||
{-# LANGUAGE RankNTypes #-} |
||||
{-# LANGUAGE RecordWildCards #-} |
||||
|
||||
module HistoryProviderServer |
||||
( |
||||
startHistoryProviderServer |
||||
, stopHistoryProviderServer |
||||
, withHistoryProviderServer |
||||
) where |
||||
|
||||
import ATrade.Logging (Message, |
||||
Severity (Debug, Info, Warning), |
||||
log) |
||||
import ATrade.Types (Bar (..), BarTimeframe (..), |
||||
toDouble) |
||||
import Colog (HasLog (getLogAction, setLogAction), |
||||
LogAction (LogAction, unLogAction)) |
||||
import Control.Concurrent (ThreadId, forkIO) |
||||
import Control.Concurrent.STM (TVar, atomically, newTVarIO, |
||||
putTMVar, readTVarIO, takeTMVar, |
||||
writeTVar) |
||||
import Control.Concurrent.STM.TMVar (TMVar) |
||||
import Control.Exception (bracket) |
||||
import Control.Monad (forM_, void) |
||||
import Control.Monad.Extra (whileM) |
||||
import Control.Monad.IO.Class (MonadIO (liftIO)) |
||||
import Control.Monad.Reader (MonadReader, asks) |
||||
import Control.Monad.Trans.Reader (ReaderT (runReaderT)) |
||||
import Data.Aeson (FromJSON (..), eitherDecode, |
||||
withObject, (.:)) |
||||
import Data.Aeson.Types as Aeson |
||||
import Data.Attoparsec.Text as Attoparsec |
||||
import Data.Binary.Put (putDoublele, putWord64le, runPut) |
||||
import qualified Data.ByteString as B |
||||
import qualified Data.ByteString.Lazy as BL |
||||
import qualified Data.List as L |
||||
import Data.List.NonEmpty (NonEmpty ((:|))) |
||||
import qualified Data.Text as T |
||||
import Data.Time (Day, UTCTime (UTCTime), |
||||
fromGregorianValid) |
||||
import Data.Time.Clock (diffUTCTime, getCurrentTime, |
||||
secondsToDiffTime) |
||||
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) |
||||
import Prelude hiding (log) |
||||
import System.ZMQ4 (Context, Router (Router), bind, |
||||
close, receive, receiveMulti, |
||||
sendMulti, socket, withSocket) |
||||
import TXMLConnector (HistoryRequest (..), |
||||
HistoryResponse (..), |
||||
Request (..), Response (..), |
||||
TXMLConnectorHandle, hrBars, |
||||
makeRequest) |
||||
|
||||
data HistoryProviderServerHandle = |
||||
HistoryProviderServerHandle |
||||
{ |
||||
hpsThreadId :: ThreadId |
||||
, hpsRun :: TVar Bool |
||||
} |
||||
|
||||
data Period = |
||||
Period1Min | |
||||
Period5Min | |
||||
Period15Min | |
||||
Period30Min | |
||||
PeriodHour | |
||||
PeriodDay | |
||||
PeriodWeek | |
||||
PeriodMonth |
||||
deriving (Eq, Show) |
||||
|
||||
parsePeriod :: T.Text -> Maybe Period |
||||
parsePeriod "M1" = Just Period1Min |
||||
parsePeriod "M5" = Just Period5Min |
||||
parsePeriod "M15" = Just Period15Min |
||||
parsePeriod "M30" = Just Period30Min |
||||
parsePeriod "H1" = Just PeriodHour |
||||
parsePeriod "D" = Just PeriodDay |
||||
parsePeriod "W" = Just PeriodWeek |
||||
parsePeriod "M" = Just PeriodMonth |
||||
parsePeriod _ = Nothing |
||||
|
||||
periodToSeconds :: Period -> Int |
||||
periodToSeconds Period1Min = 60 |
||||
periodToSeconds Period5Min = 60 * 5 |
||||
periodToSeconds Period15Min = 60 * 15 |
||||
periodToSeconds Period30Min = 60 * 30 |
||||
periodToSeconds PeriodHour = 60 * 60 |
||||
periodToSeconds PeriodDay = 60 * 60 * 24 |
||||
periodToSeconds PeriodWeek = 60 * 60 * 24 * 7 |
||||
periodToSeconds PeriodMonth = 60 * 60 * 24 * 30 |
||||
|
||||
data QHPRequest = |
||||
QHPRequest { |
||||
rqTicker :: T.Text, |
||||
rqStartTime :: UTCTime, |
||||
rqEndTime :: UTCTime, |
||||
rqPeriod :: Period, |
||||
rqCompression :: Maybe T.Text |
||||
} deriving (Show, Eq) |
||||
|
||||
instance FromJSON QHPRequest where |
||||
parseJSON = withObject "Request" $ \v -> QHPRequest <$> |
||||
v .: "ticker" <*> |
||||
(v .: "from" >>= parseTime) <*> |
||||
(v .: "to" >>= parseTime) <*> |
||||
(v .: "timeframe" >>= parseTf) <*> |
||||
v .:? "compression" |
||||
where |
||||
parseTf :: T.Text -> Aeson.Parser Period |
||||
parseTf t = if |
||||
| t == "M1" -> return Period1Min |
||||
| t == "M5" -> return Period5Min |
||||
| t == "M15" -> return Period15Min |
||||
| t == "M30" -> return Period30Min |
||||
| t == "H1" -> return PeriodHour |
||||
| t == "D" -> return PeriodDay |
||||
| t == "W" -> return PeriodWeek |
||||
| t == "MN" -> return PeriodMonth |
||||
| otherwise -> fail "Invalid period specified" |
||||
|
||||
parseTime :: T.Text -> Aeson.Parser UTCTime |
||||
parseTime text = case Attoparsec.parseOnly (timeParse <* Attoparsec.endOfInput) text of |
||||
Right r -> return r |
||||
Left e -> fail $ "Can't parse time: " ++ T.unpack text ++ "/" ++ e |
||||
timeParse :: Attoparsec.Parser UTCTime |
||||
timeParse = do |
||||
year <- decimal |
||||
void $ char '-' |
||||
month <- decimal |
||||
void $ char '-' |
||||
day <- decimal |
||||
void $ char 'T' |
||||
hour <- decimal |
||||
void $ char ':' |
||||
minute <- decimal |
||||
void $ char ':' |
||||
sec <- decimal |
||||
case fromGregorianValid year month day of |
||||
Just gregorianDay -> return $ UTCTime gregorianDay (secondsToDiffTime $ hour * 3600 + minute * 60 + sec) |
||||
_ -> fail "Can't parse date: invalid values" |
||||
|
||||
data Env = Env |
||||
{ |
||||
eRun :: TVar Bool |
||||
, eContext :: Context |
||||
, eEndpoint :: T.Text |
||||
, eLogger :: LogAction IO Message |
||||
, eTxml :: TXMLConnectorHandle |
||||
} |
||||
|
||||
newtype App a = App { unApp :: ReaderT Env IO a } |
||||
deriving (Functor, Applicative, Monad, MonadReader Env, MonadIO) |
||||
|
||||
instance HasLog Env Message App where |
||||
getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . eLogger $ env) } |
||||
setLogAction _ env = env -- fuck it |
||||
|
||||
startHistoryProviderServer :: |
||||
(MonadIO m) => |
||||
Context -> |
||||
T.Text -> |
||||
TXMLConnectorHandle -> |
||||
LogAction IO Message -> |
||||
m HistoryProviderServerHandle |
||||
startHistoryProviderServer ctx endpoint txmlH logger = do |
||||
hpsRun <- liftIO . newTVarIO $ True |
||||
let env = Env { |
||||
eRun = hpsRun |
||||
, eContext = ctx |
||||
, eEndpoint = endpoint |
||||
, eLogger = logger |
||||
, eTxml = txmlH |
||||
} |
||||
hpsThreadId <- liftIO . forkIO $ (runReaderT . unApp) workThread env |
||||
pure HistoryProviderServerHandle {..} |
||||
|
||||
stopHistoryProviderServer :: |
||||
(MonadIO m) => |
||||
HistoryProviderServerHandle -> |
||||
m () |
||||
stopHistoryProviderServer h = liftIO . atomically $ writeTVar (hpsRun h) False |
||||
|
||||
withHistoryProviderServer :: |
||||
(MonadIO m) => |
||||
Context -> |
||||
T.Text -> |
||||
TXMLConnectorHandle -> |
||||
LogAction IO Message -> |
||||
(forall a. m a -> IO a) -> |
||||
(HistoryProviderServerHandle -> m ()) -> |
||||
m () |
||||
withHistoryProviderServer ctx endpoint txmlH logger runner action = |
||||
liftIO $ bracket |
||||
(startHistoryProviderServer ctx endpoint txmlH logger) |
||||
stopHistoryProviderServer |
||||
(runner . action) |
||||
|
||||
workThread :: App () |
||||
workThread = do |
||||
runVar <- asks eRun |
||||
ctx <- asks eContext |
||||
sock <- liftIO $ socket ctx Router |
||||
ep <- asks eEndpoint |
||||
liftIO $ bind sock $ T.unpack ep |
||||
whileM $ do |
||||
incomingData <- liftIO . receiveMulti $ sock |
||||
case incomingData of |
||||
(sender:_:rawRq:_) -> |
||||
case eitherDecode $ BL.fromStrict rawRq of |
||||
Right request -> do |
||||
response <- handleRequest sender request |
||||
sendResponseWithDelimiter sock sender response |
||||
Left err -> log Warning "HistoryProviderServer.WorkThread" $ "Unable to parse request: " <> T.pack err |
||||
(sender:rawRq:_) -> |
||||
case eitherDecode $ BL.fromStrict rawRq of |
||||
Right request -> do |
||||
response <- handleRequest sender request |
||||
sendResponse sock sender response |
||||
Left err -> log Warning "HistoryProviderServer.WorkThread" $ "Unable to parse request: " <> T.pack err |
||||
_ -> log Warning "HistoryProviderServer.WorkThread" "Unable to parse request" |
||||
liftIO $ readTVarIO runVar |
||||
liftIO $ close sock |
||||
where |
||||
handleRequest sender request = do |
||||
now <- liftIO getCurrentTime |
||||
let diff = now `diffUTCTime` rqStartTime request |
||||
let count = truncate diff `div` periodToSeconds (rqPeriod request) |
||||
log Debug "HistoryProviderServer.WorkThread" $ "Requesting bars: " <> (T.pack . show) count |
||||
txml <- asks eTxml |
||||
response <- liftIO . makeRequest txml $ Request HistoryRequest |
||||
{ |
||||
hrTickerId = rqTicker request |
||||
, hrTimeframe = BarTimeframe . periodToSeconds . rqPeriod $ request |
||||
, hrCount = count |
||||
, hrReset = True |
||||
} |
||||
log Info "HistoryProviderServer.WorkThread" "Got response from TXML" |
||||
case response of |
||||
ResponseHistory hr -> do |
||||
log Info "HistoryProviderServer.WorkThread" $ "Bars1: " <> (T.pack . show . length) (hrBars hr) |
||||
let bs = L.filter (timestampBetween (rqStartTime request) (rqEndTime request)) $ hrBars hr |
||||
log Info "HistoryProviderServer.WorkThread" $ "Bars: " <> (T.pack . show . length) bs |
||||
pure bs |
||||
_ -> do |
||||
log Warning "HistoryProviderServer.WorkThread" "Invalid response" |
||||
pure [] |
||||
|
||||
timestampBetween start end bar = let ts = barTimestamp bar in start <= ts && ts <= end |
||||
|
||||
sendResponse sock receiver response = liftIO $ sendMulti sock $ receiver :| encodeResponse response |
||||
sendResponseWithDelimiter sock receiver response = liftIO $ sendMulti sock $ receiver :| [""] <> encodeResponse response |
||||
|
||||
encodeResponse response = ["OK"] <> [serializeBars response] |
||||
|
||||
serializeBars :: [Bar] -> B.ByteString |
||||
serializeBars bars = BL.toStrict $ runPut $ forM_ bars serializeBar' |
||||
serializeBar' bar = do |
||||
putWord64le (truncate . utcTimeToPOSIXSeconds . barTimestamp $ bar) |
||||
putDoublele (toDouble . barOpen $ bar) |
||||
putDoublele (toDouble . barHigh $ bar) |
||||
putDoublele (toDouble . barLow $ bar) |
||||
putDoublele (toDouble . barClose $ bar) |
||||
putWord64le (fromInteger . barVolume $ bar) |
||||
|
||||
Loading…
Reference in new issue