You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
132 lines
5.0 KiB
132 lines
5.0 KiB
{-# LANGUAGE OverloadedStrings #-} |
|
|
|
module ATrade.MDS.HistoryServer ( |
|
HistoryServer, |
|
HistoryServerConfig(..), |
|
startHistoryServer |
|
) where |
|
|
|
import ATrade.MDS.Database |
|
import ATrade.MDS.Protocol |
|
import ATrade.Types |
|
import Control.Concurrent |
|
import Control.Monad |
|
import Control.Monad.IO.Class |
|
import Data.Aeson |
|
import Data.Binary.Get |
|
import Data.Binary.Put |
|
import qualified Data.ByteString as B |
|
import qualified Data.ByteString.Lazy as BL |
|
import Data.Conduit as C |
|
import Data.Conduit.Combinators |
|
import Data.List.NonEmpty |
|
import qualified Data.Text as T |
|
import Data.Time.Clock.POSIX |
|
import qualified Data.Vector as V |
|
import Safe |
|
|
|
import System.Log.Logger |
|
import System.ZMQ4 |
|
|
|
data HistoryServer = HistoryServer ThreadId ThreadId |
|
|
|
data HistoryServerConfig = HistoryServerConfig { |
|
hspQHPEndpoint :: T.Text, |
|
hspHAPEndpoint :: T.Text |
|
} deriving (Show, Eq) |
|
|
|
startHistoryServer :: HistoryServerConfig -> MdsHandle -> Context -> IO HistoryServer |
|
startHistoryServer cfg db ctx = do |
|
qhp <- socket ctx Router |
|
bind qhp $ T.unpack $ hspQHPEndpoint cfg |
|
qhpTid <- forkIO $ serveQHP db qhp |
|
|
|
hap <- socket ctx Router |
|
bind hap $ T.unpack $ hspHAPEndpoint cfg |
|
hapTid <- forkIO $ serveHAP db hap |
|
|
|
return $ HistoryServer qhpTid hapTid |
|
|
|
serveQHP :: (Sender a, Receiver a) => MdsHandle -> Socket a -> IO () |
|
serveQHP db sock = forever $ do |
|
rq <- receiveMulti sock |
|
let maybeCmd = (BL.fromStrict <$> rq `atMay` 2) >>= decode |
|
case (headMay rq, maybeCmd) of |
|
(Just peerId, Just cmd) -> handleCmd peerId cmd |
|
_ -> return () |
|
where |
|
handleCmd :: B.ByteString -> QHPRequest -> IO () |
|
handleCmd peerId cmd = case cmd of |
|
rq -> do |
|
debugM "QHP" $ "Incoming command: " ++ show cmd |
|
let dataC = getDataConduit db (replaceWildcards $ rqTicker rq) (TimeInterval (rqStartTime rq) (rqEndTime rq)) (Timeframe (periodSeconds $ rqPeriod rq)) |
|
runConduit $ dataC .| (conduitVector chunkSize) .| (sendChunks peerId) |
|
--qdata <- getData db (rqTicker rq) (TimeInterval (rqStartTime rq) (rqEndTime rq)) (Timeframe (periodSeconds $ rqPeriod rq)) |
|
--let bytes = serializeBars $ V.concat $ fmap snd qdata |
|
--sendMulti sock $ peerId :| [B.empty, "OK", BL.toStrict bytes] |
|
sendChunks :: (MonadIO m) => B.ByteString -> ConduitT (V.Vector Bar) Void m () |
|
sendChunks peerId = do |
|
liftIO $ send sock [SendMore] peerId |
|
liftIO $ send sock [SendMore] B.empty |
|
liftIO $ send sock [SendMore] "OK" |
|
awaitForever $ \vBars -> liftIO $ do |
|
debugM "QHP" $ "Sending chunk: " ++ show (V.length vBars) ++ " bars" |
|
send sock [SendMore] $ BL.toStrict $ serializeBars vBars |
|
liftIO $ send sock [] B.empty |
|
serializeBars :: V.Vector Bar -> BL.ByteString |
|
serializeBars bars = runPut $ V.forM_ bars serializeBar' |
|
serializeBar' bar = do |
|
putWord64le (truncate . utcTimeToPOSIXSeconds . barTimestamp $ bar) |
|
putDoublele (toDouble . barOpen $ bar) |
|
putDoublele (toDouble . barHigh $ bar) |
|
putDoublele (toDouble . barLow $ bar) |
|
putDoublele (toDouble . barClose $ bar) |
|
putWord64le (fromInteger . barVolume $ bar) |
|
|
|
chunkSize = 4096 |
|
|
|
replaceWildcards = T.map mapWildcard |
|
mapWildcard '?' = '_' |
|
mapWildcard '*' = '%' |
|
mapWildcard x = x |
|
|
|
serveHAP :: (Sender a, Receiver a) => MdsHandle -> Socket a -> IO () |
|
serveHAP db sock = forever $ do |
|
rq <- receiveMulti sock |
|
let maybeCmd = (BL.fromStrict <$> rq `atMay` 2) >>= decode |
|
let mbRawData = rq `atMay` 3 |
|
case (headMay rq, maybeCmd, mbRawData) of |
|
(Just peerId, Just cmd, Just rawData) -> do |
|
let parsedData = deserializeBars (hapTicker cmd) $ BL.fromStrict rawData |
|
handleCmd peerId cmd parsedData |
|
_ -> return () |
|
where |
|
handleCmd :: B.ByteString -> HAPRequest -> [Bar] -> IO () |
|
handleCmd peerId rq bars = do |
|
debugM "HAP" $ "Incoming command: " ++ show rq |
|
putData db (hapTicker rq) (TimeInterval (hapStartTime rq) (hapEndTime rq)) (Timeframe $ hapTimeframeSec rq) (V.fromList bars) |
|
debugM "HAP" $ "Data updated" |
|
sendMulti sock $ peerId :| B.empty : ["OK"] |
|
|
|
deserializeBars tickerId input = |
|
case runGetOrFail parseBar input of |
|
Left _ -> [] |
|
Right (rest, _, bar) -> bar : deserializeBars tickerId rest |
|
where |
|
parseBar = do |
|
rawTimestamp <- realToFrac <$> getWord64le |
|
baropen <- getDoublele |
|
barhigh <- getDoublele |
|
barlow <- getDoublele |
|
barclose <- getDoublele |
|
barvolume <- getWord64le |
|
return Bar |
|
{ |
|
barSecurity = tickerId, |
|
barTimestamp = posixSecondsToUTCTime rawTimestamp, |
|
barOpen = fromDouble baropen, |
|
barHigh = fromDouble barhigh, |
|
barLow = fromDouble barlow, |
|
barClose = fromDouble barclose, |
|
barVolume = toInteger barvolume |
|
}
|
|
|