diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index b54364a..4f659dd 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -38,26 +38,25 @@ data BrokerClientHandle = BrokerClientHandle { submitOrder :: Order -> IO (Either T.Text OrderId), cancelOrder :: OrderId -> IO (Either T.Text ()), getNotifications :: IO (Either T.Text [Notification]), - cmdVar :: MVar BrokerServerRequest, - respVar :: MVar BrokerServerResponse + cmdVar :: MVar (BrokerServerRequest, MVar BrokerServerResponse) } -brokerClientThread :: B.ByteString -> Context -> T.Text -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> MVar () -> MVar () -> ClientSecurityParams -> IO () -brokerClientThread socketIdentity ctx ep cmd resp comp killMv secParams = finally brokerClientThread' cleanup +brokerClientThread :: B.ByteString -> Context -> T.Text -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> MVar () -> MVar () -> ClientSecurityParams -> IO () +brokerClientThread socketIdentity ctx ep cmd comp killMv secParams = finally brokerClientThread' cleanup where cleanup = infoM "Broker.Client" "Quitting broker client thread" >> putMVar comp () brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ do debugM "Broker.Client" "Starting event loop" handle (\e -> do - warningM "Broker.Client" $ "Broker client: exception: " ++ (show (e :: SomeException)) ++ "; isZMQ: " ++ show (isZMQError e) + warningM "Broker.Client" $ "Broker client: exception: " ++ show (e :: SomeException) ++ "; isZMQ: " ++ show (isZMQError e) if isZMQError e then do debugM "Broker.Client" "Rethrowing exception" throwIO e else do - putMVar resp (ResponseError "Response error")) $ withSocket ctx Req (\sock -> do + return ()) $ withSocket ctx Req (\sock -> do setLinger (restrict 0) sock - debugM "Broker.Client" $ "Connecting to: " ++ show (T.unpack ep) + case cspCertificate secParams of Just clientCert -> zapApplyCertificate clientCert sock Nothing -> return () @@ -66,11 +65,11 @@ brokerClientThread socketIdentity ctx ep cmd resp comp killMv secParams = finall Nothing -> return () connect sock $ T.unpack ep - debugM "Broker.Client" $ "Connected" + debugM "Broker.Client" "Connected" isTimeout <- newIORef False whileM_ (andM [isNothing <$> tryReadMVar killMv, (== False) <$> readIORef isTimeout]) $ do - request <- takeMVar cmd + (request, resp) <- takeMVar cmd send sock [] (BL.toStrict $ encode request) incomingMessage <- timeout 5000000 $ receive sock case incomingMessage of @@ -88,19 +87,17 @@ startBrokerClient socketIdentity ctx endpoint secParams = do idCounter <- newIORef 1 compMv <- newEmptyMVar killMv <- newEmptyMVar - cmdVar <- newEmptyMVar :: IO (MVar BrokerServerRequest) - respVar <- newEmptyMVar :: IO (MVar BrokerServerResponse) - tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar respVar compMv killMv secParams) + cmdVar <- newEmptyMVar :: IO (MVar (BrokerServerRequest, MVar BrokerServerResponse)) + tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar compMv killMv secParams) return BrokerClientHandle { tid = tid, completionMvar = compMv, killMvar = killMv, - submitOrder = bcSubmitOrder (decodeUtf8 socketIdentity) idCounter cmdVar respVar, - cancelOrder = bcCancelOrder (decodeUtf8 socketIdentity) idCounter cmdVar respVar, - getNotifications = bcGetNotifications (decodeUtf8 socketIdentity) idCounter cmdVar respVar, - cmdVar = cmdVar, - respVar = respVar + submitOrder = bcSubmitOrder (decodeUtf8 socketIdentity) idCounter cmdVar, + cancelOrder = bcCancelOrder (decodeUtf8 socketIdentity) idCounter cmdVar, + getNotifications = bcGetNotifications (decodeUtf8 socketIdentity) idCounter cmdVar, + cmdVar = cmdVar } stopBrokerClient :: BrokerClientHandle -> IO () @@ -108,30 +105,33 @@ stopBrokerClient handle = putMVar (killMvar handle) () >> yield >> killThread (t nextId cnt = atomicModifyIORef' cnt (\v -> (v + 1, v)) -bcSubmitOrder :: ClientIdentity -> IORef Int64 -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> Order -> IO (Either T.Text OrderId) -bcSubmitOrder clientIdentity idCounter cmdVar respVar order = do +bcSubmitOrder :: ClientIdentity -> IORef Int64 -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> Order -> IO (Either T.Text OrderId) +bcSubmitOrder clientIdentity idCounter cmdVar order = do + respVar <- newEmptyMVar sqnum <- nextId idCounter - putMVar cmdVar (RequestSubmitOrder sqnum clientIdentity order) + putMVar cmdVar (RequestSubmitOrder sqnum clientIdentity order, respVar) resp <- takeMVar respVar case resp of (ResponseOrderSubmitted oid) -> return $ Right oid (ResponseError msg) -> return $ Left msg _ -> return $ Left "Unknown error" -bcCancelOrder :: ClientIdentity -> IORef RequestSqnum -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> OrderId -> IO (Either T.Text ()) -bcCancelOrder clientIdentity idCounter cmdVar respVar orderId = do +bcCancelOrder :: ClientIdentity -> IORef RequestSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> OrderId -> IO (Either T.Text ()) +bcCancelOrder clientIdentity idCounter cmdVar orderId = do + respVar <- newEmptyMVar sqnum <- nextId idCounter - putMVar cmdVar (RequestCancelOrder sqnum clientIdentity orderId) + putMVar cmdVar (RequestCancelOrder sqnum clientIdentity orderId, respVar) resp <- takeMVar respVar case resp of (ResponseOrderCancelled oid) -> return $ Right () (ResponseError msg) -> return $ Left msg _ -> return $ Left "Unknown error" -bcGetNotifications :: ClientIdentity -> IORef RequestSqnum -> MVar BrokerServerRequest -> MVar BrokerServerResponse -> IO (Either T.Text [Notification]) -bcGetNotifications clientIdentity idCounter cmdVar respVar = do +bcGetNotifications :: ClientIdentity -> IORef RequestSqnum -> MVar (BrokerServerRequest, MVar BrokerServerResponse) -> IO (Either T.Text [Notification]) +bcGetNotifications clientIdentity idCounter cmdVar = do + respVar <- newEmptyMVar sqnum <- nextId idCounter - putMVar cmdVar (RequestNotifications sqnum clientIdentity (NotificationSqnum 0)) + putMVar cmdVar (RequestNotifications sqnum clientIdentity (NotificationSqnum 0), respVar) resp <- takeMVar respVar case resp of (ResponseNotifications ns) -> return $ Right ns