Browse Source

Quotesource client: use STM.TBQueue

master
Denis Tereshkin 9 years ago
parent
commit
1703918336
  1. 7
      src/ATrade/QuoteSource/Client.hs
  2. 6
      test/TestQuoteSourceClient.hs

7
src/ATrade/QuoteSource/Client.hs

@ -7,7 +7,8 @@ module ATrade.QuoteSource.Client (
import ATrade.Types import ATrade.Types
import Control.Concurrent hiding (readChan, writeChan) import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.BoundedChan import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent.MVar import Control.Concurrent.MVar
import Control.Monad import Control.Monad
import Control.Monad.Loops import Control.Monad.Loops
@ -32,7 +33,7 @@ data QuoteSourceClientHandle = QuoteSourceClientHandle {
killMVar :: MVar () killMVar :: MVar ()
} }
startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle startQuoteSourceClient :: TBQueue Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle
startQuoteSourceClient chan tickers ctx endpoint = do startQuoteSourceClient chan tickers ctx endpoint = do
compMv <- newEmptyMVar compMv <- newEmptyMVar
killMv <- newEmptyMVar killMv <- newEmptyMVar
@ -57,7 +58,7 @@ startQuoteSourceClient chan tickers ctx endpoint = do
if headMay rawTick == Just "SYSTEM#HEARTBEAT" if headMay rawTick == Just "SYSTEM#HEARTBEAT"
then writeIORef lastHeartbeat now then writeIORef lastHeartbeat now
else case deserializeTick rawTick of else case deserializeTick rawTick of
Just tick -> writeChan chan tick Just tick -> atomically $ writeTBQueue chan tick
Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick"
debugM "QuoteSource.Client" "Heartbeat timeout") debugM "QuoteSource.Client" "Heartbeat timeout")

6
test/TestQuoteSourceClient.hs

@ -38,14 +38,14 @@ unitTests = testGroup "QuoteSource.Client" [testStartStop, testTickStream]
testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do
ep <- makeEndpoint ep <- makeEndpoint
chan <- atomically $ newTBQueue 1000 chan <- atomically $ newTBQueue 1000
clientChan <- newBoundedChan 1000 clientChan <- atomically $ newTBQueue 1000
bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs ->
bracket (startQuoteSourceClient clientChan [] ctx ep) stopQuoteSourceClient (const yield))) bracket (startQuoteSourceClient clientChan [] ctx ep) stopQuoteSourceClient (const yield)))
testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do
ep <- makeEndpoint ep <- makeEndpoint
chan <- atomically $ newTBQueue 1000 chan <- atomically $ newTBQueue 1000
clientChan <- newBoundedChan 1000 clientChan <- atomically $ newTBQueue 1000
bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs ->
bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep) stopQuoteSourceClient (\qc -> do bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep) stopQuoteSourceClient (\qc -> do
let tick = Tick { let tick = Tick {
@ -55,6 +55,6 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c
value = 1000, value = 1000,
volume = 1} volume = 1}
forkIO $ forever $ atomically $ writeTBQueue chan (QSSTick tick) forkIO $ forever $ atomically $ writeTBQueue chan (QSSTick tick)
recvdTick <- readChan clientChan recvdTick <- atomically $ readTBQueue clientChan
tick @=? recvdTick))) tick @=? recvdTick)))

Loading…
Cancel
Save