From aa13290f16aabc94de237c7c1ed0af4987d23f8d Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 22 Sep 2016 08:57:04 +0700 Subject: [PATCH] Basic quotesource server --- quik-connector.cabal | 1 + src/Data/ATrade.hs | 25 +++++++++++++++++++++++- src/QuoteSource/Server.hs | 41 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 66 insertions(+), 1 deletion(-) create mode 100644 src/QuoteSource/Server.hs diff --git a/quik-connector.cabal b/quik-connector.cabal index 848cd5b..1726cfa 100644 --- a/quik-connector.cabal +++ b/quik-connector.cabal @@ -36,6 +36,7 @@ library , datetime , BoundedChan , hslogger + , zeromq4-haskell default-language: Haskell2010 extra-libraries: "user32" other-modules: System.Win32.XlParser diff --git a/src/Data/ATrade.hs b/src/Data/ATrade.hs index 2263628..71d4010 100644 --- a/src/Data/ATrade.hs +++ b/src/Data/ATrade.hs @@ -1,11 +1,18 @@ module Data.ATrade ( Tick(..), - DataType(..) + DataType(..), + serializeTick ) where import Data.Decimal import Data.Time.Clock +import Data.DateTime +import Data.ByteString.Lazy as B +import Data.Text as T +import Data.Text.Encoding as E +import Data.List as L +import Data.Binary.Builder data DataType = Unknown | Price @@ -53,3 +60,19 @@ data Tick = Tick { volume :: Integer } deriving (Show, Eq) +serializeTick :: Tick -> [ByteString] +serializeTick tick = do + header : [rawdata] + where + header = B.fromChunks [ E.encodeUtf8 . T.pack $ security tick ] + rawdata = toLazyByteString $ mconcat [ + putWord32le $ fromIntegral 1, + putWord64le $ fromIntegral . toSeconds . timestamp $ tick, + putWord32le $ fromIntegral . truncate . (* 1000000) . fractionalPart . utctDayTime . timestamp $ tick, + putWord32le $ fromIntegral . fromEnum . datatype $ tick, + putWord64le $ truncate . value $ tick, + putWord32le $ truncate . (* 1000000000) . fractionalPart $ value tick, + putWord32le $ fromIntegral $ volume tick ] + fractionalPart :: (RealFrac a) => a -> a + fractionalPart x = x - (fromIntegral (floor x)) + diff --git a/src/QuoteSource/Server.hs b/src/QuoteSource/Server.hs new file mode 100644 index 0000000..e47f986 --- /dev/null +++ b/src/QuoteSource/Server.hs @@ -0,0 +1,41 @@ + +module QuoteSource.Server ( +) where + +import System.ZMQ4 +import Control.Concurrent.BoundedChan +import Data.ATrade +import Control.Concurrent +import Control.Monad + +data QuoteSourceServer = QuoteSourceServerState { + ctx :: Context, + outSocket :: Socket Pub, + tickChannel :: BoundedChan Tick, + serverThread :: ThreadId +} + +serverThread :: QuoteSourceServer -> IO () +serverThread state = do + finally serverThread' cleanup + where + cleanup = close $ outSocket state + + serverThread = forever $ do + tick <- readChan $ tickChannel state + sendMulti (outSocket state) serializeTick tick + +startQuoteSourceServer :: BoundedChan Tick -> Context -> String -> IO QuoteSourceServer +startQuoteSourceServer chan c ep = do + sock <- socket c Pub + bind sock ep + tid <- myThreadId + let state = QuoteSourceServerState { + ctx = c, + outSocket = sock, + tickChannel = chan, + serverThread = tid + } + stid <- forkIO $ serverThread state + return $ state { serverThread = stid } +