diff --git a/mds.cabal b/mds.cabal index 0b2772e..1e4224f 100644 --- a/mds.cabal +++ b/mds.cabal @@ -37,6 +37,7 @@ library , binary , binary-ieee754 , conduit + , conduit-extra default-language: Haskell2010 other-modules: ATrade.MDS.Protocol default-extensions: OverloadedStrings diff --git a/src/ATrade/MDS/HistoryServer.hs b/src/ATrade/MDS/HistoryServer.hs index bb5ae5b..66bc3bc 100644 --- a/src/ATrade/MDS/HistoryServer.hs +++ b/src/ATrade/MDS/HistoryServer.hs @@ -6,27 +6,40 @@ module ATrade.MDS.HistoryServer ( startHistoryServer ) where -import ATrade.MDS.Database -import ATrade.MDS.Protocol -import ATrade.Types -import Control.Concurrent -import Control.Monad -import Control.Monad.IO.Class -import Data.Aeson -import Data.Binary.Get -import Data.Binary.Put +import ATrade.MDS.Database (MdsHandle, + TimeInterval (TimeInterval), + Timeframe (Timeframe), + getDataConduit, putData) +import ATrade.MDS.Protocol (HAPRequest (hapEndTime, hapStartTime, hapTicker, hapTimeframeSec), + QHPRequest (rqCompression, rqEndTime, rqPeriod, rqStartTime, rqTicker), + periodSeconds) +import ATrade.Types (Bar (..), fromDouble, toDouble) +import Control.Concurrent (ThreadId, forkIO) +import Control.Monad (forever) +import Control.Monad.IO.Class (MonadIO (..)) +import Data.Aeson (decode) +import Data.Binary.Get (getDoublele, getWord64le, + runGetOrFail) +import Data.Binary.Put (putDoublele, putWord64le, runPut) import qualified Data.ByteString as B import qualified Data.ByteString.Lazy as BL -import Data.Conduit as C -import Data.Conduit.Combinators -import Data.List.NonEmpty +import Data.Conduit as C (ConduitT, Void, awaitForever, + runConduit, yield, (.|)) +import Data.Conduit.Combinators (conduitVector) +import qualified Data.Conduit.Combinators as CC +import qualified Data.Conduit.Zlib as CZ +import Data.List.NonEmpty (NonEmpty ((:|))) import qualified Data.Text as T -import Data.Time.Clock.POSIX +import Data.Time.Clock.POSIX (posixSecondsToUTCTime, + utcTimeToPOSIXSeconds) import qualified Data.Vector as V -import Safe +import Safe (atMay, headMay) -import System.Log.Logger -import System.ZMQ4 +import System.Log.Logger (debugM) +import System.ZMQ4 (Context, Flag (SendMore), Receiver, + Router (Router), Sender, Socket, + bind, receiveMulti, send, sendMulti, + socket) data HistoryServer = HistoryServer ThreadId ThreadId @@ -59,17 +72,23 @@ serveQHP db sock = forever $ do handleCmd peerId cmd = case cmd of rq -> do debugM "QHP" $ "Incoming command: " ++ show cmd + let compressionConduit = if rqCompression rq == Just "gzip" then CZ.gzip else CC.map id let dataC = getDataConduit db (replaceWildcards $ rqTicker rq) (TimeInterval (rqStartTime rq) (rqEndTime rq)) (Timeframe (periodSeconds $ rqPeriod rq)) - runConduit $ dataC .| (conduitVector chunkSize) .| (sendChunks peerId) - sendChunks :: (MonadIO m) => B.ByteString -> ConduitT (V.Vector Bar) Void m () - sendChunks peerId = do + runConduit $ dataC .| conduitVector chunkSize .| serializeChunk .| compressionConduit .| sendBSChunks peerId + + serializeChunk :: (MonadIO m) => ConduitT (V.Vector Bar) B.ByteString m () + serializeChunk = awaitForever $ \vBars -> yield $ BL.toStrict $ serializeBars vBars + + sendBSChunks :: (MonadIO m) => B.ByteString -> ConduitT B.ByteString Void m () + sendBSChunks peerId = do liftIO $ send sock [SendMore] peerId liftIO $ send sock [SendMore] B.empty liftIO $ send sock [SendMore] "OK" - awaitForever $ \vBars -> liftIO $ do - debugM "QHP" $ "Sending chunk: " ++ show (V.length vBars) ++ " bars" - send sock [SendMore] $ BL.toStrict $ serializeBars vBars + awaitForever $ \bs -> liftIO $ do + debugM "QHP" $ "Sending chunk: " ++ show (B.length bs) ++ " bytes" + send sock [SendMore] bs liftIO $ send sock [] B.empty + serializeBars :: V.Vector Bar -> BL.ByteString serializeBars bars = runPut $ V.forM_ bars serializeBar' serializeBar' bar = do @@ -80,7 +99,7 @@ serveQHP db sock = forever $ do putDoublele (toDouble . barClose $ bar) putWord64le (fromInteger . barVolume $ bar) - chunkSize = 4096 + chunkSize = 16384 replaceWildcards = T.map mapWildcard mapWildcard '?' = '_' diff --git a/src/ATrade/MDS/Protocol.hs b/src/ATrade/MDS/Protocol.hs index 455aba8..8ffa8cd 100644 --- a/src/ATrade/MDS/Protocol.hs +++ b/src/ATrade/MDS/Protocol.hs @@ -54,7 +54,8 @@ data QHPRequest = rqTicker :: T.Text, rqStartTime :: UTCTime, rqEndTime :: UTCTime, - rqPeriod :: Period + rqPeriod :: Period, + rqCompression :: Maybe T.Text } deriving (Show, Eq) instance FromJSON QHPRequest where @@ -62,7 +63,8 @@ instance FromJSON QHPRequest where v .: "ticker" <*> (v .: "from" >>= parseTime) <*> (v .: "to" >>= parseTime) <*> - (v .: "timeframe" >>= parseTf) + (v .: "timeframe" >>= parseTf) <*> + v .:? "compression" where parseTf :: T.Text -> Aeson.Parser Period parseTf t = if diff --git a/stack.yaml b/stack.yaml index 9c8eb3d..a01ca12 100644 --- a/stack.yaml +++ b/stack.yaml @@ -41,7 +41,12 @@ packages: - '../zeromq4-haskell-zap' # Dependency packages to be pulled from upstream that are not in the resolver # (e.g., acme-missiles-0.3) -extra-deps: ["HDBC-sqlite3-2.3.3.1", "datetime-0.3.1", "th-printf-0.5.1", "text-format-0.3.2"] +extra-deps: +- HDBC-sqlite3-2.3.3.1 +- datetime-0.3.1 +- text-format-0.3.2 +- co-log-0.4.0.1@sha256:3d4c17f37693c80d1aa2c41669bc3438fac3e89dc5f479e57d79bc3ddc4dfcc5,5087 +- ansi-terminal-0.10.3@sha256:e2fbcef5f980dc234c7ad8e2fa433b0e8109132c9e643bc40ea5608cd5697797,3226 # Override default flag values for local packages and extra-deps flags: {}