Browse Source

QuoteSource client: reconnect on heartbeat loss

master
Denis Tereshkin 9 years ago
parent
commit
b4123f8b10
  1. 40
      src/ATrade/QuoteSource/Client.hs

40
src/ATrade/QuoteSource/Client.hs

@ -10,6 +10,7 @@ import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.BoundedChan import Control.Concurrent.BoundedChan
import Control.Concurrent.MVar import Control.Concurrent.MVar
import Control.Monad import Control.Monad
import Control.Monad.Loops
import Control.Exception import Control.Exception
import Data.List.NonEmpty import Data.List.NonEmpty
import qualified Data.Text as T import qualified Data.Text as T
@ -34,29 +35,32 @@ startQuoteSourceClient chan tickers ctx endpoint = do
compMv <- newEmptyMVar compMv <- newEmptyMVar
now <- getCurrentTime now <- getCurrentTime
lastHeartbeat <- newIORef now lastHeartbeat <- newIORef now
tid <- forkIO $ do tid <- forkIO $ finally (clientThread lastHeartbeat) (cleanup compMv)
sock <- createAndConnectSocket
finally (clientThread sock lastHeartbeat) (cleanup compMv sock)
return QuoteSourceClientHandle { tid = tid, completionMvar = compMv } return QuoteSourceClientHandle { tid = tid, completionMvar = compMv }
where where
clientThread sock lastHeartbeat = forever $ do clientThread lastHeartbeat = forever $ withSocket ctx Sub (\sock -> do
evs <- poll 200 [Sock sock [In] Nothing]
when ((L.length . L.head) evs > 0) $ do
rawTick <- fmap BL.fromStrict <$> receiveMulti sock
if headMay rawTick == Just "SYSTEM#HEARTBEAT"
then do
now <- getCurrentTime
writeIORef lastHeartbeat now
else case deserializeTick rawTick of
Just tick -> writeChan chan tick
Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick"
cleanup compMv sock = close sock >> putMVar compMv ()
createAndConnectSocket = do
sock <- socket ctx Sub
connect sock $ T.unpack endpoint connect sock $ T.unpack endpoint
mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers
subscribe sock $ B8.pack "SYSTEM#HEARTBEAT" subscribe sock $ B8.pack "SYSTEM#HEARTBEAT"
return sock
whileM_ (notTimeout lastHeartbeat) $ do
evs <- poll 200 [Sock sock [In] Nothing]
when ((L.length . L.head) evs > 0) $ do
rawTick <- fmap BL.fromStrict <$> receiveMulti sock
now <- getCurrentTime
prevHeartbeat <- readIORef lastHeartbeat
if headMay rawTick == Just "SYSTEM#HEARTBEAT"
then writeIORef lastHeartbeat now
else case deserializeTick rawTick of
Just tick -> writeChan chan tick
Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick")
notTimeout ts = do
now <- getCurrentTime
heartbeatTs <- readIORef ts
return $ diffUTCTime now heartbeatTs < 10
cleanup compMv = putMVar compMv ()
stopQuoteSourceClient :: QuoteSourceClientHandle -> IO () stopQuoteSourceClient :: QuoteSourceClientHandle -> IO ()
stopQuoteSourceClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle) stopQuoteSourceClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle)

Loading…
Cancel
Save