ATrade core infrastructure
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

242 lines
9.6 KiB

{-# LANGUAGE OverloadedStrings #-}
module ATrade.Broker.Server (
startBrokerServer,
stopBrokerServer,
BrokerInterface(..)
) where
import ATrade.Types
import ATrade.Broker.Protocol
import System.ZMQ4
import System.ZMQ4.ZAP
import Data.List.NonEmpty
import qualified Data.Map as M
import qualified Data.ByteString as B hiding (putStrLn)
import qualified Data.ByteString.Lazy as BL hiding (putStrLn)
import qualified Data.Text as T
import qualified Data.List as L
import Data.Aeson
import Data.Maybe
import Data.Time.Clock
import Data.IORef
import Control.Concurrent hiding (readChan, writeChan)
import Control.Concurrent.BoundedChan
import Control.Exception
import Control.Monad
import Control.Monad.Loops
import System.Log.Logger
import System.Timeout
9 years ago
import ATrade.Util
newtype OrderIdGenerator = IO OrderId
type PeerId = B.ByteString
data BrokerInterface = BrokerInterface {
accounts :: [T.Text],
setNotificationCallback :: Maybe (Notification -> IO()) -> IO (),
submitOrder :: Order -> IO (),
cancelOrder :: OrderId -> IO Bool,
stopBroker :: IO ()
}
data BrokerServerState = BrokerServerState {
bsSocket :: Socket Router,
orderToBroker :: M.Map OrderId BrokerInterface,
orderMap :: M.Map OrderId PeerId, -- Matches 0mq client identities with corresponding orders
lastPacket :: M.Map PeerId (RequestSqnum, BrokerServerResponse),
pendingNotifications :: M.Map PeerId [Notification],
brokers :: [BrokerInterface],
completionMvar :: MVar (),
killMvar :: MVar (),
orderIdCounter :: OrderId,
tradeSink :: BoundedChan Trade
}
data BrokerServerHandle = BrokerServerHandle ThreadId ThreadId (MVar ()) (MVar ())
9 years ago
startBrokerServer :: [BrokerInterface] -> Context -> T.Text -> T.Text -> ServerSecurityParams -> IO BrokerServerHandle
startBrokerServer brokers c ep tradeSinkEp params = do
sock <- socket c Router
setLinger (restrict 0) sock
9 years ago
case sspDomain params of
Just domain -> setZapDomain domain sock
Nothing -> return ()
case sspCertificate params of
Just cert -> do
setCurveServer True sock
zapApplyCertificate cert sock
Nothing -> return ()
bind sock (T.unpack ep)
tid <- myThreadId
compMv <- newEmptyMVar
killMv <- newEmptyMVar
tsChan <- newBoundedChan 100
state <- newIORef BrokerServerState {
bsSocket = sock,
orderMap = M.empty,
orderToBroker = M.empty,
lastPacket = M.empty,
pendingNotifications = M.empty,
brokers = brokers,
completionMvar = compMv,
killMvar = killMv,
orderIdCounter = 1,
tradeSink = tsChan
}
mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers
debugM "Broker.Server" "Forking broker server thread"
BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv
notificationCallback :: IORef BrokerServerState -> Notification -> IO ()
notificationCallback state n = do
chan <- tradeSink <$> readIORef state
case n of
TradeNotification trade -> tryWriteChan chan trade
_ -> return False
orders <- orderMap <$> readIORef state
case M.lookup (notificationOrderId n) orders of
Just peerId -> addNotification peerId n
Nothing -> warningM "Broker.Server" "Notification: unknown order"
where
9 years ago
addNotification peerId n = atomicMapIORef state (\s ->
case M.lookup peerId . pendingNotifications $ s of
9 years ago
Just ns -> s { pendingNotifications = M.insert peerId (n : ns) (pendingNotifications s)}
Nothing -> s { pendingNotifications = M.insert peerId [n] (pendingNotifications s)})
tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO ()
tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $
whileM_ (not <$> wasKilled) $
handle (\e -> do
warningM "Broker.Server" $ "Trade sink: exception: " ++ (show (e :: SomeException)) ++ "; isZMQ: " ++ show (isZMQError e)
when (isZMQError e) $ do
debugM "Broker.Server" "Rethrowing exception"
throwIO e) $ withSocket c Dealer (\sock -> do
debugM "Broker.Server" "Connecting trade sink socket"
chan <- tradeSink <$> readIORef state
connect sock $ T.unpack tradeSinkEp
timeoutMv <- newIORef False
threadDelay 1000000
whileM_ (andM [not <$> wasKilled, readIORef timeoutMv]) $ do
maybeTrade <- tryReadChan chan
case maybeTrade of
Just trade -> sendMulti sock $ B.empty :| [encodeTrade trade]
Nothing -> do
threadDelay 1000000
sendMulti sock $ B.empty :| [BL.toStrict $ encode TradeSinkHeartBeat]
events <- poll 5000 [Sock sock [In] Nothing]
if not . L.null . L.head $ events
then void . receive $ sock -- anything will do
else do
writeIORef timeoutMv True
warningM "Broker.Server" "Trade sink timeout")
where
isZMQError e = "ZMQError" `L.isPrefixOf` show e
wasKilled = fmap killMvar (readIORef state) >>= fmap isJust . tryReadMVar
encodeTrade :: Trade -> B.ByteString
encodeTrade = BL.toStrict . encode . convertTrade
convertTrade trade = TradeSinkTrade {
tsAccountId = tradeAccount trade,
tsSecurity = tradeSecurity trade,
tsPrice = fromRational . toRational . tradePrice $ trade,
tsQuantity = fromInteger $ tradeQuantity trade,
tsVolume = fromRational . toRational . tradeVolume $ trade,
tsCurrency = tradeVolumeCurrency trade,
tsOperation = tradeOperation trade,
tsExecutionTime = tradeTimestamp trade,
tsSignalId = tradeSignalId trade
}
brokerServerThread :: IORef BrokerServerState -> IO ()
brokerServerThread state = finally brokerServerThread' cleanup
where
brokerServerThread' = whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ do
sock <- bsSocket <$> readIORef state
events <- poll 100 [Sock sock [In] Nothing]
unless (null . L.head $ events) $ do
msg <- receiveMulti sock
case msg of
[peerId, _, payload] ->
case decode . BL.fromStrict $ payload of
Just request -> do
let sqnum = requestSqnum request
-- Here, we should check if previous packet sequence number is the same
-- If it is, we should resend previous response
lastPackMap <- lastPacket <$> readIORef state
case shouldResend sqnum peerId lastPackMap of
Just response -> sendMessage sock peerId response -- Resend
Nothing -> do
-- Handle incoming request, send response
response <- handleMessage peerId request
sendMessage sock peerId response
-- and store response in case we'll need to resend it
atomicMapIORef state (\s -> s { lastPacket = M.insert peerId (sqnum, response) (lastPacket s)})
Nothing -> do
-- If we weren't able to parse request, we should send error
-- but shouldn't update lastPacket
let response = ResponseError "Invalid request"
sendMessage sock peerId response
_ -> warningM "Broker.Server" ("Invalid packet received: " ++ show msg)
shouldResend sqnum peerId lastPackMap = case M.lookup peerId lastPackMap of
Just (lastSqnum, response) -> if sqnum == lastSqnum
then Just response
else Nothing
Nothing -> Nothing
cleanup = do
sock <- bsSocket <$> readIORef state
close sock
mv <- completionMvar <$> readIORef state
putMVar mv ()
handleMessage :: PeerId -> BrokerServerRequest -> IO BrokerServerResponse
handleMessage peerId request = do
bros <- brokers <$> readIORef state
case request of
9 years ago
RequestSubmitOrder sqnum order -> do
debugM "Broker.Server" $ "Request: submit order:" ++ show request
case findBrokerForAccount (orderAccountId order) bros of
Just bro -> do
oid <- nextOrderId
9 years ago
atomicMapIORef state (\s -> s {
orderToBroker = M.insert oid bro (orderToBroker s),
9 years ago
orderMap = M.insert oid peerId (orderMap s) })
submitOrder bro order { orderId = oid }
return $ ResponseOrderSubmitted oid
9 years ago
Nothing -> do
debugM "Broker.Server" $ "Unknown account: " ++ T.unpack (orderAccountId order)
return $ ResponseError "Unknown account"
RequestCancelOrder sqnum oid -> do
m <- orderToBroker <$> readIORef state
case M.lookup oid m of
Just bro -> do
cancelOrder bro oid
return $ ResponseOrderCancelled oid
Nothing -> return $ ResponseError "Unknown order"
RequestNotifications sqnum -> do
maybeNs <- M.lookup peerId . pendingNotifications <$> readIORef state
case maybeNs of
Just ns -> do
9 years ago
atomicMapIORef state (\s -> s { pendingNotifications = M.insert peerId [] (pendingNotifications s)})
return $ ResponseNotifications . L.reverse $ ns
Nothing -> return $ ResponseNotifications []
sendMessage sock peerId resp = sendMulti sock (peerId :| [B.empty, BL.toStrict . encode $ resp])
findBrokerForAccount account = L.find (L.elem account . accounts)
nextOrderId = atomicModifyIORef' state (\s -> ( s {orderIdCounter = 1 + orderIdCounter s}, orderIdCounter s))
stopBrokerServer :: BrokerServerHandle -> IO ()
stopBrokerServer (BrokerServerHandle tid tstid compMv killMv) = do
putMVar killMv ()
killThread tstid
yield
readMVar compMv