@ -12,12 +12,14 @@ import Data.Hashable
@@ -12,12 +12,14 @@ import Data.Hashable
import Data.Bits
import ATrade.Types
import Data.IORef
import qualified Data.HashMap.Strict as M
import qualified Data.List as L
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import ATrade.Broker.Protocol
import ATrade.Broker.Server
import Data.Time.Clock
import Data.Decimal
import Data.Maybe
import Control.Monad
import Control.Concurrent.BoundedChan
import Control.Concurrent hiding ( readChan )
@ -31,10 +33,11 @@ instance Hashable TickMapKey where
@@ -31,10 +33,11 @@ instance Hashable TickMapKey where
data PaperBrokerState = PaperBrokerState {
pbTid :: Maybe ThreadId ,
tickMap :: M . Hash Map TickMapKey Tick ,
orders :: M . Hash Map OrderId Order ,
tickMap :: M . Map TickMapKey Tick ,
orders :: M . Map OrderId Order ,
cash :: ! Decimal ,
notificationCallback :: Maybe ( Notification -> IO () )
notificationCallback :: Maybe ( Notification -> IO () ) ,
pendingOrders :: [ Order ]
}
mkPaperBroker :: BoundedChan Tick -> Decimal -> [ T . Text ] -> IO BrokerInterface
@ -44,7 +47,8 @@ mkPaperBroker tickChan startCash accounts = do
@@ -44,7 +47,8 @@ mkPaperBroker tickChan startCash accounts = do
tickMap = M . empty ,
orders = M . empty ,
cash = startCash ,
notificationCallback = Nothing }
notificationCallback = Nothing ,
pendingOrders = [] }
tid <- forkIO $ brokerThread tickChan state
atomicModifyIORef' state ( \ s -> ( s { pbTid = Just tid } , () ) )
@ -60,19 +64,72 @@ brokerThread :: BoundedChan Tick -> IORef PaperBrokerState -> IO ()
@@ -60,19 +64,72 @@ brokerThread :: BoundedChan Tick -> IORef PaperBrokerState -> IO ()
brokerThread chan state = forever $ do
tick <- readChan chan
atomicModifyIORef' state ( \ s -> ( s { tickMap = M . insert ( makeKey tick ) tick $! tickMap s } , () ) )
executePendingOrders tick state
where
makeKey ! tick = TickMapKey ( security $! tick ) ( datatype tick )
executePendingOrders tick state = do
po <- pendingOrders <$> readIORef state
executedIds <- catMaybes <$> mapM execute po
atomicModifyIORef' state ( \ s -> ( s { pendingOrders = L . filter ( \ order -> orderId order ` L . notElem ` executedIds ) ( pendingOrders s ) } , () ) )
where
execute order =
case orderPrice order of
Market -> do
executeAtTick state order tick
return $ Just $ orderId order
Limit price -> executeLimitAt price order
_ -> return Nothing
executeLimitAt price order = case orderOperation order of
Buy -> if ( datatype tick == Price && price > value tick ) || ( datatype tick == BestOffer && price > value tick )
then do
executeAtTick state order $ tick { value = price }
return $ Just $ orderId order
else return Nothing
Sell -> if ( datatype tick == Price && price < value tick ) || ( datatype tick == BestBid && price < value tick )
then do
executeAtTick state order $ tick { value = price }
return $ Just $ orderId order
else return Nothing
pbSetNotificationCallback :: IORef PaperBrokerState -> Maybe ( Notification -> IO () ) -> IO ()
pbSetNotificationCallback state callback = atomicModifyIORef' state ( \ s -> ( s { notificationCallback = callback } , () ) )
mkTrade :: Tick -> Order -> UTCTime -> Trade
mkTrade tick order timestamp = Trade {
tradeOrderId = orderId order ,
tradePrice = value tick ,
tradeQuantity = orderQuantity order ,
tradeVolume = realFracToDecimal 10 ( fromIntegral $ orderQuantity order ) * value tick ,
tradeVolumeCurrency = " TEST " ,
tradeOperation = orderOperation order ,
tradeAccount = orderAccountId order ,
tradeSecurity = orderSecurity order ,
tradeTimestamp = timestamp ,
tradeSignalId = orderSignalId order }
maybeCall proj state arg = do
cb <- proj <$> readIORef state
case cb of
Just callback -> callback arg
Nothing -> return ()
executeAtTick state order tick = do
let newOrder = order { orderState = Executed }
let tradeVolume = realFracToDecimal 10 ( fromIntegral $ orderQuantity order ) * value tick
atomicModifyIORef' state ( \ s -> ( s { orders = M . insert ( orderId order ) newOrder $ orders s , cash = cash s - tradeVolume } , () ) )
debugM " PaperBroker " $ " Executed: " ++ show newOrder
ts <- getCurrentTime
maybeCall notificationCallback state $ TradeNotification $ mkTrade tick order ts
maybeCall notificationCallback state $ OrderNotification ( orderId order ) Executed
pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO ()
pbSubmitOrder state order = do
infoM " PaperBroker " $ " Submitted order: " ++ show order
case orderPrice order of
Market -> executeMarketOrder state order
Limit price -> submitLimitOrder state order
Limit price -> submitLimitOrder price state order
Stop price trigger -> submitStopOrder state order
StopMarket trigger -> submitStopMarketOrder state order
@ -83,15 +140,22 @@ pbSubmitOrder state order = do
@@ -83,15 +140,22 @@ pbSubmitOrder state order = do
Nothing -> let newOrder = order { orderState = OrderError } in
atomicModifyIORef' state ( \ s -> ( s { orders = M . insert ( orderId order ) newOrder $ orders s } , () ) )
Just tick -> let newOrder = order { orderState = Executed }
tradeVolume = ( realFracToDecimal 10 ( fromIntegral $ orderQuantity order ) * value tick ) in do
atomicModifyIORef' state ( \ s -> ( s { orders = M . insert ( orderId order ) newOrder $ orders s , cash = cash s - tradeVolume } , () ) )
debugM " PaperBroker " $ " Executed: " ++ show newOrder
ts <- getCurrentTime
maybeCall notificationCallback state $ TradeNotification $ mkTrade tick order ts
maybeCall notificationCallback state $ OrderNotification ( orderId order ) Executed
Just tick -> executeAtTick state order tick
submitLimitOrder price state order = do
tm <- tickMap <$> readIORef state
case M . lookup key tm of
Nothing -> do
let newOrder = order { orderState = Submitted }
atomicModifyIORef' state ( \ s -> ( s { orders = M . insert ( orderId order ) newOrder $ orders s } , () ) )
maybeCall notificationCallback state $ OrderNotification ( orderId order ) Submitted
Just tick ->
if ( ( orderOperation order == Buy ) && ( value tick < price ) ) || ( ( orderOperation order == Sell ) && ( value tick > price ) )
then executeAtTick state order tick
else do
let newOrder = order { orderState = Submitted }
atomicModifyIORef' state ( \ s -> ( s { orders = M . insert ( orderId order ) newOrder $ orders s , pendingOrders = newOrder : pendingOrders s } , () ) )
maybeCall notificationCallback state $ OrderNotification ( orderId order ) Submitted
submitLimitOrder state order = warningM " PaperBroker " $ " Not implemented: Submitted order: " ++ show order
submitStopOrder state order = warningM " PaperBroker " $ " Not implemented: Submitted order: " ++ show order
submitStopMarketOrder state order = warningM " PaperBroker " $ " Not implemented: Submitted order: " ++ show order
@ -100,28 +164,13 @@ pbSubmitOrder state order = do
@@ -100,28 +164,13 @@ pbSubmitOrder state order = do
Sell -> BestBid
key = TickMapKey ( orderSecurity order ) ( orderDatatype order )
maybeCall proj state arg = do
cb <- proj <$> readIORef state
case cb of
Just callback -> callback arg
Nothing -> return ()
mkTrade :: Tick -> Order -> UTCTime -> Trade
mkTrade tick order timestamp = Trade {
tradeOrderId = orderId order ,
tradePrice = value tick ,
tradeQuantity = orderQuantity order ,
tradeVolume = realFracToDecimal 10 ( fromIntegral $ orderQuantity order ) * value tick ,
tradeVolumeCurrency = " TEST " ,
tradeOperation = orderOperation order ,
tradeAccount = orderAccountId order ,
tradeSecurity = orderSecurity order ,
tradeTimestamp = timestamp ,
tradeSignalId = orderSignalId order }
pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO Bool
pbCancelOrder state order = undefined
pbCancelOrder state oid = do
atomicModifyIORef' state ( \ s -> ( s { pendingOrders = L . filter ( \ o -> orderId o /= oid ) ( pendingOrders s ) ,
orders = M . adjustWithKey ( \ k v -> v { orderState = Cancelled } ) oid ( orders s ) } , () ) )
maybeCall notificationCallback state $ OrderNotification oid Cancelled
return True
pbDestroyBroker :: IORef PaperBrokerState -> IO ()
pbDestroyBroker state = do