From 13723bb436937f4dc9ee95bf643d78acd4efcf21 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 18 Jun 2023 13:57:11 +0700 Subject: [PATCH] TXMLConnector: extract handleTransaqData --- src/TXMLConnector/Internal.hs | 242 ++++++++++++++++++---------------- 1 file changed, 130 insertions(+), 112 deletions(-) diff --git a/src/TXMLConnector/Internal.hs b/src/TXMLConnector/Internal.hs index 05b7699..7b3d454 100644 --- a/src/TXMLConnector/Internal.hs +++ b/src/TXMLConnector/Internal.hs @@ -221,7 +221,7 @@ workThread = do serverConnectionState <- asks serverConnected timerVar' <- asks timerVar void $ liftIO $ forkIO $ whileM $ do - threadDelay 1000000 + threadDelay 5000000 void . liftIO . atomically $ tryPutTMVar timerVar' () connStatus <- liftIO . readTVarIO $ serverConnectionState pure $ connStatus /= StageShutdown @@ -260,6 +260,128 @@ parseContent (Elem el) = parseElement _ -> Nothing parseContent _ = Nothing +handleTransaqData :: (MonadIO m, + MonadReader Env m, + MonadTXML m, + HasLog Env Message m) => TransaqResponse -> m (Maybe ConnectionStage) +handleTransaqData transaqData = do + tm <- asks tickMap + case transaqData of + TransaqResponseAllTrades (ResponseAllTrades trades) -> do + qssChan <- asks qssChannel + let ticks = fmap allTradeToTick trades + liftIO $ forM_ ticks (writeChan qssChan . QSSTick) + liftIO $ forM_ ticks (insertTick tm) + pure Nothing + TransaqResponseQuotations (ResponseQuotations quotations) -> do + qssChan <- asks qssChannel + now <- liftIO getCurrentTime + let ticks = concatMap (quotationToTicks now) quotations + liftIO $ forM_ ticks (writeChan qssChan . QSSTick) + liftIO $ forM_ ticks (insertTick tm) + pure Nothing + TransaqResponseCandles respCandle -> do + resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar + log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle) + case resp of + Just !tmvar -> if cStatus respCandle == StatusPending + then do + cur <- asks currentCandles + liftIO $ atomically . modifyTVar' cur $ (\c -> cCandles respCandle <> c) + else do + cur <- asks currentCandles + liftIO $ atomically $ do + candles <- readTVar cur + putTMVar tmvar $ ResponseHistory $ HistoryResponse + { + hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles) + , hrMoreData = False + } + _ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var" + pure Nothing + TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder >> pure Nothing + TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade >> pure Nothing + TransaqResponseSecurities (ResponseSecurities securities) -> do + tisH <- asks tisHandle + cfg <- asks config + let tickerInfos = securityToTickerInfo <$> securities + log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities + forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker) + forM_ tickerInfos (liftIO . putTickerInfo tisH) + makeSubscriptions cfg + pure Nothing + _ -> pure Nothing + where + handleTrade transaqTrade = do + brState <- asks brokerState + trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) + maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) + orderMap <- liftIO $ readTVarIO (bsOrderMap brState) + case maybeCb of + Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of + Just oid -> case M.lookup oid orderMap of + Just order -> do + let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order) + log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif + liftIO $ cb notif + _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade + _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade + Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for trade notification!" + + fromTransaqTrade transaqTrade order = + Trade + { + tradeOrderId = orderId order + , tradePrice = fromDouble (tPrice transaqTrade) + , tradeQuantity = fromIntegral $ tQuantity transaqTrade + , tradeVolume = fromDouble $ tValue transaqTrade + , tradeVolumeCurrency = "" + , tradeOperation = fromDirection (tBuysell transaqTrade) + , tradeAccount = tClient transaqTrade <> "#" <> tUnion transaqTrade + , tradeSecurity = tBoard transaqTrade <> "#" <> tSecCode transaqTrade + , tradeTimestamp = tTimestamp transaqTrade + , tradeCommission = fromDouble $ tComission transaqTrade + , tradeSignalId = orderSignalId order + } + + fromDirection Transaq.Buy = AT.Buy + fromDirection Transaq.Sell = AT.Sell + + handleOrder orderUpdate = do + brState <- asks brokerState + trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) + maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) + case maybeCb of + Just cb -> case BM.lookupR (ExchangeOrderId (oOrderNo orderUpdate)) trIdMap <|> + BM.lookupR (TransactionId (fromIntegral $ oTransactionId orderUpdate)) trIdMap of + Just oid -> do + let notif = BackendOrderNotification oid (orderStateFromTransaq orderUpdate) + log Debug "TXMLConnector.WorkThread" $ "Sending order notification: " <> (T.pack . show) notif + liftIO $ atomically $ do + m <- readTVar (bsOrderTransactionIdMap brState) + when (BM.notMemberR (ExchangeOrderId (oOrderNo orderUpdate)) m) $ do + modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert oid (ExchangeOrderId $ oOrderNo orderUpdate)) + liftIO $ cb notif + _ -> log Warning "TXMLConnector.WorkThread" "Unable to find order for order notification" + Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for order notification" + + orderStateFromTransaq orderUpdate = + case oStatus orderUpdate of + OrderActive -> Submitted + OrderCancelled -> Cancelled + OrderDenied -> Rejected + OrderDisabled -> Rejected + OrderExpired -> Cancelled + OrderFailed -> Rejected + OrderForwarding -> Unsubmitted + OrderInactive -> OrderError + OrderMatched -> Executed + OrderRefused -> Rejected + OrderRemoved -> Rejected + OrderWait -> Unsubmitted + OrderWatching -> Unsubmitted + _ -> OrderError + handleConnected :: (MonadIO m, MonadReader Env m, @@ -289,44 +411,7 @@ handleConnected = do log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw pure Nothing Right () -> pure Nothing - MainQueueTransaqData transaqData -> do - tm <- asks tickMap - case transaqData of - TransaqResponseAllTrades (ResponseAllTrades trades) -> do - qssChan <- asks qssChannel - let ticks = fmap allTradeToTick trades - forM_ ticks (liftIO . writeChan qssChan . QSSTick) - forM_ ticks (insertToTickMap tm) - pure Nothing - TransaqResponseQuotations (ResponseQuotations quotations) -> do - qssChan <- asks qssChannel - now <- liftIO getCurrentTime - let ticks = concatMap (quotationToTicks now) quotations - forM_ ticks (liftIO . writeChan qssChan . QSSTick) - forM_ ticks (insertToTickMap tm) - pure Nothing - TransaqResponseCandles respCandle -> do - resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar - log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle) - case resp of - Just !tmvar -> if cStatus respCandle == StatusPending - then do - cur <- asks currentCandles - liftIO $ atomically . modifyTVar' cur $ (\c -> cCandles respCandle <> c) - else do - cur <- asks currentCandles - liftIO $ atomically $ do - candles <- readTVar cur - putTMVar tmvar $ ResponseHistory $ HistoryResponse - { - hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles) - , hrMoreData = False - } - _ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var" - pure Nothing - TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder >> pure Nothing - TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade >> pure Nothing - _ -> pure Nothing + MainQueueTransaqData transaqData -> handleTransaqData transaqData MainQueueRequest (RequestHistory request) -> do cur <- asks currentCandles liftIO $ atomically $ writeTVar cur [] @@ -387,75 +472,6 @@ handleConnected = do where requestTimeout = 10 - handleTrade transaqTrade = do - brState <- asks brokerState - trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) - maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) - orderMap <- liftIO $ readTVarIO (bsOrderMap brState) - case maybeCb of - Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of - Just oid -> case M.lookup oid orderMap of - Just order -> do - let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order) - log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif - liftIO $ cb notif - _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade - _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade - Nothing -> log Warning "TXMLConnector.WorkThread" $ "No callback for trade notification!" - - fromTransaqTrade transaqTrade order = - Trade - { - tradeOrderId = orderId order - , tradePrice = fromDouble (tPrice transaqTrade) - , tradeQuantity = fromIntegral $ tQuantity transaqTrade - , tradeVolume = fromDouble $ tValue transaqTrade - , tradeVolumeCurrency = "" - , tradeOperation = fromDirection (tBuysell transaqTrade) - , tradeAccount = tClient transaqTrade <> "#" <> tUnion transaqTrade - , tradeSecurity = tBoard transaqTrade <> "#" <> tSecCode transaqTrade - , tradeTimestamp = tTimestamp transaqTrade - , tradeCommission = fromDouble $ tComission transaqTrade - , tradeSignalId = orderSignalId order - } - - fromDirection Transaq.Buy = AT.Buy - fromDirection Transaq.Sell = AT.Sell - - handleOrder orderUpdate = do - brState <- asks brokerState - trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) - maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) - case maybeCb of - Just cb -> case BM.lookupR (ExchangeOrderId (oOrderNo orderUpdate)) trIdMap <|> - BM.lookupR (TransactionId (fromIntegral $ oTransactionId orderUpdate)) trIdMap of - Just oid -> do - let notif = BackendOrderNotification oid (orderStateFromTransaq orderUpdate) - log Debug "TXMLConnector.WorkThread" $ "Sending order notification: " <> (T.pack . show) notif - liftIO $ atomically $ do - m <- readTVar (bsOrderTransactionIdMap brState) - when (BM.notMemberR (ExchangeOrderId (oOrderNo orderUpdate)) m) $ do - modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert oid (ExchangeOrderId $ oOrderNo orderUpdate)) - liftIO $ cb notif - _ -> log Warning "TXMLConnector.WorkThread" "Unable to find order for order notification" - Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for order notification" - - orderStateFromTransaq orderUpdate = - case oStatus orderUpdate of - OrderActive -> Submitted - OrderCancelled -> Cancelled - OrderDenied -> Rejected - OrderDisabled -> Rejected - OrderExpired -> Cancelled - OrderFailed -> Rejected - OrderForwarding -> Unsubmitted - OrderInactive -> OrderError - OrderMatched -> Executed - OrderRefused -> Rejected - OrderRemoved -> Rejected - OrderWait -> Unsubmitted - OrderWatching -> Unsubmitted - _ -> OrderError checkRequestTimeout = do now <- liftIO getCurrentTime @@ -469,8 +485,6 @@ handleConnected = do liftIO . atomically . putTMVar tmvar $ ResponseTimeout _ -> pure () - insertToTickMap = insertTick - handleGetInfo :: (MonadIO m, MonadReader Env m, MonadTXML m, @@ -525,7 +539,7 @@ handleGetInfo = do tisH <- asks tisHandle let tickerInfos = securityToTickerInfo <$> securities log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities - forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker) + --forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker) forM_ tickerInfos (liftIO . putTickerInfo tisH) pure Nothing TransaqResponseSecInfo secInfo -> do @@ -536,8 +550,12 @@ handleGetInfo = do "Incoming client data: " <> (T.pack . show) (cClientId clientData) <> "#" <> (T.pack . show) (cUnion clientData) pure Nothing _ -> pure Nothing + +makeSubscriptions :: (MonadIO m, + MonadTXML m, + HasLog Env Message m) => TransaqConnectorConfig -> m (Either T.Text ()) +makeSubscriptions config = sendCommand . toXml $ cmdSubscription config where - makeSubscriptions config = sendCommand . toXml $ cmdSubscription config cmdSubscription config = CommandSubscribe {