Browse Source

Quotesource server: use STM channels

master
Denis Tereshkin 9 years ago
parent
commit
e7f99f0955
  1. 2
      libatrade.cabal
  2. 19
      src/ATrade/QuoteSource/Client.hs
  3. 12
      src/ATrade/QuoteSource/Server.hs
  4. 2
      src/ATrade/Types.hs
  5. 8
      test/TestQuoteSourceClient.hs
  6. 9
      test/TestQuoteSourceServer.hs

2
libatrade.cabal

@ -38,6 +38,7 @@ library @@ -38,6 +38,7 @@ library
, containers
, monad-loops
, safe
, stm
default-language: Haskell2010
executable libatrade-exe
@ -75,6 +76,7 @@ test-suite libatrade-test @@ -75,6 +76,7 @@ test-suite libatrade-test
, bytestring
, monad-loops
, uuid
, stm
ghc-options: -threaded -rtsopts -with-rtsopts=-N -Wincomplete-patterns
default-language: Haskell2010
other-modules: ArbitraryInstances

19
src/ATrade/QuoteSource/Client.hs

@ -13,6 +13,7 @@ import Control.Monad @@ -13,6 +13,7 @@ import Control.Monad
import Control.Monad.Loops
import Control.Exception
import Data.List.NonEmpty
import Data.Maybe
import qualified Data.Text as T
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Char8 as B8
@ -27,27 +28,29 @@ import Safe @@ -27,27 +28,29 @@ import Safe
data QuoteSourceClientHandle = QuoteSourceClientHandle {
tid :: ThreadId,
completionMvar :: MVar ()
completionMvar :: MVar (),
killMVar :: MVar ()
}
startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle
startQuoteSourceClient chan tickers ctx endpoint = do
compMv <- newEmptyMVar
killMv <- newEmptyMVar
now <- getCurrentTime
lastHeartbeat <- newIORef now
tid <- forkIO $ finally (clientThread lastHeartbeat) (cleanup compMv)
return QuoteSourceClientHandle { tid = tid, completionMvar = compMv }
tid <- forkIO $ finally (clientThread lastHeartbeat killMv) (cleanup compMv)
return QuoteSourceClientHandle { tid = tid, completionMvar = compMv, killMVar = killMv }
where
clientThread lastHeartbeat = forever $ withSocket ctx Sub (\sock -> do
clientThread lastHeartbeat killMv = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do
connect sock $ T.unpack endpoint
mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers
mapM_ (subscribe sock . encodeUtf8) tickers
subscribe sock $ B8.pack "SYSTEM#HEARTBEAT"
now <- getCurrentTime
writeIORef lastHeartbeat now
whileM_ (notTimeout lastHeartbeat) $ do
whileM_ (andM [notTimeout lastHeartbeat, isNothing <$> tryReadMVar killMv]) $ do
evs <- poll 200 [Sock sock [In] Nothing]
when ((L.length . L.head) evs > 0) $ do
unless (null (L.head evs)) $ do
rawTick <- fmap BL.fromStrict <$> receiveMulti sock
now <- getCurrentTime
prevHeartbeat <- readIORef lastHeartbeat
@ -66,4 +69,4 @@ startQuoteSourceClient chan tickers ctx endpoint = do @@ -66,4 +69,4 @@ startQuoteSourceClient chan tickers ctx endpoint = do
cleanup compMv = putMVar compMv ()
stopQuoteSourceClient :: QuoteSourceClientHandle -> IO ()
stopQuoteSourceClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle)
stopQuoteSourceClient handle = yield >> putMVar (killMVar handle) () >> readMVar (completionMvar handle)

12
src/ATrade/QuoteSource/Server.hs

@ -8,6 +8,8 @@ module ATrade.QuoteSource.Server ( @@ -8,6 +8,8 @@ module ATrade.QuoteSource.Server (
import ATrade.Types
import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Exception
import Control.Monad
import qualified Data.Text as T
@ -20,7 +22,7 @@ import System.ZMQ4 @@ -20,7 +22,7 @@ import System.ZMQ4
data QuoteSourceServer = QuoteSourceServerState {
ctx :: Context,
outSocket :: Socket Pub,
tickChannel :: BoundedChan QuoteSourceServerData,
tickChannel :: TBQueue QuoteSourceServerData,
completionMvar :: MVar (),
serverThreadId :: ThreadId,
heartbeatThreadId :: ThreadId
@ -39,7 +41,7 @@ serverThread state = do @@ -39,7 +41,7 @@ serverThread state = do
putMVar (completionMvar state) ()
serverThread' = do
qssdata <- readChan $ tickChannel state
qssdata <- atomically $ readTBQueue $ tickChannel state
case qssdata of
QSSKill -> return ()
QSSHeartbeat -> do
@ -49,14 +51,14 @@ serverThread state = do @@ -49,14 +51,14 @@ serverThread state = do
sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick
serverThread'
startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer
startQuoteSourceServer :: TBQueue QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer
startQuoteSourceServer chan c ep = do
sock <- socket c Pub
bind sock $ T.unpack ep
tid <- myThreadId
hbTid <- forkIO $ forever $ do
threadDelay 1000000
writeChan chan QSSHeartbeat
atomically $ writeTBQueue chan QSSHeartbeat
mv <- newEmptyMVar
let state = QuoteSourceServerState {
@ -71,5 +73,5 @@ startQuoteSourceServer chan c ep = do @@ -71,5 +73,5 @@ startQuoteSourceServer chan c ep = do
return $ state { serverThreadId = stid }
stopQuoteSourceServer :: QuoteSourceServer -> IO ()
stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> writeChan (tickChannel server) QSSKill >> readMVar (completionMvar server)
stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> atomically (writeTBQueue (tickChannel server) QSSKill) >> readMVar (completionMvar server)

2
src/ATrade/Types.hs

@ -138,7 +138,7 @@ deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of @@ -138,7 +138,7 @@ deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of
deserializeTick _ = Nothing
data Bar = Bar {
barSecurity :: !T.Text,
barSecurity :: !TickerId,
barTimestamp :: !UTCTime,
barOpen :: !Decimal,
barHigh :: !Decimal,

8
test/TestQuoteSourceClient.hs

@ -16,6 +16,8 @@ import Control.Monad @@ -16,6 +16,8 @@ import Control.Monad
import Control.Monad.Loops
import Control.Concurrent.MVar
import Control.Concurrent.BoundedChan
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent hiding (writeChan, readChan)
import Control.Exception
import System.ZMQ4
@ -35,14 +37,14 @@ unitTests = testGroup "QuoteSource.Client" [testStartStop, testTickStream] @@ -35,14 +37,14 @@ unitTests = testGroup "QuoteSource.Client" [testStartStop, testTickStream]
testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do
ep <- makeEndpoint
chan <- newBoundedChan 1000
chan <- atomically $ newTBQueue 1000
clientChan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs ->
bracket (startQuoteSourceClient clientChan [] ctx ep) stopQuoteSourceClient (const yield)))
testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do
ep <- makeEndpoint
chan <- newBoundedChan 1000
chan <- atomically $ newTBQueue 1000
clientChan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs ->
bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep) stopQuoteSourceClient (\qc -> do
@ -52,7 +54,7 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c @@ -52,7 +54,7 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c
timestamp = UTCTime (fromGregorian 2016 9 27) 16000,
value = 1000,
volume = 1}
forkIO $ forever $ writeChan chan (Just tick)
forkIO $ forever $ atomically $ writeTBQueue chan (QSSTick tick)
recvdTick <- readChan clientChan
tick @=? recvdTick)))

9
test/TestQuoteSourceServer.hs

@ -15,7 +15,8 @@ import ATrade.QuoteSource.Server @@ -15,7 +15,8 @@ import ATrade.QuoteSource.Server
import Control.Monad
import Control.Monad.Loops
import Control.Concurrent.MVar
import Control.Concurrent.BoundedChan
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent hiding (writeChan)
import Control.Exception
import System.ZMQ4
@ -26,12 +27,12 @@ import Data.Maybe @@ -26,12 +27,12 @@ import Data.Maybe
unitTests = testGroup "QuoteSource.Server" [testStartStop, testTickStream]
testStartStop = testCase "QuoteSource Server starts and stops" $ withContext (\ctx -> do
chan <- newBoundedChan 1000
chan <- atomically $ newTBQueue 1000
qss <- startQuoteSourceServer chan ctx "inproc://quotesource-server"
stopQuoteSourceServer qss)
testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx -> do
chan <- newBoundedChan 1000
chan <- atomically $ newTBQueue 1000
bracket (startQuoteSourceServer chan ctx "inproc://quotesource-server") stopQuoteSourceServer (\qs ->
withSocket ctx Sub (\s -> do
connect s "inproc://quotesource-server"
@ -42,7 +43,7 @@ testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx - @@ -42,7 +43,7 @@ testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx -
timestamp = UTCTime (fromGregorian 2016 9 27) 16000,
value = 1000,
volume = 1}
tryWriteChan chan (Just tick)
atomically $ writeTBQueue chan (QSSTick tick)
packet <- fmap BL.fromStrict <$> receiveMulti s
case deserializeTick packet of
Just recvdTick -> tick @=? recvdTick

Loading…
Cancel
Save