Browse Source

PaperBroker: fix leak

master
Denis Tereshkin 9 years ago
parent
commit
c2985a8431
  1. 44
      src/Broker/PaperBroker.hs

44
src/Broker/PaperBroker.hs

@ -10,7 +10,6 @@ module Broker.PaperBroker ( @@ -10,7 +10,6 @@ module Broker.PaperBroker (
import Control.DeepSeq
import Data.Hashable
import Data.Bits
import Control.Concurrent.BoundedChan
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import ATrade.Types
@ -33,7 +32,6 @@ instance Hashable TickMapKey where @@ -33,7 +32,6 @@ instance Hashable TickMapKey where
data PaperBrokerState = PaperBrokerState {
pbTid :: Maybe ThreadId,
tickChannel :: TBQueue Tick,
tickMap :: M.HashMap TickMapKey Tick,
orders :: M.HashMap OrderId Order,
cash :: ! Decimal,
@ -42,16 +40,15 @@ data PaperBrokerState = PaperBrokerState { @@ -42,16 +40,15 @@ data PaperBrokerState = PaperBrokerState {
mkPaperBroker :: TBQueue Tick -> Decimal -> [T.Text] -> IO BrokerInterface
mkPaperBroker tickChan startCash accounts = do
state <- newIORef PaperBrokerState {
state <- atomically $ newTVar PaperBrokerState {
pbTid = Nothing,
tickChannel = tickChan,
tickMap = M.empty,
orders = M.empty,
cash = startCash,
notificationCallback = Nothing }
tid <- forkIO $ brokerThread state
atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ()) )
tid <- forkIO $ brokerThread tickChan state
atomically $ modifyTVar' state (\s -> s { pbTid = Just tid })
return BrokerInterface {
accounts = accounts,
@ -60,23 +57,20 @@ mkPaperBroker tickChan startCash accounts = do @@ -60,23 +57,20 @@ mkPaperBroker tickChan startCash accounts = do
cancelOrder = pbCancelOrder state,
stopBroker = pbDestroyBroker state }
brokerThread :: IORef PaperBrokerState -> IO ()
brokerThread state = do
chan <- tickChannel <$> readIORef state
forever $ do
tick <- atomically $ readTBQueue chan
atomicModifyIORef' state (\s -> (s { tickMap = M.insert (makeKey tick) tick $! tickMap s }, ()) )
brokerThread :: TBQueue Tick -> TVar PaperBrokerState -> IO ()
brokerThread chan state = forever $ atomically $ do
tick <- readTBQueue chan
modifyTVar' state (\s -> s { tickMap = M.insert (makeKey tick) tick $! tickMap s })
where
makeKey !tick = TickMapKey (security $! tick) (datatype tick)
pbSetNotificationCallback :: IORef PaperBrokerState -> Maybe (Notification -> IO ()) -> IO()
pbSetNotificationCallback state callback = modifyIORef state (\s -> s { notificationCallback = callback } )
pbSetNotificationCallback :: TVar PaperBrokerState -> Maybe (Notification -> IO ()) -> IO()
pbSetNotificationCallback state callback = atomically $ modifyTVar' state (\s -> s { notificationCallback = callback } )
pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO ()
pbSubmitOrder :: TVar PaperBrokerState -> Order -> IO ()
pbSubmitOrder state order = do
infoM "PaperBroker" $ "Submitted order: " ++ show order
curState <- readIORef state
case orderPrice order of
Market -> executeMarketOrder state order
Limit price -> submitLimitOrder state order
@ -85,14 +79,14 @@ pbSubmitOrder state order = do @@ -85,14 +79,14 @@ pbSubmitOrder state order = do
where
executeMarketOrder state order = do
tm <- tickMap <$> readIORef state
tm <- atomically $ tickMap <$> readTVar state
case M.lookup key tm of
Nothing -> let newOrder = order { orderState = OrderError } in
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ()) )
atomically $ modifyTVar' state (\s -> s { orders = M.insert (orderId order) newOrder $ orders s })
Just tick -> let newOrder = order { orderState = Executed }
tradeVolume = (realFracToDecimal 10 (fromIntegral $ orderQuantity order) * value tick) in do
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume}, ()) )
atomically $ modifyTVar' state (\s -> s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume})
debugM "PaperBroker" $ "Executed: " ++ show newOrder
ts <- getCurrentTime
maybeCall notificationCallback state $ TradeNotification $ mkTrade tick order ts
@ -108,7 +102,7 @@ pbSubmitOrder state order = do @@ -108,7 +102,7 @@ pbSubmitOrder state order = do
key = TickMapKey (orderSecurity order) (orderDatatype order)
maybeCall proj state arg = do
cb <- proj <$> readIORef state
cb <- atomically $ proj <$> readTVar state
case cb of
Just callback -> callback arg
Nothing -> return ()
@ -127,16 +121,16 @@ pbSubmitOrder state order = do @@ -127,16 +121,16 @@ pbSubmitOrder state order = do
tradeSignalId = orderSignalId order }
pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO Bool
pbCancelOrder :: TVar PaperBrokerState -> OrderId -> IO Bool
pbCancelOrder state order = undefined
pbDestroyBroker :: IORef PaperBrokerState -> IO ()
pbDestroyBroker :: TVar PaperBrokerState -> IO ()
pbDestroyBroker state = do
maybeTid <- pbTid <$> readIORef state
maybeTid <- atomically $ pbTid <$> readTVar state
case maybeTid of
Just tid -> killThread tid
Nothing -> return ()
pbGetOrder :: IORef PaperBrokerState -> OrderId -> IO (Maybe Order)
pbGetOrder state oid = M.lookup oid . orders <$> readIORef state
pbGetOrder :: TVar PaperBrokerState -> OrderId -> IO (Maybe Order)
pbGetOrder state oid = atomically $ M.lookup oid . orders <$> readTVar state

Loading…
Cancel
Save