Browse Source

Got rid of STM

master
Denis Tereshkin 9 years ago
parent
commit
c94881aceb
  1. 3
      libatrade.cabal
  2. 2
      src/ATrade/Broker/Server.hs
  3. 7
      src/ATrade/QuoteSource/Client.hs
  4. 14
      src/ATrade/QuoteSource/Server.hs
  5. 14
      test/TestQuoteSourceClient.hs
  6. 9
      test/TestQuoteSourceServer.hs

3
libatrade.cabal

@ -39,8 +39,9 @@ library
, monad-loops , monad-loops
, safe , safe
, stm , stm
, text-format , deepseq
, errors , errors
, text-format
, parsec , parsec
default-language: Haskell2010 default-language: Haskell2010

2
src/ATrade/Broker/Server.hs

@ -77,7 +77,7 @@ startBrokerServer brokers c ep tradeSinkEp = do
mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers
debugM "Broker.Server" "Forking broker server thread" debugM "Broker.Server" "Forking broker server thread"
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv BrokerServerHandle <$> forkOS (brokerServerThread state) <*> forkOS (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv
notificationCallback :: IORef BrokerServerState -> Notification -> IO () notificationCallback :: IORef BrokerServerState -> Notification -> IO ()
notificationCallback state n = do notificationCallback state n = do

7
src/ATrade/QuoteSource/Client.hs

@ -6,9 +6,8 @@ module ATrade.QuoteSource.Client (
) where ) where
import ATrade.Types import ATrade.Types
import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (readChan, writeChan) import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent.MVar import Control.Concurrent.MVar
import Control.Monad import Control.Monad
import Control.Monad.Loops import Control.Monad.Loops
@ -33,7 +32,7 @@ data QuoteSourceClientHandle = QuoteSourceClientHandle {
killMVar :: MVar () killMVar :: MVar ()
} }
startQuoteSourceClient :: TBQueue Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle
startQuoteSourceClient chan tickers ctx endpoint = do startQuoteSourceClient chan tickers ctx endpoint = do
compMv <- newEmptyMVar compMv <- newEmptyMVar
killMv <- newEmptyMVar killMv <- newEmptyMVar
@ -58,7 +57,7 @@ startQuoteSourceClient chan tickers ctx endpoint = do
if headMay rawTick == Just "SYSTEM#HEARTBEAT" if headMay rawTick == Just "SYSTEM#HEARTBEAT"
then writeIORef lastHeartbeat now then writeIORef lastHeartbeat now
else case deserializeTick rawTick of else case deserializeTick rawTick of
Just tick -> atomically $ writeTBQueue chan tick Just tick -> writeChan chan tick
Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick"
debugM "QuoteSource.Client" "Heartbeat timeout") debugM "QuoteSource.Client" "Heartbeat timeout")

14
src/ATrade/QuoteSource/Server.hs

@ -8,8 +8,6 @@ module ATrade.QuoteSource.Server (
import ATrade.Types import ATrade.Types
import Control.Concurrent.BoundedChan import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (readChan, writeChan) import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Exception import Control.Exception
import Control.Monad import Control.Monad
import qualified Data.Text as T import qualified Data.Text as T
@ -22,7 +20,7 @@ import System.ZMQ4
data QuoteSourceServer = QuoteSourceServerState { data QuoteSourceServer = QuoteSourceServerState {
ctx :: Context, ctx :: Context,
outSocket :: Socket Pub, outSocket :: Socket Pub,
tickChannel :: TBQueue QuoteSourceServerData, tickChannel :: BoundedChan QuoteSourceServerData,
completionMvar :: MVar (), completionMvar :: MVar (),
serverThreadId :: ThreadId, serverThreadId :: ThreadId,
heartbeatThreadId :: ThreadId heartbeatThreadId :: ThreadId
@ -41,7 +39,7 @@ serverThread state = do
putMVar (completionMvar state) () putMVar (completionMvar state) ()
serverThread' = do serverThread' = do
qssdata <- atomically $ readTBQueue $ tickChannel state qssdata <- readChan $ tickChannel state
case qssdata of case qssdata of
QSSKill -> return () QSSKill -> return ()
QSSHeartbeat -> do QSSHeartbeat -> do
@ -51,14 +49,14 @@ serverThread state = do
sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick
serverThread' serverThread'
startQuoteSourceServer :: TBQueue QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer
startQuoteSourceServer chan c ep = do startQuoteSourceServer chan c ep = do
sock <- socket c Pub sock <- socket c Pub
bind sock $ T.unpack ep bind sock $ T.unpack ep
tid <- myThreadId tid <- myThreadId
hbTid <- forkIO $ forever $ do hbTid <- forkIO $ forever $ do
threadDelay 1000000 threadDelay 1000000
atomically $ writeTBQueue chan QSSHeartbeat writeChan chan QSSHeartbeat
mv <- newEmptyMVar mv <- newEmptyMVar
let state = QuoteSourceServerState { let state = QuoteSourceServerState {
@ -69,9 +67,9 @@ startQuoteSourceServer chan c ep = do
serverThreadId = tid, serverThreadId = tid,
heartbeatThreadId = hbTid heartbeatThreadId = hbTid
} }
stid <- forkIO $ serverThread state stid <- forkOS $ serverThread state
return $ state { serverThreadId = stid } return $ state { serverThreadId = stid }
stopQuoteSourceServer :: QuoteSourceServer -> IO () stopQuoteSourceServer :: QuoteSourceServer -> IO ()
stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> atomically (writeTBQueue (tickChannel server) QSSKill) >> readMVar (completionMvar server) stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> (writeChan (tickChannel server) QSSKill) >> readMVar (completionMvar server)

14
test/TestQuoteSourceClient.hs

@ -16,8 +16,6 @@ import Control.Monad
import Control.Monad.Loops import Control.Monad.Loops
import Control.Concurrent.MVar import Control.Concurrent.MVar
import Control.Concurrent.BoundedChan import Control.Concurrent.BoundedChan
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent hiding (writeChan, readChan) import Control.Concurrent hiding (writeChan, readChan)
import Control.Exception import Control.Exception
import System.ZMQ4 import System.ZMQ4
@ -37,15 +35,15 @@ unitTests = testGroup "QuoteSource.Client" [testStartStop, testTickStream]
testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do
ep <- makeEndpoint ep <- makeEndpoint
chan <- atomically $ newTBQueue 1000 chan <- newBoundedChan 1000
clientChan <- atomically $ newTBQueue 1000 clientChan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs ->
bracket (startQuoteSourceClient clientChan [] ctx ep) stopQuoteSourceClient (const yield))) bracket (startQuoteSourceClient clientChan [] ctx ep) stopQuoteSourceClient (const yield)))
testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do
ep <- makeEndpoint ep <- makeEndpoint
chan <- atomically $ newTBQueue 1000 chan <- newBoundedChan 1000
clientChan <- atomically $ newTBQueue 1000 clientChan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs -> bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs ->
bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep) stopQuoteSourceClient (\qc -> do bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep) stopQuoteSourceClient (\qc -> do
let tick = Tick { let tick = Tick {
@ -54,7 +52,7 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c
timestamp = UTCTime (fromGregorian 2016 9 27) 16000, timestamp = UTCTime (fromGregorian 2016 9 27) 16000,
value = 1000, value = 1000,
volume = 1} volume = 1}
forkIO $ forever $ atomically $ writeTBQueue chan (QSSTick tick) forkIO $ forever $ writeChan chan (QSSTick tick)
recvdTick <- atomically $ readTBQueue clientChan recvdTick <- readChan clientChan
tick @=? recvdTick))) tick @=? recvdTick)))

9
test/TestQuoteSourceServer.hs

@ -15,9 +15,8 @@ import ATrade.QuoteSource.Server
import Control.Monad import Control.Monad
import Control.Monad.Loops import Control.Monad.Loops
import Control.Concurrent.MVar import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent hiding (writeChan) import Control.Concurrent hiding (writeChan)
import Control.Concurrent.BoundedChan
import Control.Exception import Control.Exception
import System.ZMQ4 import System.ZMQ4
import Data.Time.Clock import Data.Time.Clock
@ -27,12 +26,12 @@ import Data.Maybe
unitTests = testGroup "QuoteSource.Server" [testStartStop, testTickStream] unitTests = testGroup "QuoteSource.Server" [testStartStop, testTickStream]
testStartStop = testCase "QuoteSource Server starts and stops" $ withContext (\ctx -> do testStartStop = testCase "QuoteSource Server starts and stops" $ withContext (\ctx -> do
chan <- atomically $ newTBQueue 1000 chan <- newBoundedChan 1000
qss <- startQuoteSourceServer chan ctx "inproc://quotesource-server" qss <- startQuoteSourceServer chan ctx "inproc://quotesource-server"
stopQuoteSourceServer qss) stopQuoteSourceServer qss)
testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx -> do testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx -> do
chan <- atomically $ newTBQueue 1000 chan <- newBoundedChan 1000
bracket (startQuoteSourceServer chan ctx "inproc://quotesource-server") stopQuoteSourceServer (\qs -> bracket (startQuoteSourceServer chan ctx "inproc://quotesource-server") stopQuoteSourceServer (\qs ->
withSocket ctx Sub (\s -> do withSocket ctx Sub (\s -> do
connect s "inproc://quotesource-server" connect s "inproc://quotesource-server"
@ -43,7 +42,7 @@ testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx -
timestamp = UTCTime (fromGregorian 2016 9 27) 16000, timestamp = UTCTime (fromGregorian 2016 9 27) 16000,
value = 1000, value = 1000,
volume = 1} volume = 1}
atomically $ writeTBQueue chan (QSSTick tick) writeChan chan (QSSTick tick)
packet <- fmap BL.fromStrict <$> receiveMulti s packet <- fmap BL.fromStrict <$> receiveMulti s
case deserializeTick packet of case deserializeTick packet of
Just recvdTick -> tick @=? recvdTick Just recvdTick -> tick @=? recvdTick

Loading…
Cancel
Save