|
|
|
@ -159,6 +159,7 @@ data BrokerState = |
|
|
|
BrokerState |
|
|
|
BrokerState |
|
|
|
{ |
|
|
|
{ |
|
|
|
bsOrderTransactionIdMap :: TVar (BM.Bimap OrderId TransactionId) |
|
|
|
bsOrderTransactionIdMap :: TVar (BM.Bimap OrderId TransactionId) |
|
|
|
|
|
|
|
, bsInitialTransactionIdMap :: TVar (M.Map OrderId Int64) |
|
|
|
, bsNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ())) |
|
|
|
, bsNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ())) |
|
|
|
, bsOrderMap :: TVar (M.Map OrderId Order) |
|
|
|
, bsOrderMap :: TVar (M.Map OrderId Order) |
|
|
|
, bsPendingOrders :: TVar (D.Deque Order) |
|
|
|
, bsPendingOrders :: TVar (D.Deque Order) |
|
|
|
@ -386,6 +387,7 @@ handleConnected = do |
|
|
|
runVar' <- asks runVar |
|
|
|
runVar' <- asks runVar |
|
|
|
queue <- asks transaqQueue |
|
|
|
queue <- asks transaqQueue |
|
|
|
timerVar' <- asks timerVar |
|
|
|
timerVar' <- asks timerVar |
|
|
|
|
|
|
|
liftIO $ emitEvent "fsm_connected_loop" |
|
|
|
item <- liftIO . atomically $ |
|
|
|
item <- liftIO . atomically $ |
|
|
|
(readTMVar runVar' >> pure MainQueueShutdown) `orElse` |
|
|
|
(readTMVar runVar' >> pure MainQueueShutdown) `orElse` |
|
|
|
(MainQueueTransaqData <$> readTBQueue queue) `orElse` |
|
|
|
(MainQueueTransaqData <$> readTBQueue queue) `orElse` |
|
|
|
@ -393,11 +395,21 @@ handleConnected = do |
|
|
|
(takeTMVar timerVar' >> pure MainQueuePingServer) |
|
|
|
(takeTMVar timerVar' >> pure MainQueuePingServer) |
|
|
|
case item of |
|
|
|
case item of |
|
|
|
MainQueueShutdown -> pure $ Just StageShutdown |
|
|
|
MainQueueShutdown -> pure $ Just StageShutdown |
|
|
|
MainQueuePingServer -> pingServer |
|
|
|
MainQueuePingServer -> do |
|
|
|
MainQueueTransaqData transaqData -> handleTransaqData transaqData |
|
|
|
liftIO $ emitEvent "fsm_connected_ping" |
|
|
|
MainQueueRequest (RequestHistory request) -> processHistoryRequest request |
|
|
|
pingServer |
|
|
|
MainQueueRequest (RequestSubmitOrder order) -> processSubmitOrderRequest order |
|
|
|
MainQueueTransaqData transaqData -> do |
|
|
|
MainQueueRequest (RequestCancelOrder oid) -> processCancelOrderRequest oid |
|
|
|
liftIO $ emitEvent "fsm_connected_transaq_data" |
|
|
|
|
|
|
|
handleTransaqData transaqData |
|
|
|
|
|
|
|
MainQueueRequest (RequestHistory request) -> do |
|
|
|
|
|
|
|
liftIO $ emitEvent "fsm_connected_history_request" |
|
|
|
|
|
|
|
processHistoryRequest request |
|
|
|
|
|
|
|
MainQueueRequest (RequestSubmitOrder order) -> do |
|
|
|
|
|
|
|
liftIO $ emitEvent "fsm_connected_order_request" |
|
|
|
|
|
|
|
processSubmitOrderRequest order |
|
|
|
|
|
|
|
MainQueueRequest (RequestCancelOrder oid) -> do |
|
|
|
|
|
|
|
liftIO $ emitEvent "fsm_connected_order_cancel" |
|
|
|
|
|
|
|
processCancelOrderRequest oid |
|
|
|
|
|
|
|
|
|
|
|
where |
|
|
|
where |
|
|
|
requestTimeoutValue = 10 |
|
|
|
requestTimeoutValue = 10 |
|
|
|
@ -432,7 +444,7 @@ handleConnected = do |
|
|
|
pure Nothing |
|
|
|
pure Nothing |
|
|
|
|
|
|
|
|
|
|
|
processSubmitOrderRequest order = do |
|
|
|
processSubmitOrderRequest order = do |
|
|
|
log Debug "TXMLConnector.WorkThread" $ "Incoming request: submit order " <> (T.pack . show) order |
|
|
|
log Info "TXMLConnector.WorkThread" $ "Incoming request: submit order " <> (T.pack . show) order |
|
|
|
case mkNewOrderCommand order of |
|
|
|
case mkNewOrderCommand order of |
|
|
|
Just cmd -> do |
|
|
|
Just cmd -> do |
|
|
|
v <- sendCommand . toXml $ cmd |
|
|
|
v <- sendCommand . toXml $ cmd |
|
|
|
@ -445,31 +457,39 @@ handleConnected = do |
|
|
|
liftIO $ atomically $ do |
|
|
|
liftIO $ atomically $ do |
|
|
|
modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order) |
|
|
|
modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order) |
|
|
|
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert (orderId order) (TransactionId transactionId')) |
|
|
|
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert (orderId order) (TransactionId transactionId')) |
|
|
|
|
|
|
|
modifyTVar' (bsInitialTransactionIdMap brState) (M.insert (orderId order) transactionId') |
|
|
|
resp <- readTMVar respVar |
|
|
|
resp <- readTMVar respVar |
|
|
|
putTMVar resp ResponseOrderSubmitted |
|
|
|
putTMVar resp ResponseOrderSubmitted |
|
|
|
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) |
|
|
|
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) |
|
|
|
case maybeCb of |
|
|
|
case maybeCb of |
|
|
|
Just cb -> do |
|
|
|
Just cb -> do |
|
|
|
let notif = BackendOrderNotification (orderId order) Submitted |
|
|
|
let notif = BackendOrderNotification (orderId order) Submitted |
|
|
|
|
|
|
|
log Debug "TXMLConnector.WorkThread" $ "Passing notification" <> (T.pack . show) notif |
|
|
|
liftIO $ cb notif |
|
|
|
liftIO $ cb notif |
|
|
|
_ -> pure () |
|
|
|
_ -> log Warning "TXMLConnector.WorkThread" "No backend notification callback" |
|
|
|
log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <> |
|
|
|
log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <> |
|
|
|
(T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId' |
|
|
|
(T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId' |
|
|
|
((TransaqResponseResult (ResponseFailure err)):_) -> do |
|
|
|
((TransaqResponseResult (ResponseFailure err)):_) -> do |
|
|
|
brState <- asks brokerState |
|
|
|
brState <- asks brokerState |
|
|
|
log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err |
|
|
|
log Warning "TXMLConnector.WorkThread" $ "Order submission failure: " <> err |
|
|
|
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) |
|
|
|
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) |
|
|
|
case maybeCb of |
|
|
|
case maybeCb of |
|
|
|
Just cb -> do |
|
|
|
Just cb -> do |
|
|
|
let notif = BackendOrderNotification (orderId order) Rejected |
|
|
|
let notif = BackendOrderNotification (orderId order) Rejected |
|
|
|
liftIO $ cb notif |
|
|
|
liftIO $ cb notif |
|
|
|
_ -> pure () |
|
|
|
_ -> log Warning "TXMLConnector.WorkThread" "No callback" |
|
|
|
|
|
|
|
respVar <- asks responseVar |
|
|
|
|
|
|
|
liftIO $ atomically $ do |
|
|
|
|
|
|
|
resp <- readTMVar respVar |
|
|
|
|
|
|
|
putTMVar resp ResponseOrderSubmitted |
|
|
|
_ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result" |
|
|
|
_ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result" |
|
|
|
pure Nothing |
|
|
|
pure Nothing |
|
|
|
Right _ -> do |
|
|
|
Right _ -> do |
|
|
|
log Warning "TXMLConnector.WorkThread" "Expected result, got nothing" |
|
|
|
log Warning "TXMLConnector.WorkThread" "Expected result, got nothing" |
|
|
|
pure Nothing |
|
|
|
pure Nothing |
|
|
|
_ -> pure Nothing |
|
|
|
_ -> do |
|
|
|
|
|
|
|
log Warning "TXMLConnector.WorkThread" "Unable to make command" |
|
|
|
|
|
|
|
pure Nothing |
|
|
|
|
|
|
|
|
|
|
|
processCancelOrderRequest oid = do |
|
|
|
processCancelOrderRequest oid = do |
|
|
|
log Debug "TXMLConnector.WorkThread" $ "Incoming request: cancel order " <> (T.pack . show) oid |
|
|
|
log Debug "TXMLConnector.WorkThread" $ "Incoming request: cancel order " <> (T.pack . show) oid |
|
|
|
@ -479,9 +499,13 @@ handleConnected = do |
|
|
|
transactionMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) |
|
|
|
transactionMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) |
|
|
|
case BM.lookup oid transactionMap of |
|
|
|
case BM.lookup oid transactionMap of |
|
|
|
Just (TransactionId transactionId') -> sendCancelOrder transactionId' |
|
|
|
Just (TransactionId transactionId') -> sendCancelOrder transactionId' |
|
|
|
Just (ExchangeOrderId eoid) -> sendCancelOrder eoid |
|
|
|
Just (ExchangeOrderId eoid) -> do |
|
|
|
|
|
|
|
initialTransactionIdMap <- liftIO $ readTVarIO (bsInitialTransactionIdMap brState) |
|
|
|
|
|
|
|
case M.lookup oid initialTransactionIdMap of |
|
|
|
|
|
|
|
Just trId -> sendCancelOrder trId |
|
|
|
|
|
|
|
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to obtain transaction id for order " <> (T.pack . show) oid |
|
|
|
_ -> do |
|
|
|
_ -> do |
|
|
|
log Debug "TXMLConnector.WorkThread" $ "Unable to locate transaction ID for order: " <> (T.pack . show) oid |
|
|
|
log Warning "TXMLConnector.WorkThread" $ "Unable to locate transaction ID for order: " <> (T.pack . show) oid |
|
|
|
liftIO . atomically $ putTMVar resp ResponseOrderCancelled |
|
|
|
liftIO . atomically $ putTMVar resp ResponseOrderCancelled |
|
|
|
pure Nothing |
|
|
|
pure Nothing |
|
|
|
|
|
|
|
|
|
|
|
@ -491,7 +515,7 @@ handleConnected = do |
|
|
|
v <- sendCommand . toXml $ CommandCancelOrder (toInteger transactionId') |
|
|
|
v <- sendCommand . toXml $ CommandCancelOrder (toInteger transactionId') |
|
|
|
case v of |
|
|
|
case v of |
|
|
|
Left result -> do |
|
|
|
Left result -> do |
|
|
|
log Debug "TXMLConnector.WorkThread" $ "Cancellation result: " <> (T.pack . show) result |
|
|
|
log Info "TXMLConnector.WorkThread" $ "Cancellation result: " <> (T.pack . show) result |
|
|
|
liftIO . atomically $ putTMVar resp ResponseOrderCancelled |
|
|
|
liftIO . atomically $ putTMVar resp ResponseOrderCancelled |
|
|
|
_ -> liftIO . atomically $ putTMVar resp ResponseOrderCancelled |
|
|
|
_ -> liftIO . atomically $ putTMVar resp ResponseOrderCancelled |
|
|
|
|
|
|
|
|
|
|
|
|