Browse Source

Trying to get rid of leaks

master
Denis Tereshkin 9 years ago
parent
commit
30dcdca1c3
  1. 1
      libatrade.cabal
  2. 21
      src/ATrade/Broker/Server.hs
  3. 10
      src/ATrade/Types.hs

1
libatrade.cabal

@ -36,6 +36,7 @@ library
, zeromq4-haskell , zeromq4-haskell
, unordered-containers , unordered-containers
, containers , containers
, monad-loops
default-language: Haskell2010 default-language: Haskell2010
executable libatrade-exe executable libatrade-exe

21
src/ATrade/Broker/Server.hs

@ -11,16 +11,18 @@ import ATrade.Broker.Protocol
import System.ZMQ4 import System.ZMQ4
import Data.List.NonEmpty import Data.List.NonEmpty
import qualified Data.Map as M import qualified Data.Map as M
import qualified Data.ByteString as B import qualified Data.ByteString as B hiding (putStrLn)
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL hiding (putStrLn)
import qualified Data.Text as T import qualified Data.Text as T
import qualified Data.List as L import qualified Data.List as L
import Data.Aeson import Data.Aeson
import Data.Maybe
import Data.Time.Clock import Data.Time.Clock
import Data.IORef import Data.IORef
import Control.Concurrent import Control.Concurrent
import Control.Exception import Control.Exception
import Control.Monad import Control.Monad
import Control.Monad.Loops
import System.Log.Logger import System.Log.Logger
import ATrade.Util import ATrade.Util
@ -43,10 +45,11 @@ data BrokerServerState = BrokerServerState {
pendingNotifications :: M.Map PeerId [Notification], pendingNotifications :: M.Map PeerId [Notification],
brokers :: [BrokerInterface], brokers :: [BrokerInterface],
completionMvar :: MVar (), completionMvar :: MVar (),
killMvar :: MVar (),
orderIdCounter :: OrderId orderIdCounter :: OrderId
} }
data BrokerServerHandle = BrokerServerHandle ThreadId (MVar ()) data BrokerServerHandle = BrokerServerHandle ThreadId (MVar ()) (MVar ())
startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> IO BrokerServerHandle startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> IO BrokerServerHandle
startBrokerServer brokers c ep = do startBrokerServer brokers c ep = do
@ -54,6 +57,7 @@ startBrokerServer brokers c ep = do
bind sock (T.unpack ep) bind sock (T.unpack ep)
tid <- myThreadId tid <- myThreadId
compMv <- newEmptyMVar compMv <- newEmptyMVar
killMv <- newEmptyMVar
state <- newIORef BrokerServerState { state <- newIORef BrokerServerState {
bsSocket = sock, bsSocket = sock,
orderMap = M.empty, orderMap = M.empty,
@ -62,12 +66,13 @@ startBrokerServer brokers c ep = do
pendingNotifications = M.empty, pendingNotifications = M.empty,
brokers = brokers, brokers = brokers,
completionMvar = compMv, completionMvar = compMv,
killMvar = killMv,
orderIdCounter = 1 orderIdCounter = 1
} }
mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers
debugM "Broker.Server" "Forking broker server thread" debugM "Broker.Server" "Forking broker server thread"
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv BrokerServerHandle <$> forkIO (brokerServerThread state) <*> pure compMv <*> pure killMv
notificationCallback :: IORef BrokerServerState -> Notification -> IO () notificationCallback :: IORef BrokerServerState -> Notification -> IO ()
notificationCallback state n = do notificationCallback state n = do
@ -82,9 +87,10 @@ notificationCallback state n = do
Just ns -> s { pendingNotifications = M.insert peerId (n : ns) (pendingNotifications s)} Just ns -> s { pendingNotifications = M.insert peerId (n : ns) (pendingNotifications s)}
Nothing -> s { pendingNotifications = M.insert peerId [n] (pendingNotifications s)}) Nothing -> s { pendingNotifications = M.insert peerId [n] (pendingNotifications s)})
brokerServerThread :: IORef BrokerServerState -> IO ()
brokerServerThread state = finally brokerServerThread' cleanup brokerServerThread state = finally brokerServerThread' cleanup
where where
brokerServerThread' = forever $ do brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryTakeMVar) $ do
sock <- bsSocket <$> readIORef state sock <- bsSocket <$> readIORef state
evs <- poll 200 [Sock sock [In] Nothing] evs <- poll 200 [Sock sock [In] Nothing]
when ((L.length . L.head) evs > 0) $ do when ((L.length . L.head) evs > 0) $ do
@ -121,6 +127,7 @@ brokerServerThread state = finally brokerServerThread' cleanup
cleanup = do cleanup = do
sock <- bsSocket <$> readIORef state sock <- bsSocket <$> readIORef state
close sock close sock
mv <- completionMvar <$> readIORef state mv <- completionMvar <$> readIORef state
putMVar mv () putMVar mv ()
@ -161,5 +168,7 @@ brokerServerThread state = finally brokerServerThread' cleanup
stopBrokerServer :: BrokerServerHandle -> IO () stopBrokerServer :: BrokerServerHandle -> IO ()
stopBrokerServer (BrokerServerHandle tid compMv) = yield >> killThread tid >> readMVar compMv stopBrokerServer (BrokerServerHandle tid compMv killMv) = do
putMVar killMv ()
yield >> readMVar compMv

10
src/ATrade/Types.hs

@ -71,11 +71,11 @@ instance Enum DataType where
| otherwise = Unknown | otherwise = Unknown
data Tick = Tick { data Tick = Tick {
security :: T.Text, security :: !T.Text,
datatype :: DataType, datatype :: !DataType,
timestamp :: UTCTime, timestamp :: !UTCTime,
value :: Decimal, value :: !Decimal,
volume :: Integer volume :: !Integer
} deriving (Show, Eq) } deriving (Show, Eq)
serializeTick :: Tick -> [ByteString] serializeTick :: Tick -> [ByteString]

Loading…
Cancel
Save