From e7f99f09555f6d0af2a145ab7e8a3063c2739abc Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 20 Oct 2016 11:05:42 +0700 Subject: [PATCH 1/3] Quotesource server: use STM channels --- libatrade.cabal | 2 ++ src/ATrade/QuoteSource/Client.hs | 19 +++++++++++-------- src/ATrade/QuoteSource/Server.hs | 12 +++++++----- src/ATrade/Types.hs | 2 +- test/TestQuoteSourceClient.hs | 8 +++++--- test/TestQuoteSourceServer.hs | 9 +++++---- 6 files changed, 31 insertions(+), 21 deletions(-) diff --git a/libatrade.cabal b/libatrade.cabal index 2bcfc08..86dbd66 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -38,6 +38,7 @@ library , containers , monad-loops , safe + , stm default-language: Haskell2010 executable libatrade-exe @@ -75,6 +76,7 @@ test-suite libatrade-test , bytestring , monad-loops , uuid + , stm ghc-options: -threaded -rtsopts -with-rtsopts=-N -Wincomplete-patterns default-language: Haskell2010 other-modules: ArbitraryInstances diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs index 5a89682..9314c87 100644 --- a/src/ATrade/QuoteSource/Client.hs +++ b/src/ATrade/QuoteSource/Client.hs @@ -13,6 +13,7 @@ import Control.Monad import Control.Monad.Loops import Control.Exception import Data.List.NonEmpty +import Data.Maybe import qualified Data.Text as T import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Char8 as B8 @@ -27,27 +28,29 @@ import Safe data QuoteSourceClientHandle = QuoteSourceClientHandle { tid :: ThreadId, - completionMvar :: MVar () + completionMvar :: MVar (), + killMVar :: MVar () } startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle startQuoteSourceClient chan tickers ctx endpoint = do compMv <- newEmptyMVar + killMv <- newEmptyMVar now <- getCurrentTime lastHeartbeat <- newIORef now - tid <- forkIO $ finally (clientThread lastHeartbeat) (cleanup compMv) - return QuoteSourceClientHandle { tid = tid, completionMvar = compMv } + tid <- forkIO $ finally (clientThread lastHeartbeat killMv) (cleanup compMv) + return QuoteSourceClientHandle { tid = tid, completionMvar = compMv, killMVar = killMv } where - clientThread lastHeartbeat = forever $ withSocket ctx Sub (\sock -> do + clientThread lastHeartbeat killMv = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do connect sock $ T.unpack endpoint - mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers + mapM_ (subscribe sock . encodeUtf8) tickers subscribe sock $ B8.pack "SYSTEM#HEARTBEAT" now <- getCurrentTime writeIORef lastHeartbeat now - whileM_ (notTimeout lastHeartbeat) $ do + whileM_ (andM [notTimeout lastHeartbeat, isNothing <$> tryReadMVar killMv]) $ do evs <- poll 200 [Sock sock [In] Nothing] - when ((L.length . L.head) evs > 0) $ do + unless (null (L.head evs)) $ do rawTick <- fmap BL.fromStrict <$> receiveMulti sock now <- getCurrentTime prevHeartbeat <- readIORef lastHeartbeat @@ -66,4 +69,4 @@ startQuoteSourceClient chan tickers ctx endpoint = do cleanup compMv = putMVar compMv () stopQuoteSourceClient :: QuoteSourceClientHandle -> IO () -stopQuoteSourceClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle) +stopQuoteSourceClient handle = yield >> putMVar (killMVar handle) () >> readMVar (completionMvar handle) diff --git a/src/ATrade/QuoteSource/Server.hs b/src/ATrade/QuoteSource/Server.hs index dae1225..812dd58 100644 --- a/src/ATrade/QuoteSource/Server.hs +++ b/src/ATrade/QuoteSource/Server.hs @@ -8,6 +8,8 @@ module ATrade.QuoteSource.Server ( import ATrade.Types import Control.Concurrent.BoundedChan import Control.Concurrent hiding (readChan, writeChan) +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue import Control.Exception import Control.Monad import qualified Data.Text as T @@ -20,7 +22,7 @@ import System.ZMQ4 data QuoteSourceServer = QuoteSourceServerState { ctx :: Context, outSocket :: Socket Pub, - tickChannel :: BoundedChan QuoteSourceServerData, + tickChannel :: TBQueue QuoteSourceServerData, completionMvar :: MVar (), serverThreadId :: ThreadId, heartbeatThreadId :: ThreadId @@ -39,7 +41,7 @@ serverThread state = do putMVar (completionMvar state) () serverThread' = do - qssdata <- readChan $ tickChannel state + qssdata <- atomically $ readTBQueue $ tickChannel state case qssdata of QSSKill -> return () QSSHeartbeat -> do @@ -49,14 +51,14 @@ serverThread state = do sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick serverThread' -startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer +startQuoteSourceServer :: TBQueue QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer startQuoteSourceServer chan c ep = do sock <- socket c Pub bind sock $ T.unpack ep tid <- myThreadId hbTid <- forkIO $ forever $ do threadDelay 1000000 - writeChan chan QSSHeartbeat + atomically $ writeTBQueue chan QSSHeartbeat mv <- newEmptyMVar let state = QuoteSourceServerState { @@ -71,5 +73,5 @@ startQuoteSourceServer chan c ep = do return $ state { serverThreadId = stid } stopQuoteSourceServer :: QuoteSourceServer -> IO () -stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> writeChan (tickChannel server) QSSKill >> readMVar (completionMvar server) +stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> atomically (writeTBQueue (tickChannel server) QSSKill) >> readMVar (completionMvar server) diff --git a/src/ATrade/Types.hs b/src/ATrade/Types.hs index 3033083..6297c58 100644 --- a/src/ATrade/Types.hs +++ b/src/ATrade/Types.hs @@ -138,7 +138,7 @@ deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of deserializeTick _ = Nothing data Bar = Bar { - barSecurity :: !T.Text, + barSecurity :: !TickerId, barTimestamp :: !UTCTime, barOpen :: !Decimal, barHigh :: !Decimal, diff --git a/test/TestQuoteSourceClient.hs b/test/TestQuoteSourceClient.hs index 3b2cdf6..5915c67 100644 --- a/test/TestQuoteSourceClient.hs +++ b/test/TestQuoteSourceClient.hs @@ -16,6 +16,8 @@ import Control.Monad import Control.Monad.Loops import Control.Concurrent.MVar import Control.Concurrent.BoundedChan +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue import Control.Concurrent hiding (writeChan, readChan) import Control.Exception import System.ZMQ4 @@ -35,14 +37,14 @@ unitTests = testGroup "QuoteSource.Client" [testStartStop, testTickStream] testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do ep <- makeEndpoint - chan <- newBoundedChan 1000 + chan <- atomically $ newTBQueue 1000 clientChan <- newBoundedChan 1000 bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceClient clientChan [] ctx ep) stopQuoteSourceClient (const yield))) testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do ep <- makeEndpoint - chan <- newBoundedChan 1000 + chan <- atomically $ newTBQueue 1000 clientChan <- newBoundedChan 1000 bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep) stopQuoteSourceClient (\qc -> do @@ -52,7 +54,7 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c timestamp = UTCTime (fromGregorian 2016 9 27) 16000, value = 1000, volume = 1} - forkIO $ forever $ writeChan chan (Just tick) + forkIO $ forever $ atomically $ writeTBQueue chan (QSSTick tick) recvdTick <- readChan clientChan tick @=? recvdTick))) diff --git a/test/TestQuoteSourceServer.hs b/test/TestQuoteSourceServer.hs index 2851975..c75be18 100644 --- a/test/TestQuoteSourceServer.hs +++ b/test/TestQuoteSourceServer.hs @@ -15,7 +15,8 @@ import ATrade.QuoteSource.Server import Control.Monad import Control.Monad.Loops import Control.Concurrent.MVar -import Control.Concurrent.BoundedChan +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue import Control.Concurrent hiding (writeChan) import Control.Exception import System.ZMQ4 @@ -26,12 +27,12 @@ import Data.Maybe unitTests = testGroup "QuoteSource.Server" [testStartStop, testTickStream] testStartStop = testCase "QuoteSource Server starts and stops" $ withContext (\ctx -> do - chan <- newBoundedChan 1000 + chan <- atomically $ newTBQueue 1000 qss <- startQuoteSourceServer chan ctx "inproc://quotesource-server" stopQuoteSourceServer qss) testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx -> do - chan <- newBoundedChan 1000 + chan <- atomically $ newTBQueue 1000 bracket (startQuoteSourceServer chan ctx "inproc://quotesource-server") stopQuoteSourceServer (\qs -> withSocket ctx Sub (\s -> do connect s "inproc://quotesource-server" @@ -42,7 +43,7 @@ testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx - timestamp = UTCTime (fromGregorian 2016 9 27) 16000, value = 1000, volume = 1} - tryWriteChan chan (Just tick) + atomically $ writeTBQueue chan (QSSTick tick) packet <- fmap BL.fromStrict <$> receiveMulti s case deserializeTick packet of Just recvdTick -> tick @=? recvdTick From 1703918336a1b40c40a74f41e0db6fe342ba4163 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 20 Oct 2016 11:08:23 +0700 Subject: [PATCH 2/3] Quotesource client: use STM.TBQueue --- src/ATrade/QuoteSource/Client.hs | 7 ++++--- test/TestQuoteSourceClient.hs | 6 +++--- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs index 9314c87..33eb542 100644 --- a/src/ATrade/QuoteSource/Client.hs +++ b/src/ATrade/QuoteSource/Client.hs @@ -7,7 +7,8 @@ module ATrade.QuoteSource.Client ( import ATrade.Types import Control.Concurrent hiding (readChan, writeChan) -import Control.Concurrent.BoundedChan +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue import Control.Concurrent.MVar import Control.Monad import Control.Monad.Loops @@ -32,7 +33,7 @@ data QuoteSourceClientHandle = QuoteSourceClientHandle { killMVar :: MVar () } -startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle +startQuoteSourceClient :: TBQueue Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle startQuoteSourceClient chan tickers ctx endpoint = do compMv <- newEmptyMVar killMv <- newEmptyMVar @@ -57,7 +58,7 @@ startQuoteSourceClient chan tickers ctx endpoint = do if headMay rawTick == Just "SYSTEM#HEARTBEAT" then writeIORef lastHeartbeat now else case deserializeTick rawTick of - Just tick -> writeChan chan tick + Just tick -> atomically $ writeTBQueue chan tick Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" debugM "QuoteSource.Client" "Heartbeat timeout") diff --git a/test/TestQuoteSourceClient.hs b/test/TestQuoteSourceClient.hs index 5915c67..a7106be 100644 --- a/test/TestQuoteSourceClient.hs +++ b/test/TestQuoteSourceClient.hs @@ -38,14 +38,14 @@ unitTests = testGroup "QuoteSource.Client" [testStartStop, testTickStream] testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do ep <- makeEndpoint chan <- atomically $ newTBQueue 1000 - clientChan <- newBoundedChan 1000 + clientChan <- atomically $ newTBQueue 1000 bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceClient clientChan [] ctx ep) stopQuoteSourceClient (const yield))) testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do ep <- makeEndpoint chan <- atomically $ newTBQueue 1000 - clientChan <- newBoundedChan 1000 + clientChan <- atomically $ newTBQueue 1000 bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep) stopQuoteSourceClient (\qc -> do let tick = Tick { @@ -55,6 +55,6 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c value = 1000, volume = 1} forkIO $ forever $ atomically $ writeTBQueue chan (QSSTick tick) - recvdTick <- readChan clientChan + recvdTick <- atomically $ readTBQueue clientChan tick @=? recvdTick))) From 2abd68ba2f360f6f56a566d0c7c0f25d539922c1 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 17 Nov 2016 15:22:00 +0700 Subject: [PATCH 3/3] BrokerClient: handle timeout --- src/ATrade/Broker/Client.hs | 38 ++++++++++++++++++++++++------------- 1 file changed, 25 insertions(+), 13 deletions(-) diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index 0c465f0..2cec045 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -15,9 +15,11 @@ import Control.Concurrent.BoundedChan import Control.Concurrent.MVar import Control.Exception import Control.Monad +import Control.Monad.Loops import Data.Aeson import Data.Int import Data.IORef +import Data.Maybe import Data.List.NonEmpty import qualified Data.Text as T import qualified Data.ByteString.Lazy as BL @@ -28,6 +30,7 @@ import System.Log.Logger data BrokerClientHandle = BrokerClientHandle { tid :: ThreadId, completionMvar :: MVar (), + killMvar :: MVar (), submitOrder :: Order -> IO (Either T.Text OrderId), cancelOrder :: OrderId -> IO (Either T.Text ()), getNotifications :: IO (Either T.Text [Notification]), @@ -35,32 +38,41 @@ data BrokerClientHandle = BrokerClientHandle { respVar :: MVar BrokerServerResponse } -brokerClientThread ctx ep cmd resp comp = do - sock <- socket ctx Req - connect sock $ T.unpack ep - finally (brokerClientThread' sock) (cleanup sock) +brokerClientThread :: Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> IO () +brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cleanup where - cleanup sock = close sock >> putMVar comp () - brokerClientThread' sock = do - forever $ do + cleanup = putMVar comp () + brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do + sock <- socket ctx Req + connect sock $ T.unpack ep + setReceiveTimeout (restrict 1000) sock + finally (brokerClientThread'' sock) (close sock) + brokerClientThread'' sock = whileM_ (isNothing <$> tryReadMVar killMv) $ do request <- takeMVar cmd send sock [] (BL.toStrict $ encode request) - maybeResponse <- decode . BL.fromStrict <$> receive sock - case maybeResponse of - Just response -> putMVar resp response - Nothing -> putMVar resp (ResponseError "Unable to decode response") + events <- poll 1000 [Sock sock [In] Nothing] + if (not . null) events + then do + maybeResponse <- decode . BL.fromStrict <$> receive sock + case maybeResponse of + Just response -> putMVar resp response + Nothing -> putMVar resp (ResponseError "Unable to decode response") + else + putMVar resp (ResponseError "Response timeout") startBrokerClient :: Context -> T.Text -> IO BrokerClientHandle startBrokerClient ctx endpoint = do idCounter <- newIORef 1 compMv <- newEmptyMVar + killMv <- newEmptyMVar cmdVar <- newEmptyMVar :: IO (MVar BrokerServerRequest) respVar <- newEmptyMVar :: IO (MVar BrokerServerResponse) - tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv) + tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv killMv) return BrokerClientHandle { tid = tid, completionMvar = compMv, + killMvar = killMv, submitOrder = bcSubmitOrder idCounter cmdVar respVar, cancelOrder = bcCancelOrder idCounter cmdVar respVar, getNotifications = bcGetNotifications idCounter cmdVar respVar, @@ -69,7 +81,7 @@ startBrokerClient ctx endpoint = do } stopBrokerClient :: BrokerClientHandle -> IO () -stopBrokerClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle) +stopBrokerClient handle = putMVar (killMvar handle) () >> yield >> killThread (tid handle) >> readMVar (completionMvar handle) nextId cnt = atomicModifyIORef' cnt (\v -> (v + 1, v))