diff --git a/libatrade.cabal b/libatrade.cabal index 2bcfc08..86dbd66 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -38,6 +38,7 @@ library , containers , monad-loops , safe + , stm default-language: Haskell2010 executable libatrade-exe @@ -75,6 +76,7 @@ test-suite libatrade-test , bytestring , monad-loops , uuid + , stm ghc-options: -threaded -rtsopts -with-rtsopts=-N -Wincomplete-patterns default-language: Haskell2010 other-modules: ArbitraryInstances diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs index 5a89682..9314c87 100644 --- a/src/ATrade/QuoteSource/Client.hs +++ b/src/ATrade/QuoteSource/Client.hs @@ -13,6 +13,7 @@ import Control.Monad import Control.Monad.Loops import Control.Exception import Data.List.NonEmpty +import Data.Maybe import qualified Data.Text as T import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Char8 as B8 @@ -27,27 +28,29 @@ import Safe data QuoteSourceClientHandle = QuoteSourceClientHandle { tid :: ThreadId, - completionMvar :: MVar () + completionMvar :: MVar (), + killMVar :: MVar () } startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle startQuoteSourceClient chan tickers ctx endpoint = do compMv <- newEmptyMVar + killMv <- newEmptyMVar now <- getCurrentTime lastHeartbeat <- newIORef now - tid <- forkIO $ finally (clientThread lastHeartbeat) (cleanup compMv) - return QuoteSourceClientHandle { tid = tid, completionMvar = compMv } + tid <- forkIO $ finally (clientThread lastHeartbeat killMv) (cleanup compMv) + return QuoteSourceClientHandle { tid = tid, completionMvar = compMv, killMVar = killMv } where - clientThread lastHeartbeat = forever $ withSocket ctx Sub (\sock -> do + clientThread lastHeartbeat killMv = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do connect sock $ T.unpack endpoint - mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers + mapM_ (subscribe sock . encodeUtf8) tickers subscribe sock $ B8.pack "SYSTEM#HEARTBEAT" now <- getCurrentTime writeIORef lastHeartbeat now - whileM_ (notTimeout lastHeartbeat) $ do + whileM_ (andM [notTimeout lastHeartbeat, isNothing <$> tryReadMVar killMv]) $ do evs <- poll 200 [Sock sock [In] Nothing] - when ((L.length . L.head) evs > 0) $ do + unless (null (L.head evs)) $ do rawTick <- fmap BL.fromStrict <$> receiveMulti sock now <- getCurrentTime prevHeartbeat <- readIORef lastHeartbeat @@ -66,4 +69,4 @@ startQuoteSourceClient chan tickers ctx endpoint = do cleanup compMv = putMVar compMv () stopQuoteSourceClient :: QuoteSourceClientHandle -> IO () -stopQuoteSourceClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle) +stopQuoteSourceClient handle = yield >> putMVar (killMVar handle) () >> readMVar (completionMvar handle) diff --git a/src/ATrade/QuoteSource/Server.hs b/src/ATrade/QuoteSource/Server.hs index dae1225..812dd58 100644 --- a/src/ATrade/QuoteSource/Server.hs +++ b/src/ATrade/QuoteSource/Server.hs @@ -8,6 +8,8 @@ module ATrade.QuoteSource.Server ( import ATrade.Types import Control.Concurrent.BoundedChan import Control.Concurrent hiding (readChan, writeChan) +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue import Control.Exception import Control.Monad import qualified Data.Text as T @@ -20,7 +22,7 @@ import System.ZMQ4 data QuoteSourceServer = QuoteSourceServerState { ctx :: Context, outSocket :: Socket Pub, - tickChannel :: BoundedChan QuoteSourceServerData, + tickChannel :: TBQueue QuoteSourceServerData, completionMvar :: MVar (), serverThreadId :: ThreadId, heartbeatThreadId :: ThreadId @@ -39,7 +41,7 @@ serverThread state = do putMVar (completionMvar state) () serverThread' = do - qssdata <- readChan $ tickChannel state + qssdata <- atomically $ readTBQueue $ tickChannel state case qssdata of QSSKill -> return () QSSHeartbeat -> do @@ -49,14 +51,14 @@ serverThread state = do sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick serverThread' -startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer +startQuoteSourceServer :: TBQueue QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer startQuoteSourceServer chan c ep = do sock <- socket c Pub bind sock $ T.unpack ep tid <- myThreadId hbTid <- forkIO $ forever $ do threadDelay 1000000 - writeChan chan QSSHeartbeat + atomically $ writeTBQueue chan QSSHeartbeat mv <- newEmptyMVar let state = QuoteSourceServerState { @@ -71,5 +73,5 @@ startQuoteSourceServer chan c ep = do return $ state { serverThreadId = stid } stopQuoteSourceServer :: QuoteSourceServer -> IO () -stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> writeChan (tickChannel server) QSSKill >> readMVar (completionMvar server) +stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> atomically (writeTBQueue (tickChannel server) QSSKill) >> readMVar (completionMvar server) diff --git a/src/ATrade/Types.hs b/src/ATrade/Types.hs index 3033083..6297c58 100644 --- a/src/ATrade/Types.hs +++ b/src/ATrade/Types.hs @@ -138,7 +138,7 @@ deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of deserializeTick _ = Nothing data Bar = Bar { - barSecurity :: !T.Text, + barSecurity :: !TickerId, barTimestamp :: !UTCTime, barOpen :: !Decimal, barHigh :: !Decimal, diff --git a/test/TestQuoteSourceClient.hs b/test/TestQuoteSourceClient.hs index 3b2cdf6..5915c67 100644 --- a/test/TestQuoteSourceClient.hs +++ b/test/TestQuoteSourceClient.hs @@ -16,6 +16,8 @@ import Control.Monad import Control.Monad.Loops import Control.Concurrent.MVar import Control.Concurrent.BoundedChan +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue import Control.Concurrent hiding (writeChan, readChan) import Control.Exception import System.ZMQ4 @@ -35,14 +37,14 @@ unitTests = testGroup "QuoteSource.Client" [testStartStop, testTickStream] testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do ep <- makeEndpoint - chan <- newBoundedChan 1000 + chan <- atomically $ newTBQueue 1000 clientChan <- newBoundedChan 1000 bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceClient clientChan [] ctx ep) stopQuoteSourceClient (const yield))) testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do ep <- makeEndpoint - chan <- newBoundedChan 1000 + chan <- atomically $ newTBQueue 1000 clientChan <- newBoundedChan 1000 bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep) stopQuoteSourceClient (\qc -> do @@ -52,7 +54,7 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c timestamp = UTCTime (fromGregorian 2016 9 27) 16000, value = 1000, volume = 1} - forkIO $ forever $ writeChan chan (Just tick) + forkIO $ forever $ atomically $ writeTBQueue chan (QSSTick tick) recvdTick <- readChan clientChan tick @=? recvdTick))) diff --git a/test/TestQuoteSourceServer.hs b/test/TestQuoteSourceServer.hs index 2851975..c75be18 100644 --- a/test/TestQuoteSourceServer.hs +++ b/test/TestQuoteSourceServer.hs @@ -15,7 +15,8 @@ import ATrade.QuoteSource.Server import Control.Monad import Control.Monad.Loops import Control.Concurrent.MVar -import Control.Concurrent.BoundedChan +import Control.Concurrent.STM +import Control.Concurrent.STM.TBQueue import Control.Concurrent hiding (writeChan) import Control.Exception import System.ZMQ4 @@ -26,12 +27,12 @@ import Data.Maybe unitTests = testGroup "QuoteSource.Server" [testStartStop, testTickStream] testStartStop = testCase "QuoteSource Server starts and stops" $ withContext (\ctx -> do - chan <- newBoundedChan 1000 + chan <- atomically $ newTBQueue 1000 qss <- startQuoteSourceServer chan ctx "inproc://quotesource-server" stopQuoteSourceServer qss) testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx -> do - chan <- newBoundedChan 1000 + chan <- atomically $ newTBQueue 1000 bracket (startQuoteSourceServer chan ctx "inproc://quotesource-server") stopQuoteSourceServer (\qs -> withSocket ctx Sub (\s -> do connect s "inproc://quotesource-server" @@ -42,7 +43,7 @@ testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx - timestamp = UTCTime (fromGregorian 2016 9 27) 16000, value = 1000, volume = 1} - tryWriteChan chan (Just tick) + atomically $ writeTBQueue chan (QSSTick tick) packet <- fmap BL.fromStrict <$> receiveMulti s case deserializeTick packet of Just recvdTick -> tick @=? recvdTick