diff --git a/src/TXMLConnector.hs b/src/TXMLConnector.hs index 199e81b..133987e 100644 --- a/src/TXMLConnector.hs +++ b/src/TXMLConnector.hs @@ -112,6 +112,9 @@ import Data.Int (Int64) import qualified Data.Map.Strict as M import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime) +import FSM (FSMCallback (..), + FSMState (isTerminalState), + makeFsm, runFsm) import GHC.Exts (IsList (..)) import Prelude hiding (log) import TickerInfoServer (TickerInfo (..), @@ -176,6 +179,10 @@ data TXMLConnectorHandle = data ConnectionStage = StageConnection | StageGetInfo | StageConnected | StageShutdown deriving (Eq, Show, Ord) +instance FSMState ConnectionStage where + isTerminalState StageShutdown = True + isTerminalState _ = False + data MainQueueData = MainQueueTransaqData TransaqResponse | MainQueueRequest Request @@ -310,15 +317,10 @@ workThread = do connStatus <- liftIO . readTVarIO $ serverConnectionState pure $ connStatus /= StageShutdown - void $ whileM $ do - connStatus <- liftIO . readTVarIO $ serverConnectionState - case connStatus of - StageConnection -> handleUnconnected - StageGetInfo -> handleGetInfo - StageConnected -> handleConnected - StageShutdown -> pure () - pure $ connStatus /= StageShutdown - + fsm <- makeFsm StageConnection [ (StageConnection, FSMCallback handleUnconnected) + , (StageGetInfo, FSMCallback handleGetInfo) + , (StageConnected, FSMCallback handleConnected)] + runFsm fsm liftIO $ freeCallback cb where parseTransaqLogLevel 1 = TXML.Warning @@ -345,8 +347,10 @@ workThread = do "result" -> TransaqResponseResult <$> fromXml el _ -> Nothing writeToQueue queue resp = atomically $ writeTBQueue queue resp - handleConnected :: App () + handleConnected :: App (Maybe ConnectionStage) handleConnected = do + checkRequestTimeout + serverConn <- asks serverConnected rqVar <- asks requestVar runVar' <- asks runVar @@ -358,14 +362,17 @@ workThread = do (MainQueueRequest <$> takeTMVar rqVar) `orElse` (takeTMVar timerVar' >> pure MainQueuePingServer) case item of - MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown + MainQueueShutdown -> pure $ Just StageShutdown MainQueuePingServer -> do maybeServerStatus <- liftIO $ sendCommand $ toXml CommandServerStatus case maybeServerStatus of Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of - ((TransaqResponseResult (ResponseFailure _)):_) -> liftIO $ atomically $ writeTVar serverConn StageConnection - _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw - Right () -> pure () + ((TransaqResponseResult (ResponseFailure _)):_) -> do + pure $ Just StageConnection + _ -> do + log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw + pure Nothing + Right () -> pure Nothing MainQueueTransaqData transaqData -> do tm <- asks tickMap case transaqData of @@ -374,12 +381,14 @@ workThread = do let ticks = fmap allTradeToTick trades forM_ ticks (liftIO . writeChan qssChan . QSSTick) forM_ ticks (insertToTickMap tm) + pure Nothing TransaqResponseQuotations (ResponseQuotations quotations) -> do qssChan <- asks qssChannel now <- liftIO getCurrentTime let ticks = concatMap (quotationToTicks now) quotations forM_ ticks (liftIO . writeChan qssChan . QSSTick) forM_ ticks (insertToTickMap tm) + pure Nothing TransaqResponseCandles respCandle -> do resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle) @@ -398,9 +407,10 @@ workThread = do , hrMoreData = False } _ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var" - TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder - TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade - _ -> pure () + pure Nothing + TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder >> pure Nothing + TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade >> pure Nothing + _ -> pure Nothing MainQueueRequest (RequestHistory request) -> do cur <- asks currentCandles liftIO $ atomically $ writeTVar cur [] @@ -418,6 +428,7 @@ workThread = do } _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request _ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request) + pure Nothing MainQueueRequest (RequestSubmitOrder order) -> do case mkNewOrderCommand order of Just cmd -> do @@ -451,12 +462,12 @@ workThread = do liftIO $ cb notif _ -> pure () _ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result" + pure Nothing Right _ -> do log Warning "TXMLConnector.WorkThread" "Expected result, got nothing" - _ -> pure () - _ -> pure () - - checkRequestTimeout + pure Nothing + _ -> pure Nothing + _ -> pure Nothing requestTimeout = 10 @@ -542,7 +553,7 @@ workThread = do liftIO . atomically . putTMVar tmvar $ ResponseTimeout _ -> pure () - handleGetInfo :: App () + handleGetInfo :: App (Maybe ConnectionStage) handleGetInfo = do queue <- asks transaqQueue cfg <- asks config @@ -554,52 +565,58 @@ workThread = do case state serverStatus of Transaq.Disconnected -> do log Warning "TXMLConnector.WorkThread" "Server disconnected" - liftIO . atomically $ writeTVar conn StageConnection + pure $ Just StageConnection Transaq.Connected -> do log Info "TXMLConnector.WorkThread" "Server connected" void $ liftIO . sendCommand $ toXml $ CommandChangePass (transaqPassword cfg) "goobaka12" - liftIO . atomically $ writeTVar conn StageConnected v <- makeSubscriptions cfg case v of Left errmsg -> do log Warning "TXMLConnector.WorkThread" "Unable to subscribe" void $ liftIO . sendCommand $ toXml CommandDisconnect - Right _ -> log Info "TXMLConnector.WorkThread" "Subscriptions done" + pure $ Just StageConnection + Right _ -> do + log Info "TXMLConnector.WorkThread" "Subscriptions done" + pure $ Just StageConnected Transaq.Error errmsg -> do log Warning "TXMLConnector.WorkThread" "Connection error" liftIO . atomically $ writeTVar conn StageConnection void $ liftIO $ sendCommand $ toXml $ CommandDisconnect - TransaqResponseResult result -> + pure $ Just StageConnection + TransaqResponseResult result -> do log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result - -- TODO: handle order response - TransaqResponseCandles candles -> + pure Nothing + TransaqResponseCandles candles -> do log Debug "TXMLConnector.WorkThread" $ "Incoming candles message: " <> (T.pack . show . length . Transaq.cCandles $ candles) - -- TODO: Pass to qhp + pure Nothing TransaqResponseMarkets (ResponseMarkets markets) -> do log Debug "TXMLConnector.WorkThread" "Incoming markets:" forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m) - -- TODO: Pass to qtis + pure Nothing TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do ckMap <- asks candleKindMap log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k))) + pure Nothing TransaqResponseSecurities (ResponseSecurities securities) -> do tisH <- asks tisHandle let tickerInfos = securityToTickerInfo <$> securities log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker) forM_ tickerInfos (liftIO . putTickerInfo tisH) - TransaqResponseSecInfo secInfo -> + pure Nothing + TransaqResponseSecInfo secInfo -> do log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo - -- TODO: Pass to qtis + pure Nothing TransaqResponseClient (ResponseClient clientData) -> do log Debug "TXMLConnector.WorkThread" $ "Incoming client data: " <> (T.pack . show) (cClientId clientData) <> "#" <> (T.pack . show) (cUnion clientData) - _ -> pure () + pure Nothing + _ -> pure Nothing - handleUnconnected :: App () + handleUnconnected :: App (Maybe ConnectionStage) handleUnconnected = do cfg <- asks config log Debug "TXMLConnector.WorkThread" "Sending connect command" @@ -612,10 +629,10 @@ workThread = do threadDelay reconnectionDelay queue <- asks transaqQueue void $ liftIO $ atomically $ flushTBQueue queue + pure Nothing Right _ -> do log Info "TXMLConnector.WorkThread" "Connected" - conn <- asks serverConnected - liftIO . atomically $ writeTVar conn StageGetInfo + pure $ Just StageGetInfo makeSubscriptions config = liftIO . sendCommand . toXml $ cmdSubscription config