Browse Source

TXMLConnector: refactor to FSM

master
Denis Tereshkin 3 years ago
parent
commit
41c9df214a
  1. 89
      src/TXMLConnector.hs

89
src/TXMLConnector.hs

@ -112,6 +112,9 @@ import Data.Int (Int64)
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import Data.Time.Clock (UTCTime, diffUTCTime, import Data.Time.Clock (UTCTime, diffUTCTime,
getCurrentTime) getCurrentTime)
import FSM (FSMCallback (..),
FSMState (isTerminalState),
makeFsm, runFsm)
import GHC.Exts (IsList (..)) import GHC.Exts (IsList (..))
import Prelude hiding (log) import Prelude hiding (log)
import TickerInfoServer (TickerInfo (..), import TickerInfoServer (TickerInfo (..),
@ -176,6 +179,10 @@ data TXMLConnectorHandle =
data ConnectionStage = StageConnection | StageGetInfo | StageConnected | StageShutdown data ConnectionStage = StageConnection | StageGetInfo | StageConnected | StageShutdown
deriving (Eq, Show, Ord) deriving (Eq, Show, Ord)
instance FSMState ConnectionStage where
isTerminalState StageShutdown = True
isTerminalState _ = False
data MainQueueData = data MainQueueData =
MainQueueTransaqData TransaqResponse MainQueueTransaqData TransaqResponse
| MainQueueRequest Request | MainQueueRequest Request
@ -310,15 +317,10 @@ workThread = do
connStatus <- liftIO . readTVarIO $ serverConnectionState connStatus <- liftIO . readTVarIO $ serverConnectionState
pure $ connStatus /= StageShutdown pure $ connStatus /= StageShutdown
void $ whileM $ do fsm <- makeFsm StageConnection [ (StageConnection, FSMCallback handleUnconnected)
connStatus <- liftIO . readTVarIO $ serverConnectionState , (StageGetInfo, FSMCallback handleGetInfo)
case connStatus of , (StageConnected, FSMCallback handleConnected)]
StageConnection -> handleUnconnected runFsm fsm
StageGetInfo -> handleGetInfo
StageConnected -> handleConnected
StageShutdown -> pure ()
pure $ connStatus /= StageShutdown
liftIO $ freeCallback cb liftIO $ freeCallback cb
where where
parseTransaqLogLevel 1 = TXML.Warning parseTransaqLogLevel 1 = TXML.Warning
@ -345,8 +347,10 @@ workThread = do
"result" -> TransaqResponseResult <$> fromXml el "result" -> TransaqResponseResult <$> fromXml el
_ -> Nothing _ -> Nothing
writeToQueue queue resp = atomically $ writeTBQueue queue resp writeToQueue queue resp = atomically $ writeTBQueue queue resp
handleConnected :: App () handleConnected :: App (Maybe ConnectionStage)
handleConnected = do handleConnected = do
checkRequestTimeout
serverConn <- asks serverConnected serverConn <- asks serverConnected
rqVar <- asks requestVar rqVar <- asks requestVar
runVar' <- asks runVar runVar' <- asks runVar
@ -358,14 +362,17 @@ workThread = do
(MainQueueRequest <$> takeTMVar rqVar) `orElse` (MainQueueRequest <$> takeTMVar rqVar) `orElse`
(takeTMVar timerVar' >> pure MainQueuePingServer) (takeTMVar timerVar' >> pure MainQueuePingServer)
case item of case item of
MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown MainQueueShutdown -> pure $ Just StageShutdown
MainQueuePingServer -> do MainQueuePingServer -> do
maybeServerStatus <- liftIO $ sendCommand $ toXml CommandServerStatus maybeServerStatus <- liftIO $ sendCommand $ toXml CommandServerStatus
case maybeServerStatus of case maybeServerStatus of
Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of
((TransaqResponseResult (ResponseFailure _)):_) -> liftIO $ atomically $ writeTVar serverConn StageConnection ((TransaqResponseResult (ResponseFailure _)):_) -> do
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw pure $ Just StageConnection
Right () -> pure () _ -> do
log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw
pure Nothing
Right () -> pure Nothing
MainQueueTransaqData transaqData -> do MainQueueTransaqData transaqData -> do
tm <- asks tickMap tm <- asks tickMap
case transaqData of case transaqData of
@ -374,12 +381,14 @@ workThread = do
let ticks = fmap allTradeToTick trades let ticks = fmap allTradeToTick trades
forM_ ticks (liftIO . writeChan qssChan . QSSTick) forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks (insertToTickMap tm) forM_ ticks (insertToTickMap tm)
pure Nothing
TransaqResponseQuotations (ResponseQuotations quotations) -> do TransaqResponseQuotations (ResponseQuotations quotations) -> do
qssChan <- asks qssChannel qssChan <- asks qssChannel
now <- liftIO getCurrentTime now <- liftIO getCurrentTime
let ticks = concatMap (quotationToTicks now) quotations let ticks = concatMap (quotationToTicks now) quotations
forM_ ticks (liftIO . writeChan qssChan . QSSTick) forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks (insertToTickMap tm) forM_ ticks (insertToTickMap tm)
pure Nothing
TransaqResponseCandles respCandle -> do TransaqResponseCandles respCandle -> do
resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar
log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle) log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle)
@ -398,9 +407,10 @@ workThread = do
, hrMoreData = False , hrMoreData = False
} }
_ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var" _ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var"
TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder pure Nothing
TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder >> pure Nothing
_ -> pure () TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade >> pure Nothing
_ -> pure Nothing
MainQueueRequest (RequestHistory request) -> do MainQueueRequest (RequestHistory request) -> do
cur <- asks currentCandles cur <- asks currentCandles
liftIO $ atomically $ writeTVar cur [] liftIO $ atomically $ writeTVar cur []
@ -418,6 +428,7 @@ workThread = do
} }
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request
_ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request) _ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request)
pure Nothing
MainQueueRequest (RequestSubmitOrder order) -> do MainQueueRequest (RequestSubmitOrder order) -> do
case mkNewOrderCommand order of case mkNewOrderCommand order of
Just cmd -> do Just cmd -> do
@ -451,12 +462,12 @@ workThread = do
liftIO $ cb notif liftIO $ cb notif
_ -> pure () _ -> pure ()
_ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result" _ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result"
pure Nothing
Right _ -> do Right _ -> do
log Warning "TXMLConnector.WorkThread" "Expected result, got nothing" log Warning "TXMLConnector.WorkThread" "Expected result, got nothing"
_ -> pure () pure Nothing
_ -> pure () _ -> pure Nothing
_ -> pure Nothing
checkRequestTimeout
requestTimeout = 10 requestTimeout = 10
@ -542,7 +553,7 @@ workThread = do
liftIO . atomically . putTMVar tmvar $ ResponseTimeout liftIO . atomically . putTMVar tmvar $ ResponseTimeout
_ -> pure () _ -> pure ()
handleGetInfo :: App () handleGetInfo :: App (Maybe ConnectionStage)
handleGetInfo = do handleGetInfo = do
queue <- asks transaqQueue queue <- asks transaqQueue
cfg <- asks config cfg <- asks config
@ -554,52 +565,58 @@ workThread = do
case state serverStatus of case state serverStatus of
Transaq.Disconnected -> do Transaq.Disconnected -> do
log Warning "TXMLConnector.WorkThread" "Server disconnected" log Warning "TXMLConnector.WorkThread" "Server disconnected"
liftIO . atomically $ writeTVar conn StageConnection pure $ Just StageConnection
Transaq.Connected -> do Transaq.Connected -> do
log Info "TXMLConnector.WorkThread" "Server connected" log Info "TXMLConnector.WorkThread" "Server connected"
void $ liftIO . sendCommand $ toXml $ void $ liftIO . sendCommand $ toXml $
CommandChangePass (transaqPassword cfg) "goobaka12" CommandChangePass (transaqPassword cfg) "goobaka12"
liftIO . atomically $ writeTVar conn StageConnected
v <- makeSubscriptions cfg v <- makeSubscriptions cfg
case v of case v of
Left errmsg -> do Left errmsg -> do
log Warning "TXMLConnector.WorkThread" "Unable to subscribe" log Warning "TXMLConnector.WorkThread" "Unable to subscribe"
void $ liftIO . sendCommand $ toXml CommandDisconnect void $ liftIO . sendCommand $ toXml CommandDisconnect
Right _ -> log Info "TXMLConnector.WorkThread" "Subscriptions done" pure $ Just StageConnection
Right _ -> do
log Info "TXMLConnector.WorkThread" "Subscriptions done"
pure $ Just StageConnected
Transaq.Error errmsg -> do Transaq.Error errmsg -> do
log Warning "TXMLConnector.WorkThread" "Connection error" log Warning "TXMLConnector.WorkThread" "Connection error"
liftIO . atomically $ writeTVar conn StageConnection liftIO . atomically $ writeTVar conn StageConnection
void $ liftIO $ sendCommand $ toXml $ CommandDisconnect void $ liftIO $ sendCommand $ toXml $ CommandDisconnect
TransaqResponseResult result -> pure $ Just StageConnection
TransaqResponseResult result -> do
log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result
-- TODO: handle order response pure Nothing
TransaqResponseCandles candles -> TransaqResponseCandles candles -> do
log Debug "TXMLConnector.WorkThread" $ log Debug "TXMLConnector.WorkThread" $
"Incoming candles message: " <> (T.pack . show . length . Transaq.cCandles $ candles) "Incoming candles message: " <> (T.pack . show . length . Transaq.cCandles $ candles)
-- TODO: Pass to qhp pure Nothing
TransaqResponseMarkets (ResponseMarkets markets) -> do TransaqResponseMarkets (ResponseMarkets markets) -> do
log Debug "TXMLConnector.WorkThread" "Incoming markets:" log Debug "TXMLConnector.WorkThread" "Incoming markets:"
forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m) forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m)
-- TODO: Pass to qtis pure Nothing
TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do
ckMap <- asks candleKindMap ckMap <- asks candleKindMap
log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds
forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k))) forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k)))
pure Nothing
TransaqResponseSecurities (ResponseSecurities securities) -> do TransaqResponseSecurities (ResponseSecurities securities) -> do
tisH <- asks tisHandle tisH <- asks tisHandle
let tickerInfos = securityToTickerInfo <$> securities let tickerInfos = securityToTickerInfo <$> securities
log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities
forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker) forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker)
forM_ tickerInfos (liftIO . putTickerInfo tisH) forM_ tickerInfos (liftIO . putTickerInfo tisH)
TransaqResponseSecInfo secInfo -> pure Nothing
TransaqResponseSecInfo secInfo -> do
log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo
-- TODO: Pass to qtis pure Nothing
TransaqResponseClient (ResponseClient clientData) -> do TransaqResponseClient (ResponseClient clientData) -> do
log Debug "TXMLConnector.WorkThread" $ log Debug "TXMLConnector.WorkThread" $
"Incoming client data: " <> (T.pack . show) (cClientId clientData) <> "#" <> (T.pack . show) (cUnion clientData) "Incoming client data: " <> (T.pack . show) (cClientId clientData) <> "#" <> (T.pack . show) (cUnion clientData)
_ -> pure () pure Nothing
_ -> pure Nothing
handleUnconnected :: App () handleUnconnected :: App (Maybe ConnectionStage)
handleUnconnected = do handleUnconnected = do
cfg <- asks config cfg <- asks config
log Debug "TXMLConnector.WorkThread" "Sending connect command" log Debug "TXMLConnector.WorkThread" "Sending connect command"
@ -612,10 +629,10 @@ workThread = do
threadDelay reconnectionDelay threadDelay reconnectionDelay
queue <- asks transaqQueue queue <- asks transaqQueue
void $ liftIO $ atomically $ flushTBQueue queue void $ liftIO $ atomically $ flushTBQueue queue
pure Nothing
Right _ -> do Right _ -> do
log Info "TXMLConnector.WorkThread" "Connected" log Info "TXMLConnector.WorkThread" "Connected"
conn <- asks serverConnected pure $ Just StageGetInfo
liftIO . atomically $ writeTVar conn StageGetInfo
makeSubscriptions config = liftIO . sendCommand . toXml $ cmdSubscription config makeSubscriptions config = liftIO . sendCommand . toXml $ cmdSubscription config

Loading…
Cancel
Save