diff --git a/src/TXMLConnector.hs b/src/TXMLConnector.hs index a77ec6a..8deb01b 100644 --- a/src/TXMLConnector.hs +++ b/src/TXMLConnector.hs @@ -34,7 +34,8 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar', newTVarIO, orElse, putTMVar, readTMVar, readTVar, readTVarIO, takeTMVar, - tryReadTMVar, writeTVar) + tryPutTMVar, tryReadTMVar, + writeTVar) import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue, readTBQueue, writeTBQueue) import Control.Monad (forever, void, when) @@ -53,6 +54,7 @@ import Transaq (AllTradesTrade (..), CommandConnect (..), CommandDisconnect (CommandDisconnect), CommandGetHistoryData (CommandGetHistoryData), + CommandServerStatus (..), CommandSubscribe (..), ConnectionState (Disconnected), Language (LanguageEn), @@ -176,6 +178,7 @@ data ConnectionStage = StageConnection | StageGetInfo | StageConnected | StageSh data MainQueueData = MainQueueTransaqData TransaqResponse | MainQueueRequest Request + | MainQueuePingServer | MainQueueShutdown deriving (Eq, Show) @@ -213,6 +216,7 @@ data Env = , candleKindMap :: TVar (M.Map Int Int) , brokerState :: BrokerState , runVar :: TMVar () + , timerVar :: TMVar () } newtype App a = App { unApp :: ReaderT Env IO a } @@ -243,6 +247,7 @@ start logger config qssChannel tisH = do orderTransactionIdMap <- newTVarIO BM.empty pendingOrders <- newTVarIO (fromList []) runVar <- newEmptyTMVarIO + timerVar <- newEmptyTMVarIO let brokerState = BrokerState { @@ -268,6 +273,7 @@ start logger config qssChannel tisH = do , candleKindMap = candleKindMap , brokerState = brokerState , runVar = runVar + , timerVar = timerVar } threadId <- forkIO $ (runReaderT . unApp) workThread env return $ TXMLConnectorHandle @@ -298,6 +304,13 @@ workThread = do Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" Just cb -> do serverConnectionState <- asks serverConnected + timerVar' <- asks timerVar + void $ liftIO $ forkIO $ whileM $ do + threadDelay 1000000 + void . liftIO . atomically $ tryPutTMVar timerVar' () + connStatus <- liftIO . readTVarIO $ serverConnectionState + pure $ connStatus /= StageShutdown + void $ whileM $ do connStatus <- liftIO . readTVarIO $ serverConnectionState case connStatus of @@ -339,11 +352,18 @@ workThread = do rqVar <- asks requestVar runVar' <- asks runVar queue <- asks transaqQueue + timerVar' <- asks timerVar item <- liftIO . atomically $ (MainQueueTransaqData <$> readTBQueue queue) `orElse` (MainQueueRequest <$> takeTMVar rqVar) `orElse` - (takeTMVar runVar' >> pure MainQueueShutdown) + (takeTMVar runVar' >> pure MainQueueShutdown) `orElse` + (takeTMVar timerVar' >> pure MainQueuePingServer) case item of MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown + MainQueuePingServer -> do + maybeServerStatus<- liftIO $ sendCommand $ toXml CommandServerStatus + case maybeServerStatus of + Left serverStatusRaw -> void $ liftIO $ parseAndWrite queue logger serverStatusRaw + Right () -> pure () MainQueueTransaqData transaqData -> do tm <- asks tickMap case transaqData of @@ -512,7 +532,8 @@ workThread = do item <- liftIO . atomically $ readTBQueue queue conn <- asks serverConnected case item of - TransaqResponseServerStatus serverStatus -> + TransaqResponseServerStatus serverStatus -> do + log Warning "TXMLConnector.WorkThread" $ "Incoming server status: " <> (T.pack . show) serverStatus case state serverStatus of Transaq.Disconnected -> do log Warning "TXMLConnector.WorkThread" "Server disconnected" diff --git a/src/Transaq.hs b/src/Transaq.hs index 7439c91..89ede1a 100644 --- a/src/Transaq.hs +++ b/src/Transaq.hs @@ -10,6 +10,7 @@ module Transaq TransaqResponse(..), SecurityId(..), CommandDisconnect(..), + CommandServerStatus(..), CommandSubscribe(..), CommandNewOrder(..), CommandCancelOrder(..), @@ -180,6 +181,13 @@ data SecurityId = , seccode :: T.Text } deriving (Show, Eq, Ord) +data CommandServerStatus = CommandServerStatus + deriving (Show, Eq, Ord) + +instance TransaqCommand CommandServerStatus where + toXml CommandServerStatus = T.pack . showElement $ unode "command" [strAttr "id" "server_status"] + + instance Node SecurityId where node n SecurityId {..} = node n [ unode "board" (T.unpack board)