diff --git a/libatrade.cabal b/libatrade.cabal index cdd81aa..bc0b522 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -36,6 +36,7 @@ library , zeromq4-haskell , unordered-containers , containers + , monad-loops default-language: Haskell2010 executable libatrade-exe diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index cd5a0e7..35cda5d 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -11,16 +11,18 @@ import ATrade.Broker.Protocol import System.ZMQ4 import Data.List.NonEmpty import qualified Data.Map as M -import qualified Data.ByteString as B -import qualified Data.ByteString.Lazy as BL +import qualified Data.ByteString as B hiding (putStrLn) +import qualified Data.ByteString.Lazy as BL hiding (putStrLn) import qualified Data.Text as T import qualified Data.List as L import Data.Aeson +import Data.Maybe import Data.Time.Clock import Data.IORef import Control.Concurrent import Control.Exception import Control.Monad +import Control.Monad.Loops import System.Log.Logger import ATrade.Util @@ -43,10 +45,11 @@ data BrokerServerState = BrokerServerState { pendingNotifications :: M.Map PeerId [Notification], brokers :: [BrokerInterface], completionMvar :: MVar (), + killMvar :: MVar (), orderIdCounter :: OrderId } -data BrokerServerHandle = BrokerServerHandle ThreadId (MVar ()) +data BrokerServerHandle = BrokerServerHandle ThreadId (MVar ()) (MVar ()) startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> IO BrokerServerHandle startBrokerServer brokers c ep = do @@ -54,6 +57,7 @@ startBrokerServer brokers c ep = do bind sock (T.unpack ep) tid <- myThreadId compMv <- newEmptyMVar + killMv <- newEmptyMVar state <- newIORef BrokerServerState { bsSocket = sock, orderMap = M.empty, @@ -62,12 +66,13 @@ startBrokerServer brokers c ep = do pendingNotifications = M.empty, brokers = brokers, completionMvar = compMv, + killMvar = killMv, orderIdCounter = 1 } mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers debugM "Broker.Server" "Forking broker server thread" - BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv + BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv <*> pure killMv notificationCallback :: IORef BrokerServerState -> Notification -> IO () notificationCallback state n = do @@ -82,9 +87,10 @@ notificationCallback state n = do Just ns -> s { pendingNotifications = M.insert peerId (n : ns) (pendingNotifications s)} Nothing -> s { pendingNotifications = M.insert peerId [n] (pendingNotifications s)}) +brokerServerThread :: IORef BrokerServerState -> IO () brokerServerThread state = finally brokerServerThread' cleanup where - brokerServerThread' = forever $ do + brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryTakeMVar) $ do sock <- bsSocket <$> readIORef state evs <- poll 200 [Sock sock [In] Nothing] when ((L.length . L.head) evs > 0) $ do @@ -121,6 +127,7 @@ brokerServerThread state = finally brokerServerThread' cleanup cleanup = do sock <- bsSocket <$> readIORef state close sock + mv <- completionMvar <$> readIORef state putMVar mv () @@ -161,5 +168,7 @@ brokerServerThread state = finally brokerServerThread' cleanup stopBrokerServer :: BrokerServerHandle -> IO () -stopBrokerServer (BrokerServerHandle tid compMv) = yield >> killThread tid >> readMVar compMv +stopBrokerServer (BrokerServerHandle tid compMv killMv) = do + putMVar killMv () + yield >> readMVar compMv diff --git a/src/ATrade/Types.hs b/src/ATrade/Types.hs index a5a7bfa..4129006 100644 --- a/src/ATrade/Types.hs +++ b/src/ATrade/Types.hs @@ -71,11 +71,11 @@ instance Enum DataType where | otherwise = Unknown data Tick = Tick { - security :: T.Text, - datatype :: DataType, - timestamp :: UTCTime, - value :: Decimal, - volume :: Integer + security :: !T.Text, + datatype :: !DataType, + timestamp :: !UTCTime, + value :: !Decimal, + volume :: !Integer } deriving (Show, Eq) serializeTick :: Tick -> [ByteString]