Browse Source

Protocol update: add client-identity to broker requests

master
Denis Tereshkin 8 years ago
parent
commit
cef63b0b3c
  1. 2
      libatrade.cabal
  2. 25
      src/ATrade/Broker/Client.hs
  3. 41
      src/ATrade/Broker/Protocol.hs
  4. 26
      src/ATrade/Broker/Server.hs

2
libatrade.cabal

@ -1,5 +1,5 @@
name: libatrade name: libatrade
version: 0.4.0.0 version: 0.5.0.0
synopsis: ATrade infrastructure core library synopsis: ATrade infrastructure core library
description: Please see README.md description: Please see README.md
homepage: https://github.com/asakul/libatrade.git homepage: https://github.com/asakul/libatrade.git

25
src/ATrade/Broker/Client.hs

@ -57,7 +57,6 @@ brokerClientThread socketIdentity ctx ep cmd resp comp killMv secParams = finall
else do else do
putMVar resp (ResponseError "Response error")) $ withSocket ctx Req (\sock -> do putMVar resp (ResponseError "Response error")) $ withSocket ctx Req (\sock -> do
setLinger (restrict 0) sock setLinger (restrict 0) sock
setIdentity(restrict socketIdentity) sock
debugM "Broker.Client" $ "Connecting to: " ++ show (T.unpack ep) debugM "Broker.Client" $ "Connecting to: " ++ show (T.unpack ep)
case cspCertificate secParams of case cspCertificate secParams of
Just clientCert -> zapApplyCertificate clientCert sock Just clientCert -> zapApplyCertificate clientCert sock
@ -97,9 +96,9 @@ startBrokerClient socketIdentity ctx endpoint secParams = do
tid = tid, tid = tid,
completionMvar = compMv, completionMvar = compMv,
killMvar = killMv, killMvar = killMv,
submitOrder = bcSubmitOrder idCounter cmdVar respVar, submitOrder = bcSubmitOrder (decodeUtf8 socketIdentity) idCounter cmdVar respVar,
cancelOrder = bcCancelOrder idCounter cmdVar respVar, cancelOrder = bcCancelOrder (decodeUtf8 socketIdentity) idCounter cmdVar respVar,
getNotifications = bcGetNotifications idCounter cmdVar respVar, getNotifications = bcGetNotifications (decodeUtf8 socketIdentity) idCounter cmdVar respVar,
cmdVar = cmdVar, cmdVar = cmdVar,
respVar = respVar respVar = respVar
} }
@ -109,30 +108,30 @@ stopBrokerClient handle = putMVar (killMvar handle) () >> yield >> killThread (t
nextId cnt = atomicModifyIORef' cnt (\v -> (v + 1, v)) nextId cnt = atomicModifyIORef' cnt (\v -> (v + 1, v))
bcSubmitOrder :: IORef Int64 -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> Order -> IO (Either T.Text OrderId) bcSubmitOrder :: ClientIdentity -> IORef Int64 -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> Order -> IO (Either T.Text OrderId)
bcSubmitOrder idCounter cmdVar respVar order = do bcSubmitOrder clientIdentity idCounter cmdVar respVar order = do
sqnum <- nextId idCounter sqnum <- nextId idCounter
putMVar cmdVar (RequestSubmitOrder sqnum order) putMVar cmdVar (RequestSubmitOrder sqnum clientIdentity order)
resp <- takeMVar respVar resp <- takeMVar respVar
case resp of case resp of
(ResponseOrderSubmitted oid) -> return $ Right oid (ResponseOrderSubmitted oid) -> return $ Right oid
(ResponseError msg) -> return $ Left msg (ResponseError msg) -> return $ Left msg
_ -> return $ Left "Unknown error" _ -> return $ Left "Unknown error"
bcCancelOrder :: IORef RequestSqnum -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> OrderId -> IO (Either T.Text ()) bcCancelOrder :: ClientIdentity -> IORef RequestSqnum -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> OrderId -> IO (Either T.Text ())
bcCancelOrder idCounter cmdVar respVar orderId = do bcCancelOrder clientIdentity idCounter cmdVar respVar orderId = do
sqnum <- nextId idCounter sqnum <- nextId idCounter
putMVar cmdVar (RequestCancelOrder sqnum orderId) putMVar cmdVar (RequestCancelOrder sqnum clientIdentity orderId)
resp <- takeMVar respVar resp <- takeMVar respVar
case resp of case resp of
(ResponseOrderCancelled oid) -> return $ Right () (ResponseOrderCancelled oid) -> return $ Right ()
(ResponseError msg) -> return $ Left msg (ResponseError msg) -> return $ Left msg
_ -> return $ Left "Unknown error" _ -> return $ Left "Unknown error"
bcGetNotifications :: IORef RequestSqnum -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> IO (Either T.Text [Notification]) bcGetNotifications :: ClientIdentity -> IORef RequestSqnum -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> IO (Either T.Text [Notification])
bcGetNotifications idCounter cmdVar respVar = do bcGetNotifications clientIdentity idCounter cmdVar respVar = do
sqnum <- nextId idCounter sqnum <- nextId idCounter
putMVar cmdVar (RequestNotifications sqnum) putMVar cmdVar (RequestNotifications sqnum clientIdentity)
resp <- takeMVar respVar resp <- takeMVar respVar
case resp of case resp of
(ResponseNotifications ns) -> return $ Right ns (ResponseNotifications ns) -> return $ Right ns

41
src/ATrade/Broker/Protocol.hs

@ -8,13 +8,15 @@ module ATrade.Broker.Protocol (
RequestSqnum(..), RequestSqnum(..),
requestSqnum, requestSqnum,
TradeSinkMessage(..), TradeSinkMessage(..),
mkTradeMessage mkTradeMessage,
ClientIdentity(..)
) where ) where
import Control.Error.Util import Control.Error.Util
import qualified Data.HashMap.Strict as HM import qualified Data.HashMap.Strict as HM
import qualified Data.Text as T import qualified Data.Text as T
import Data.Text.Format import Data.Text.Format
import Data.Text.Encoding
import Data.Aeson import Data.Aeson
import Data.Aeson.Types hiding (parse) import Data.Aeson.Types hiding (parse)
import Data.Int import Data.Int
@ -23,40 +25,45 @@ import Data.Time.Calendar
import ATrade.Types import ATrade.Types
import Text.Parsec import Text.Parsec
type ClientIdentity = T.Text
type RequestSqnum = Int64 type RequestSqnum = Int64
data BrokerServerRequest = RequestSubmitOrder RequestSqnum Order data BrokerServerRequest = RequestSubmitOrder RequestSqnum ClientIdentity Order
| RequestCancelOrder RequestSqnum OrderId | RequestCancelOrder RequestSqnum ClientIdentity OrderId
| RequestNotifications RequestSqnum | RequestNotifications RequestSqnum ClientIdentity
deriving (Eq, Show) deriving (Eq, Show)
requestSqnum :: BrokerServerRequest -> RequestSqnum requestSqnum :: BrokerServerRequest -> RequestSqnum
requestSqnum (RequestSubmitOrder sqnum _) = sqnum requestSqnum (RequestSubmitOrder sqnum _ _) = sqnum
requestSqnum (RequestCancelOrder sqnum _) = sqnum requestSqnum (RequestCancelOrder sqnum _ _) = sqnum
requestSqnum (RequestNotifications sqnum) = sqnum requestSqnum (RequestNotifications sqnum _) = sqnum
instance FromJSON BrokerServerRequest where instance FromJSON BrokerServerRequest where
parseJSON = withObject "object" (\obj -> do parseJSON = withObject "object" (\obj -> do
sqnum <- obj .: "request-sqnum" sqnum <- obj .: "request-sqnum"
parseRequest sqnum obj) clientIdentity <- obj .: "client-identity"
parseRequest sqnum clientIdentity obj)
where where
parseRequest :: RequestSqnum -> Object -> Parser BrokerServerRequest parseRequest :: RequestSqnum -> ClientIdentity -> Object -> Parser BrokerServerRequest
parseRequest sqnum obj parseRequest sqnum clientIdentity obj
| HM.member "order" obj = do | HM.member "order" obj = do
order <- obj .: "order" order <- obj .: "order"
RequestSubmitOrder sqnum <$> parseJSON order RequestSubmitOrder sqnum clientIdentity <$> parseJSON order
| HM.member "cancel-order" obj = do | HM.member "cancel-order" obj = do
orderId <- obj .: "cancel-order" orderId <- obj .: "cancel-order"
RequestCancelOrder sqnum <$> parseJSON orderId RequestCancelOrder sqnum clientIdentity <$> parseJSON orderId
| HM.member "request-notifications" obj = return (RequestNotifications sqnum) | HM.member "request-notifications" obj = return (RequestNotifications sqnum clientIdentity)
parseRequest _ _ = fail "Invalid request object" parseRequest _ _ _ = fail "Invalid request object"
instance ToJSON BrokerServerRequest where instance ToJSON BrokerServerRequest where
toJSON (RequestSubmitOrder sqnum order) = object ["request-sqnum" .= sqnum, toJSON (RequestSubmitOrder sqnum clientIdentity order) = object ["request-sqnum" .= sqnum,
"client-identity" .= clientIdentity,
"order" .= order ] "order" .= order ]
toJSON (RequestCancelOrder sqnum oid) = object ["request-sqnum" .= sqnum, toJSON (RequestCancelOrder sqnum clientIdentity oid) = object ["request-sqnum" .= sqnum,
"client-identity" .= clientIdentity,
"cancel-order" .= oid ] "cancel-order" .= oid ]
toJSON (RequestNotifications sqnum) = object ["request-sqnum" .= sqnum, toJSON (RequestNotifications sqnum clientIdentity) = object ["request-sqnum" .= sqnum,
"client-identity" .= clientIdentity,
"request-notifications" .= ("" :: T.Text) ] "request-notifications" .= ("" :: T.Text) ]
data BrokerServerResponse = ResponseOrderSubmitted OrderId data BrokerServerResponse = ResponseOrderSubmitted OrderId

26
src/ATrade/Broker/Server.hs

@ -44,9 +44,9 @@ data BrokerInterface = BrokerInterface {
data BrokerServerState = BrokerServerState { data BrokerServerState = BrokerServerState {
bsSocket :: Socket Router, bsSocket :: Socket Router,
orderToBroker :: M.Map OrderId BrokerInterface, orderToBroker :: M.Map OrderId BrokerInterface,
orderMap :: M.Map OrderId PeerId, -- Matches 0mq client identities with corresponding orders orderMap :: M.Map OrderId ClientIdentity, -- Matches 0mq client identities with corresponding orders
lastPacket :: M.Map PeerId (RequestSqnum, BrokerServerResponse), lastPacket :: M.Map PeerId (RequestSqnum, BrokerServerResponse),
pendingNotifications :: M.Map PeerId [Notification], pendingNotifications :: M.Map ClientIdentity [Notification],
brokers :: [BrokerInterface], brokers :: [BrokerInterface],
completionMvar :: MVar (), completionMvar :: MVar (),
killMvar :: MVar (), killMvar :: MVar (),
@ -101,14 +101,14 @@ notificationCallback state n = do
_ -> return False _ -> return False
orders <- orderMap <$> readIORef state orders <- orderMap <$> readIORef state
case M.lookup (notificationOrderId n) orders of case M.lookup (notificationOrderId n) orders of
Just peerId -> addNotification peerId n Just clientIdentity -> addNotification clientIdentity n
Nothing -> warningM "Broker.Server" "Notification: unknown order" Nothing -> warningM "Broker.Server" "Notification: unknown order"
where where
addNotification peerId n = atomicMapIORef state (\s -> addNotification clientIdentity n = atomicMapIORef state (\s ->
case M.lookup peerId . pendingNotifications $ s of case M.lookup clientIdentity . pendingNotifications $ s of
Just ns -> s { pendingNotifications = M.insert peerId (n : ns) (pendingNotifications s)} Just ns -> s { pendingNotifications = M.insert clientIdentity (n : ns) (pendingNotifications s)}
Nothing -> s { pendingNotifications = M.insert peerId [n] (pendingNotifications s)}) Nothing -> s { pendingNotifications = M.insert clientIdentity [n] (pendingNotifications s)})
tradeSinkHandler :: Context -> IORef BrokerServerState -> [TradeSink] -> IO () tradeSinkHandler :: Context -> IORef BrokerServerState -> [TradeSink] -> IO ()
tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $ tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $
@ -173,32 +173,32 @@ brokerServerThread state = finally brokerServerThread' cleanup
handleMessage peerId request = do handleMessage peerId request = do
bros <- brokers <$> readIORef state bros <- brokers <$> readIORef state
case request of case request of
RequestSubmitOrder sqnum order -> do RequestSubmitOrder sqnum clientIdentity order -> do
debugM "Broker.Server" $ "Request: submit order:" ++ show request debugM "Broker.Server" $ "Request: submit order:" ++ show request
case findBrokerForAccount (orderAccountId order) bros of case findBrokerForAccount (orderAccountId order) bros of
Just bro -> do Just bro -> do
oid <- nextOrderId oid <- nextOrderId
atomicMapIORef state (\s -> s { atomicMapIORef state (\s -> s {
orderToBroker = M.insert oid bro (orderToBroker s), orderToBroker = M.insert oid bro (orderToBroker s),
orderMap = M.insert oid peerId (orderMap s) }) orderMap = M.insert oid clientIdentity (orderMap s) })
submitOrder bro order { orderId = oid } submitOrder bro order { orderId = oid }
return $ ResponseOrderSubmitted oid return $ ResponseOrderSubmitted oid
Nothing -> do Nothing -> do
debugM "Broker.Server" $ "Unknown account: " ++ T.unpack (orderAccountId order) debugM "Broker.Server" $ "Unknown account: " ++ T.unpack (orderAccountId order)
return $ ResponseError "Unknown account" return $ ResponseError "Unknown account"
RequestCancelOrder sqnum oid -> do RequestCancelOrder sqnum clientIdentity oid -> do
m <- orderToBroker <$> readIORef state m <- orderToBroker <$> readIORef state
case M.lookup oid m of case M.lookup oid m of
Just bro -> do Just bro -> do
cancelOrder bro oid cancelOrder bro oid
return $ ResponseOrderCancelled oid return $ ResponseOrderCancelled oid
Nothing -> return $ ResponseError "Unknown order" Nothing -> return $ ResponseError "Unknown order"
RequestNotifications sqnum -> do RequestNotifications sqnum clientIdentity -> do
maybeNs <- M.lookup peerId . pendingNotifications <$> readIORef state maybeNs <- M.lookup clientIdentity . pendingNotifications <$> readIORef state
case maybeNs of case maybeNs of
Just ns -> do Just ns -> do
atomicMapIORef state (\s -> s { pendingNotifications = M.insert peerId [] (pendingNotifications s)}) atomicMapIORef state (\s -> s { pendingNotifications = M.insert clientIdentity [] (pendingNotifications s)})
return $ ResponseNotifications . L.reverse $ ns return $ ResponseNotifications . L.reverse $ ns
Nothing -> return $ ResponseNotifications [] Nothing -> return $ ResponseNotifications []

Loading…
Cancel
Save