@ -18,11 +18,16 @@ import ATrade.Broker.Protocol (BrokerServerRequest (..),
@@ -18,11 +18,16 @@ import ATrade.Broker.Protocol (BrokerServerRequest (..),
RequestSqnum ,
getNotificationSqnum ,
nextSqnum , requestSqnum )
import ATrade.Logging ( Message ( Message ) ,
Severity ( Debug , Warning ) ,
logWith )
import ATrade.Logging ( Severity ( Info ) )
import ATrade.Types ( Order ( orderAccountId , orderId ) ,
OrderId ,
ServerSecurityParams ( sspCertificate , sspDomain ) ,
Trade ( tradeOrderId ) )
import ATrade.Util ( atomicMapIORef )
import Colog ( LogAction )
import Control.Concurrent ( MVar , ThreadId , forkIO ,
killThread , myThreadId ,
newEmptyMVar , putMVar ,
@ -93,8 +98,9 @@ startBrokerServer :: [BrokerBackend] ->
@@ -93,8 +98,9 @@ startBrokerServer :: [BrokerBackend] ->
T . Text ->
[ TradeSink ] ->
ServerSecurityParams ->
LogAction IO Message ->
IO BrokerServerHandle
startBrokerServer brokers c ep notificationsEp tradeSinks params = do
startBrokerServer brokers c ep notificationsEp tradeSinks params logger = do
sock <- socket c Router
notificationsSock <- socket c Pub
setLinger ( restrict 0 ) sock
@ -136,14 +142,19 @@ startBrokerServer brokers c ep notificationsEp tradeSinks params = do
@@ -136,14 +142,19 @@ startBrokerServer brokers c ep notificationsEp tradeSinks params = do
orderIdCounter = 1 ,
tradeSink = tsChan
}
mapM_ ( \ bro -> setNotificationCallback bro ( Just $ notificationCallback state ) ) brokers
mapM_ ( \ bro -> setNotificationCallback bro ( Just $ notificationCallback state logger ) ) brokers
debugM " Broker.Server " " Forking broker server thread "
BrokerServerHandle <$> forkIO ( brokerServerThread state ) <*> forkIO ( tradeSinkHandler c state tradeSinks ) <*> pure compMv <*> pure killMv
log Info " Broker.Server " " Forking broker server thread "
BrokerServerHandle <$> forkIO ( brokerServerThread state logger ) <*> forkIO ( tradeSinkHandler c state tradeSinks ) <*> pure compMv <*> pure killMv
where
log = logWith logger
notificationCallback :: IORef BrokerServerState -> BrokerBackendNotification -> IO ()
notificationCallback state n = do
debugM " Broker.Server " $ " Notification: " ++ show n
notificationCallback :: IORef BrokerServerState ->
LogAction IO Message ->
BrokerBackendNotification ->
IO ()
notificationCallback state logger n = do
log Debug " Broker.Server " $ " Notification: " <> ( T . pack . show ) n
chan <- tradeSink <$> readIORef state
case n of
BackendTradeNotification trade -> tryWriteChan chan trade
@ -157,11 +168,12 @@ notificationCallback state n = do
@@ -157,11 +168,12 @@ notificationCallback state n = do
case n of
BackendTradeNotification trade -> addNotification clientIdentity ( TradeNotification sqnum trade { tradeOrderId = localOrderId } )
BackendOrderNotification globalOrderId newstate -> addNotification clientIdentity ( OrderNotification sqnum localOrderId newstate )
Nothing -> warningM " Broker.Server " " Notification: unknown order "
Nothing -> log Warning " Broker.Server " " Notification: unknown order "
where
log = logWith logger
addNotification clientIdentity n = do
debugM " Broker.Server " $ " Sending notification to client [ " <> T . unpack clientIdentity <> " ] "
log Debug " Broker.Server " $ " Sending notification to client [ " <> clientIdentity <> " ] "
atomicMapIORef state ( \ s ->
case M . lookup clientIdentity . pendingNotifications $ s of
Just ns -> s { pendingNotifications = M . insert clientIdentity ( n : ns ) ( pendingNotifications s ) }
@ -178,12 +190,15 @@ tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $
@@ -178,12 +190,15 @@ tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $
Just trade -> mapM_ ( \ x -> x trade ) tradeSinks
Nothing -> threadDelay 100000
where
wasKilled = isJust <$> ( killMvar <$> readIORef state >>= tryReadMVar )
wasKilled = isJust <$> ( readIORef state >>= tryReadMVar . killMv ar )
brokerServerThread :: IORef BrokerServerState -> IO ()
brokerServerThread state = finally brokerServerThread' cleanup
brokerServerThread :: IORef BrokerServerState ->
LogAction IO Message ->
IO ()
brokerServerThread state logger = finally brokerServerThread' cleanup
where
log = logWith logger
brokerServerThread' = whileM_ ( fmap killMvar ( readIORef state ) >>= fmap isNothing . tryReadMVar ) $ do
sock <- bsSocket <$> readIORef state
events <- poll 100 [ Sock sock [ In ] Nothing ]
@ -199,7 +214,7 @@ brokerServerThread state = finally brokerServerThread' cleanup
@@ -199,7 +214,7 @@ brokerServerThread state = finally brokerServerThread' cleanup
lastPackMap <- lastPacket <$> readIORef state
case shouldResend sqnum peerId lastPackMap of
Just response -> do
debugM " Broker.Server " $ " Resending packet for peerId: " ++ show peerId
log Debug " Broker.Server " $ " Resending packet for peerId: " <> ( T . pack . show ) peerId
sendMessage sock peerId response -- Resend
atomicMapIORef state ( \ s -> s { lastPacket = M . delete peerId ( lastPacket s ) } )
Nothing -> do
@ -213,7 +228,7 @@ brokerServerThread state = finally brokerServerThread' cleanup
@@ -213,7 +228,7 @@ brokerServerThread state = finally brokerServerThread' cleanup
-- but shouldn't update lastPacket
let response = ResponseError $ " Invalid request: " <> T . pack errmsg
sendMessage sock peerId response
_ -> warningM " Broker.Server " ( " Invalid packet received: " ++ show msg )
_ -> log Warning " Broker.Server " ( " Invalid packet received: " <> ( T . pack . show ) msg )
shouldResend sqnum peerId lastPackMap = case M . lookup peerId lastPackMap of
Just ( lastSqnum , response ) -> if sqnum == lastSqnum
@ -233,7 +248,7 @@ brokerServerThread state = finally brokerServerThread' cleanup
@@ -233,7 +248,7 @@ brokerServerThread state = finally brokerServerThread' cleanup
bros <- brokers <$> readIORef state
case request of
RequestSubmitOrder sqnum clientIdentity order -> do
debugM " Broker.Server " $ " Request: submit order: " ++ show request
log Debug " Broker.Server " $ " Request: submit order: " <> ( T . pack . show ) request
case findBrokerForAccount ( orderAccountId order ) bros of
Just bro -> do
globalOrderId <- nextOrderId
@ -245,7 +260,7 @@ brokerServerThread state = finally brokerServerThread' cleanup
@@ -245,7 +260,7 @@ brokerServerThread state = finally brokerServerThread' cleanup
return ResponseOk
Nothing -> do
warningM " Broker.Server " $ " Unknown account: " ++ T . unpack ( orderAccountId order )
log Warning " Broker.Server " $ " Unknown account: " <> ( orderAccountId order )
return $ ResponseError " Unknown account "
RequestCancelOrder sqnum clientIdentity localOrderId -> do
m <- orderToBroker <$> readIORef state