Browse Source

Quotesource: heartbeats

master
Denis Tereshkin 9 years ago
parent
commit
4e7ad93f15
  1. 1
      libatrade.cabal
  2. 31
      src/ATrade/QuoteSource/Client.hs
  3. 24
      src/ATrade/QuoteSource/Server.hs

1
libatrade.cabal

@ -37,6 +37,7 @@ library @@ -37,6 +37,7 @@ library
, unordered-containers
, containers
, monad-loops
, safe
default-language: Haskell2010
executable libatrade-exe

31
src/ATrade/QuoteSource/Client.hs

@ -14,11 +14,16 @@ import Control.Exception @@ -14,11 +14,16 @@ import Control.Exception
import Data.List.NonEmpty
import qualified Data.Text as T
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Char8 as B8
import qualified Data.List as L
import Data.Text.Encoding
import Data.Time.Clock
import Data.IORef
import System.ZMQ4
import System.Log.Logger
import Safe
data QuoteSourceClientHandle = QuoteSourceClientHandle {
tid :: ThreadId,
completionMvar :: MVar ()
@ -27,21 +32,31 @@ data QuoteSourceClientHandle = QuoteSourceClientHandle { @@ -27,21 +32,31 @@ data QuoteSourceClientHandle = QuoteSourceClientHandle {
startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle
startQuoteSourceClient chan tickers ctx endpoint = do
compMv <- newEmptyMVar
now <- getCurrentTime
lastHeartbeat <- newIORef now
tid <- forkIO $ do
sock <- socket ctx Sub
connect sock $ T.unpack endpoint
mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers
finally (clientThread sock) (cleanup compMv sock)
sock <- createAndConnectSocket
finally (clientThread sock lastHeartbeat) (cleanup compMv sock)
return QuoteSourceClientHandle { tid = tid, completionMvar = compMv }
where
clientThread sock = forever $ do
clientThread sock lastHeartbeat = forever $ do
evs <- poll 200 [Sock sock [In] Nothing]
when ((L.length . L.head) evs > 0) $ do
rawTick <- fmap BL.fromStrict <$> receiveMulti sock
case deserializeTick rawTick of
Just tick -> writeChan chan tick
Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick"
if headMay rawTick == Just "SYSTEM#HEARTBEAT"
then do
now <- getCurrentTime
writeIORef lastHeartbeat now
else case deserializeTick rawTick of
Just tick -> writeChan chan tick
Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick"
cleanup compMv sock = close sock >> putMVar compMv ()
createAndConnectSocket = do
sock <- socket ctx Sub
connect sock $ T.unpack endpoint
mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers
subscribe sock $ B8.pack "SYSTEM#HEARTBEAT"
return sock
stopQuoteSourceClient :: QuoteSourceClientHandle -> IO ()
stopQuoteSourceClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle)

24
src/ATrade/QuoteSource/Server.hs

@ -1,7 +1,8 @@ @@ -1,7 +1,8 @@
module ATrade.QuoteSource.Server (
startQuoteSourceServer,
stopQuoteSourceServer
stopQuoteSourceServer,
QuoteSourceServerData(..)
) where
import ATrade.Types
@ -10,6 +11,7 @@ import Control.Concurrent hiding (readChan, writeChan) @@ -10,6 +11,7 @@ import Control.Concurrent hiding (readChan, writeChan)
import Control.Exception
import Control.Monad
import qualified Data.Text as T
import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL
import Data.List.NonEmpty hiding (map)
import System.Log.Logger
@ -18,11 +20,14 @@ import System.ZMQ4 @@ -18,11 +20,14 @@ import System.ZMQ4
data QuoteSourceServer = QuoteSourceServerState {
ctx :: Context,
outSocket :: Socket Pub,
tickChannel :: BoundedChan (Maybe Tick),
tickChannel :: BoundedChan QuoteSourceServerData,
completionMvar :: MVar (),
serverThreadId :: ThreadId
}
data QuoteSourceServerData = QSSTick Tick | QSSHeartbeat | QSSKill
deriving (Show, Eq)
serverThread :: QuoteSourceServer -> IO ()
serverThread state = do
finally serverThread' cleanup
@ -33,14 +38,17 @@ serverThread state = do @@ -33,14 +38,17 @@ serverThread state = do
putMVar (completionMvar state) ()
serverThread' = do
maybeTick <- readChan $ tickChannel state
case maybeTick of
Nothing -> return ()
Just tick -> do
qssdata <- readChan $ tickChannel state
case qssdata of
QSSKill -> return ()
QSSHeartbeat -> do
send (outSocket state) [] $ B8.pack "SYSTEM#HEARTBEAT"
serverThread'
QSSTick tick -> do
sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick
serverThread'
startQuoteSourceServer :: BoundedChan (Maybe Tick) -> Context -> T.Text -> IO QuoteSourceServer
startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer
startQuoteSourceServer chan c ep = do
sock <- socket c Pub
bind sock $ T.unpack ep
@ -57,5 +65,5 @@ startQuoteSourceServer chan c ep = do @@ -57,5 +65,5 @@ startQuoteSourceServer chan c ep = do
return $ state { serverThreadId = stid }
stopQuoteSourceServer :: QuoteSourceServer -> IO ()
stopQuoteSourceServer server = writeChan (tickChannel server) Nothing >> readMVar (completionMvar server)
stopQuoteSourceServer server = writeChan (tickChannel server) QSSKill >> readMVar (completionMvar server)

Loading…
Cancel
Save