From 14b94cdd6f935c81696e148450ace6e0f6b0ebb3 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 3 Jun 2023 12:10:43 +0700 Subject: [PATCH] Fix reconnection --- src/Main.hs | 4 ++-- src/TXMLConnector.hs | 46 +++++++++++++++----------------------------- 2 files changed, 18 insertions(+), 32 deletions(-) diff --git a/src/Main.hs b/src/Main.hs index 17b6c1e..f4ffd0b 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -70,10 +70,10 @@ main = do (quotesourceEndpoint cfg) defaultServerSecurityParams) stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do - withZMQTradeSink ctx (tradesinkDashboard cfg) $ \tsDashboard -> + withZMQTradeSink ctx (tradesinkDashboard cfg) logger $ \tsDashboard -> case parseURI (T.unpack $ mqttUri cfg) of Just uri -> do - withMQTTTradeSink uri mqttTradeSinkTopic $ \tsMqtt -> do + withMQTTTradeSink uri mqttTradeSinkTopic logger $ \tsMqtt -> do txml <- Connector.start logger cfg qssChannel tisH bracket (startBrokerServer [Connector.makeBrokerBackend txml (account cfg)] diff --git a/src/TXMLConnector.hs b/src/TXMLConnector.hs index c4fa42e..e731bdc 100644 --- a/src/TXMLConnector.hs +++ b/src/TXMLConnector.hs @@ -33,8 +33,9 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar', readTVarIO, takeTMVar, tryPutTMVar, tryReadTMVar, writeTVar) -import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue, - readTBQueue, writeTBQueue) +import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue, + newTBQueue, readTBQueue, + writeTBQueue) import Control.Monad (forever, void, when) import Control.Monad.Extra (whileM) import qualified Data.Bimap as BM @@ -350,9 +351,10 @@ workThread = do runVar' <- asks runVar queue <- asks transaqQueue timerVar' <- asks timerVar - item <- liftIO . atomically $ (MainQueueTransaqData <$> readTBQueue queue) `orElse` + item <- liftIO . atomically $ + (readTMVar runVar' >> pure MainQueueShutdown) `orElse` + (MainQueueTransaqData <$> readTBQueue queue) `orElse` (MainQueueRequest <$> takeTMVar rqVar) `orElse` - (takeTMVar runVar' >> pure MainQueueShutdown) `orElse` (takeTMVar timerVar' >> pure MainQueuePingServer) case item of MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown @@ -560,12 +562,13 @@ workThread = do v <- makeSubscriptions cfg case v of Left errmsg -> do - log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg + log Warning "TXMLConnector.WorkThread" "Unable to subscribe" void $ liftIO . sendCommand $ toXml CommandDisconnect Right _ -> log Info "TXMLConnector.WorkThread" "Subscriptions done" Transaq.Error errmsg -> do - log Warning "TXMLConnector.WorkThread" $ "Connection error: " <> errmsg + log Warning "TXMLConnector.WorkThread" "Connection error" liftIO . atomically $ writeTVar conn StageConnection + void $ liftIO $ sendCommand $ toXml $ CommandDisconnect TransaqResponseResult result -> log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result -- TODO: handle order response @@ -618,35 +621,16 @@ workThread = do pushPosEquity = Nothing } case v of - Left err -> do - log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]" + Left _ -> do + log Warning "TXMLConnector.WorkThread" "Unable to connect" + void $ liftIO $ sendCommand $ toXml $ CommandDisconnect liftIO $ threadDelay (1000 * 1000 * 10) + queue <- asks transaqQueue + void $ liftIO $ atomically $ flushTBQueue queue Right _ -> do log Info "TXMLConnector.WorkThread" "Connected" conn <- asks serverConnected liftIO . atomically $ writeTVar conn StageGetInfo - -- item <- atomically $ readTBQueue queue - -- case item of - -- TransaqResponseServerStatus status -> do - -- case state status of - -- Transaq.Error errmsg -> do - -- log Warning "TXMLConnector.WorkThread" $ "Unable to connect: " <> errmsg - -- void $ sendCommand $ toXml CommandDisconnect - -- threadDelay (10 * 1000 * 1000) - -- Transaq.Connected -> do - -- atomically $ writeTVar serverConnected StageGetInfo - -- -- v <- makeSubscriptions config - -- -- case v of - -- -- Left errmsg -> do - -- -- log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg - -- -- void $ sendCommand $ toXml CommandDisconnect - -- -- Right _ -> - -- Transaq.Disconnected -> do - -- log Warning "TXMLConnector.WorkThread" "Unable to connect (disconnected)" - -- threadDelay (10 * 1000 * 1000) - -- other -> do - -- log Warning "TXMLConnector.WorkThread" $ "Stray message: " <> (T.pack . show) other - -- threadDelay (1000 * 1000) makeSubscriptions config = liftIO . sendCommand . toXml $ CommandSubscribe @@ -762,6 +746,8 @@ mkNewOrderCommand order = , nosplit = False } _ -> Nothing + _ -> Nothing + _ -> Nothing where toDirection AT.Buy = Transaq.Buy toDirection AT.Sell = Transaq.Sell