Browse Source

Quotesource: support bar transfer

master
Denis Tereshkin 7 years ago
parent
commit
2380a05203
  1. 1
      .gitignore
  2. 2
      src/ATrade/Broker/Server.hs
  3. 57
      src/ATrade/QuoteSource/Client.hs
  4. 65
      src/ATrade/QuoteSource/Server.hs
  5. 241
      src/ATrade/Types.hs
  6. 2
      stack.yaml
  7. 33
      test/ArbitraryInstances.hs
  8. 59
      test/TestQuoteSourceClient.hs
  9. 50
      test/TestQuoteSourceServer.hs
  10. 8
      test/TestTypes.hs

1
.gitignore vendored

@ -1,2 +1,3 @@ @@ -1,2 +1,3 @@
.*
\#*.*\#

2
src/ATrade/Broker/Server.hs

@ -118,7 +118,7 @@ tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $ @@ -118,7 +118,7 @@ tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $
maybeTrade <- tryReadChan chan
case maybeTrade of
Just trade -> mapM_ (\x -> x trade) tradeSinks
Nothing -> threadDelay 1000000
Nothing -> threadDelay 100000
where
wasKilled = isJust <$> (killMvar <$> readIORef state >>= tryReadMVar)

57
src/ATrade/QuoteSource/Client.hs

@ -1,47 +1,52 @@ @@ -1,47 +1,52 @@
{-# LANGUAGE OverloadedStrings #-}
module ATrade.QuoteSource.Client (
QuoteData(..),
startQuoteSourceClient,
stopQuoteSourceClient
) where
import ATrade.Types
import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (readChan, writeChan, writeList2Chan)
import Control.Concurrent.MVar
import Control.Monad
import Control.Monad.Loops
import Control.Exception
import Data.List.NonEmpty
import Data.Maybe
import qualified Data.Text as T
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Char8 as B8
import qualified Data.List as L
import Data.Text.Encoding
import Data.Time.Clock
import Data.IORef
import System.ZMQ4
import System.Log.Logger
import ATrade.Types
import Control.Concurrent hiding (readChan, writeChan,
writeList2Chan)
import Control.Concurrent.BoundedChan
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
import Control.Monad.Loops
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL
import Data.IORef
import qualified Data.List as L
import Data.List.NonEmpty
import Data.Maybe
import qualified Data.Text as T
import Data.Text.Encoding
import Data.Time.Clock
import System.Log.Logger
import System.ZMQ4
import Safe
import Safe
data QuoteSourceClientHandle = QuoteSourceClientHandle {
tid :: ThreadId,
tid :: ThreadId,
completionMvar :: MVar (),
killMVar :: MVar ()
killMVar :: MVar ()
}
deserializeTicks :: [BL.ByteString] -> [Tick]
data QuoteData = QDTick Tick | QDBar (BarTimeframe, Bar)
deriving (Show, Eq)
deserializeTicks :: [BL.ByteString] -> [QuoteData]
deserializeTicks (secname:raw:_) = deserializeWithName (decodeUtf8 . BL.toStrict $ secname) raw
where
deserializeWithName secNameT raw = case deserializeTickBody raw of
(rest, Just tick) -> tick { security = secNameT } : deserializeWithName secNameT rest
(rest, Just tick) -> QDTick (tick { security = secNameT }) : deserializeWithName secNameT rest
_ -> []
deserializeTicks _ = []
startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle
startQuoteSourceClient :: BoundedChan QuoteData -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle
startQuoteSourceClient chan tickers ctx endpoint = do
compMv <- newEmptyMVar
killMv <- newEmptyMVar
@ -67,7 +72,9 @@ startQuoteSourceClient chan tickers ctx endpoint = do @@ -67,7 +72,9 @@ startQuoteSourceClient chan tickers ctx endpoint = do
prevHeartbeat <- readIORef lastHeartbeat
if headMay rawTick == Just "SYSTEM#HEARTBEAT"
then writeIORef lastHeartbeat now
else writeList2Chan chan (deserializeTicks rawTick)
else case deserializeBar rawTick of
Just (tf, bar) -> writeChan chan $ QDBar (tf, bar)
_ -> writeList2Chan chan (deserializeTicks rawTick)
debugM "QuoteSource.Client" "Heartbeat timeout")
notTimeout ts = do

65
src/ATrade/QuoteSource/Server.hs

@ -5,36 +5,37 @@ module ATrade.QuoteSource.Server ( @@ -5,36 +5,37 @@ module ATrade.QuoteSource.Server (
QuoteSourceServerData(..)
) where
import ATrade.Types
import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (readChan, writeChan)
import Control.Exception
import Control.Monad
import qualified Data.List as L
import qualified Data.Text as T
import qualified Data.Text.Encoding as E
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString as B
import Data.List.NonEmpty hiding (map)
import Data.Maybe
import System.Log.Logger
import System.ZMQ4
import System.ZMQ4.ZAP
import Prelude hiding ((!!))
import Safe
import ATrade.Types
import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.BoundedChan
import Control.Exception
import Control.Monad
import qualified Data.ByteString as B
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL
import Data.Foldable
import qualified Data.List as L
import Data.List.NonEmpty hiding (map)
import Data.Maybe
import qualified Data.Text as T
import qualified Data.Text.Encoding as E
import Prelude hiding ((!!))
import System.Log.Logger
import System.ZMQ4
import System.ZMQ4.ZAP
import Safe
data QuoteSourceServer = QuoteSourceServerState {
ctx :: Context,
outSocket :: Socket Pub,
tickChannel :: BoundedChan QuoteSourceServerData,
completionMvar :: MVar (),
serverThreadId :: ThreadId,
ctx :: Context,
outSocket :: Socket Pub,
tickChannel :: BoundedChan QuoteSourceServerData,
completionMvar :: MVar (),
serverThreadId :: ThreadId,
heartbeatThreadId :: ThreadId
}
data QuoteSourceServerData = QSSTick Tick | QSSHeartbeat | QSSKill
data QuoteSourceServerData = QSSTick Tick | QSSBar (BarTimeframe, Bar) | QSSHeartbeat | QSSKill
deriving (Show, Eq)
serverThread :: QuoteSourceServer -> IO ()
@ -50,12 +51,15 @@ serverThread state = do @@ -50,12 +51,15 @@ serverThread state = do
qssdata' <- readChan $ tickChannel state
qssdata <- readChanN 15 $ tickChannel state
let fulldata = qssdata' : qssdata
let tickGroups = L.groupBy (\x y -> security x == security y) $ mapMaybe onlyTick fulldata
let (ticks, bars) = getTicksAndBars fulldata
let tickGroups = L.groupBy (\x y -> security x == security y) $ ticks
mapM_ (\ticks -> case headMay ticks of
Just h -> sendTicks (security h) ticks
Just h -> sendTicks (security h) ticks
Nothing -> return()) tickGroups
mapM_ sendBar bars
when (QSSHeartbeat `elem` fulldata) $ send (outSocket state) [] $ B8.pack "SYSTEM#HEARTBEAT"
unless (QSSKill `elem` fulldata) serverThread'
@ -72,9 +76,14 @@ serverThread state = do @@ -72,9 +76,14 @@ serverThread state = do
onlyTick t = case t of
QSSTick tick -> Just tick
_ -> Nothing
_ -> Nothing
getTicksAndBars = foldl' (\(tl, bl) qss -> case qss of
QSSTick t -> (t : tl, bl)
QSSBar b -> (tl, b : bl)
_ -> (tl, bl)) ([], [])
sendTicks secName ticklist = sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializedTicks secName ticklist
sendBar (tf, bar) = sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeBar tf bar
serializedTicks secName ticklist = header : [body]
where
header = BL.fromStrict . E.encodeUtf8 $ secName

241
src/ATrade/Types.hs

@ -1,10 +1,16 @@ @@ -1,10 +1,16 @@
{-# LANGUAGE OverloadedStrings, TypeSynonymInstances, FlexibleInstances #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE TypeSynonymInstances #-}
module ATrade.Types (
TickerId,
Tick(..),
Bar(..),
serializeBar,
serializeBarBody,
deserializeBar,
BarTimeframe(..),
DataType(..),
serializeTick,
serializeTickBody,
@ -25,28 +31,28 @@ module ATrade.Types ( @@ -25,28 +31,28 @@ module ATrade.Types (
module ATrade.Price
) where
import GHC.Generics
import ATrade.Price
import Control.Monad
import Data.Aeson
import Data.Aeson.Types
import Data.Binary.Builder
import Data.Binary.Get
import Data.ByteString.Lazy as B
import Data.DateTime
import Data.Int
import Data.List as L
import Data.Maybe
import Data.Ratio
import Data.Text as T
import Data.Text.Encoding as E
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Word
import System.ZMQ4.ZAP
import GHC.Generics
import ATrade.Price
import Control.Monad
import Data.Aeson
import Data.Aeson.Types
import Data.Binary.Get
import Data.Binary.Put
import Data.ByteString.Lazy as B
import Data.DateTime
import Data.Int
import Data.List as L
import Data.Maybe
import Data.Ratio
import Data.Text as T
import Data.Text.Encoding as E
import Data.Time.Clock
import Data.Time.Clock.POSIX
import Data.Word
import System.ZMQ4.ZAP
type TickerId = T.Text
@ -89,25 +95,36 @@ instance Enum DataType where @@ -89,25 +95,36 @@ instance Enum DataType where
| otherwise = Unknown
data Tick = Tick {
security :: !T.Text,
datatype :: !DataType,
security :: !T.Text,
datatype :: !DataType,
timestamp :: !UTCTime,
value :: !Price,
volume :: !Integer
value :: !Price,
volume :: !Integer
} deriving (Show, Eq, Generic)
putPrice :: Price -> Put
putPrice price = do
let (i, f) = decompose price
putWord64le $ fromInteger . toInteger $ i
putWord32le $ (* 1000) . fromInteger . toInteger $ f
parsePrice :: Get Price
parsePrice = do
intpart <- (fromIntegral <$> getWord64le) :: Get Int64
nanopart <- (fromIntegral <$> getWord32le) :: Get Int32
return $ compose (intpart, nanopart `div` 1000)
serializeTickHeader :: Tick -> ByteString
serializeTickHeader tick = B.fromStrict . E.encodeUtf8 $ security tick
serializeTickBody :: Tick -> ByteString
serializeTickBody tick = toLazyByteString $ mconcat [
putWord32le 1,
putWord64le $ fromIntegral . toSeconds' . timestamp $ tick,
putWord32le $ fromIntegral . fracSeconds . timestamp $ tick,
putWord32le $ fromIntegral . fromEnum . datatype $ tick,
putWord64le $ fromInteger . toInteger . fst . decompose . value $ tick,
putWord32le $ (* 1000) . fromInteger . toInteger . snd . decompose . value $ tick,
putWord32le $ fromIntegral $ volume tick ]
serializeTickBody tick = runPut $ do
putWord32le 1
putWord64le $ fromIntegral . toSeconds' . timestamp $ tick
putWord32le $ fromIntegral . fracSeconds . timestamp $ tick
putWord32le $ fromIntegral . fromEnum . datatype $ tick
putPrice $ value tick
putWord32le $ fromIntegral $ volume tick
where
fractionalPart :: (RealFrac a) => a -> a
fractionalPart x = x - fromIntegral (truncate x)
@ -125,17 +142,16 @@ parseTick = do @@ -125,17 +142,16 @@ parseTick = do
tsec <- getWord64le
tusec <- getWord32le
dt <- toEnum . fromEnum <$> getWord32le
intpart <- (fromIntegral <$> getWord64le) :: Get Int64
nanopart <- (fromIntegral <$> getWord32le) :: Get Int32
price <- parsePrice
volume <- fromIntegral <$> (fromIntegral <$> getWord32le :: Get Int32)
return Tick { security = "",
datatype = dt,
timestamp = makeTimestamp tsec tusec,
value = compose (intpart, nanopart `div` 1000),
value = price,
volume = volume }
where
makeTimestamp :: Word64 -> Word32 -> UTCTime
makeTimestamp sec usec = addUTCTime (fromRational $ toInteger usec % 1000000) (fromSeconds . toInteger $ sec)
makeTimestamp :: Word64 -> Word32 -> UTCTime
makeTimestamp sec usec = addUTCTime (fromRational $ toInteger usec % 1000000) (fromSeconds . toInteger $ sec)
deserializeTick :: [ByteString] -> Maybe Tick
deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of
@ -146,23 +162,84 @@ deserializeTick _ = Nothing @@ -146,23 +162,84 @@ deserializeTick _ = Nothing
deserializeTickBody :: ByteString -> (ByteString, Maybe Tick)
deserializeTickBody bs = case runGetOrFail parseTick bs of
Left (rest, _, _) -> (rest, Nothing)
Left (rest, _, _) -> (rest, Nothing)
Right (rest, _, tick) -> (rest, Just tick)
data Bar = Bar {
barSecurity :: !TickerId,
barSecurity :: !TickerId,
barTimestamp :: !UTCTime,
barOpen :: !Price,
barHigh :: !Price,
barLow :: !Price,
barClose :: !Price,
barVolume :: !Integer
barOpen :: !Price,
barHigh :: !Price,
barLow :: !Price,
barClose :: !Price,
barVolume :: !Integer
} deriving (Show, Eq, Generic)
-- | Stores timeframe in seconds
newtype BarTimeframe = BarTimeframe { unBarTimeframe :: Int }
deriving (Show, Eq)
serializeBar :: BarTimeframe -> Bar -> [ByteString]
serializeBar tf bar = serializeBarHeader tf bar : [serializeBarBody tf bar]
-- | Encodes bar header as tickerid:timeframe_seconds;
-- Why ';' at the end? To support correct 0mq subscriptions. When we subscribe to topic,
-- we actually subscribe by all topics which has requested subscription as a prefix.
serializeBarHeader :: BarTimeframe -> Bar -> ByteString
serializeBarHeader tf bar =
B.fromStrict . E.encodeUtf8 $ (barSecurity bar) `T.append` encodeTimeframe tf
where
encodeTimeframe tf = mconcat [ ":", (T.pack . show $ unBarTimeframe tf), ";" ]
serializeBarBody :: BarTimeframe -> Bar -> ByteString
serializeBarBody tf bar = runPut $ do
putWord32le 2
putWord32le $ fromIntegral $ unBarTimeframe tf
putWord64le $ fromIntegral . toSeconds' . barTimestamp $ bar
putWord32le $ fromIntegral . fracSeconds . barTimestamp $ bar
putPrice $ barOpen bar
putPrice $ barHigh bar
putPrice $ barLow bar
putPrice $ barClose bar
putWord32le $ fromIntegral $ barVolume bar
where
fractionalPart :: (RealFrac a) => a -> a
fractionalPart x = x - fromIntegral (truncate x)
toSeconds' = floor . utcTimeToPOSIXSeconds
fracSeconds t = (truncate $ (* 1000000000000) $ utcTimeToPOSIXSeconds t) `mod` 1000000000000 `div` 1000000
parseBar :: Get (BarTimeframe, Bar)
parseBar = do
packetType <- fromEnum <$> getWord32le
when (packetType /= 2) $ fail "Expected packettype == 2"
tf <- fromIntegral <$> getWord32le
tsec <- getWord64le
tusec <- getWord32le
open_ <- parsePrice
high_ <- parsePrice
low_ <- parsePrice
close_ <- parsePrice
volume_ <- fromIntegral <$> getWord32le
return (BarTimeframe tf, Bar { barSecurity = "",
barTimestamp = makeTimestamp tsec tusec,
barOpen = open_,
barHigh = high_,
barLow = low_,
barClose = close_,
barVolume = volume_ })
deserializeBar :: [ByteString] -> Maybe (BarTimeframe, Bar)
deserializeBar (header:rawData:_) = case runGetOrFail parseBar rawData of
Left (_, _, _) -> Nothing
Right (_, _, (tf, bar)) -> Just $ (tf, bar { barSecurity = T.takeWhile (/= ':') . E.decodeUtf8 . B.toStrict $ header })
deserializeBar _ = Nothing
data SignalId = SignalId {
strategyId :: T.Text,
signalName :: T.Text,
comment :: T.Text }
comment :: T.Text }
deriving (Show, Eq)
instance FromJSON SignalId where
@ -223,7 +300,7 @@ instance FromJSON Operation where @@ -223,7 +300,7 @@ instance FromJSON Operation where
parseJSON _ = fail "Should be string"
instance ToJSON Operation where
toJSON Buy = String "buy"
toJSON Buy = String "buy"
toJSON Sell = String "sell"
data OrderState = Unsubmitted
@ -250,26 +327,26 @@ instance FromJSON OrderState where @@ -250,26 +327,26 @@ instance FromJSON OrderState where
instance ToJSON OrderState where
toJSON os = case os of
Unsubmitted -> String "unsubmitted"
Submitted -> String "submitted"
Unsubmitted -> String "unsubmitted"
Submitted -> String "submitted"
PartiallyExecuted -> String "partially-executed"
Executed -> String "executed"
Cancelled -> String "cancelled"
Rejected -> String "rejected"
OrderError -> String "error"
Executed -> String "executed"
Cancelled -> String "cancelled"
Rejected -> String "rejected"
OrderError -> String "error"
type OrderId = Integer
data Order = Order {
orderId :: OrderId,
orderAccountId :: T.Text,
orderSecurity :: T.Text,
orderPrice :: OrderPrice,
orderQuantity :: Integer,
orderId :: OrderId,
orderAccountId :: T.Text,
orderSecurity :: T.Text,
orderPrice :: OrderPrice,
orderQuantity :: Integer,
orderExecutedQuantity :: Integer,
orderOperation :: Operation,
orderState :: OrderState,
orderSignalId :: SignalId }
orderOperation :: Operation,
orderState :: OrderState,
orderSignalId :: SignalId }
deriving (Show, Eq)
mkOrder = Order { orderId = 0,
@ -310,17 +387,17 @@ instance ToJSON Order where @@ -310,17 +387,17 @@ instance ToJSON Order where
ifMaybe name pred val = if pred val then Just (name .= val) else Nothing
data Trade = Trade {
tradeOrderId :: OrderId,
tradePrice :: Price,
tradeQuantity :: Integer,
tradeVolume :: Price,
tradeOrderId :: OrderId,
tradePrice :: Price,
tradeQuantity :: Integer,
tradeVolume :: Price,
tradeVolumeCurrency :: T.Text,
tradeOperation :: Operation,
tradeAccount :: T.Text,
tradeSecurity :: T.Text,
tradeTimestamp :: UTCTime,
tradeCommission :: Price,
tradeSignalId :: SignalId }
tradeOperation :: Operation,
tradeAccount :: T.Text,
tradeSecurity :: T.Text,
tradeTimestamp :: UTCTime,
tradeCommission :: Price,
tradeSignalId :: SignalId }
deriving (Show, Eq)
instance FromJSON Trade where
@ -352,7 +429,7 @@ instance ToJSON Trade where @@ -352,7 +429,7 @@ instance ToJSON Trade where
"signal-id" .= tradeSignalId trade]
data ServerSecurityParams = ServerSecurityParams {
sspDomain :: Maybe T.Text,
sspDomain :: Maybe T.Text,
sspCertificate :: Maybe CurveCertificate
} deriving (Show, Eq)
@ -362,8 +439,8 @@ defaultServerSecurityParams = ServerSecurityParams { @@ -362,8 +439,8 @@ defaultServerSecurityParams = ServerSecurityParams {
}
data ClientSecurityParams = ClientSecurityParams {
cspDomain :: Maybe T.Text,
cspCertificate :: Maybe CurveCertificate,
cspDomain :: Maybe T.Text,
cspCertificate :: Maybe CurveCertificate,
cspServerCertificate :: Maybe CurveCertificate
} deriving (Show, Eq)
@ -373,10 +450,10 @@ defaultClientSecurityParams = ClientSecurityParams { @@ -373,10 +450,10 @@ defaultClientSecurityParams = ClientSecurityParams {
}
data TickerInfo = TickerInfo {
tiTicker :: TickerId,
tiClass :: T.Text,
tiBase :: Maybe TickerId,
tiLotSize :: Integer,
tiTicker :: TickerId,
tiClass :: T.Text,
tiBase :: Maybe TickerId,
tiLotSize :: Integer,
tiTickSize :: Price
} deriving (Show, Eq)

2
stack.yaml

@ -40,7 +40,7 @@ packages: @@ -40,7 +40,7 @@ packages:
- '../zeromq4-haskell-zap'
# Dependency packages to be pulled from upstream that are not in the resolver
# (e.g., acme-missiles-0.3)
extra-deps: [ "datetime-0.3.1", "hexdump-0.1"]
extra-deps: [ "datetime-0.3.1", "hexdump-0.1", "text-format-0.3.2"]
# Override default flag values for local packages and extra-deps
flags: {}

33
test/ArbitraryInstances.hs

@ -10,6 +10,7 @@ import Test.QuickCheck.Instances () @@ -10,6 +10,7 @@ import Test.QuickCheck.Instances ()
import ATrade.Types
import ATrade.Price as P
import qualified Data.Text as T
import ATrade.Broker.Protocol
import Data.Time.Clock
@ -18,22 +19,24 @@ import Data.Time.Calendar @@ -18,22 +19,24 @@ import Data.Time.Calendar
notTooBig :: (Num a, Ord a) => a -> Bool
notTooBig x = abs x < 100000000
arbitraryTickerId = arbitrary `suchThat` (T.all (/= ':'))
instance Arbitrary Tick where
arbitrary = Tick <$>
arbitrary <*>
arbitraryTickerId <*>
arbitrary <*>
arbitraryTimestamp <*>
arbitrary <*>
arbitrary
where
arbitraryTimestamp = do
y <- choose (1970, 2050)
m <- choose (1, 12)
d <- choose (1, 31)
sec <- secondsToDiffTime <$> choose (0, 86399)
arbitraryTimestamp = do
y <- choose (1970, 2050)
m <- choose (1, 12)
d <- choose (1, 31)
sec <- secondsToDiffTime <$> choose (0, 86399)
return $ UTCTime (fromGregorian y m d) sec
return $ UTCTime (fromGregorian y m d) sec
instance Arbitrary DataType where
arbitrary = toEnum <$> choose (1, 10)
@ -116,3 +119,17 @@ instance Arbitrary BrokerServerResponse where @@ -116,3 +119,17 @@ instance Arbitrary BrokerServerResponse where
instance Arbitrary P.Price where
arbitrary = P.Price <$> (arbitrary `suchThat` (\p -> abs p < 1000000000 * 10000000))
instance Arbitrary Bar where
arbitrary = Bar <$>
arbitraryTickerId <*>
arbitraryTimestamp <*>
arbitrary <*>
arbitrary <*>
arbitrary <*>
arbitrary <*>
arbitrary `suchThat` (> 0)
instance Arbitrary BarTimeframe where
arbitrary = BarTimeframe <$> (arbitrary `suchThat` (\p -> p > 0 && p < 86400 * 365))

59
test/TestQuoteSourceClient.hs

@ -4,22 +4,22 @@ module TestQuoteSourceClient ( @@ -4,22 +4,22 @@ module TestQuoteSourceClient (
unitTests
) where
import Test.Tasty
import Test.Tasty.HUnit
import ATrade.Types
import ATrade.QuoteSource.Server
import ATrade.QuoteSource.Client
import Control.Monad
import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (writeChan, readChan)
import Control.Exception
import System.ZMQ4
import Data.Time.Clock
import Data.Time.Calendar
import qualified Data.Text as T
import Data.UUID as U
import Data.UUID.V4 as UV4
import Test.Tasty
import Test.Tasty.HUnit
import ATrade.QuoteSource.Client
import ATrade.QuoteSource.Server
import ATrade.Types
import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.BoundedChan
import Control.Exception
import Control.Monad
import qualified Data.Text as T
import Data.Time.Calendar
import Data.Time.Clock
import Data.UUID as U
import Data.UUID.V4 as UV4
import System.ZMQ4
makeEndpoint :: IO T.Text
makeEndpoint = do
@ -27,7 +27,10 @@ makeEndpoint = do @@ -27,7 +27,10 @@ makeEndpoint = do
return $ "inproc://server" `T.append` uid
unitTests :: TestTree
unitTests = testGroup "QuoteSource.Client" [testStartStop, testTickStream]
unitTests = testGroup "QuoteSource.Client" [
testStartStop
, testTickStream
, testBarStream ]
testStartStop :: TestTree
testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do
@ -51,6 +54,24 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c @@ -51,6 +54,24 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c
value = 1000,
volume = 1}
forkIO $ forever $ writeChan chan (QSSTick tick)
recvdTick <- readChan clientChan
tick @=? recvdTick)))
recvdData <- readChan clientChan
QDTick tick @=? recvdData)))
testBarStream :: TestTree
testBarStream = testCase "QuoteSource clients receives bars" $ withContext (\ctx -> do
ep <- makeEndpoint
chan <- newBoundedChan 1000
clientChan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx ep Nothing) stopQuoteSourceServer (\_ ->
bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep) stopQuoteSourceClient (\_ -> do
let bar = Bar {
barSecurity = "FOOBAR",
barTimestamp = UTCTime (fromGregorian 2016 9 27) 16000,
barOpen = fromDouble 10.0,
barHigh = fromDouble 15.0,
barLow = fromDouble 8.0,
barClose = fromDouble 11.0,
barVolume = 42 }
forkIO $ forever $ writeChan chan $ QSSBar (BarTimeframe 60, bar)
recvdData <- readChan clientChan
QDBar (BarTimeframe 60, bar) @=? recvdData)))

50
test/TestQuoteSourceServer.hs

@ -4,20 +4,23 @@ module TestQuoteSourceServer ( @@ -4,20 +4,23 @@ module TestQuoteSourceServer (
unitTests
) where
import Test.Tasty
import Test.Tasty.HUnit
import ATrade.Types
import qualified Data.ByteString.Lazy as BL
import ATrade.QuoteSource.Server
import Control.Concurrent.BoundedChan
import Control.Exception
import System.ZMQ4
import Data.Time.Clock
import Data.Time.Calendar
import Test.Tasty
import Test.Tasty.HUnit
import ATrade.QuoteSource.Server
import ATrade.Types
import Control.Concurrent.BoundedChan
import Control.Exception
import qualified Data.ByteString.Lazy as BL
import Data.Time.Calendar
import Data.Time.Clock
import System.ZMQ4
unitTests :: TestTree
unitTests = testGroup "QuoteSource.Server" [testStartStop, testTickStream]
unitTests = testGroup "QuoteSource.Server" [
testStartStop
, testTickStream
, testBarStream ]
testStartStop :: TestTree
testStartStop = testCase "QuoteSource Server starts and stops" $ withContext (\ctx -> do
@ -42,5 +45,26 @@ testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx - @@ -42,5 +45,26 @@ testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx -
packet <- fmap BL.fromStrict <$> receiveMulti s
case deserializeTick packet of
Just recvdTick -> tick @=? recvdTick
Nothing -> assertFailure "Unable to deserialize tick")))
Nothing -> assertFailure "Unable to deserialize tick")))
testBarStream :: TestTree
testBarStream = testCase "QuoteSource Server sends bars" $ withContext (\ctx -> do
chan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx "inproc://quotesource-server" Nothing) stopQuoteSourceServer (\_ ->
withSocket ctx Sub (\s -> do
connect s "inproc://quotesource-server"
subscribe s "FOOBAR"
let bar = Bar {
barSecurity = "FOOBAR",
barTimestamp = UTCTime (fromGregorian 2016 9 27) 16000,
barOpen = fromDouble 10.0,
barHigh = fromDouble 15.0,
barLow = fromDouble 8.0,
barClose = fromDouble 11.0,
barVolume = 1 }
writeChan chan (QSSBar (BarTimeframe 60, bar))
packet <- fmap BL.fromStrict <$> receiveMulti s
case deserializeBar packet of
Just (barTf, recvdBar) -> (bar @=? recvdBar) >> (barTf @=? (BarTimeframe 60))
Nothing -> assertFailure "Unable to deserialize bar")))

8
test/TestTypes.hs

@ -15,6 +15,8 @@ import ArbitraryInstances () @@ -15,6 +15,8 @@ import ArbitraryInstances ()
import Data.Aeson
import qualified Data.ByteString.Lazy as B
import Debug.Trace
properties :: TestTree
properties = testGroup "Types" [
testTickSerialization
@ -31,6 +33,7 @@ properties = testGroup "Types" [ @@ -31,6 +33,7 @@ properties = testGroup "Types" [
, testPriceAddition
, testPriceMultiplication
, testPriceSubtraction
, testBarSerialization
]
testTickSerialization :: TestTree
@ -116,3 +119,8 @@ testPriceSubtraction :: TestTree @@ -116,3 +119,8 @@ testPriceSubtraction :: TestTree
testPriceSubtraction = QC.testProperty "Price subtraction"
(\(p1, p2) -> abs (toDouble p1 - toDouble p2 - toDouble (p1 - p2)) < 0.00001)
testBarSerialization :: TestTree
testBarSerialization = QC.testProperty "Deserialize serialized bar"
(\(tf, bar) -> case deserializeBar (serializeBar tf bar) of
Just (tf', bar') -> bar == bar' && tf == tf'
Nothing -> False)

Loading…
Cancel
Save