From 2380a05203e667874d6c89836058c024108064ec Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 30 May 2019 16:22:30 +0700 Subject: [PATCH] Quotesource: support bar transfer --- .gitignore | 1 + src/ATrade/Broker/Server.hs | 2 +- src/ATrade/QuoteSource/Client.hs | 59 ++++---- src/ATrade/QuoteSource/Server.hs | 69 +++++---- src/ATrade/Types.hs | 243 ++++++++++++++++++++----------- stack.yaml | 2 +- test/ArbitraryInstances.hs | 33 ++++- test/TestQuoteSourceClient.hs | 61 +++++--- test/TestQuoteSourceServer.hs | 52 +++++-- test/TestTypes.hs | 8 + 10 files changed, 347 insertions(+), 183 deletions(-) diff --git a/.gitignore b/.gitignore index 67635a9..a1c80c6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ .* +\#*.*\# diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 6209292..995bf75 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -118,7 +118,7 @@ tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $ maybeTrade <- tryReadChan chan case maybeTrade of Just trade -> mapM_ (\x -> x trade) tradeSinks - Nothing -> threadDelay 1000000 + Nothing -> threadDelay 100000 where wasKilled = isJust <$> (killMvar <$> readIORef state >>= tryReadMVar) diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs index f5bba8a..8a50d94 100644 --- a/src/ATrade/QuoteSource/Client.hs +++ b/src/ATrade/QuoteSource/Client.hs @@ -1,47 +1,52 @@ {-# LANGUAGE OverloadedStrings #-} module ATrade.QuoteSource.Client ( + QuoteData(..), startQuoteSourceClient, stopQuoteSourceClient ) where -import ATrade.Types -import Control.Concurrent.BoundedChan -import Control.Concurrent hiding (readChan, writeChan, writeList2Chan) -import Control.Concurrent.MVar -import Control.Monad -import Control.Monad.Loops -import Control.Exception -import Data.List.NonEmpty -import Data.Maybe -import qualified Data.Text as T -import qualified Data.ByteString.Lazy as BL -import qualified Data.ByteString.Char8 as B8 -import qualified Data.List as L -import Data.Text.Encoding -import Data.Time.Clock -import Data.IORef -import System.ZMQ4 -import System.Log.Logger +import ATrade.Types +import Control.Concurrent hiding (readChan, writeChan, + writeList2Chan) +import Control.Concurrent.BoundedChan +import Control.Concurrent.MVar +import Control.Exception +import Control.Monad +import Control.Monad.Loops +import qualified Data.ByteString.Char8 as B8 +import qualified Data.ByteString.Lazy as BL +import Data.IORef +import qualified Data.List as L +import Data.List.NonEmpty +import Data.Maybe +import qualified Data.Text as T +import Data.Text.Encoding +import Data.Time.Clock +import System.Log.Logger +import System.ZMQ4 -import Safe +import Safe data QuoteSourceClientHandle = QuoteSourceClientHandle { - tid :: ThreadId, + tid :: ThreadId, completionMvar :: MVar (), - killMVar :: MVar () + killMVar :: MVar () } -deserializeTicks :: [BL.ByteString] -> [Tick] +data QuoteData = QDTick Tick | QDBar (BarTimeframe, Bar) + deriving (Show, Eq) + +deserializeTicks :: [BL.ByteString] -> [QuoteData] deserializeTicks (secname:raw:_) = deserializeWithName (decodeUtf8 . BL.toStrict $ secname) raw where deserializeWithName secNameT raw = case deserializeTickBody raw of - (rest, Just tick) -> tick { security = secNameT } : deserializeWithName secNameT rest + (rest, Just tick) -> QDTick (tick { security = secNameT }) : deserializeWithName secNameT rest _ -> [] deserializeTicks _ = [] -startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle +startQuoteSourceClient :: BoundedChan QuoteData -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle startQuoteSourceClient chan tickers ctx endpoint = do compMv <- newEmptyMVar killMv <- newEmptyMVar @@ -60,14 +65,16 @@ startQuoteSourceClient chan tickers ctx endpoint = do now <- getCurrentTime writeIORef lastHeartbeat now whileM_ (andM [notTimeout lastHeartbeat, isNothing <$> tryReadMVar killMv]) $ do - evs <- poll 200 [Sock sock [In] Nothing] + evs <- poll 200 [Sock sock [In] Nothing] unless (null (L.head evs)) $ do rawTick <- fmap BL.fromStrict <$> receiveMulti sock now <- getCurrentTime prevHeartbeat <- readIORef lastHeartbeat if headMay rawTick == Just "SYSTEM#HEARTBEAT" then writeIORef lastHeartbeat now - else writeList2Chan chan (deserializeTicks rawTick) + else case deserializeBar rawTick of + Just (tf, bar) -> writeChan chan $ QDBar (tf, bar) + _ -> writeList2Chan chan (deserializeTicks rawTick) debugM "QuoteSource.Client" "Heartbeat timeout") notTimeout ts = do diff --git a/src/ATrade/QuoteSource/Server.hs b/src/ATrade/QuoteSource/Server.hs index 4a3e6ff..5b04093 100644 --- a/src/ATrade/QuoteSource/Server.hs +++ b/src/ATrade/QuoteSource/Server.hs @@ -5,36 +5,37 @@ module ATrade.QuoteSource.Server ( QuoteSourceServerData(..) ) where -import ATrade.Types -import Control.Concurrent.BoundedChan -import Control.Concurrent hiding (readChan, writeChan) -import Control.Exception -import Control.Monad -import qualified Data.List as L -import qualified Data.Text as T -import qualified Data.Text.Encoding as E -import qualified Data.ByteString.Char8 as B8 -import qualified Data.ByteString.Lazy as BL -import qualified Data.ByteString as B -import Data.List.NonEmpty hiding (map) -import Data.Maybe -import System.Log.Logger -import System.ZMQ4 -import System.ZMQ4.ZAP -import Prelude hiding ((!!)) - -import Safe +import ATrade.Types +import Control.Concurrent hiding (readChan, writeChan) +import Control.Concurrent.BoundedChan +import Control.Exception +import Control.Monad +import qualified Data.ByteString as B +import qualified Data.ByteString.Char8 as B8 +import qualified Data.ByteString.Lazy as BL +import Data.Foldable +import qualified Data.List as L +import Data.List.NonEmpty hiding (map) +import Data.Maybe +import qualified Data.Text as T +import qualified Data.Text.Encoding as E +import Prelude hiding ((!!)) +import System.Log.Logger +import System.ZMQ4 +import System.ZMQ4.ZAP + +import Safe data QuoteSourceServer = QuoteSourceServerState { - ctx :: Context, - outSocket :: Socket Pub, - tickChannel :: BoundedChan QuoteSourceServerData, - completionMvar :: MVar (), - serverThreadId :: ThreadId, + ctx :: Context, + outSocket :: Socket Pub, + tickChannel :: BoundedChan QuoteSourceServerData, + completionMvar :: MVar (), + serverThreadId :: ThreadId, heartbeatThreadId :: ThreadId } -data QuoteSourceServerData = QSSTick Tick | QSSHeartbeat | QSSKill +data QuoteSourceServerData = QSSTick Tick | QSSBar (BarTimeframe, Bar) | QSSHeartbeat | QSSKill deriving (Show, Eq) serverThread :: QuoteSourceServer -> IO () @@ -50,16 +51,19 @@ serverThread state = do qssdata' <- readChan $ tickChannel state qssdata <- readChanN 15 $ tickChannel state let fulldata = qssdata' : qssdata - let tickGroups = L.groupBy (\x y -> security x == security y) $ mapMaybe onlyTick fulldata + let (ticks, bars) = getTicksAndBars fulldata + let tickGroups = L.groupBy (\x y -> security x == security y) $ ticks mapM_ (\ticks -> case headMay ticks of - Just h -> sendTicks (security h) ticks + Just h -> sendTicks (security h) ticks Nothing -> return()) tickGroups + mapM_ sendBar bars + when (QSSHeartbeat `elem` fulldata) $ send (outSocket state) [] $ B8.pack "SYSTEM#HEARTBEAT" unless (QSSKill `elem` fulldata) serverThread' - + readChanN n chan | n <= 0 = return [] | otherwise = do @@ -72,9 +76,14 @@ serverThread state = do onlyTick t = case t of QSSTick tick -> Just tick - _ -> Nothing + _ -> Nothing + getTicksAndBars = foldl' (\(tl, bl) qss -> case qss of + QSSTick t -> (t : tl, bl) + QSSBar b -> (tl, b : bl) + _ -> (tl, bl)) ([], []) sendTicks secName ticklist = sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializedTicks secName ticklist + sendBar (tf, bar) = sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeBar tf bar serializedTicks secName ticklist = header : [body] where header = BL.fromStrict . E.encodeUtf8 $ secName @@ -92,7 +101,7 @@ startQuoteSourceServer chan c ep socketDomainIdMb = do hbTid <- forkIO $ forever $ do threadDelay 1000000 writeChan chan QSSHeartbeat - + mv <- newEmptyMVar let state = QuoteSourceServerState { ctx = c, diff --git a/src/ATrade/Types.hs b/src/ATrade/Types.hs index b29ded3..90cd8ef 100644 --- a/src/ATrade/Types.hs +++ b/src/ATrade/Types.hs @@ -1,10 +1,16 @@ -{-# LANGUAGE OverloadedStrings, TypeSynonymInstances, FlexibleInstances #-} -{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE TypeSynonymInstances #-} module ATrade.Types ( TickerId, Tick(..), Bar(..), + serializeBar, + serializeBarBody, + deserializeBar, + BarTimeframe(..), DataType(..), serializeTick, serializeTickBody, @@ -25,28 +31,28 @@ module ATrade.Types ( module ATrade.Price ) where -import GHC.Generics - -import ATrade.Price - -import Control.Monad -import Data.Aeson -import Data.Aeson.Types -import Data.Binary.Builder -import Data.Binary.Get -import Data.ByteString.Lazy as B -import Data.DateTime -import Data.Int -import Data.List as L -import Data.Maybe -import Data.Ratio -import Data.Text as T -import Data.Text.Encoding as E -import Data.Time.Clock -import Data.Time.Clock.POSIX -import Data.Word - -import System.ZMQ4.ZAP +import GHC.Generics + +import ATrade.Price + +import Control.Monad +import Data.Aeson +import Data.Aeson.Types +import Data.Binary.Get +import Data.Binary.Put +import Data.ByteString.Lazy as B +import Data.DateTime +import Data.Int +import Data.List as L +import Data.Maybe +import Data.Ratio +import Data.Text as T +import Data.Text.Encoding as E +import Data.Time.Clock +import Data.Time.Clock.POSIX +import Data.Word + +import System.ZMQ4.ZAP type TickerId = T.Text @@ -89,25 +95,36 @@ instance Enum DataType where | otherwise = Unknown data Tick = Tick { - security :: !T.Text, - datatype :: !DataType, + security :: !T.Text, + datatype :: !DataType, timestamp :: !UTCTime, - value :: !Price, - volume :: !Integer + value :: !Price, + volume :: !Integer } deriving (Show, Eq, Generic) +putPrice :: Price -> Put +putPrice price = do + let (i, f) = decompose price + putWord64le $ fromInteger . toInteger $ i + putWord32le $ (* 1000) . fromInteger . toInteger $ f + +parsePrice :: Get Price +parsePrice = do + intpart <- (fromIntegral <$> getWord64le) :: Get Int64 + nanopart <- (fromIntegral <$> getWord32le) :: Get Int32 + return $ compose (intpart, nanopart `div` 1000) + serializeTickHeader :: Tick -> ByteString serializeTickHeader tick = B.fromStrict . E.encodeUtf8 $ security tick serializeTickBody :: Tick -> ByteString -serializeTickBody tick = toLazyByteString $ mconcat [ - putWord32le 1, - putWord64le $ fromIntegral . toSeconds' . timestamp $ tick, - putWord32le $ fromIntegral . fracSeconds . timestamp $ tick, - putWord32le $ fromIntegral . fromEnum . datatype $ tick, - putWord64le $ fromInteger . toInteger . fst . decompose . value $ tick, - putWord32le $ (* 1000) . fromInteger . toInteger . snd . decompose . value $ tick, - putWord32le $ fromIntegral $ volume tick ] +serializeTickBody tick = runPut $ do + putWord32le 1 + putWord64le $ fromIntegral . toSeconds' . timestamp $ tick + putWord32le $ fromIntegral . fracSeconds . timestamp $ tick + putWord32le $ fromIntegral . fromEnum . datatype $ tick + putPrice $ value tick + putWord32le $ fromIntegral $ volume tick where fractionalPart :: (RealFrac a) => a -> a fractionalPart x = x - fromIntegral (truncate x) @@ -125,17 +142,16 @@ parseTick = do tsec <- getWord64le tusec <- getWord32le dt <- toEnum . fromEnum <$> getWord32le - intpart <- (fromIntegral <$> getWord64le) :: Get Int64 - nanopart <- (fromIntegral <$> getWord32le) :: Get Int32 + price <- parsePrice volume <- fromIntegral <$> (fromIntegral <$> getWord32le :: Get Int32) return Tick { security = "", datatype = dt, timestamp = makeTimestamp tsec tusec, - value = compose (intpart, nanopart `div` 1000), + value = price, volume = volume } - where - makeTimestamp :: Word64 -> Word32 -> UTCTime - makeTimestamp sec usec = addUTCTime (fromRational $ toInteger usec % 1000000) (fromSeconds . toInteger $ sec) + +makeTimestamp :: Word64 -> Word32 -> UTCTime +makeTimestamp sec usec = addUTCTime (fromRational $ toInteger usec % 1000000) (fromSeconds . toInteger $ sec) deserializeTick :: [ByteString] -> Maybe Tick deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of @@ -146,23 +162,84 @@ deserializeTick _ = Nothing deserializeTickBody :: ByteString -> (ByteString, Maybe Tick) deserializeTickBody bs = case runGetOrFail parseTick bs of - Left (rest, _, _) -> (rest, Nothing) + Left (rest, _, _) -> (rest, Nothing) Right (rest, _, tick) -> (rest, Just tick) data Bar = Bar { - barSecurity :: !TickerId, + barSecurity :: !TickerId, barTimestamp :: !UTCTime, - barOpen :: !Price, - barHigh :: !Price, - barLow :: !Price, - barClose :: !Price, - barVolume :: !Integer + barOpen :: !Price, + barHigh :: !Price, + barLow :: !Price, + barClose :: !Price, + barVolume :: !Integer } deriving (Show, Eq, Generic) +-- | Stores timeframe in seconds +newtype BarTimeframe = BarTimeframe { unBarTimeframe :: Int } + deriving (Show, Eq) + +serializeBar :: BarTimeframe -> Bar -> [ByteString] +serializeBar tf bar = serializeBarHeader tf bar : [serializeBarBody tf bar] + +-- | Encodes bar header as tickerid:timeframe_seconds; +-- Why ';' at the end? To support correct 0mq subscriptions. When we subscribe to topic, +-- we actually subscribe by all topics which has requested subscription as a prefix. +serializeBarHeader :: BarTimeframe -> Bar -> ByteString +serializeBarHeader tf bar = + B.fromStrict . E.encodeUtf8 $ (barSecurity bar) `T.append` encodeTimeframe tf + where + encodeTimeframe tf = mconcat [ ":", (T.pack . show $ unBarTimeframe tf), ";" ] + +serializeBarBody :: BarTimeframe -> Bar -> ByteString +serializeBarBody tf bar = runPut $ do + putWord32le 2 + putWord32le $ fromIntegral $ unBarTimeframe tf + putWord64le $ fromIntegral . toSeconds' . barTimestamp $ bar + putWord32le $ fromIntegral . fracSeconds . barTimestamp $ bar + putPrice $ barOpen bar + putPrice $ barHigh bar + putPrice $ barLow bar + putPrice $ barClose bar + putWord32le $ fromIntegral $ barVolume bar + where + fractionalPart :: (RealFrac a) => a -> a + fractionalPart x = x - fromIntegral (truncate x) + toSeconds' = floor . utcTimeToPOSIXSeconds + fracSeconds t = (truncate $ (* 1000000000000) $ utcTimeToPOSIXSeconds t) `mod` 1000000000000 `div` 1000000 + +parseBar :: Get (BarTimeframe, Bar) +parseBar = do + packetType <- fromEnum <$> getWord32le + when (packetType /= 2) $ fail "Expected packettype == 2" + tf <- fromIntegral <$> getWord32le + tsec <- getWord64le + tusec <- getWord32le + open_ <- parsePrice + high_ <- parsePrice + low_ <- parsePrice + close_ <- parsePrice + volume_ <- fromIntegral <$> getWord32le + return (BarTimeframe tf, Bar { barSecurity = "", + barTimestamp = makeTimestamp tsec tusec, + barOpen = open_, + barHigh = high_, + barLow = low_, + barClose = close_, + barVolume = volume_ }) + +deserializeBar :: [ByteString] -> Maybe (BarTimeframe, Bar) +deserializeBar (header:rawData:_) = case runGetOrFail parseBar rawData of + Left (_, _, _) -> Nothing + Right (_, _, (tf, bar)) -> Just $ (tf, bar { barSecurity = T.takeWhile (/= ':') . E.decodeUtf8 . B.toStrict $ header }) + +deserializeBar _ = Nothing + + data SignalId = SignalId { strategyId :: T.Text, signalName :: T.Text, - comment :: T.Text } + comment :: T.Text } deriving (Show, Eq) instance FromJSON SignalId where @@ -195,7 +272,7 @@ instance FromJSON OrderPrice where execPrice <- v .: "execution" case execPrice of (String s) -> if s /= "market" - then (fail "If string, then should be 'market'") + then (fail "If string, then should be 'market'") else return $ StopMarket trprice (Number n) -> return $ Stop trprice (fromScientific n) _ -> fail "Should be either number or 'market'" @@ -223,7 +300,7 @@ instance FromJSON Operation where parseJSON _ = fail "Should be string" instance ToJSON Operation where - toJSON Buy = String "buy" + toJSON Buy = String "buy" toJSON Sell = String "sell" data OrderState = Unsubmitted @@ -250,26 +327,26 @@ instance FromJSON OrderState where instance ToJSON OrderState where toJSON os = case os of - Unsubmitted -> String "unsubmitted" - Submitted -> String "submitted" + Unsubmitted -> String "unsubmitted" + Submitted -> String "submitted" PartiallyExecuted -> String "partially-executed" - Executed -> String "executed" - Cancelled -> String "cancelled" - Rejected -> String "rejected" - OrderError -> String "error" + Executed -> String "executed" + Cancelled -> String "cancelled" + Rejected -> String "rejected" + OrderError -> String "error" type OrderId = Integer data Order = Order { - orderId :: OrderId, - orderAccountId :: T.Text, - orderSecurity :: T.Text, - orderPrice :: OrderPrice, - orderQuantity :: Integer, + orderId :: OrderId, + orderAccountId :: T.Text, + orderSecurity :: T.Text, + orderPrice :: OrderPrice, + orderQuantity :: Integer, orderExecutedQuantity :: Integer, - orderOperation :: Operation, - orderState :: OrderState, - orderSignalId :: SignalId } + orderOperation :: Operation, + orderState :: OrderState, + orderSignalId :: SignalId } deriving (Show, Eq) mkOrder = Order { orderId = 0, @@ -310,17 +387,17 @@ instance ToJSON Order where ifMaybe name pred val = if pred val then Just (name .= val) else Nothing data Trade = Trade { - tradeOrderId :: OrderId, - tradePrice :: Price, - tradeQuantity :: Integer, - tradeVolume :: Price, + tradeOrderId :: OrderId, + tradePrice :: Price, + tradeQuantity :: Integer, + tradeVolume :: Price, tradeVolumeCurrency :: T.Text, - tradeOperation :: Operation, - tradeAccount :: T.Text, - tradeSecurity :: T.Text, - tradeTimestamp :: UTCTime, - tradeCommission :: Price, - tradeSignalId :: SignalId } + tradeOperation :: Operation, + tradeAccount :: T.Text, + tradeSecurity :: T.Text, + tradeTimestamp :: UTCTime, + tradeCommission :: Price, + tradeSignalId :: SignalId } deriving (Show, Eq) instance FromJSON Trade where @@ -352,7 +429,7 @@ instance ToJSON Trade where "signal-id" .= tradeSignalId trade] data ServerSecurityParams = ServerSecurityParams { - sspDomain :: Maybe T.Text, + sspDomain :: Maybe T.Text, sspCertificate :: Maybe CurveCertificate } deriving (Show, Eq) @@ -362,8 +439,8 @@ defaultServerSecurityParams = ServerSecurityParams { } data ClientSecurityParams = ClientSecurityParams { - cspDomain :: Maybe T.Text, - cspCertificate :: Maybe CurveCertificate, + cspDomain :: Maybe T.Text, + cspCertificate :: Maybe CurveCertificate, cspServerCertificate :: Maybe CurveCertificate } deriving (Show, Eq) @@ -373,10 +450,10 @@ defaultClientSecurityParams = ClientSecurityParams { } data TickerInfo = TickerInfo { - tiTicker :: TickerId, - tiClass :: T.Text, - tiBase :: Maybe TickerId, - tiLotSize :: Integer, + tiTicker :: TickerId, + tiClass :: T.Text, + tiBase :: Maybe TickerId, + tiLotSize :: Integer, tiTickSize :: Price } deriving (Show, Eq) diff --git a/stack.yaml b/stack.yaml index 6a1a42e..fa86d9b 100644 --- a/stack.yaml +++ b/stack.yaml @@ -40,7 +40,7 @@ packages: - '../zeromq4-haskell-zap' # Dependency packages to be pulled from upstream that are not in the resolver # (e.g., acme-missiles-0.3) -extra-deps: [ "datetime-0.3.1", "hexdump-0.1"] +extra-deps: [ "datetime-0.3.1", "hexdump-0.1", "text-format-0.3.2"] # Override default flag values for local packages and extra-deps flags: {} diff --git a/test/ArbitraryInstances.hs b/test/ArbitraryInstances.hs index 8e26580..932230b 100644 --- a/test/ArbitraryInstances.hs +++ b/test/ArbitraryInstances.hs @@ -10,6 +10,7 @@ import Test.QuickCheck.Instances () import ATrade.Types import ATrade.Price as P +import qualified Data.Text as T import ATrade.Broker.Protocol import Data.Time.Clock @@ -18,22 +19,24 @@ import Data.Time.Calendar notTooBig :: (Num a, Ord a) => a -> Bool notTooBig x = abs x < 100000000 +arbitraryTickerId = arbitrary `suchThat` (T.all (/= ':')) + instance Arbitrary Tick where arbitrary = Tick <$> - arbitrary <*> + arbitraryTickerId <*> arbitrary <*> arbitraryTimestamp <*> arbitrary <*> arbitrary - where - arbitraryTimestamp = do - y <- choose (1970, 2050) - m <- choose (1, 12) - d <- choose (1, 31) - sec <- secondsToDiffTime <$> choose (0, 86399) +arbitraryTimestamp = do + y <- choose (1970, 2050) + m <- choose (1, 12) + d <- choose (1, 31) + + sec <- secondsToDiffTime <$> choose (0, 86399) - return $ UTCTime (fromGregorian y m d) sec + return $ UTCTime (fromGregorian y m d) sec instance Arbitrary DataType where arbitrary = toEnum <$> choose (1, 10) @@ -116,3 +119,17 @@ instance Arbitrary BrokerServerResponse where instance Arbitrary P.Price where arbitrary = P.Price <$> (arbitrary `suchThat` (\p -> abs p < 1000000000 * 10000000)) +instance Arbitrary Bar where + arbitrary = Bar <$> + arbitraryTickerId <*> + arbitraryTimestamp <*> + arbitrary <*> + arbitrary <*> + arbitrary <*> + arbitrary <*> + arbitrary `suchThat` (> 0) + +instance Arbitrary BarTimeframe where + arbitrary = BarTimeframe <$> (arbitrary `suchThat` (\p -> p > 0 && p < 86400 * 365)) + + diff --git a/test/TestQuoteSourceClient.hs b/test/TestQuoteSourceClient.hs index 3c6a88a..4debb3d 100644 --- a/test/TestQuoteSourceClient.hs +++ b/test/TestQuoteSourceClient.hs @@ -4,22 +4,22 @@ module TestQuoteSourceClient ( unitTests ) where -import Test.Tasty -import Test.Tasty.HUnit - -import ATrade.Types -import ATrade.QuoteSource.Server -import ATrade.QuoteSource.Client -import Control.Monad -import Control.Concurrent.BoundedChan -import Control.Concurrent hiding (writeChan, readChan) -import Control.Exception -import System.ZMQ4 -import Data.Time.Clock -import Data.Time.Calendar -import qualified Data.Text as T -import Data.UUID as U -import Data.UUID.V4 as UV4 +import Test.Tasty +import Test.Tasty.HUnit + +import ATrade.QuoteSource.Client +import ATrade.QuoteSource.Server +import ATrade.Types +import Control.Concurrent hiding (readChan, writeChan) +import Control.Concurrent.BoundedChan +import Control.Exception +import Control.Monad +import qualified Data.Text as T +import Data.Time.Calendar +import Data.Time.Clock +import Data.UUID as U +import Data.UUID.V4 as UV4 +import System.ZMQ4 makeEndpoint :: IO T.Text makeEndpoint = do @@ -27,7 +27,10 @@ makeEndpoint = do return $ "inproc://server" `T.append` uid unitTests :: TestTree -unitTests = testGroup "QuoteSource.Client" [testStartStop, testTickStream] +unitTests = testGroup "QuoteSource.Client" [ + testStartStop + , testTickStream + , testBarStream ] testStartStop :: TestTree testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do @@ -51,6 +54,24 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c value = 1000, volume = 1} forkIO $ forever $ writeChan chan (QSSTick tick) - recvdTick <- readChan clientChan - tick @=? recvdTick))) - + recvdData <- readChan clientChan + QDTick tick @=? recvdData))) + +testBarStream :: TestTree +testBarStream = testCase "QuoteSource clients receives bars" $ withContext (\ctx -> do + ep <- makeEndpoint + chan <- newBoundedChan 1000 + clientChan <- newBoundedChan 1000 + bracket (startQuoteSourceServer chan ctx ep Nothing) stopQuoteSourceServer (\_ -> + bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep) stopQuoteSourceClient (\_ -> do + let bar = Bar { + barSecurity = "FOOBAR", + barTimestamp = UTCTime (fromGregorian 2016 9 27) 16000, + barOpen = fromDouble 10.0, + barHigh = fromDouble 15.0, + barLow = fromDouble 8.0, + barClose = fromDouble 11.0, + barVolume = 42 } + forkIO $ forever $ writeChan chan $ QSSBar (BarTimeframe 60, bar) + recvdData <- readChan clientChan + QDBar (BarTimeframe 60, bar) @=? recvdData))) diff --git a/test/TestQuoteSourceServer.hs b/test/TestQuoteSourceServer.hs index 507e64e..fba8636 100644 --- a/test/TestQuoteSourceServer.hs +++ b/test/TestQuoteSourceServer.hs @@ -4,20 +4,23 @@ module TestQuoteSourceServer ( unitTests ) where -import Test.Tasty -import Test.Tasty.HUnit - -import ATrade.Types -import qualified Data.ByteString.Lazy as BL -import ATrade.QuoteSource.Server -import Control.Concurrent.BoundedChan -import Control.Exception -import System.ZMQ4 -import Data.Time.Clock -import Data.Time.Calendar +import Test.Tasty +import Test.Tasty.HUnit + +import ATrade.QuoteSource.Server +import ATrade.Types +import Control.Concurrent.BoundedChan +import Control.Exception +import qualified Data.ByteString.Lazy as BL +import Data.Time.Calendar +import Data.Time.Clock +import System.ZMQ4 unitTests :: TestTree -unitTests = testGroup "QuoteSource.Server" [testStartStop, testTickStream] +unitTests = testGroup "QuoteSource.Server" [ + testStartStop + , testTickStream + , testBarStream ] testStartStop :: TestTree testStartStop = testCase "QuoteSource Server starts and stops" $ withContext (\ctx -> do @@ -42,5 +45,26 @@ testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx - packet <- fmap BL.fromStrict <$> receiveMulti s case deserializeTick packet of Just recvdTick -> tick @=? recvdTick - Nothing -> assertFailure "Unable to deserialize tick"))) - + Nothing -> assertFailure "Unable to deserialize tick"))) + + +testBarStream :: TestTree +testBarStream = testCase "QuoteSource Server sends bars" $ withContext (\ctx -> do + chan <- newBoundedChan 1000 + bracket (startQuoteSourceServer chan ctx "inproc://quotesource-server" Nothing) stopQuoteSourceServer (\_ -> + withSocket ctx Sub (\s -> do + connect s "inproc://quotesource-server" + subscribe s "FOOBAR" + let bar = Bar { + barSecurity = "FOOBAR", + barTimestamp = UTCTime (fromGregorian 2016 9 27) 16000, + barOpen = fromDouble 10.0, + barHigh = fromDouble 15.0, + barLow = fromDouble 8.0, + barClose = fromDouble 11.0, + barVolume = 1 } + writeChan chan (QSSBar (BarTimeframe 60, bar)) + packet <- fmap BL.fromStrict <$> receiveMulti s + case deserializeBar packet of + Just (barTf, recvdBar) -> (bar @=? recvdBar) >> (barTf @=? (BarTimeframe 60)) + Nothing -> assertFailure "Unable to deserialize bar"))) diff --git a/test/TestTypes.hs b/test/TestTypes.hs index 3fe9f9c..1129ce9 100644 --- a/test/TestTypes.hs +++ b/test/TestTypes.hs @@ -15,6 +15,8 @@ import ArbitraryInstances () import Data.Aeson import qualified Data.ByteString.Lazy as B +import Debug.Trace + properties :: TestTree properties = testGroup "Types" [ testTickSerialization @@ -31,6 +33,7 @@ properties = testGroup "Types" [ , testPriceAddition , testPriceMultiplication , testPriceSubtraction + , testBarSerialization ] testTickSerialization :: TestTree @@ -116,3 +119,8 @@ testPriceSubtraction :: TestTree testPriceSubtraction = QC.testProperty "Price subtraction" (\(p1, p2) -> abs (toDouble p1 - toDouble p2 - toDouble (p1 - p2)) < 0.00001) +testBarSerialization :: TestTree +testBarSerialization = QC.testProperty "Deserialize serialized bar" + (\(tf, bar) -> case deserializeBar (serializeBar tf bar) of + Just (tf', bar') -> bar == bar' && tf == tf' + Nothing -> False)