From c94881aceb5ca6db28653f3374d0a44e91fbec26 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 10 Dec 2016 19:34:02 +0700 Subject: [PATCH] Got rid of STM --- libatrade.cabal | 3 ++- src/ATrade/Broker/Server.hs | 2 +- src/ATrade/QuoteSource/Client.hs | 7 +++---- src/ATrade/QuoteSource/Server.hs | 14 ++++++-------- test/TestQuoteSourceClient.hs | 14 ++++++-------- test/TestQuoteSourceServer.hs | 9 ++++----- 6 files changed, 22 insertions(+), 27 deletions(-) diff --git a/libatrade.cabal b/libatrade.cabal index 1d95dca..885a29a 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -39,8 +39,9 @@ library , monad-loops , safe , stm - , text-format + , deepseq , errors + , text-format , parsec default-language: Haskell2010 diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 6bfbee3..c3c6d2f 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -77,7 +77,7 @@ startBrokerServer brokers c ep tradeSinkEp = do mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers debugM "Broker.Server" "Forking broker server thread" - BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv + BrokerServerHandle <$> forkOS (brokerServerThread state) <*> forkOS (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv notificationCallback :: IORef BrokerServerState -> Notification -> IO () notificationCallback state n = do diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs index 33eb542..76e5d34 100644 --- a/src/ATrade/QuoteSource/Client.hs +++ b/src/ATrade/QuoteSource/Client.hs @@ -6,9 +6,8 @@ module ATrade.QuoteSource.Client ( ) where import ATrade.Types +import Control.Concurrent.BoundedChan import Control.Concurrent hiding (readChan, writeChan) -import Control.Concurrent.STM -import Control.Concurrent.STM.TBQueue import Control.Concurrent.MVar import Control.Monad import Control.Monad.Loops @@ -33,7 +32,7 @@ data QuoteSourceClientHandle = QuoteSourceClientHandle { killMVar :: MVar () } -startQuoteSourceClient :: TBQueue Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle +startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle startQuoteSourceClient chan tickers ctx endpoint = do compMv <- newEmptyMVar killMv <- newEmptyMVar @@ -58,7 +57,7 @@ startQuoteSourceClient chan tickers ctx endpoint = do if headMay rawTick == Just "SYSTEM#HEARTBEAT" then writeIORef lastHeartbeat now else case deserializeTick rawTick of - Just tick -> atomically $ writeTBQueue chan tick + Just tick -> writeChan chan tick Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" debugM "QuoteSource.Client" "Heartbeat timeout") diff --git a/src/ATrade/QuoteSource/Server.hs b/src/ATrade/QuoteSource/Server.hs index 812dd58..eced023 100644 --- a/src/ATrade/QuoteSource/Server.hs +++ b/src/ATrade/QuoteSource/Server.hs @@ -8,8 +8,6 @@ module ATrade.QuoteSource.Server ( import ATrade.Types import Control.Concurrent.BoundedChan import Control.Concurrent hiding (readChan, writeChan) -import Control.Concurrent.STM -import Control.Concurrent.STM.TBQueue import Control.Exception import Control.Monad import qualified Data.Text as T @@ -22,7 +20,7 @@ import System.ZMQ4 data QuoteSourceServer = QuoteSourceServerState { ctx :: Context, outSocket :: Socket Pub, - tickChannel :: TBQueue QuoteSourceServerData, + tickChannel :: BoundedChan QuoteSourceServerData, completionMvar :: MVar (), serverThreadId :: ThreadId, heartbeatThreadId :: ThreadId @@ -41,7 +39,7 @@ serverThread state = do putMVar (completionMvar state) () serverThread' = do - qssdata <- atomically $ readTBQueue $ tickChannel state + qssdata <- readChan $ tickChannel state case qssdata of QSSKill -> return () QSSHeartbeat -> do @@ -51,14 +49,14 @@ serverThread state = do sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick serverThread' -startQuoteSourceServer :: TBQueue QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer +startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer startQuoteSourceServer chan c ep = do sock <- socket c Pub bind sock $ T.unpack ep tid <- myThreadId hbTid <- forkIO $ forever $ do threadDelay 1000000 - atomically $ writeTBQueue chan QSSHeartbeat + writeChan chan QSSHeartbeat mv <- newEmptyMVar let state = QuoteSourceServerState { @@ -69,9 +67,9 @@ startQuoteSourceServer chan c ep = do serverThreadId = tid, heartbeatThreadId = hbTid } - stid <- forkIO $ serverThread state + stid <- forkOS $ serverThread state return $ state { serverThreadId = stid } stopQuoteSourceServer :: QuoteSourceServer -> IO () -stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> atomically (writeTBQueue (tickChannel server) QSSKill) >> readMVar (completionMvar server) +stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> (writeChan (tickChannel server) QSSKill) >> readMVar (completionMvar server) diff --git a/test/TestQuoteSourceClient.hs b/test/TestQuoteSourceClient.hs index a7106be..e369384 100644 --- a/test/TestQuoteSourceClient.hs +++ b/test/TestQuoteSourceClient.hs @@ -16,8 +16,6 @@ import Control.Monad import Control.Monad.Loops import Control.Concurrent.MVar import Control.Concurrent.BoundedChan -import Control.Concurrent.STM -import Control.Concurrent.STM.TBQueue import Control.Concurrent hiding (writeChan, readChan) import Control.Exception import System.ZMQ4 @@ -37,15 +35,15 @@ unitTests = testGroup "QuoteSource.Client" [testStartStop, testTickStream] testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do ep <- makeEndpoint - chan <- atomically $ newTBQueue 1000 - clientChan <- atomically $ newTBQueue 1000 + chan <- newBoundedChan 1000 + clientChan <- newBoundedChan 1000 bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceClient clientChan [] ctx ep) stopQuoteSourceClient (const yield))) testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do ep <- makeEndpoint - chan <- atomically $ newTBQueue 1000 - clientChan <- atomically $ newTBQueue 1000 + chan <- newBoundedChan 1000 + clientChan <- newBoundedChan 1000 bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep) stopQuoteSourceClient (\qc -> do let tick = Tick { @@ -54,7 +52,7 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c timestamp = UTCTime (fromGregorian 2016 9 27) 16000, value = 1000, volume = 1} - forkIO $ forever $ atomically $ writeTBQueue chan (QSSTick tick) - recvdTick <- atomically $ readTBQueue clientChan + forkIO $ forever $ writeChan chan (QSSTick tick) + recvdTick <- readChan clientChan tick @=? recvdTick))) diff --git a/test/TestQuoteSourceServer.hs b/test/TestQuoteSourceServer.hs index c75be18..0cf43a2 100644 --- a/test/TestQuoteSourceServer.hs +++ b/test/TestQuoteSourceServer.hs @@ -15,9 +15,8 @@ import ATrade.QuoteSource.Server import Control.Monad import Control.Monad.Loops import Control.Concurrent.MVar -import Control.Concurrent.STM -import Control.Concurrent.STM.TBQueue import Control.Concurrent hiding (writeChan) +import Control.Concurrent.BoundedChan import Control.Exception import System.ZMQ4 import Data.Time.Clock @@ -27,12 +26,12 @@ import Data.Maybe unitTests = testGroup "QuoteSource.Server" [testStartStop, testTickStream] testStartStop = testCase "QuoteSource Server starts and stops" $ withContext (\ctx -> do - chan <- atomically $ newTBQueue 1000 + chan <- newBoundedChan 1000 qss <- startQuoteSourceServer chan ctx "inproc://quotesource-server" stopQuoteSourceServer qss) testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx -> do - chan <- atomically $ newTBQueue 1000 + chan <- newBoundedChan 1000 bracket (startQuoteSourceServer chan ctx "inproc://quotesource-server") stopQuoteSourceServer (\qs -> withSocket ctx Sub (\s -> do connect s "inproc://quotesource-server" @@ -43,7 +42,7 @@ testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx - timestamp = UTCTime (fromGregorian 2016 9 27) 16000, value = 1000, volume = 1} - atomically $ writeTBQueue chan (QSSTick tick) + writeChan chan (QSSTick tick) packet <- fmap BL.fromStrict <$> receiveMulti s case deserializeTick packet of Just recvdTick -> tick @=? recvdTick