Browse Source

Fix reconnection

master
Denis Tereshkin 3 years ago
parent
commit
14b94cdd6f
  1. 4
      src/Main.hs
  2. 46
      src/TXMLConnector.hs

4
src/Main.hs

@ -70,10 +70,10 @@ main = do
(quotesourceEndpoint cfg) (quotesourceEndpoint cfg)
defaultServerSecurityParams) defaultServerSecurityParams)
stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do stopQuoteSourceServer $ \_ -> withTickerInfoServer logger ctx (tisEndpoint cfg) $ \tisH -> do
withZMQTradeSink ctx (tradesinkDashboard cfg) $ \tsDashboard -> withZMQTradeSink ctx (tradesinkDashboard cfg) logger $ \tsDashboard ->
case parseURI (T.unpack $ mqttUri cfg) of case parseURI (T.unpack $ mqttUri cfg) of
Just uri -> do Just uri -> do
withMQTTTradeSink uri mqttTradeSinkTopic $ \tsMqtt -> do withMQTTTradeSink uri mqttTradeSinkTopic logger $ \tsMqtt -> do
txml <- Connector.start logger cfg qssChannel tisH txml <- Connector.start logger cfg qssChannel tisH
bracket (startBrokerServer bracket (startBrokerServer
[Connector.makeBrokerBackend txml (account cfg)] [Connector.makeBrokerBackend txml (account cfg)]

46
src/TXMLConnector.hs

@ -33,8 +33,9 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar',
readTVarIO, takeTMVar, readTVarIO, takeTMVar,
tryPutTMVar, tryReadTMVar, tryPutTMVar, tryReadTMVar,
writeTVar) writeTVar)
import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue, import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue,
readTBQueue, writeTBQueue) newTBQueue, readTBQueue,
writeTBQueue)
import Control.Monad (forever, void, when) import Control.Monad (forever, void, when)
import Control.Monad.Extra (whileM) import Control.Monad.Extra (whileM)
import qualified Data.Bimap as BM import qualified Data.Bimap as BM
@ -350,9 +351,10 @@ workThread = do
runVar' <- asks runVar runVar' <- asks runVar
queue <- asks transaqQueue queue <- asks transaqQueue
timerVar' <- asks timerVar timerVar' <- asks timerVar
item <- liftIO . atomically $ (MainQueueTransaqData <$> readTBQueue queue) `orElse` item <- liftIO . atomically $
(readTMVar runVar' >> pure MainQueueShutdown) `orElse`
(MainQueueTransaqData <$> readTBQueue queue) `orElse`
(MainQueueRequest <$> takeTMVar rqVar) `orElse` (MainQueueRequest <$> takeTMVar rqVar) `orElse`
(takeTMVar runVar' >> pure MainQueueShutdown) `orElse`
(takeTMVar timerVar' >> pure MainQueuePingServer) (takeTMVar timerVar' >> pure MainQueuePingServer)
case item of case item of
MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown
@ -560,12 +562,13 @@ workThread = do
v <- makeSubscriptions cfg v <- makeSubscriptions cfg
case v of case v of
Left errmsg -> do Left errmsg -> do
log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg log Warning "TXMLConnector.WorkThread" "Unable to subscribe"
void $ liftIO . sendCommand $ toXml CommandDisconnect void $ liftIO . sendCommand $ toXml CommandDisconnect
Right _ -> log Info "TXMLConnector.WorkThread" "Subscriptions done" Right _ -> log Info "TXMLConnector.WorkThread" "Subscriptions done"
Transaq.Error errmsg -> do Transaq.Error errmsg -> do
log Warning "TXMLConnector.WorkThread" $ "Connection error: " <> errmsg log Warning "TXMLConnector.WorkThread" "Connection error"
liftIO . atomically $ writeTVar conn StageConnection liftIO . atomically $ writeTVar conn StageConnection
void $ liftIO $ sendCommand $ toXml $ CommandDisconnect
TransaqResponseResult result -> TransaqResponseResult result ->
log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result
-- TODO: handle order response -- TODO: handle order response
@ -618,35 +621,16 @@ workThread = do
pushPosEquity = Nothing pushPosEquity = Nothing
} }
case v of case v of
Left err -> do Left _ -> do
log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]" log Warning "TXMLConnector.WorkThread" "Unable to connect"
void $ liftIO $ sendCommand $ toXml $ CommandDisconnect
liftIO $ threadDelay (1000 * 1000 * 10) liftIO $ threadDelay (1000 * 1000 * 10)
queue <- asks transaqQueue
void $ liftIO $ atomically $ flushTBQueue queue
Right _ -> do Right _ -> do
log Info "TXMLConnector.WorkThread" "Connected" log Info "TXMLConnector.WorkThread" "Connected"
conn <- asks serverConnected conn <- asks serverConnected
liftIO . atomically $ writeTVar conn StageGetInfo liftIO . atomically $ writeTVar conn StageGetInfo
-- item <- atomically $ readTBQueue queue
-- case item of
-- TransaqResponseServerStatus status -> do
-- case state status of
-- Transaq.Error errmsg -> do
-- log Warning "TXMLConnector.WorkThread" $ "Unable to connect: " <> errmsg
-- void $ sendCommand $ toXml CommandDisconnect
-- threadDelay (10 * 1000 * 1000)
-- Transaq.Connected -> do
-- atomically $ writeTVar serverConnected StageGetInfo
-- -- v <- makeSubscriptions config
-- -- case v of
-- -- Left errmsg -> do
-- -- log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg
-- -- void $ sendCommand $ toXml CommandDisconnect
-- -- Right _ ->
-- Transaq.Disconnected -> do
-- log Warning "TXMLConnector.WorkThread" "Unable to connect (disconnected)"
-- threadDelay (10 * 1000 * 1000)
-- other -> do
-- log Warning "TXMLConnector.WorkThread" $ "Stray message: " <> (T.pack . show) other
-- threadDelay (1000 * 1000)
makeSubscriptions config = makeSubscriptions config =
liftIO . sendCommand . toXml $ liftIO . sendCommand . toXml $
CommandSubscribe CommandSubscribe
@ -762,6 +746,8 @@ mkNewOrderCommand order =
, nosplit = False , nosplit = False
} }
_ -> Nothing _ -> Nothing
_ -> Nothing
_ -> Nothing
where where
toDirection AT.Buy = Transaq.Buy toDirection AT.Buy = Transaq.Buy
toDirection AT.Sell = Transaq.Sell toDirection AT.Sell = Transaq.Sell

Loading…
Cancel
Save