diff --git a/libatrade.cabal b/libatrade.cabal index 918198f..0342a7e 100644 --- a/libatrade.cabal +++ b/libatrade.cabal @@ -1,5 +1,5 @@ name: libatrade -version: 0.4.0.0 +version: 0.5.0.0 synopsis: ATrade infrastructure core library description: Please see README.md homepage: https://github.com/asakul/libatrade.git diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index a0dc950..ba5332b 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -57,7 +57,6 @@ brokerClientThread socketIdentity ctx ep cmd resp comp killMv secParams = finall else do putMVar resp (ResponseError "Response error")) $ withSocket ctx Req (\sock -> do setLinger (restrict 0) sock - setIdentity(restrict socketIdentity) sock debugM "Broker.Client" $ "Connecting to: " ++ show (T.unpack ep) case cspCertificate secParams of Just clientCert -> zapApplyCertificate clientCert sock @@ -97,9 +96,9 @@ startBrokerClient socketIdentity ctx endpoint secParams = do tid = tid, completionMvar = compMv, killMvar = killMv, - submitOrder = bcSubmitOrder idCounter cmdVar respVar, - cancelOrder = bcCancelOrder idCounter cmdVar respVar, - getNotifications = bcGetNotifications idCounter cmdVar respVar, + submitOrder = bcSubmitOrder (decodeUtf8 socketIdentity) idCounter cmdVar respVar, + cancelOrder = bcCancelOrder (decodeUtf8 socketIdentity) idCounter cmdVar respVar, + getNotifications = bcGetNotifications (decodeUtf8 socketIdentity) idCounter cmdVar respVar, cmdVar = cmdVar, respVar = respVar } @@ -109,30 +108,30 @@ stopBrokerClient handle = putMVar (killMvar handle) () >> yield >> killThread (t nextId cnt = atomicModifyIORef' cnt (\v -> (v + 1, v)) -bcSubmitOrder :: IORef Int64 -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> Order -> IO (Either T.Text OrderId) -bcSubmitOrder idCounter cmdVar respVar order = do +bcSubmitOrder :: ClientIdentity -> IORef Int64 -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> Order -> IO (Either T.Text OrderId) +bcSubmitOrder clientIdentity idCounter cmdVar respVar order = do sqnum <- nextId idCounter - putMVar cmdVar (RequestSubmitOrder sqnum order) + putMVar cmdVar (RequestSubmitOrder sqnum clientIdentity order) resp <- takeMVar respVar case resp of (ResponseOrderSubmitted oid) -> return $ Right oid (ResponseError msg) -> return $ Left msg _ -> return $ Left "Unknown error" -bcCancelOrder :: IORef RequestSqnum -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> OrderId -> IO (Either T.Text ()) -bcCancelOrder idCounter cmdVar respVar orderId = do +bcCancelOrder :: ClientIdentity -> IORef RequestSqnum -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> OrderId -> IO (Either T.Text ()) +bcCancelOrder clientIdentity idCounter cmdVar respVar orderId = do sqnum <- nextId idCounter - putMVar cmdVar (RequestCancelOrder sqnum orderId) + putMVar cmdVar (RequestCancelOrder sqnum clientIdentity orderId) resp <- takeMVar respVar case resp of (ResponseOrderCancelled oid) -> return $ Right () (ResponseError msg) -> return $ Left msg _ -> return $ Left "Unknown error" -bcGetNotifications :: IORef RequestSqnum -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> IO (Either T.Text [Notification]) -bcGetNotifications idCounter cmdVar respVar = do +bcGetNotifications :: ClientIdentity -> IORef RequestSqnum -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> IO (Either T.Text [Notification]) +bcGetNotifications clientIdentity idCounter cmdVar respVar = do sqnum <- nextId idCounter - putMVar cmdVar (RequestNotifications sqnum) + putMVar cmdVar (RequestNotifications sqnum clientIdentity) resp <- takeMVar respVar case resp of (ResponseNotifications ns) -> return $ Right ns diff --git a/src/ATrade/Broker/Protocol.hs b/src/ATrade/Broker/Protocol.hs index 98a9ef6..ebc4c55 100644 --- a/src/ATrade/Broker/Protocol.hs +++ b/src/ATrade/Broker/Protocol.hs @@ -8,13 +8,15 @@ module ATrade.Broker.Protocol ( RequestSqnum(..), requestSqnum, TradeSinkMessage(..), - mkTradeMessage + mkTradeMessage, + ClientIdentity(..) ) where import Control.Error.Util import qualified Data.HashMap.Strict as HM import qualified Data.Text as T import Data.Text.Format +import Data.Text.Encoding import Data.Aeson import Data.Aeson.Types hiding (parse) import Data.Int @@ -23,40 +25,45 @@ import Data.Time.Calendar import ATrade.Types import Text.Parsec +type ClientIdentity = T.Text type RequestSqnum = Int64 -data BrokerServerRequest = RequestSubmitOrder RequestSqnum Order - | RequestCancelOrder RequestSqnum OrderId - | RequestNotifications RequestSqnum +data BrokerServerRequest = RequestSubmitOrder RequestSqnum ClientIdentity Order + | RequestCancelOrder RequestSqnum ClientIdentity OrderId + | RequestNotifications RequestSqnum ClientIdentity deriving (Eq, Show) requestSqnum :: BrokerServerRequest -> RequestSqnum -requestSqnum (RequestSubmitOrder sqnum _) = sqnum -requestSqnum (RequestCancelOrder sqnum _) = sqnum -requestSqnum (RequestNotifications sqnum) = sqnum +requestSqnum (RequestSubmitOrder sqnum _ _) = sqnum +requestSqnum (RequestCancelOrder sqnum _ _) = sqnum +requestSqnum (RequestNotifications sqnum _) = sqnum instance FromJSON BrokerServerRequest where parseJSON = withObject "object" (\obj -> do sqnum <- obj .: "request-sqnum" - parseRequest sqnum obj) + clientIdentity <- obj .: "client-identity" + parseRequest sqnum clientIdentity obj) where - parseRequest :: RequestSqnum -> Object -> Parser BrokerServerRequest - parseRequest sqnum obj + parseRequest :: RequestSqnum -> ClientIdentity -> Object -> Parser BrokerServerRequest + parseRequest sqnum clientIdentity obj | HM.member "order" obj = do order <- obj .: "order" - RequestSubmitOrder sqnum <$> parseJSON order + RequestSubmitOrder sqnum clientIdentity <$> parseJSON order | HM.member "cancel-order" obj = do orderId <- obj .: "cancel-order" - RequestCancelOrder sqnum <$> parseJSON orderId - | HM.member "request-notifications" obj = return (RequestNotifications sqnum) - parseRequest _ _ = fail "Invalid request object" + RequestCancelOrder sqnum clientIdentity <$> parseJSON orderId + | HM.member "request-notifications" obj = return (RequestNotifications sqnum clientIdentity) + parseRequest _ _ _ = fail "Invalid request object" instance ToJSON BrokerServerRequest where - toJSON (RequestSubmitOrder sqnum order) = object ["request-sqnum" .= sqnum, + toJSON (RequestSubmitOrder sqnum clientIdentity order) = object ["request-sqnum" .= sqnum, + "client-identity" .= clientIdentity, "order" .= order ] - toJSON (RequestCancelOrder sqnum oid) = object ["request-sqnum" .= sqnum, + toJSON (RequestCancelOrder sqnum clientIdentity oid) = object ["request-sqnum" .= sqnum, + "client-identity" .= clientIdentity, "cancel-order" .= oid ] - toJSON (RequestNotifications sqnum) = object ["request-sqnum" .= sqnum, + toJSON (RequestNotifications sqnum clientIdentity) = object ["request-sqnum" .= sqnum, + "client-identity" .= clientIdentity, "request-notifications" .= ("" :: T.Text) ] data BrokerServerResponse = ResponseOrderSubmitted OrderId diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index 19bf277..6d2cac5 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -44,9 +44,9 @@ data BrokerInterface = BrokerInterface { data BrokerServerState = BrokerServerState { bsSocket :: Socket Router, orderToBroker :: M.Map OrderId BrokerInterface, - orderMap :: M.Map OrderId PeerId, -- Matches 0mq client identities with corresponding orders + orderMap :: M.Map OrderId ClientIdentity, -- Matches 0mq client identities with corresponding orders lastPacket :: M.Map PeerId (RequestSqnum, BrokerServerResponse), - pendingNotifications :: M.Map PeerId [Notification], + pendingNotifications :: M.Map ClientIdentity [Notification], brokers :: [BrokerInterface], completionMvar :: MVar (), killMvar :: MVar (), @@ -101,14 +101,14 @@ notificationCallback state n = do _ -> return False orders <- orderMap <$> readIORef state case M.lookup (notificationOrderId n) orders of - Just peerId -> addNotification peerId n + Just clientIdentity -> addNotification clientIdentity n Nothing -> warningM "Broker.Server" "Notification: unknown order" where - addNotification peerId n = atomicMapIORef state (\s -> - case M.lookup peerId . pendingNotifications $ s of - Just ns -> s { pendingNotifications = M.insert peerId (n : ns) (pendingNotifications s)} - Nothing -> s { pendingNotifications = M.insert peerId [n] (pendingNotifications s)}) + addNotification clientIdentity n = atomicMapIORef state (\s -> + case M.lookup clientIdentity . pendingNotifications $ s of + Just ns -> s { pendingNotifications = M.insert clientIdentity (n : ns) (pendingNotifications s)} + Nothing -> s { pendingNotifications = M.insert clientIdentity [n] (pendingNotifications s)}) tradeSinkHandler :: Context -> IORef BrokerServerState -> [TradeSink] -> IO () tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $ @@ -173,32 +173,32 @@ brokerServerThread state = finally brokerServerThread' cleanup handleMessage peerId request = do bros <- brokers <$> readIORef state case request of - RequestSubmitOrder sqnum order -> do + RequestSubmitOrder sqnum clientIdentity order -> do debugM "Broker.Server" $ "Request: submit order:" ++ show request case findBrokerForAccount (orderAccountId order) bros of Just bro -> do oid <- nextOrderId atomicMapIORef state (\s -> s { orderToBroker = M.insert oid bro (orderToBroker s), - orderMap = M.insert oid peerId (orderMap s) }) + orderMap = M.insert oid clientIdentity (orderMap s) }) submitOrder bro order { orderId = oid } return $ ResponseOrderSubmitted oid Nothing -> do debugM "Broker.Server" $ "Unknown account: " ++ T.unpack (orderAccountId order) return $ ResponseError "Unknown account" - RequestCancelOrder sqnum oid -> do + RequestCancelOrder sqnum clientIdentity oid -> do m <- orderToBroker <$> readIORef state case M.lookup oid m of Just bro -> do cancelOrder bro oid return $ ResponseOrderCancelled oid Nothing -> return $ ResponseError "Unknown order" - RequestNotifications sqnum -> do - maybeNs <- M.lookup peerId . pendingNotifications <$> readIORef state + RequestNotifications sqnum clientIdentity -> do + maybeNs <- M.lookup clientIdentity . pendingNotifications <$> readIORef state case maybeNs of Just ns -> do - atomicMapIORef state (\s -> s { pendingNotifications = M.insert peerId [] (pendingNotifications s)}) + atomicMapIORef state (\s -> s { pendingNotifications = M.insert clientIdentity [] (pendingNotifications s)}) return $ ResponseNotifications . L.reverse $ ns Nothing -> return $ ResponseNotifications []