Browse Source

Compression support

master
Denis Tereshkin 4 years ago
parent
commit
38fba0239a
  1. 1
      mds.cabal
  2. 65
      src/ATrade/MDS/HistoryServer.hs
  3. 6
      src/ATrade/MDS/Protocol.hs
  4. 7
      stack.yaml

1
mds.cabal

@ -37,6 +37,7 @@ library @@ -37,6 +37,7 @@ library
, binary
, binary-ieee754
, conduit
, conduit-extra
default-language: Haskell2010
other-modules: ATrade.MDS.Protocol
default-extensions: OverloadedStrings

65
src/ATrade/MDS/HistoryServer.hs

@ -6,27 +6,40 @@ module ATrade.MDS.HistoryServer ( @@ -6,27 +6,40 @@ module ATrade.MDS.HistoryServer (
startHistoryServer
) where
import ATrade.MDS.Database
import ATrade.MDS.Protocol
import ATrade.Types
import Control.Concurrent
import Control.Monad
import Control.Monad.IO.Class
import Data.Aeson
import Data.Binary.Get
import Data.Binary.Put
import ATrade.MDS.Database (MdsHandle,
TimeInterval (TimeInterval),
Timeframe (Timeframe),
getDataConduit, putData)
import ATrade.MDS.Protocol (HAPRequest (hapEndTime, hapStartTime, hapTicker, hapTimeframeSec),
QHPRequest (rqCompression, rqEndTime, rqPeriod, rqStartTime, rqTicker),
periodSeconds)
import ATrade.Types (Bar (..), fromDouble, toDouble)
import Control.Concurrent (ThreadId, forkIO)
import Control.Monad (forever)
import Control.Monad.IO.Class (MonadIO (..))
import Data.Aeson (decode)
import Data.Binary.Get (getDoublele, getWord64le,
runGetOrFail)
import Data.Binary.Put (putDoublele, putWord64le, runPut)
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import Data.Conduit as C
import Data.Conduit.Combinators
import Data.List.NonEmpty
import Data.Conduit as C (ConduitT, Void, awaitForever,
runConduit, yield, (.|))
import Data.Conduit.Combinators (conduitVector)
import qualified Data.Conduit.Combinators as CC
import qualified Data.Conduit.Zlib as CZ
import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Text as T
import Data.Time.Clock.POSIX
import Data.Time.Clock.POSIX (posixSecondsToUTCTime,
utcTimeToPOSIXSeconds)
import qualified Data.Vector as V
import Safe
import Safe (atMay, headMay)
import System.Log.Logger
import System.ZMQ4
import System.Log.Logger (debugM)
import System.ZMQ4 (Context, Flag (SendMore), Receiver,
Router (Router), Sender, Socket,
bind, receiveMulti, send, sendMulti,
socket)
data HistoryServer = HistoryServer ThreadId ThreadId
@ -59,17 +72,23 @@ serveQHP db sock = forever $ do @@ -59,17 +72,23 @@ serveQHP db sock = forever $ do
handleCmd peerId cmd = case cmd of
rq -> do
debugM "QHP" $ "Incoming command: " ++ show cmd
let compressionConduit = if rqCompression rq == Just "gzip" then CZ.gzip else CC.map id
let dataC = getDataConduit db (replaceWildcards $ rqTicker rq) (TimeInterval (rqStartTime rq) (rqEndTime rq)) (Timeframe (periodSeconds $ rqPeriod rq))
runConduit $ dataC .| (conduitVector chunkSize) .| (sendChunks peerId)
sendChunks :: (MonadIO m) => B.ByteString -> ConduitT (V.Vector Bar) Void m ()
sendChunks peerId = do
runConduit $ dataC .| conduitVector chunkSize .| serializeChunk .| compressionConduit .| sendBSChunks peerId
serializeChunk :: (MonadIO m) => ConduitT (V.Vector Bar) B.ByteString m ()
serializeChunk = awaitForever $ \vBars -> yield $ BL.toStrict $ serializeBars vBars
sendBSChunks :: (MonadIO m) => B.ByteString -> ConduitT B.ByteString Void m ()
sendBSChunks peerId = do
liftIO $ send sock [SendMore] peerId
liftIO $ send sock [SendMore] B.empty
liftIO $ send sock [SendMore] "OK"
awaitForever $ \vBars -> liftIO $ do
debugM "QHP" $ "Sending chunk: " ++ show (V.length vBars) ++ " bars"
send sock [SendMore] $ BL.toStrict $ serializeBars vBars
awaitForever $ \bs -> liftIO $ do
debugM "QHP" $ "Sending chunk: " ++ show (B.length bs) ++ " bytes"
send sock [SendMore] bs
liftIO $ send sock [] B.empty
serializeBars :: V.Vector Bar -> BL.ByteString
serializeBars bars = runPut $ V.forM_ bars serializeBar'
serializeBar' bar = do
@ -80,7 +99,7 @@ serveQHP db sock = forever $ do @@ -80,7 +99,7 @@ serveQHP db sock = forever $ do
putDoublele (toDouble . barClose $ bar)
putWord64le (fromInteger . barVolume $ bar)
chunkSize = 4096
chunkSize = 16384
replaceWildcards = T.map mapWildcard
mapWildcard '?' = '_'

6
src/ATrade/MDS/Protocol.hs

@ -54,7 +54,8 @@ data QHPRequest = @@ -54,7 +54,8 @@ data QHPRequest =
rqTicker :: T.Text,
rqStartTime :: UTCTime,
rqEndTime :: UTCTime,
rqPeriod :: Period
rqPeriod :: Period,
rqCompression :: Maybe T.Text
} deriving (Show, Eq)
instance FromJSON QHPRequest where
@ -62,7 +63,8 @@ instance FromJSON QHPRequest where @@ -62,7 +63,8 @@ instance FromJSON QHPRequest where
v .: "ticker" <*>
(v .: "from" >>= parseTime) <*>
(v .: "to" >>= parseTime) <*>
(v .: "timeframe" >>= parseTf)
(v .: "timeframe" >>= parseTf) <*>
v .:? "compression"
where
parseTf :: T.Text -> Aeson.Parser Period
parseTf t = if

7
stack.yaml

@ -41,7 +41,12 @@ packages: @@ -41,7 +41,12 @@ packages:
- '../zeromq4-haskell-zap'
# Dependency packages to be pulled from upstream that are not in the resolver
# (e.g., acme-missiles-0.3)
extra-deps: ["HDBC-sqlite3-2.3.3.1", "datetime-0.3.1", "th-printf-0.5.1", "text-format-0.3.2"]
extra-deps:
- HDBC-sqlite3-2.3.3.1
- datetime-0.3.1
- text-format-0.3.2
- co-log-0.4.0.1@sha256:3d4c17f37693c80d1aa2c41669bc3438fac3e89dc5f479e57d79bc3ddc4dfcc5,5087
- ansi-terminal-0.10.3@sha256:e2fbcef5f980dc234c7ad8e2fa433b0e8109132c9e643bc40ea5608cd5697797,3226
# Override default flag values for local packages and extra-deps
flags: {}

Loading…
Cancel
Save