From a19bad11080b3edc4fd3657f2b625c51f9a5bde3 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Wed, 22 Apr 2020 18:20:56 +0700 Subject: [PATCH] QHP: send multipart data --- mds.cabal | 1 + src/ATrade/MDS/Database.hs | 27 +++++++++++++++++++++++++-- src/ATrade/MDS/HistoryServer.hs | 30 +++++++++++++++++++++++------- 3 files changed, 49 insertions(+), 9 deletions(-) diff --git a/mds.cabal b/mds.cabal index f2807f1..0b2772e 100644 --- a/mds.cabal +++ b/mds.cabal @@ -36,6 +36,7 @@ library , attoparsec , binary , binary-ieee754 + , conduit default-language: Haskell2010 other-modules: ATrade.MDS.Protocol default-extensions: OverloadedStrings diff --git a/src/ATrade/MDS/Database.hs b/src/ATrade/MDS/Database.hs index 33972f1..d7a31b4 100644 --- a/src/ATrade/MDS/Database.hs +++ b/src/ATrade/MDS/Database.hs @@ -6,6 +6,7 @@ module ATrade.MDS.Database ( initDatabase, closeDatabase, getData, + getDataConduit, putData, TimeInterval(..), Timeframe(..), @@ -16,11 +17,14 @@ module ATrade.MDS.Database ( import ATrade.Types import Control.Monad +import Control.Monad.IO.Class +import Control.Monad.Loops +import Data.Conduit import Data.Maybe -import qualified Data.Text as T +import qualified Data.Text as T import Data.Time.Clock import Data.Time.Clock.POSIX -import qualified Data.Vector as V +import qualified Data.Vector as V import Database.HDBC import Database.HDBC.Sqlite3 import System.Log.Logger @@ -78,6 +82,25 @@ getData db tickerId interval@(TimeInterval start end) (Timeframe tfSec) = do } barFromResult _ _ = Nothing +getDataConduit :: (MonadIO m) => MdsHandle -> TickerId -> TimeInterval -> Timeframe -> ConduitT () Bar m () +getDataConduit db tickerId (TimeInterval start end) (Timeframe tfSec) = do + stmt <- liftIO $ prepare db "SELECT timestamp, timeframe, open, high, low, close, volume FROM bars WHERE ticker == ? AND timeframe == ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp ASC;" + _ <- liftIO $ execute stmt [(toSql. T.unpack) tickerId, toSql tfSec, (toSql . utcTimeToPOSIXSeconds) start, (toSql . utcTimeToPOSIXSeconds) end] + whileJust_ (liftIO $ fetchRow stmt) $ \row -> case barFromResult tickerId row of + Just bar -> yield bar + Nothing -> return () + where + barFromResult ticker [ts, _, open, high, low, close, vol] = Just Bar { + barSecurity = ticker, + barTimestamp = fromSql ts, + barOpen = fromDouble $ fromSql open, + barHigh = fromDouble $ fromSql high, + barLow = fromDouble $ fromSql low, + barClose = fromDouble $ fromSql close, + barVolume = fromSql vol + } + barFromResult _ _ = Nothing + putData :: MdsHandle -> TickerId -> TimeInterval -> Timeframe -> V.Vector Bar -> IO () putData db tickerId (TimeInterval start end) tf@(Timeframe tfSec) bars = do withTransaction db $ \db' -> do diff --git a/src/ATrade/MDS/HistoryServer.hs b/src/ATrade/MDS/HistoryServer.hs index 66c1c90..f60fbfc 100644 --- a/src/ATrade/MDS/HistoryServer.hs +++ b/src/ATrade/MDS/HistoryServer.hs @@ -11,15 +11,18 @@ import ATrade.MDS.Protocol import ATrade.Types import Control.Concurrent import Control.Monad +import Control.Monad.IO.Class import Data.Aeson import Data.Binary.Get import Data.Binary.Put -import qualified Data.ByteString as B -import qualified Data.ByteString.Lazy as BL +import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL +import Data.Conduit as C +import Data.Conduit.Combinators import Data.List.NonEmpty -import qualified Data.Text as T +import qualified Data.Text as T import Data.Time.Clock.POSIX -import qualified Data.Vector as V +import qualified Data.Vector as V import Safe import System.Log.Logger @@ -56,9 +59,20 @@ serveQHP db sock = forever $ do handleCmd peerId cmd = case cmd of rq -> do debugM "QHP" $ "Incoming command: " ++ show cmd - qdata <- getData db (rqTicker rq) (TimeInterval (rqStartTime rq) (rqEndTime rq)) (Timeframe (periodSeconds $ rqPeriod rq)) - let bytes = serializeBars $ V.concat $ fmap snd qdata - sendMulti sock $ peerId :| [B.empty, "OK", BL.toStrict bytes] + let dataC = getDataConduit db (rqTicker rq) (TimeInterval (rqStartTime rq) (rqEndTime rq)) (Timeframe (periodSeconds $ rqPeriod rq)) + runConduit $ dataC .| (conduitVector chunkSize) .| (sendChunks peerId) + --qdata <- getData db (rqTicker rq) (TimeInterval (rqStartTime rq) (rqEndTime rq)) (Timeframe (periodSeconds $ rqPeriod rq)) + --let bytes = serializeBars $ V.concat $ fmap snd qdata + --sendMulti sock $ peerId :| [B.empty, "OK", BL.toStrict bytes] + sendChunks :: (MonadIO m) => B.ByteString -> ConduitT (V.Vector Bar) Void m () + sendChunks peerId = do + liftIO $ send sock [SendMore] peerId + liftIO $ send sock [SendMore] B.empty + liftIO $ send sock [SendMore] "OK" + awaitForever $ \vBars -> liftIO $ do + debugM "QHP" $ "Sending chunk: " ++ show (V.length vBars) ++ " bars" + send sock [SendMore] $ BL.toStrict $ serializeBars vBars + liftIO $ send sock [] B.empty serializeBars :: V.Vector Bar -> BL.ByteString serializeBars bars = runPut $ V.forM_ bars serializeBar' serializeBar' bar = do @@ -69,6 +83,8 @@ serveQHP db sock = forever $ do putDoublele (toDouble . barClose $ bar) putWord64le (fromInteger . barVolume $ bar) + chunkSize = 4096 + serveHAP :: (Sender a, Receiver a) => MdsHandle -> Socket a -> IO () serveHAP db sock = forever $ do rq <- receiveMulti sock