From 03188f5fa4a24236286931f2c2e17cc98ec1ccd3 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 25 Jan 2025 10:01:59 +0700 Subject: [PATCH] Update to new libatrade --- src/Main.hs | 2 -- src/TXMLConnector.hs | 2 ++ src/TXMLConnector/Internal.hs | 50 ++++++++++++++++++++++++++--------- transaq-connector.cabal | 5 ++-- 4 files changed, 42 insertions(+), 17 deletions(-) diff --git a/src/Main.hs b/src/Main.hs index d10517d..6589a49 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -116,10 +116,8 @@ main = do [Connector.makeBrokerBackend txml (accounts cfg), paper] ctx (brokerEndpoint cfg) - (brokerNotificationsEndpoint cfg) (NotificationSqnum startTimestamp) [tsDashboard, tsGotify] - defaultServerSecurityParams logger) (\x -> do stopBrokerServer x log Info "main" "Stopping TXMLConnector" diff --git a/src/TXMLConnector.hs b/src/TXMLConnector.hs index be8ff5c..0d58ab0 100644 --- a/src/TXMLConnector.hs +++ b/src/TXMLConnector.hs @@ -115,6 +115,7 @@ start logger' tickTable config' qssChannel' tisH = do orderMap <- newTVarIO M.empty notificationCallback <- newTVarIO Nothing orderTransactionIdMap <- newTVarIO BM.empty + initialTransactionIdMap <- newTVarIO M.empty pendingOrders <- newTVarIO (fromList []) runVar' <- newEmptyTMVarIO timerVar' <- newEmptyTMVarIO @@ -124,6 +125,7 @@ start logger' tickTable config' qssChannel' tisH = do , bsNotificationCallback = notificationCallback , bsOrderMap = orderMap , bsPendingOrders = pendingOrders + , bsInitialTransactionIdMap = initialTransactionIdMap } let env = diff --git a/src/TXMLConnector/Internal.hs b/src/TXMLConnector/Internal.hs index 44757ef..6bf10a5 100644 --- a/src/TXMLConnector/Internal.hs +++ b/src/TXMLConnector/Internal.hs @@ -159,6 +159,7 @@ data BrokerState = BrokerState { bsOrderTransactionIdMap :: TVar (BM.Bimap OrderId TransactionId) + , bsInitialTransactionIdMap :: TVar (M.Map OrderId Int64) , bsNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ())) , bsOrderMap :: TVar (M.Map OrderId Order) , bsPendingOrders :: TVar (D.Deque Order) @@ -386,6 +387,7 @@ handleConnected = do runVar' <- asks runVar queue <- asks transaqQueue timerVar' <- asks timerVar + liftIO $ emitEvent "fsm_connected_loop" item <- liftIO . atomically $ (readTMVar runVar' >> pure MainQueueShutdown) `orElse` (MainQueueTransaqData <$> readTBQueue queue) `orElse` @@ -393,11 +395,21 @@ handleConnected = do (takeTMVar timerVar' >> pure MainQueuePingServer) case item of MainQueueShutdown -> pure $ Just StageShutdown - MainQueuePingServer -> pingServer - MainQueueTransaqData transaqData -> handleTransaqData transaqData - MainQueueRequest (RequestHistory request) -> processHistoryRequest request - MainQueueRequest (RequestSubmitOrder order) -> processSubmitOrderRequest order - MainQueueRequest (RequestCancelOrder oid) -> processCancelOrderRequest oid + MainQueuePingServer -> do + liftIO $ emitEvent "fsm_connected_ping" + pingServer + MainQueueTransaqData transaqData -> do + liftIO $ emitEvent "fsm_connected_transaq_data" + handleTransaqData transaqData + MainQueueRequest (RequestHistory request) -> do + liftIO $ emitEvent "fsm_connected_history_request" + processHistoryRequest request + MainQueueRequest (RequestSubmitOrder order) -> do + liftIO $ emitEvent "fsm_connected_order_request" + processSubmitOrderRequest order + MainQueueRequest (RequestCancelOrder oid) -> do + liftIO $ emitEvent "fsm_connected_order_cancel" + processCancelOrderRequest oid where requestTimeoutValue = 10 @@ -432,7 +444,7 @@ handleConnected = do pure Nothing processSubmitOrderRequest order = do - log Debug "TXMLConnector.WorkThread" $ "Incoming request: submit order " <> (T.pack . show) order + log Info "TXMLConnector.WorkThread" $ "Incoming request: submit order " <> (T.pack . show) order case mkNewOrderCommand order of Just cmd -> do v <- sendCommand . toXml $ cmd @@ -445,31 +457,39 @@ handleConnected = do liftIO $ atomically $ do modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order) modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert (orderId order) (TransactionId transactionId')) + modifyTVar' (bsInitialTransactionIdMap brState) (M.insert (orderId order) transactionId') resp <- readTMVar respVar putTMVar resp ResponseOrderSubmitted maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) case maybeCb of Just cb -> do let notif = BackendOrderNotification (orderId order) Submitted + log Debug "TXMLConnector.WorkThread" $ "Passing notification" <> (T.pack . show) notif liftIO $ cb notif - _ -> pure () + _ -> log Warning "TXMLConnector.WorkThread" "No backend notification callback" log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <> (T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId' ((TransaqResponseResult (ResponseFailure err)):_) -> do brState <- asks brokerState - log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err + log Warning "TXMLConnector.WorkThread" $ "Order submission failure: " <> err maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) case maybeCb of Just cb -> do let notif = BackendOrderNotification (orderId order) Rejected liftIO $ cb notif - _ -> pure () + _ -> log Warning "TXMLConnector.WorkThread" "No callback" + respVar <- asks responseVar + liftIO $ atomically $ do + resp <- readTMVar respVar + putTMVar resp ResponseOrderSubmitted _ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result" pure Nothing Right _ -> do log Warning "TXMLConnector.WorkThread" "Expected result, got nothing" pure Nothing - _ -> pure Nothing + _ -> do + log Warning "TXMLConnector.WorkThread" "Unable to make command" + pure Nothing processCancelOrderRequest oid = do log Debug "TXMLConnector.WorkThread" $ "Incoming request: cancel order " <> (T.pack . show) oid @@ -479,9 +499,13 @@ handleConnected = do transactionMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) case BM.lookup oid transactionMap of Just (TransactionId transactionId') -> sendCancelOrder transactionId' - Just (ExchangeOrderId eoid) -> sendCancelOrder eoid + Just (ExchangeOrderId eoid) -> do + initialTransactionIdMap <- liftIO $ readTVarIO (bsInitialTransactionIdMap brState) + case M.lookup oid initialTransactionIdMap of + Just trId -> sendCancelOrder trId + _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to obtain transaction id for order " <> (T.pack . show) oid _ -> do - log Debug "TXMLConnector.WorkThread" $ "Unable to locate transaction ID for order: " <> (T.pack . show) oid + log Warning "TXMLConnector.WorkThread" $ "Unable to locate transaction ID for order: " <> (T.pack . show) oid liftIO . atomically $ putTMVar resp ResponseOrderCancelled pure Nothing @@ -491,7 +515,7 @@ handleConnected = do v <- sendCommand . toXml $ CommandCancelOrder (toInteger transactionId') case v of Left result -> do - log Debug "TXMLConnector.WorkThread" $ "Cancellation result: " <> (T.pack . show) result + log Info "TXMLConnector.WorkThread" $ "Cancellation result: " <> (T.pack . show) result liftIO . atomically $ putTMVar resp ResponseOrderCancelled _ -> liftIO . atomically $ putTMVar resp ResponseOrderCancelled diff --git a/transaq-connector.cabal b/transaq-connector.cabal index 6e4e1c3..c09eb1e 100644 --- a/transaq-connector.cabal +++ b/transaq-connector.cabal @@ -31,6 +31,7 @@ executable transaq-connector , FSM , PaperBroker , Commissions + , Eventloop default-extensions: OverloadedStrings , MultiWayIf , MultiParamTypeClasses @@ -38,7 +39,7 @@ executable transaq-connector build-depends: base >= 4.7 && < 5 , dhall , eventcounters - , libatrade == 0.15.0.0 + , libatrade == 0.16.0.0 , text , transformers , co-log @@ -113,7 +114,7 @@ test-suite transaq-connector-test , tasty-hunit , dhall , eventcounters - , libatrade == 0.15.0.0 + , libatrade == 0.16.0.0 , text , transformers , co-log