diff --git a/src/Broker/PaperBroker.hs b/src/Broker/PaperBroker.hs index aebc7ee..ee1aac0 100644 --- a/src/Broker/PaperBroker.hs +++ b/src/Broker/PaperBroker.hs @@ -10,7 +10,6 @@ module Broker.PaperBroker ( import Control.DeepSeq import Data.Hashable import Data.Bits -import Control.Concurrent.BoundedChan import Control.Concurrent.STM import Control.Concurrent.STM.TBQueue import ATrade.Types @@ -33,7 +32,6 @@ instance Hashable TickMapKey where data PaperBrokerState = PaperBrokerState { pbTid :: Maybe ThreadId, - tickChannel :: TBQueue Tick, tickMap :: M.HashMap TickMapKey Tick, orders :: M.HashMap OrderId Order, cash :: ! Decimal, @@ -42,16 +40,15 @@ data PaperBrokerState = PaperBrokerState { mkPaperBroker :: TBQueue Tick -> Decimal -> [T.Text] -> IO BrokerInterface mkPaperBroker tickChan startCash accounts = do - state <- newIORef PaperBrokerState { + state <- atomically $ newTVar PaperBrokerState { pbTid = Nothing, - tickChannel = tickChan, tickMap = M.empty, orders = M.empty, cash = startCash, notificationCallback = Nothing } - tid <- forkIO $ brokerThread state - atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ()) ) + tid <- forkIO $ brokerThread tickChan state + atomically $ modifyTVar' state (\s -> s { pbTid = Just tid }) return BrokerInterface { accounts = accounts, @@ -60,23 +57,20 @@ mkPaperBroker tickChan startCash accounts = do cancelOrder = pbCancelOrder state, stopBroker = pbDestroyBroker state } -brokerThread :: IORef PaperBrokerState -> IO () -brokerThread state = do - chan <- tickChannel <$> readIORef state - forever $ do - tick <- atomically $ readTBQueue chan - atomicModifyIORef' state (\s -> (s { tickMap = M.insert (makeKey tick) tick $! tickMap s }, ()) ) +brokerThread :: TBQueue Tick -> TVar PaperBrokerState -> IO () +brokerThread chan state = forever $ atomically $ do + tick <- readTBQueue chan + modifyTVar' state (\s -> s { tickMap = M.insert (makeKey tick) tick $! tickMap s }) where makeKey !tick = TickMapKey (security $! tick) (datatype tick) -pbSetNotificationCallback :: IORef PaperBrokerState -> Maybe (Notification -> IO ()) -> IO() -pbSetNotificationCallback state callback = modifyIORef state (\s -> s { notificationCallback = callback } ) +pbSetNotificationCallback :: TVar PaperBrokerState -> Maybe (Notification -> IO ()) -> IO() +pbSetNotificationCallback state callback = atomically $ modifyTVar' state (\s -> s { notificationCallback = callback } ) -pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO () +pbSubmitOrder :: TVar PaperBrokerState -> Order -> IO () pbSubmitOrder state order = do infoM "PaperBroker" $ "Submitted order: " ++ show order - curState <- readIORef state case orderPrice order of Market -> executeMarketOrder state order Limit price -> submitLimitOrder state order @@ -85,14 +79,14 @@ pbSubmitOrder state order = do where executeMarketOrder state order = do - tm <- tickMap <$> readIORef state + tm <- atomically $ tickMap <$> readTVar state case M.lookup key tm of Nothing -> let newOrder = order { orderState = OrderError } in - atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ()) ) + atomically $ modifyTVar' state (\s -> s { orders = M.insert (orderId order) newOrder $ orders s }) Just tick -> let newOrder = order { orderState = Executed } tradeVolume = (realFracToDecimal 10 (fromIntegral $ orderQuantity order) * value tick) in do - atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume}, ()) ) + atomically $ modifyTVar' state (\s -> s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume}) debugM "PaperBroker" $ "Executed: " ++ show newOrder ts <- getCurrentTime maybeCall notificationCallback state $ TradeNotification $ mkTrade tick order ts @@ -108,7 +102,7 @@ pbSubmitOrder state order = do key = TickMapKey (orderSecurity order) (orderDatatype order) maybeCall proj state arg = do - cb <- proj <$> readIORef state + cb <- atomically $ proj <$> readTVar state case cb of Just callback -> callback arg Nothing -> return () @@ -127,16 +121,16 @@ pbSubmitOrder state order = do tradeSignalId = orderSignalId order } -pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO Bool +pbCancelOrder :: TVar PaperBrokerState -> OrderId -> IO Bool pbCancelOrder state order = undefined -pbDestroyBroker :: IORef PaperBrokerState -> IO () +pbDestroyBroker :: TVar PaperBrokerState -> IO () pbDestroyBroker state = do - maybeTid <- pbTid <$> readIORef state + maybeTid <- atomically $ pbTid <$> readTVar state case maybeTid of Just tid -> killThread tid Nothing -> return () -pbGetOrder :: IORef PaperBrokerState -> OrderId -> IO (Maybe Order) -pbGetOrder state oid = M.lookup oid . orders <$> readIORef state +pbGetOrder :: TVar PaperBrokerState -> OrderId -> IO (Maybe Order) +pbGetOrder state oid = atomically $ M.lookup oid . orders <$> readTVar state