|
|
|
|
@ -32,13 +32,14 @@ import System.ZMQ4
@@ -32,13 +32,14 @@ import System.ZMQ4
|
|
|
|
|
import System.ZMQ4.ZAP |
|
|
|
|
|
|
|
|
|
data BrokerClientHandle = BrokerClientHandle { |
|
|
|
|
tid :: ThreadId, |
|
|
|
|
completionMvar :: MVar (), |
|
|
|
|
killMvar :: MVar (), |
|
|
|
|
submitOrder :: Order -> IO (Either T.Text OrderId), |
|
|
|
|
cancelOrder :: OrderId -> IO (Either T.Text ()), |
|
|
|
|
getNotifications :: IO (Either T.Text [Notification]), |
|
|
|
|
cmdVar :: MVar (BrokerServerRequest, MVar BrokerServerResponse) |
|
|
|
|
tid :: ThreadId, |
|
|
|
|
completionMvar :: MVar (), |
|
|
|
|
killMvar :: MVar (), |
|
|
|
|
submitOrder :: Order -> IO (Either T.Text OrderId), |
|
|
|
|
cancelOrder :: OrderId -> IO (Either T.Text ()), |
|
|
|
|
getNotifications :: IO (Either T.Text [Notification]), |
|
|
|
|
cmdVar :: MVar (BrokerServerRequest, MVar BrokerServerResponse), |
|
|
|
|
lastKnownNotificationRef :: IORef NotificationSqnum |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
brokerClientThread :: B.ByteString -> Context -> T.Text -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> MVar () -> MVar () -> ClientSecurityParams -> IO () |
|
|
|
|
@ -89,6 +90,7 @@ startBrokerClient socketIdentity ctx endpoint secParams = do
@@ -89,6 +90,7 @@ startBrokerClient socketIdentity ctx endpoint secParams = do
|
|
|
|
|
killMv <- newEmptyMVar |
|
|
|
|
cmdVar <- newEmptyMVar :: IO (MVar (BrokerServerRequest, MVar BrokerServerResponse)) |
|
|
|
|
tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar compMv killMv secParams) |
|
|
|
|
notifSqnumRef <- newIORef (NotificationSqnum 0) |
|
|
|
|
|
|
|
|
|
return BrokerClientHandle { |
|
|
|
|
tid = tid, |
|
|
|
|
@ -96,8 +98,9 @@ startBrokerClient socketIdentity ctx endpoint secParams = do
@@ -96,8 +98,9 @@ startBrokerClient socketIdentity ctx endpoint secParams = do
|
|
|
|
|
killMvar = killMv, |
|
|
|
|
submitOrder = bcSubmitOrder (decodeUtf8 socketIdentity) idCounter cmdVar, |
|
|
|
|
cancelOrder = bcCancelOrder (decodeUtf8 socketIdentity) idCounter cmdVar, |
|
|
|
|
getNotifications = bcGetNotifications (decodeUtf8 socketIdentity) idCounter cmdVar, |
|
|
|
|
cmdVar = cmdVar |
|
|
|
|
getNotifications = bcGetNotifications (decodeUtf8 socketIdentity) idCounter notifSqnumRef cmdVar, |
|
|
|
|
cmdVar = cmdVar, |
|
|
|
|
lastKnownNotificationRef = notifSqnumRef |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
stopBrokerClient :: BrokerClientHandle -> IO () |
|
|
|
|
@ -127,11 +130,12 @@ bcCancelOrder clientIdentity idCounter cmdVar orderId = do
@@ -127,11 +130,12 @@ bcCancelOrder clientIdentity idCounter cmdVar orderId = do
|
|
|
|
|
(ResponseError msg) -> return $ Left msg |
|
|
|
|
_ -> return $ Left "Unknown error" |
|
|
|
|
|
|
|
|
|
bcGetNotifications :: ClientIdentity -> IORef RequestSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> IO (Either T.Text [Notification]) |
|
|
|
|
bcGetNotifications clientIdentity idCounter cmdVar = do |
|
|
|
|
bcGetNotifications :: ClientIdentity -> IORef RequestSqnum -> IORef NotificationSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> IO (Either T.Text [Notification]) |
|
|
|
|
bcGetNotifications clientIdentity idCounter notifSqnumRef cmdVar = do |
|
|
|
|
respVar <- newEmptyMVar |
|
|
|
|
sqnum <- nextId idCounter |
|
|
|
|
putMVar cmdVar (RequestNotifications sqnum clientIdentity (NotificationSqnum 0), respVar) |
|
|
|
|
notifSqnum <- nextSqnum <$> readIORef notifSqnumRef |
|
|
|
|
putMVar cmdVar (RequestNotifications sqnum clientIdentity notifSqnum, respVar) |
|
|
|
|
resp <- takeMVar respVar |
|
|
|
|
case resp of |
|
|
|
|
(ResponseNotifications ns) -> return $ Right ns |
|
|
|
|
|