From 1703918336a1b40c40a74f41e0db6fe342ba4163 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 20 Oct 2016 11:08:23 +0700 Subject: [PATCH] Quotesource client: use STM.TBQueue --- src/ATrade/QuoteSource/Client.hs | 7 ++++--- test/TestQuoteSourceClient.hs | 6 +++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs index 9314c87..33eb542 100644 --- a/src/ATrade/QuoteSource/Client.hs +++ b/src/ATrade/QuoteSource/Client.hs @@ -7,7 +7,8 @@ module ATrade.QuoteSource.Client ( import ATrade.Types import Control.Concurrent hiding (readChan, writeChan) -import Control.Concurrent.BoundedChan +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue import Control.Concurrent.MVar import Control.Monad import Control.Monad.Loops @@ -32,7 +33,7 @@ data QuoteSourceClientHandle = QuoteSourceClientHandle { killMVar :: MVar () } -startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle +startQuoteSourceClient :: TBQueue Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle startQuoteSourceClient chan tickers ctx endpoint = do compMv <- newEmptyMVar killMv <- newEmptyMVar @@ -57,7 +58,7 @@ startQuoteSourceClient chan tickers ctx endpoint = do if headMay rawTick == Just "SYSTEM#HEARTBEAT" then writeIORef lastHeartbeat now else case deserializeTick rawTick of - Just tick -> writeChan chan tick + Just tick -> atomically $ writeTBQueue chan tick Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" debugM "QuoteSource.Client" "Heartbeat timeout") diff --git a/test/TestQuoteSourceClient.hs b/test/TestQuoteSourceClient.hs index 5915c67..a7106be 100644 --- a/test/TestQuoteSourceClient.hs +++ b/test/TestQuoteSourceClient.hs @@ -38,14 +38,14 @@ unitTests = testGroup "QuoteSource.Client" [testStartStop, testTickStream] testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do ep <- makeEndpoint chan <- atomically $ newTBQueue 1000 - clientChan <- newBoundedChan 1000 + clientChan <- atomically $ newTBQueue 1000 bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceClient clientChan [] ctx ep) stopQuoteSourceClient (const yield))) testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do ep <- makeEndpoint chan <- atomically $ newTBQueue 1000 - clientChan <- newBoundedChan 1000 + clientChan <- atomically $ newTBQueue 1000 bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep) stopQuoteSourceClient (\qc -> do let tick = Tick { @@ -55,6 +55,6 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c value = 1000, volume = 1} forkIO $ forever $ atomically $ writeTBQueue chan (QSSTick tick) - recvdTick <- readChan clientChan + recvdTick <- atomically $ readTBQueue clientChan tick @=? recvdTick)))