diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs index 76e5d34..e690d20 100644 --- a/src/ATrade/QuoteSource/Client.hs +++ b/src/ATrade/QuoteSource/Client.hs @@ -7,7 +7,7 @@ module ATrade.QuoteSource.Client ( import ATrade.Types import Control.Concurrent.BoundedChan -import Control.Concurrent hiding (readChan, writeChan) +import Control.Concurrent hiding (readChan, writeChan, writeList2Chan) import Control.Concurrent.MVar import Control.Monad import Control.Monad.Loops @@ -32,6 +32,15 @@ data QuoteSourceClientHandle = QuoteSourceClientHandle { killMVar :: MVar () } +deserializeTicks :: [BL.ByteString] -> [Tick] +deserializeTicks (secname:raw:_) = deserializeWithName (decodeUtf8 . BL.toStrict $ secname) raw + where + deserializeWithName secNameT raw = case deserializeTickBody raw of + (rest, Just tick) -> tick { security = secNameT } : deserializeWithName secNameT rest + _ -> [] + +deserializeTicks _ = [] + startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle startQuoteSourceClient chan tickers ctx endpoint = do compMv <- newEmptyMVar @@ -43,6 +52,7 @@ startQuoteSourceClient chan tickers ctx endpoint = do where clientThread lastHeartbeat killMv = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do connect sock $ T.unpack endpoint + debugM "QuoteSource.Client" $ "Tickers: " ++ show tickers mapM_ (subscribe sock . encodeUtf8) tickers subscribe sock $ B8.pack "SYSTEM#HEARTBEAT" @@ -56,9 +66,7 @@ startQuoteSourceClient chan tickers ctx endpoint = do prevHeartbeat <- readIORef lastHeartbeat if headMay rawTick == Just "SYSTEM#HEARTBEAT" then writeIORef lastHeartbeat now - else case deserializeTick rawTick of - Just tick -> writeChan chan tick - Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" + else writeList2Chan chan (deserializeTicks rawTick) debugM "QuoteSource.Client" "Heartbeat timeout") notTimeout ts = do diff --git a/src/ATrade/QuoteSource/Server.hs b/src/ATrade/QuoteSource/Server.hs index 341f528..3d9afbb 100644 --- a/src/ATrade/QuoteSource/Server.hs +++ b/src/ATrade/QuoteSource/Server.hs @@ -12,13 +12,18 @@ import Control.Exception import Control.Monad import qualified Data.List as L import qualified Data.Text as T +import qualified Data.Text.Encoding as E import qualified Data.ByteString.Char8 as B8 import qualified Data.ByteString.Lazy as BL +import qualified Data.ByteString as B import Data.List.NonEmpty hiding (map) +import Data.Maybe import System.Log.Logger import System.ZMQ4 import Prelude hiding ((!!)) +import Safe + data QuoteSourceServer = QuoteSourceServerState { ctx :: Context, outSocket :: Socket Pub, @@ -41,18 +46,38 @@ serverThread state = do putMVar (completionMvar state) () serverThread' = do - qssdata <- readChan $ tickChannel state - case qssdata of - QSSKill -> return () - QSSHeartbeat -> do - send (outSocket state) [] $ B8.pack "SYSTEM#HEARTBEAT" - serverThread' - QSSTick tick -> do - sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick - {-let t = map BL.toStrict $ serializeTick tick-} - {-sendDirect (outSocket state) [SendMore] (L.head t)-} - {-sendDirect (outSocket state) [] (t L.!! 1)-} - serverThread' + qssdata' <- readChan $ tickChannel state + qssdata <- readChanN 15 $ tickChannel state + let fulldata = qssdata' : qssdata + let tickGroups = L.groupBy (\x y -> security x == security y) $ mapMaybe onlyTick fulldata + + mapM_ (\ticks -> case headMay ticks of + Just h -> sendTicks (security h) ticks + Nothing -> return()) tickGroups + + when (QSSHeartbeat `elem` fulldata) $ send (outSocket state) [] $ B8.pack "SYSTEM#HEARTBEAT" + + unless (QSSKill `elem` fulldata) serverThread' + + readChanN n chan + | n <= 0 = return [] + | otherwise = do + x <- tryReadChan chan + case x of + Nothing -> return [] + Just v -> do + rest <- readChanN (n - 1) chan + return $ v : rest + + onlyTick t = case t of + QSSTick tick -> Just tick + _ -> Nothing + + sendTicks secName ticklist = sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializedTicks secName ticklist + serializedTicks secName ticklist = header : [body] + where + header = BL.fromStrict . E.encodeUtf8 $ secName + body = BL.concat $ map serializeTickBody ticklist startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer startQuoteSourceServer chan c ep = do diff --git a/src/ATrade/Types.hs b/src/ATrade/Types.hs index 222fcfd..5277905 100644 --- a/src/ATrade/Types.hs +++ b/src/ATrade/Types.hs @@ -6,7 +6,9 @@ module ATrade.Types ( Bar(..), DataType(..), serializeTick, + serializeTickBody, deserializeTick, + deserializeTickBody, SignalId(..), OrderPrice(..), Operation(..), @@ -32,6 +34,7 @@ import Data.Ratio import Data.Text as T import Data.Text.Encoding as E import Data.Time.Clock +import Data.Time.Clock.POSIX import Data.Word type TickerId = T.Text @@ -82,11 +85,11 @@ data Tick = Tick { volume :: !Integer } deriving (Show, Eq) -serializeTick :: Tick -> [ByteString] -serializeTick tick = header : [rawdata] - where - header = B.fromStrict . E.encodeUtf8 $ security tick - rawdata = toLazyByteString $ mconcat [ +serializeTickHeader :: Tick -> ByteString +serializeTickHeader tick = B.fromStrict . E.encodeUtf8 $ security tick + +serializeTickBody :: Tick -> ByteString +serializeTickBody tick = toLazyByteString $ mconcat [ putWord32le 1, putWord64le $ fromIntegral . toSeconds' . timestamp $ tick, putWord32le $ fromIntegral . fracSeconds . timestamp $ tick, @@ -94,36 +97,32 @@ serializeTick tick = header : [rawdata] putWord64le $ truncate . value $ tick, putWord32le $ truncate . (*. 1000000000) . fractionalPart $ value tick, putWord32le $ fromIntegral $ volume tick ] - floorPart :: (RealFrac a) => a -> a - floorPart x = x - fromIntegral (floor x) + where fractionalPart :: (RealFrac a) => a -> a fractionalPart x = x - fromIntegral (truncate x) - toSeconds' t = floor $ diffUTCTime t epoch - fracSeconds t = (truncate $ (* 1000000000000) $ diffUTCTime t epoch) `mod` 1000000000000 `div` 1000000 - epoch = fromGregorian 1970 1 1 0 0 0 + toSeconds' = floor . utcTimeToPOSIXSeconds + fracSeconds t = (truncate $ (* 1000000000000) $ utcTimeToPOSIXSeconds t) `mod` 1000000000000 `div` 1000000 -deserializeTick :: [ByteString] -> Maybe Tick -deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of - Left (_, _, _) -> Nothing - Right (_, _, tick) -> Just $ tick { security = E.decodeUtf8 . B.toStrict $ header } +serializeTick :: Tick -> [ByteString] +serializeTick tick = serializeTickHeader tick : [serializeTickBody tick] + +parseTick :: Get Tick +parseTick = do + packetType <- fromEnum <$> getWord32le + when (packetType /= 1) $ fail "Expected packettype == 1" + tsec <- getWord64le + tusec <- getWord32le + dt <- toEnum . fromEnum <$> getWord32le + intpart <- (fromIntegral <$> getWord64le) :: Get Int64 + nanopart <- (fromIntegral <$> getWord32le) :: Get Int32 + volume <- fromIntegral <$> (fromIntegral <$> getWord32le :: Get Int32) + return Tick { security = "", + datatype = dt, + timestamp = makeTimestamp tsec tusec, + value = makeValue intpart nanopart, + volume = volume } where - parseTick :: Get Tick - parseTick = do - packetType <- fromEnum <$> getWord32le - when (packetType /= 1) $ fail "Expected packettype == 1" - tsec <- getWord64le - tusec <- getWord32le - dt <- toEnum . fromEnum <$> getWord32le - intpart <- (fromIntegral <$> getWord64le) :: Get Int64 - nanopart <- (fromIntegral <$> getWord32le) :: Get Int32 - volume <- fromIntegral <$> (fromIntegral <$> getWord32le :: Get Int32) - return Tick { security = "", - datatype = dt, - timestamp = makeTimestamp tsec tusec, - value = makeValue intpart nanopart, - volume = volume } - makeTimestamp :: Word64 -> Word32 -> UTCTime makeTimestamp sec usec = addUTCTime (fromRational $ toInteger usec % 1000000) (fromSeconds . toInteger $ sec) @@ -135,8 +134,18 @@ deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of convertedIntPart = realFracToDecimal 10 (fromIntegral intpart) r = toInteger nanopart % 1000000000 +deserializeTick :: [ByteString] -> Maybe Tick +deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of + Left (_, _, _) -> Nothing + Right (_, _, tick) -> Just $ tick { security = E.decodeUtf8 . B.toStrict $ header } + deserializeTick _ = Nothing +deserializeTickBody :: ByteString -> (ByteString, Maybe Tick) +deserializeTickBody bs = case runGetOrFail parseTick bs of + Left (rest, _, _) -> (rest, Nothing) + Right (rest, _, tick) -> (rest, Just tick) + data Bar = Bar { barSecurity :: !TickerId, barTimestamp :: !UTCTime, diff --git a/stack.yaml b/stack.yaml index 24eff89..10d69fd 100644 --- a/stack.yaml +++ b/stack.yaml @@ -37,7 +37,6 @@ resolver: lts-7.7 # will not be run. This is useful for tweaking upstream packages. packages: - '.' -- '../zeromq-haskell' # Dependency packages to be pulled from upstream that are not in the resolver # (e.g., acme-missiles-0.3) extra-deps: [ "datetime-0.3.1", "hexdump-0.1"] diff --git a/test/TestTypes.hs b/test/TestTypes.hs index 3e74b50..b8cbc15 100644 --- a/test/TestTypes.hs +++ b/test/TestTypes.hs @@ -21,9 +21,11 @@ import Data.Text import Data.Time.Calendar import Data.Time.Clock import Data.Tuple.Select +import qualified Data.ByteString.Lazy as B properties = testGroup "Types" [ testTickSerialization + , testTickBodySerialization , testSignalIdSerialization , testOrderPriceSerialization , testOperationSerialization @@ -37,6 +39,18 @@ testTickSerialization = QC.testProperty "Deserialize serialized tick" Just t -> tick == t Nothing -> False) +-- Adjust arbitrary instances of ticks, because body doesn't store security name +testTickBodySerialization = QC.testProperty "Deserialize serialized bunch of tick" $ + QC.forAll (arbitrary >>= (\t -> return t { security = "" })) (\tick1 -> + QC.forAll (arbitrary >>= (\t -> return t { security = "" })) (\tick2 -> + case deserializeTickBody (serialized tick1 tick2) of + (rest, Just t1) -> case deserializeTickBody rest of + (_, Just t2) -> tick1 == t1 && tick2 == t2 + _ -> False + _ -> False)) + where + serialized t1 t2 = serializeTickBody t1 `B.append` serializeTickBody t2 + testSignalIdSerialization = QC.testProperty "Deserialize serialized SignalId" (\sid -> case (decode . encode $ sid :: Maybe SignalId) of Just s -> s == sid