Browse Source

Merge branch 'stm'

master
Denis Tereshkin 9 years ago
parent
commit
319caf43b2
  1. 2
      libatrade.cabal
  2. 38
      src/ATrade/Broker/Client.hs
  3. 26
      src/ATrade/QuoteSource/Client.hs
  4. 12
      src/ATrade/QuoteSource/Server.hs
  5. 2
      src/ATrade/Types.hs
  6. 14
      test/TestQuoteSourceClient.hs
  7. 9
      test/TestQuoteSourceServer.hs

2
libatrade.cabal

@ -38,6 +38,7 @@ library @@ -38,6 +38,7 @@ library
, containers
, monad-loops
, safe
, stm
default-language: Haskell2010
executable libatrade-exe
@ -75,6 +76,7 @@ test-suite libatrade-test @@ -75,6 +76,7 @@ test-suite libatrade-test
, bytestring
, monad-loops
, uuid
, stm
ghc-options: -threaded -rtsopts -with-rtsopts=-N -Wincomplete-patterns
default-language: Haskell2010
other-modules: ArbitraryInstances

38
src/ATrade/Broker/Client.hs

@ -15,9 +15,11 @@ import Control.Concurrent.BoundedChan @@ -15,9 +15,11 @@ import Control.Concurrent.BoundedChan
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad
import Control.Monad.Loops
import Data.Aeson
import Data.Int
import Data.IORef
import Data.Maybe
import Data.List.NonEmpty
import qualified Data.Text as T
import qualified Data.ByteString.Lazy as BL
@ -28,6 +30,7 @@ import System.Log.Logger @@ -28,6 +30,7 @@ import System.Log.Logger
data BrokerClientHandle = BrokerClientHandle {
tid :: ThreadId,
completionMvar :: MVar (),
killMvar :: MVar (),
submitOrder :: Order -> IO (Either T.Text OrderId),
cancelOrder :: OrderId -> IO (Either T.Text ()),
getNotifications :: IO (Either T.Text [Notification]),
@ -35,32 +38,41 @@ data BrokerClientHandle = BrokerClientHandle { @@ -35,32 +38,41 @@ data BrokerClientHandle = BrokerClientHandle {
respVar :: MVar BrokerServerResponse
}
brokerClientThread ctx ep cmd resp comp = do
sock <- socket ctx Req
connect sock $ T.unpack ep
finally (brokerClientThread' sock) (cleanup sock)
brokerClientThread :: Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> IO ()
brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cleanup
where
cleanup sock = close sock >> putMVar comp ()
brokerClientThread' sock = do
forever $ do
cleanup = putMVar comp ()
brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do
sock <- socket ctx Req
connect sock $ T.unpack ep
setReceiveTimeout (restrict 1000) sock
finally (brokerClientThread'' sock) (close sock)
brokerClientThread'' sock = whileM_ (isNothing <$> tryReadMVar killMv) $ do
request <- takeMVar cmd
send sock [] (BL.toStrict $ encode request)
maybeResponse <- decode . BL.fromStrict <$> receive sock
case maybeResponse of
Just response -> putMVar resp response
Nothing -> putMVar resp (ResponseError "Unable to decode response")
events <- poll 1000 [Sock sock [In] Nothing]
if (not . null) events
then do
maybeResponse <- decode . BL.fromStrict <$> receive sock
case maybeResponse of
Just response -> putMVar resp response
Nothing -> putMVar resp (ResponseError "Unable to decode response")
else
putMVar resp (ResponseError "Response timeout")
startBrokerClient :: Context -> T.Text -> IO BrokerClientHandle
startBrokerClient ctx endpoint = do
idCounter <- newIORef 1
compMv <- newEmptyMVar
killMv <- newEmptyMVar
cmdVar <- newEmptyMVar :: IO (MVar BrokerServerRequest)
respVar <- newEmptyMVar :: IO (MVar BrokerServerResponse)
tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv)
tid <- forkIO (brokerClientThread ctx endpoint cmdVar respVar compMv killMv)
return BrokerClientHandle {
tid = tid,
completionMvar = compMv,
killMvar = killMv,
submitOrder = bcSubmitOrder idCounter cmdVar respVar,
cancelOrder = bcCancelOrder idCounter cmdVar respVar,
getNotifications = bcGetNotifications idCounter cmdVar respVar,
@ -69,7 +81,7 @@ startBrokerClient ctx endpoint = do @@ -69,7 +81,7 @@ startBrokerClient ctx endpoint = do
}
stopBrokerClient :: BrokerClientHandle -> IO ()
stopBrokerClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle)
stopBrokerClient handle = putMVar (killMvar handle) () >> yield >> killThread (tid handle) >> readMVar (completionMvar handle)
nextId cnt = atomicModifyIORef' cnt (\v -> (v + 1, v))

26
src/ATrade/QuoteSource/Client.hs

@ -7,12 +7,14 @@ module ATrade.QuoteSource.Client ( @@ -7,12 +7,14 @@ module ATrade.QuoteSource.Client (
import ATrade.Types
import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.BoundedChan
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent.MVar
import Control.Monad
import Control.Monad.Loops
import Control.Exception
import Data.List.NonEmpty
import Data.Maybe
import qualified Data.Text as T
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Char8 as B8
@ -27,34 +29,36 @@ import Safe @@ -27,34 +29,36 @@ import Safe
data QuoteSourceClientHandle = QuoteSourceClientHandle {
tid :: ThreadId,
completionMvar :: MVar ()
completionMvar :: MVar (),
killMVar :: MVar ()
}
startQuoteSourceClient :: BoundedChan Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle
startQuoteSourceClient :: TBQueue Tick -> [T.Text] -> Context -> T.Text -> IO QuoteSourceClientHandle
startQuoteSourceClient chan tickers ctx endpoint = do
compMv <- newEmptyMVar
killMv <- newEmptyMVar
now <- getCurrentTime
lastHeartbeat <- newIORef now
tid <- forkIO $ finally (clientThread lastHeartbeat) (cleanup compMv)
return QuoteSourceClientHandle { tid = tid, completionMvar = compMv }
tid <- forkIO $ finally (clientThread lastHeartbeat killMv) (cleanup compMv)
return QuoteSourceClientHandle { tid = tid, completionMvar = compMv, killMVar = killMv }
where
clientThread lastHeartbeat = forever $ withSocket ctx Sub (\sock -> do
clientThread lastHeartbeat killMv = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Sub (\sock -> do
connect sock $ T.unpack endpoint
mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers
mapM_ (subscribe sock . encodeUtf8) tickers
subscribe sock $ B8.pack "SYSTEM#HEARTBEAT"
now <- getCurrentTime
writeIORef lastHeartbeat now
whileM_ (notTimeout lastHeartbeat) $ do
whileM_ (andM [notTimeout lastHeartbeat, isNothing <$> tryReadMVar killMv]) $ do
evs <- poll 200 [Sock sock [In] Nothing]
when ((L.length . L.head) evs > 0) $ do
unless (null (L.head evs)) $ do
rawTick <- fmap BL.fromStrict <$> receiveMulti sock
now <- getCurrentTime
prevHeartbeat <- readIORef lastHeartbeat
if headMay rawTick == Just "SYSTEM#HEARTBEAT"
then writeIORef lastHeartbeat now
else case deserializeTick rawTick of
Just tick -> writeChan chan tick
Just tick -> atomically $ writeTBQueue chan tick
Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick"
debugM "QuoteSource.Client" "Heartbeat timeout")
@ -66,4 +70,4 @@ startQuoteSourceClient chan tickers ctx endpoint = do @@ -66,4 +70,4 @@ startQuoteSourceClient chan tickers ctx endpoint = do
cleanup compMv = putMVar compMv ()
stopQuoteSourceClient :: QuoteSourceClientHandle -> IO ()
stopQuoteSourceClient handle = yield >> killThread (tid handle) >> readMVar (completionMvar handle)
stopQuoteSourceClient handle = yield >> putMVar (killMVar handle) () >> readMVar (completionMvar handle)

12
src/ATrade/QuoteSource/Server.hs

@ -8,6 +8,8 @@ module ATrade.QuoteSource.Server ( @@ -8,6 +8,8 @@ module ATrade.QuoteSource.Server (
import ATrade.Types
import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Exception
import Control.Monad
import qualified Data.Text as T
@ -20,7 +22,7 @@ import System.ZMQ4 @@ -20,7 +22,7 @@ import System.ZMQ4
data QuoteSourceServer = QuoteSourceServerState {
ctx :: Context,
outSocket :: Socket Pub,
tickChannel :: BoundedChan QuoteSourceServerData,
tickChannel :: TBQueue QuoteSourceServerData,
completionMvar :: MVar (),
serverThreadId :: ThreadId,
heartbeatThreadId :: ThreadId
@ -39,7 +41,7 @@ serverThread state = do @@ -39,7 +41,7 @@ serverThread state = do
putMVar (completionMvar state) ()
serverThread' = do
qssdata <- readChan $ tickChannel state
qssdata <- atomically $ readTBQueue $ tickChannel state
case qssdata of
QSSKill -> return ()
QSSHeartbeat -> do
@ -49,14 +51,14 @@ serverThread state = do @@ -49,14 +51,14 @@ serverThread state = do
sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick
serverThread'
startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer
startQuoteSourceServer :: TBQueue QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer
startQuoteSourceServer chan c ep = do
sock <- socket c Pub
bind sock $ T.unpack ep
tid <- myThreadId
hbTid <- forkIO $ forever $ do
threadDelay 1000000
writeChan chan QSSHeartbeat
atomically $ writeTBQueue chan QSSHeartbeat
mv <- newEmptyMVar
let state = QuoteSourceServerState {
@ -71,5 +73,5 @@ startQuoteSourceServer chan c ep = do @@ -71,5 +73,5 @@ startQuoteSourceServer chan c ep = do
return $ state { serverThreadId = stid }
stopQuoteSourceServer :: QuoteSourceServer -> IO ()
stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> writeChan (tickChannel server) QSSKill >> readMVar (completionMvar server)
stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> atomically (writeTBQueue (tickChannel server) QSSKill) >> readMVar (completionMvar server)

2
src/ATrade/Types.hs

@ -138,7 +138,7 @@ deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of @@ -138,7 +138,7 @@ deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of
deserializeTick _ = Nothing
data Bar = Bar {
barSecurity :: !T.Text,
barSecurity :: !TickerId,
barTimestamp :: !UTCTime,
barOpen :: !Decimal,
barHigh :: !Decimal,

14
test/TestQuoteSourceClient.hs

@ -16,6 +16,8 @@ import Control.Monad @@ -16,6 +16,8 @@ import Control.Monad
import Control.Monad.Loops
import Control.Concurrent.MVar
import Control.Concurrent.BoundedChan
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent hiding (writeChan, readChan)
import Control.Exception
import System.ZMQ4
@ -35,15 +37,15 @@ unitTests = testGroup "QuoteSource.Client" [testStartStop, testTickStream] @@ -35,15 +37,15 @@ unitTests = testGroup "QuoteSource.Client" [testStartStop, testTickStream]
testStartStop = testCase "QuoteSource client connects and disconnects" $ withContext (\ctx -> do
ep <- makeEndpoint
chan <- newBoundedChan 1000
clientChan <- newBoundedChan 1000
chan <- atomically $ newTBQueue 1000
clientChan <- atomically $ newTBQueue 1000
bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs ->
bracket (startQuoteSourceClient clientChan [] ctx ep) stopQuoteSourceClient (const yield)))
testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\ctx -> do
ep <- makeEndpoint
chan <- newBoundedChan 1000
clientChan <- newBoundedChan 1000
chan <- atomically $ newTBQueue 1000
clientChan <- atomically $ newTBQueue 1000
bracket (startQuoteSourceServer chan ctx ep) stopQuoteSourceServer (\qs ->
bracket (startQuoteSourceClient clientChan ["FOOBAR"] ctx ep) stopQuoteSourceClient (\qc -> do
let tick = Tick {
@ -52,7 +54,7 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c @@ -52,7 +54,7 @@ testTickStream = testCase "QuoteSource clients receives ticks" $ withContext (\c
timestamp = UTCTime (fromGregorian 2016 9 27) 16000,
value = 1000,
volume = 1}
forkIO $ forever $ writeChan chan (Just tick)
recvdTick <- readChan clientChan
forkIO $ forever $ atomically $ writeTBQueue chan (QSSTick tick)
recvdTick <- atomically $ readTBQueue clientChan
tick @=? recvdTick)))

9
test/TestQuoteSourceServer.hs

@ -15,7 +15,8 @@ import ATrade.QuoteSource.Server @@ -15,7 +15,8 @@ import ATrade.QuoteSource.Server
import Control.Monad
import Control.Monad.Loops
import Control.Concurrent.MVar
import Control.Concurrent.BoundedChan
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent hiding (writeChan)
import Control.Exception
import System.ZMQ4
@ -26,12 +27,12 @@ import Data.Maybe @@ -26,12 +27,12 @@ import Data.Maybe
unitTests = testGroup "QuoteSource.Server" [testStartStop, testTickStream]
testStartStop = testCase "QuoteSource Server starts and stops" $ withContext (\ctx -> do
chan <- newBoundedChan 1000
chan <- atomically $ newTBQueue 1000
qss <- startQuoteSourceServer chan ctx "inproc://quotesource-server"
stopQuoteSourceServer qss)
testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx -> do
chan <- newBoundedChan 1000
chan <- atomically $ newTBQueue 1000
bracket (startQuoteSourceServer chan ctx "inproc://quotesource-server") stopQuoteSourceServer (\qs ->
withSocket ctx Sub (\s -> do
connect s "inproc://quotesource-server"
@ -42,7 +43,7 @@ testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx - @@ -42,7 +43,7 @@ testTickStream = testCase "QuoteSource Server sends ticks" $ withContext (\ctx -
timestamp = UTCTime (fromGregorian 2016 9 27) 16000,
value = 1000,
volume = 1}
tryWriteChan chan (Just tick)
atomically $ writeTBQueue chan (QSSTick tick)
packet <- fmap BL.fromStrict <$> receiveMulti s
case deserializeTick packet of
Just recvdTick -> tick @=? recvdTick

Loading…
Cancel
Save