Browse Source

Tick batching

master
Denis Tereshkin 9 years ago
parent
commit
d4a03dfa8e
  1. 16
      src/ATrade/QuoteSource/Client.hs
  2. 49
      src/ATrade/QuoteSource/Server.hs
  3. 41
      src/ATrade/Types.hs
  4. 1
      stack.yaml
  5. 14
      test/TestTypes.hs

16
src/ATrade/QuoteSource/Client.hs

@ -7,7 +7,7 @@ module ATrade.QuoteSource.Client (
import ATrade.Types import ATrade.Types
import Control.Concurrent.BoundedChan import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (readChan, writeChan) import Control.Concurrent hiding (readChan, writeChan, writeList2Chan)
import Control.Concurrent.MVar import Control.Concurrent.MVar
import Control.Monad import Control.Monad
import Control.Monad.Loops import Control.Monad.Loops
@ -32,6 +32,15 @@ data QuoteSourceClientHandle = QuoteSourceClientHandle {
killMVar :: MVar () killMVar :: MVar ()
} }
deserializeTicks :: [BL.ByteString] -> [Tick]
deserializeTicks (secname:raw:_) = deserializeWithName (decodeUtf8 . BL.toStrict $ secname) raw
where
deserializeWithName secNameT raw = case deserializeTickBody raw of
(rest, Just tick) -> tick { security = secNameT } : deserializeWithName secNameT rest
_ -> []
deserializeTicks _ = []
startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle
startQuoteSourceClient chan tickers ctx endpoint = do startQuoteSourceClient chan tickers ctx endpoint = do
compMv <- newEmptyMVar compMv <- newEmptyMVar
@ -43,6 +52,7 @@ startQuoteSourceClient chan tickers ctx endpoint = do
where where
clientThread lastHeartbeat killMv = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do clientThread lastHeartbeat killMv = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do
connect sock $ T.unpack endpoint connect sock $ T.unpack endpoint
debugM "QuoteSource.Client" $ "Tickers: " ++ show tickers
mapM_ (subscribe sock . encodeUtf8) tickers mapM_ (subscribe sock . encodeUtf8) tickers
subscribe sock $ B8.pack "SYSTEM#HEARTBEAT" subscribe sock $ B8.pack "SYSTEM#HEARTBEAT"
@ -56,9 +66,7 @@ startQuoteSourceClient chan tickers ctx endpoint = do
prevHeartbeat <- readIORef lastHeartbeat prevHeartbeat <- readIORef lastHeartbeat
if headMay rawTick == Just "SYSTEM#HEARTBEAT" if headMay rawTick == Just "SYSTEM#HEARTBEAT"
then writeIORef lastHeartbeat now then writeIORef lastHeartbeat now
else case deserializeTick rawTick of else writeList2Chan chan (deserializeTicks rawTick)
Just tick -> writeChan chan tick
Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick"
debugM "QuoteSource.Client" "Heartbeat timeout") debugM "QuoteSource.Client" "Heartbeat timeout")
notTimeout ts = do notTimeout ts = do

49
src/ATrade/QuoteSource/Server.hs

@ -12,13 +12,18 @@ import Control.Exception
import Control.Monad import Control.Monad
import qualified Data.List as L import qualified Data.List as L
import qualified Data.Text as T import qualified Data.Text as T
import qualified Data.Text.Encoding as E
import qualified Data.ByteString.Char8 as B8 import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString as B
import Data.List.NonEmpty hiding (map) import Data.List.NonEmpty hiding (map)
import Data.Maybe
import System.Log.Logger import System.Log.Logger
import System.ZMQ4 import System.ZMQ4
import Prelude hiding ((!!)) import Prelude hiding ((!!))
import Safe
data QuoteSourceServer = QuoteSourceServerState { data QuoteSourceServer = QuoteSourceServerState {
ctx :: Context, ctx :: Context,
outSocket :: Socket Pub, outSocket :: Socket Pub,
@ -41,18 +46,38 @@ serverThread state = do
putMVar (completionMvar state) () putMVar (completionMvar state) ()
serverThread' = do serverThread' = do
qssdata <- readChan $ tickChannel state qssdata' <- readChan $ tickChannel state
case qssdata of qssdata <- readChanN 15 $ tickChannel state
QSSKill -> return () let fulldata = qssdata' : qssdata
QSSHeartbeat -> do let tickGroups = L.groupBy (\x y -> security x == security y) $ mapMaybe onlyTick fulldata
send (outSocket state) [] $ B8.pack "SYSTEM#HEARTBEAT"
serverThread' mapM_ (\ticks -> case headMay ticks of
QSSTick tick -> do Just h -> sendTicks (security h) ticks
sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick Nothing -> return()) tickGroups
{-let t = map BL.toStrict $ serializeTick tick-}
{-sendDirect (outSocket state) [SendMore] (L.head t)-} when (QSSHeartbeat `elem` fulldata) $ send (outSocket state) [] $ B8.pack "SYSTEM#HEARTBEAT"
{-sendDirect (outSocket state) [] (t L.!! 1)-}
serverThread' unless (QSSKill `elem` fulldata) serverThread'
readChanN n chan
| n <= 0 = return []
| otherwise = do
x <- tryReadChan chan
case x of
Nothing -> return []
Just v -> do
rest <- readChanN (n - 1) chan
return $ v : rest
onlyTick t = case t of
QSSTick tick -> Just tick
_ -> Nothing
sendTicks secName ticklist = sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializedTicks secName ticklist
serializedTicks secName ticklist = header : [body]
where
header = BL.fromStrict . E.encodeUtf8 $ secName
body = BL.concat $ map serializeTickBody ticklist
startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer
startQuoteSourceServer chan c ep = do startQuoteSourceServer chan c ep = do

41
src/ATrade/Types.hs

@ -6,7 +6,9 @@ module ATrade.Types (
Bar(..), Bar(..),
DataType(..), DataType(..),
serializeTick, serializeTick,
serializeTickBody,
deserializeTick, deserializeTick,
deserializeTickBody,
SignalId(..), SignalId(..),
OrderPrice(..), OrderPrice(..),
Operation(..), Operation(..),
@ -32,6 +34,7 @@ import Data.Ratio
import Data.Text as T import Data.Text as T
import Data.Text.Encoding as E import Data.Text.Encoding as E
import Data.Time.Clock import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Word import Data.Word
type TickerId = T.Text type TickerId = T.Text
@ -82,11 +85,11 @@ data Tick = Tick {
volume :: !Integer volume :: !Integer
} deriving (Show, Eq) } deriving (Show, Eq)
serializeTick :: Tick -> [ByteString] serializeTickHeader :: Tick -> ByteString
serializeTick tick = header : [rawdata] serializeTickHeader tick = B.fromStrict . E.encodeUtf8 $ security tick
where
header = B.fromStrict . E.encodeUtf8 $ security tick serializeTickBody :: Tick -> ByteString
rawdata = toLazyByteString $ mconcat [ serializeTickBody tick = toLazyByteString $ mconcat [
putWord32le 1, putWord32le 1,
putWord64le $ fromIntegral . toSeconds' . timestamp $ tick, putWord64le $ fromIntegral . toSeconds' . timestamp $ tick,
putWord32le $ fromIntegral . fracSeconds . timestamp $ tick, putWord32le $ fromIntegral . fracSeconds . timestamp $ tick,
@ -94,20 +97,16 @@ serializeTick tick = header : [rawdata]
putWord64le $ truncate . value $ tick, putWord64le $ truncate . value $ tick,
putWord32le $ truncate . (*. 1000000000) . fractionalPart $ value tick, putWord32le $ truncate . (*. 1000000000) . fractionalPart $ value tick,
putWord32le $ fromIntegral $ volume tick ] putWord32le $ fromIntegral $ volume tick ]
floorPart :: (RealFrac a) => a -> a where
floorPart x = x - fromIntegral (floor x)
fractionalPart :: (RealFrac a) => a -> a fractionalPart :: (RealFrac a) => a -> a
fractionalPart x = x - fromIntegral (truncate x) fractionalPart x = x - fromIntegral (truncate x)
toSeconds' t = floor $ diffUTCTime t epoch toSeconds' = floor . utcTimeToPOSIXSeconds
fracSeconds t = (truncate $ (* 1000000000000) $ diffUTCTime t epoch) `mod` 1000000000000 `div` 1000000 fracSeconds t = (truncate $ (* 1000000000000) $ utcTimeToPOSIXSeconds t) `mod` 1000000000000 `div` 1000000
epoch = fromGregorian 1970 1 1 0 0 0
deserializeTick :: [ByteString] -> Maybe Tick serializeTick :: Tick -> [ByteString]
deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of serializeTick tick = serializeTickHeader tick : [serializeTickBody tick]
Left (_, _, _) -> Nothing
Right (_, _, tick) -> Just $ tick { security = E.decodeUtf8 . B.toStrict $ header }
where
parseTick :: Get Tick parseTick :: Get Tick
parseTick = do parseTick = do
packetType <- fromEnum <$> getWord32le packetType <- fromEnum <$> getWord32le
@ -123,7 +122,7 @@ deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of
timestamp = makeTimestamp tsec tusec, timestamp = makeTimestamp tsec tusec,
value = makeValue intpart nanopart, value = makeValue intpart nanopart,
volume = volume } volume = volume }
where
makeTimestamp :: Word64 -> Word32 -> UTCTime makeTimestamp :: Word64 -> Word32 -> UTCTime
makeTimestamp sec usec = addUTCTime (fromRational $ toInteger usec % 1000000) (fromSeconds . toInteger $ sec) makeTimestamp sec usec = addUTCTime (fromRational $ toInteger usec % 1000000) (fromSeconds . toInteger $ sec)
@ -135,8 +134,18 @@ deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of
convertedIntPart = realFracToDecimal 10 (fromIntegral intpart) convertedIntPart = realFracToDecimal 10 (fromIntegral intpart)
r = toInteger nanopart % 1000000000 r = toInteger nanopart % 1000000000
deserializeTick :: [ByteString] -> Maybe Tick
deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of
Left (_, _, _) -> Nothing
Right (_, _, tick) -> Just $ tick { security = E.decodeUtf8 . B.toStrict $ header }
deserializeTick _ = Nothing deserializeTick _ = Nothing
deserializeTickBody :: ByteString -> (ByteString, Maybe Tick)
deserializeTickBody bs = case runGetOrFail parseTick bs of
Left (rest, _, _) -> (rest, Nothing)
Right (rest, _, tick) -> (rest, Just tick)
data Bar = Bar { data Bar = Bar {
barSecurity :: !TickerId, barSecurity :: !TickerId,
barTimestamp :: !UTCTime, barTimestamp :: !UTCTime,

1
stack.yaml

@ -37,7 +37,6 @@ resolver: lts-7.7
# will not be run. This is useful for tweaking upstream packages. # will not be run. This is useful for tweaking upstream packages.
packages: packages:
- '.' - '.'
- '../zeromq-haskell'
# Dependency packages to be pulled from upstream that are not in the resolver # Dependency packages to be pulled from upstream that are not in the resolver
# (e.g., acme-missiles-0.3) # (e.g., acme-missiles-0.3)
extra-deps: [ "datetime-0.3.1", "hexdump-0.1"] extra-deps: [ "datetime-0.3.1", "hexdump-0.1"]

14
test/TestTypes.hs

@ -21,9 +21,11 @@ import Data.Text
import Data.Time.Calendar import Data.Time.Calendar
import Data.Time.Clock import Data.Time.Clock
import Data.Tuple.Select import Data.Tuple.Select
import qualified Data.ByteString.Lazy as B
properties = testGroup "Types" [ properties = testGroup "Types" [
testTickSerialization testTickSerialization
, testTickBodySerialization
, testSignalIdSerialization , testSignalIdSerialization
, testOrderPriceSerialization , testOrderPriceSerialization
, testOperationSerialization , testOperationSerialization
@ -37,6 +39,18 @@ testTickSerialization = QC.testProperty "Deserialize serialized tick"
Just t -> tick == t Just t -> tick == t
Nothing -> False) Nothing -> False)
-- Adjust arbitrary instances of ticks, because body doesn't store security name
testTickBodySerialization = QC.testProperty "Deserialize serialized bunch of tick" $
QC.forAll (arbitrary >>= (\t -> return t { security = "" })) (\tick1 ->
QC.forAll (arbitrary >>= (\t -> return t { security = "" })) (\tick2 ->
case deserializeTickBody (serialized tick1 tick2) of
(rest, Just t1) -> case deserializeTickBody rest of
(_, Just t2) -> tick1 == t1 && tick2 == t2
_ -> False
_ -> False))
where
serialized t1 t2 = serializeTickBody t1 `B.append` serializeTickBody t2
testSignalIdSerialization = QC.testProperty "Deserialize serialized SignalId" testSignalIdSerialization = QC.testProperty "Deserialize serialized SignalId"
(\sid -> case (decode . encode $ sid :: Maybe SignalId) of (\sid -> case (decode . encode $ sid :: Maybe SignalId) of
Just s -> s == sid Just s -> s == sid

Loading…
Cancel
Save