From b4123f8b10438d7eb0975bbfdbaef986419810f3 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 11 Oct 2016 08:09:50 +0700 Subject: [PATCH] QuoteSource client: reconnect on heartbeat loss --- src/ATrade/QuoteSource/Client.hs | 40 ++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 18 deletions(-) diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs index a2d01f8..ffd5b8a 100644 --- a/src/ATrade/QuoteSource/Client.hs +++ b/src/ATrade/QuoteSource/Client.hs @@ -10,6 +10,7 @@ import Control.Concurrent hiding (readChan, writeChan) import Control.Concurrent.BoundedChan import Control.Concurrent.MVar import Control.Monad +import Control.Monad.Loops import Control.Exception import Data.List.NonEmpty import qualified Data.Text as T @@ -34,29 +35,32 @@ startQuoteSourceClient chan tickers ctx endpoint = do compMv <- newEmptyMVar now <- getCurrentTime lastHeartbeat <- newIORef now - tid <- forkIO $ do - sock <- createAndConnectSocket - finally (clientThread sock lastHeartbeat) (cleanup compMv sock) + tid <- forkIO $ finally (clientThread lastHeartbeat) (cleanup compMv) return QuoteSourceClientHandle { tid = tid, completionMvar = compMv } where - clientThread sock lastHeartbeat = forever $ do - evs <- poll 200 [Sock sock [In] Nothing] - when ((L.length . L.head) evs > 0) $ do - rawTick <- fmap BL.fromStrict <$> receiveMulti sock - if headMay rawTick == Just "SYSTEM#HEARTBEAT" - then do - now <- getCurrentTime - writeIORef lastHeartbeat now - else case deserializeTick rawTick of - Just tick -> writeChan chan tick - Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" - cleanup compMv sock = close sock >> putMVar compMv () - createAndConnectSocket = do - sock <- socket ctx Sub + clientThread lastHeartbeat = forever $ withSocket ctx Sub (\sock -> do connect sock $ T.unpack endpoint mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers subscribe sock $ B8.pack "SYSTEM#HEARTBEAT" - return sock + + whileM_ (notTimeout lastHeartbeat) $ do + evs <- poll 200 [Sock sock [In] Nothing] + when ((L.length . L.head) evs > 0) $ do + rawTick <- fmap BL.fromStrict <$> receiveMulti sock + now <- getCurrentTime + prevHeartbeat <- readIORef lastHeartbeat + if headMay rawTick == Just "SYSTEM#HEARTBEAT" + then writeIORef lastHeartbeat now + else case deserializeTick rawTick of + Just tick -> writeChan chan tick + Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick") + + notTimeout ts = do + now <- getCurrentTime + heartbeatTs <- readIORef ts + return $ diffUTCTime now heartbeatTs < 10 + + cleanup compMv = putMVar compMv () stopQuoteSourceClient :: QuoteSourceClientHandle -> IO () stopQuoteSourceClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle)