Browse Source

TXMLConnector: handle timeout

master
Denis Tereshkin 3 years ago
parent
commit
c59dc4cff2
  1. 27
      src/TXMLConnector.hs
  2. 8
      src/Transaq.hs

27
src/TXMLConnector.hs

@ -34,7 +34,8 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar',
newTVarIO, orElse, putTMVar, newTVarIO, orElse, putTMVar,
readTMVar, readTVar, readTMVar, readTVar,
readTVarIO, takeTMVar, readTVarIO, takeTMVar,
tryReadTMVar, writeTVar) tryPutTMVar, tryReadTMVar,
writeTVar)
import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue, import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue,
readTBQueue, writeTBQueue) readTBQueue, writeTBQueue)
import Control.Monad (forever, void, when) import Control.Monad (forever, void, when)
@ -53,6 +54,7 @@ import Transaq (AllTradesTrade (..),
CommandConnect (..), CommandConnect (..),
CommandDisconnect (CommandDisconnect), CommandDisconnect (CommandDisconnect),
CommandGetHistoryData (CommandGetHistoryData), CommandGetHistoryData (CommandGetHistoryData),
CommandServerStatus (..),
CommandSubscribe (..), CommandSubscribe (..),
ConnectionState (Disconnected), ConnectionState (Disconnected),
Language (LanguageEn), Language (LanguageEn),
@ -176,6 +178,7 @@ data ConnectionStage = StageConnection | StageGetInfo | StageConnected | StageSh
data MainQueueData = data MainQueueData =
MainQueueTransaqData TransaqResponse MainQueueTransaqData TransaqResponse
| MainQueueRequest Request | MainQueueRequest Request
| MainQueuePingServer
| MainQueueShutdown | MainQueueShutdown
deriving (Eq, Show) deriving (Eq, Show)
@ -213,6 +216,7 @@ data Env =
, candleKindMap :: TVar (M.Map Int Int) , candleKindMap :: TVar (M.Map Int Int)
, brokerState :: BrokerState , brokerState :: BrokerState
, runVar :: TMVar () , runVar :: TMVar ()
, timerVar :: TMVar ()
} }
newtype App a = App { unApp :: ReaderT Env IO a } newtype App a = App { unApp :: ReaderT Env IO a }
@ -243,6 +247,7 @@ start logger config qssChannel tisH = do
orderTransactionIdMap <- newTVarIO BM.empty orderTransactionIdMap <- newTVarIO BM.empty
pendingOrders <- newTVarIO (fromList []) pendingOrders <- newTVarIO (fromList [])
runVar <- newEmptyTMVarIO runVar <- newEmptyTMVarIO
timerVar <- newEmptyTMVarIO
let brokerState = let brokerState =
BrokerState BrokerState
{ {
@ -268,6 +273,7 @@ start logger config qssChannel tisH = do
, candleKindMap = candleKindMap , candleKindMap = candleKindMap
, brokerState = brokerState , brokerState = brokerState
, runVar = runVar , runVar = runVar
, timerVar = timerVar
} }
threadId <- forkIO $ (runReaderT . unApp) workThread env threadId <- forkIO $ (runReaderT . unApp) workThread env
return $ TXMLConnectorHandle return $ TXMLConnectorHandle
@ -298,6 +304,13 @@ workThread = do
Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback"
Just cb -> do Just cb -> do
serverConnectionState <- asks serverConnected serverConnectionState <- asks serverConnected
timerVar' <- asks timerVar
void $ liftIO $ forkIO $ whileM $ do
threadDelay 1000000
void . liftIO . atomically $ tryPutTMVar timerVar' ()
connStatus <- liftIO . readTVarIO $ serverConnectionState
pure $ connStatus /= StageShutdown
void $ whileM $ do void $ whileM $ do
connStatus <- liftIO . readTVarIO $ serverConnectionState connStatus <- liftIO . readTVarIO $ serverConnectionState
case connStatus of case connStatus of
@ -339,11 +352,18 @@ workThread = do
rqVar <- asks requestVar rqVar <- asks requestVar
runVar' <- asks runVar runVar' <- asks runVar
queue <- asks transaqQueue queue <- asks transaqQueue
timerVar' <- asks timerVar
item <- liftIO . atomically $ (MainQueueTransaqData <$> readTBQueue queue) `orElse` item <- liftIO . atomically $ (MainQueueTransaqData <$> readTBQueue queue) `orElse`
(MainQueueRequest <$> takeTMVar rqVar) `orElse` (MainQueueRequest <$> takeTMVar rqVar) `orElse`
(takeTMVar runVar' >> pure MainQueueShutdown) (takeTMVar runVar' >> pure MainQueueShutdown) `orElse`
(takeTMVar timerVar' >> pure MainQueuePingServer)
case item of case item of
MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown
MainQueuePingServer -> do
maybeServerStatus<- liftIO $ sendCommand $ toXml CommandServerStatus
case maybeServerStatus of
Left serverStatusRaw -> void $ liftIO $ parseAndWrite queue logger serverStatusRaw
Right () -> pure ()
MainQueueTransaqData transaqData -> do MainQueueTransaqData transaqData -> do
tm <- asks tickMap tm <- asks tickMap
case transaqData of case transaqData of
@ -512,7 +532,8 @@ workThread = do
item <- liftIO . atomically $ readTBQueue queue item <- liftIO . atomically $ readTBQueue queue
conn <- asks serverConnected conn <- asks serverConnected
case item of case item of
TransaqResponseServerStatus serverStatus -> TransaqResponseServerStatus serverStatus -> do
log Warning "TXMLConnector.WorkThread" $ "Incoming server status: " <> (T.pack . show) serverStatus
case state serverStatus of case state serverStatus of
Transaq.Disconnected -> do Transaq.Disconnected -> do
log Warning "TXMLConnector.WorkThread" "Server disconnected" log Warning "TXMLConnector.WorkThread" "Server disconnected"

8
src/Transaq.hs

@ -10,6 +10,7 @@ module Transaq
TransaqResponse(..), TransaqResponse(..),
SecurityId(..), SecurityId(..),
CommandDisconnect(..), CommandDisconnect(..),
CommandServerStatus(..),
CommandSubscribe(..), CommandSubscribe(..),
CommandNewOrder(..), CommandNewOrder(..),
CommandCancelOrder(..), CommandCancelOrder(..),
@ -180,6 +181,13 @@ data SecurityId =
, seccode :: T.Text , seccode :: T.Text
} deriving (Show, Eq, Ord) } deriving (Show, Eq, Ord)
data CommandServerStatus = CommandServerStatus
deriving (Show, Eq, Ord)
instance TransaqCommand CommandServerStatus where
toXml CommandServerStatus = T.pack . showElement $ unode "command" [strAttr "id" "server_status"]
instance Node SecurityId where instance Node SecurityId where
node n SecurityId {..} = node n node n SecurityId {..} = node n
[ unode "board" (T.unpack board) [ unode "board" (T.unpack board)

Loading…
Cancel
Save