From 4e7ad93f151c2ecae9ba3973383af9cb8dc7251b Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Mon, 10 Oct 2016 17:15:01 +0700 Subject: [PATCH] Quotesource: heartbeats --- libatrade.cabal | 1 + src/ATrade/QuoteSource/Client.hs | 31 +++++++++++++++++++++++-------- src/ATrade/QuoteSource/Server.hs | 24 ++++++++++++++++-------- 3 files changed, 40 insertions(+), 16 deletions(-) diff --git a/libatrade.cabal b/libatrade.cabal index bc0b522..2bcfc08 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -37,6 +37,7 @@ library , unordered-containers , containers , monad-loops + , safe default-language: Haskell2010 executable libatrade-exe diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs index e15c2cc..a2d01f8 100644 --- a/src/ATrade/QuoteSource/Client.hs +++ b/src/ATrade/QuoteSource/Client.hs @@ -14,11 +14,16 @@ import Control.Exception import Data.List.NonEmpty import qualified Data.Text as T import qualified Data.ByteString.Lazy as BL +import qualified Data.ByteString.Char8 as B8 import qualified Data.List as L import Data.Text.Encoding +import Data.Time.Clock +import Data.IORef import System.ZMQ4 import System.Log.Logger +import Safe + data QuoteSourceClientHandle = QuoteSourceClientHandle { tid :: ThreadId, completionMvar :: MVar () @@ -27,21 +32,31 @@ data QuoteSourceClientHandle = QuoteSourceClientHandle { startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle startQuoteSourceClient chan tickers ctx endpoint = do compMv <- newEmptyMVar + now <- getCurrentTime + lastHeartbeat <- newIORef now tid <- forkIO $ do - sock <- socket ctx Sub - connect sock $ T.unpack endpoint - mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers - finally (clientThread sock) (cleanup compMv sock) + sock <- createAndConnectSocket + finally (clientThread sock lastHeartbeat) (cleanup compMv sock) return QuoteSourceClientHandle { tid = tid, completionMvar = compMv } where - clientThread sock = forever $ do + clientThread sock lastHeartbeat = forever $ do evs <- poll 200 [Sock sock [In] Nothing] when ((L.length . L.head) evs > 0) $ do rawTick <- fmap BL.fromStrict <$> receiveMulti sock - case deserializeTick rawTick of - Just tick -> writeChan chan tick - Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" + if headMay rawTick == Just "SYSTEM#HEARTBEAT" + then do + now <- getCurrentTime + writeIORef lastHeartbeat now + else case deserializeTick rawTick of + Just tick -> writeChan chan tick + Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" cleanup compMv sock = close sock >> putMVar compMv () + createAndConnectSocket = do + sock <- socket ctx Sub + connect sock $ T.unpack endpoint + mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers + subscribe sock $ B8.pack "SYSTEM#HEARTBEAT" + return sock stopQuoteSourceClient :: QuoteSourceClientHandle -> IO () stopQuoteSourceClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle) diff --git a/src/ATrade/QuoteSource/Server.hs b/src/ATrade/QuoteSource/Server.hs index c93b6bc..bbbcdcc 100644 --- a/src/ATrade/QuoteSource/Server.hs +++ b/src/ATrade/QuoteSource/Server.hs @@ -1,7 +1,8 @@ module ATrade.QuoteSource.Server ( startQuoteSourceServer, - stopQuoteSourceServer + stopQuoteSourceServer, + QuoteSourceServerData(..) ) where import ATrade.Types @@ -10,6 +11,7 @@ import Control.Concurrent hiding (readChan, writeChan) import Control.Exception import Control.Monad import qualified Data.Text as T +import qualified Data.ByteString.Char8 as B8 import qualified Data.ByteString.Lazy as BL import Data.List.NonEmpty hiding (map) import System.Log.Logger @@ -18,11 +20,14 @@ import System.ZMQ4 data QuoteSourceServer = QuoteSourceServerState { ctx :: Context, outSocket :: Socket Pub, - tickChannel :: BoundedChan (Maybe Tick), + tickChannel :: BoundedChan QuoteSourceServerData, completionMvar :: MVar (), serverThreadId :: ThreadId } +data QuoteSourceServerData = QSSTick Tick | QSSHeartbeat | QSSKill + deriving (Show, Eq) + serverThread :: QuoteSourceServer -> IO () serverThread state = do finally serverThread' cleanup @@ -33,14 +38,17 @@ serverThread state = do putMVar (completionMvar state) () serverThread' = do - maybeTick <- readChan $ tickChannel state - case maybeTick of - Nothing -> return () - Just tick -> do + qssdata <- readChan $ tickChannel state + case qssdata of + QSSKill -> return () + QSSHeartbeat -> do + send (outSocket state) [] $ B8.pack "SYSTEM#HEARTBEAT" + serverThread' + QSSTick tick -> do sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick serverThread' -startQuoteSourceServer :: BoundedChan (Maybe Tick) -> Context -> T.Text -> IO QuoteSourceServer +startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer startQuoteSourceServer chan c ep = do sock <- socket c Pub bind sock $ T.unpack ep @@ -57,5 +65,5 @@ startQuoteSourceServer chan c ep = do return $ state { serverThreadId = stid } stopQuoteSourceServer :: QuoteSourceServer -> IO () -stopQuoteSourceServer server = writeChan (tickChannel server) Nothing >> readMVar (completionMvar server) +stopQuoteSourceServer server = writeChan (tickChannel server) QSSKill >> readMVar (completionMvar server)