Browse Source

QHP: send multipart data

master
Denis Tereshkin 6 years ago
parent
commit
a19bad1108
  1. 1
      mds.cabal
  2. 27
      src/ATrade/MDS/Database.hs
  3. 30
      src/ATrade/MDS/HistoryServer.hs

1
mds.cabal

@ -36,6 +36,7 @@ library
, attoparsec , attoparsec
, binary , binary
, binary-ieee754 , binary-ieee754
, conduit
default-language: Haskell2010 default-language: Haskell2010
other-modules: ATrade.MDS.Protocol other-modules: ATrade.MDS.Protocol
default-extensions: OverloadedStrings default-extensions: OverloadedStrings

27
src/ATrade/MDS/Database.hs

@ -6,6 +6,7 @@ module ATrade.MDS.Database (
initDatabase, initDatabase,
closeDatabase, closeDatabase,
getData, getData,
getDataConduit,
putData, putData,
TimeInterval(..), TimeInterval(..),
Timeframe(..), Timeframe(..),
@ -16,11 +17,14 @@ module ATrade.MDS.Database (
import ATrade.Types import ATrade.Types
import Control.Monad import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Loops
import Data.Conduit
import Data.Maybe import Data.Maybe
import qualified Data.Text as T import qualified Data.Text as T
import Data.Time.Clock import Data.Time.Clock
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import qualified Data.Vector as V import qualified Data.Vector as V
import Database.HDBC import Database.HDBC
import Database.HDBC.Sqlite3 import Database.HDBC.Sqlite3
import System.Log.Logger import System.Log.Logger
@ -78,6 +82,25 @@ getData db tickerId interval@(TimeInterval start end) (Timeframe tfSec) = do
} }
barFromResult _ _ = Nothing barFromResult _ _ = Nothing
getDataConduit :: (MonadIO m) => MdsHandle -> TickerId -> TimeInterval -> Timeframe -> ConduitT () Bar m ()
getDataConduit db tickerId (TimeInterval start end) (Timeframe tfSec) = do
stmt <- liftIO $ prepare db "SELECT timestamp, timeframe, open, high, low, close, volume FROM bars WHERE ticker == ? AND timeframe == ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp ASC;"
_ <- liftIO $ execute stmt [(toSql. T.unpack) tickerId, toSql tfSec, (toSql . utcTimeToPOSIXSeconds) start, (toSql . utcTimeToPOSIXSeconds) end]
whileJust_ (liftIO $ fetchRow stmt) $ \row -> case barFromResult tickerId row of
Just bar -> yield bar
Nothing -> return ()
where
barFromResult ticker [ts, _, open, high, low, close, vol] = Just Bar {
barSecurity = ticker,
barTimestamp = fromSql ts,
barOpen = fromDouble $ fromSql open,
barHigh = fromDouble $ fromSql high,
barLow = fromDouble $ fromSql low,
barClose = fromDouble $ fromSql close,
barVolume = fromSql vol
}
barFromResult _ _ = Nothing
putData :: MdsHandle -> TickerId -> TimeInterval -> Timeframe -> V.Vector Bar -> IO () putData :: MdsHandle -> TickerId -> TimeInterval -> Timeframe -> V.Vector Bar -> IO ()
putData db tickerId (TimeInterval start end) tf@(Timeframe tfSec) bars = do putData db tickerId (TimeInterval start end) tf@(Timeframe tfSec) bars = do
withTransaction db $ \db' -> do withTransaction db $ \db' -> do

30
src/ATrade/MDS/HistoryServer.hs

@ -11,15 +11,18 @@ import ATrade.MDS.Protocol
import ATrade.Types import ATrade.Types
import Control.Concurrent import Control.Concurrent
import Control.Monad import Control.Monad
import Control.Monad.IO.Class
import Data.Aeson import Data.Aeson
import Data.Binary.Get import Data.Binary.Get
import Data.Binary.Put import Data.Binary.Put
import qualified Data.ByteString as B import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL
import Data.Conduit as C
import Data.Conduit.Combinators
import Data.List.NonEmpty import Data.List.NonEmpty
import qualified Data.Text as T import qualified Data.Text as T
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import qualified Data.Vector as V import qualified Data.Vector as V
import Safe import Safe
import System.Log.Logger import System.Log.Logger
@ -56,9 +59,20 @@ serveQHP db sock = forever $ do
handleCmd peerId cmd = case cmd of handleCmd peerId cmd = case cmd of
rq -> do rq -> do
debugM "QHP" $ "Incoming command: " ++ show cmd debugM "QHP" $ "Incoming command: " ++ show cmd
qdata <- getData db (rqTicker rq) (TimeInterval (rqStartTime rq) (rqEndTime rq)) (Timeframe (periodSeconds $ rqPeriod rq)) let dataC = getDataConduit db (rqTicker rq) (TimeInterval (rqStartTime rq) (rqEndTime rq)) (Timeframe (periodSeconds $ rqPeriod rq))
let bytes = serializeBars $ V.concat $ fmap snd qdata runConduit $ dataC .| (conduitVector chunkSize) .| (sendChunks peerId)
sendMulti sock $ peerId :| [B.empty, "OK", BL.toStrict bytes] --qdata <- getData db (rqTicker rq) (TimeInterval (rqStartTime rq) (rqEndTime rq)) (Timeframe (periodSeconds $ rqPeriod rq))
--let bytes = serializeBars $ V.concat $ fmap snd qdata
--sendMulti sock $ peerId :| [B.empty, "OK", BL.toStrict bytes]
sendChunks :: (MonadIO m) => B.ByteString -> ConduitT (V.Vector Bar) Void m ()
sendChunks peerId = do
liftIO $ send sock [SendMore] peerId
liftIO $ send sock [SendMore] B.empty
liftIO $ send sock [SendMore] "OK"
awaitForever $ \vBars -> liftIO $ do
debugM "QHP" $ "Sending chunk: " ++ show (V.length vBars) ++ " bars"
send sock [SendMore] $ BL.toStrict $ serializeBars vBars
liftIO $ send sock [] B.empty
serializeBars :: V.Vector Bar -> BL.ByteString serializeBars :: V.Vector Bar -> BL.ByteString
serializeBars bars = runPut $ V.forM_ bars serializeBar' serializeBars bars = runPut $ V.forM_ bars serializeBar'
serializeBar' bar = do serializeBar' bar = do
@ -69,6 +83,8 @@ serveQHP db sock = forever $ do
putDoublele (toDouble . barClose $ bar) putDoublele (toDouble . barClose $ bar)
putWord64le (fromInteger . barVolume $ bar) putWord64le (fromInteger . barVolume $ bar)
chunkSize = 4096
serveHAP :: (Sender a, Receiver a) => MdsHandle -> Socket a -> IO () serveHAP :: (Sender a, Receiver a) => MdsHandle -> Socket a -> IO ()
serveHAP db sock = forever $ do serveHAP db sock = forever $ do
rq <- receiveMulti sock rq <- receiveMulti sock

Loading…
Cancel
Save