diff --git a/src/TXMLConnector.hs b/src/TXMLConnector.hs index 542d901..a77ec6a 100644 --- a/src/TXMLConnector.hs +++ b/src/TXMLConnector.hs @@ -9,6 +9,7 @@ module TXMLConnector ( start + , TXMLConnector.stop , Request(..) , HistoryRequest(..) , Response(..) @@ -37,6 +38,7 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar', import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue, readTBQueue, writeTBQueue) import Control.Monad (forever, void, when) +import Control.Monad.Extra (whileM) import qualified Data.Bimap as BM import Data.Maybe (mapMaybe) import qualified Data.Text as T @@ -165,14 +167,16 @@ data TXMLConnectorHandle = , hResponseVar :: TMVar (TMVar Response) , hRequestTimestamp :: TVar UTCTime , hNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ())) + , hRunVar :: TMVar () } -data ConnectionStage = StageConnection | StageGetInfo | StageConnected +data ConnectionStage = StageConnection | StageGetInfo | StageConnected | StageShutdown deriving (Eq, Show, Ord) data MainQueueData = MainQueueTransaqData TransaqResponse | MainQueueRequest Request + | MainQueueShutdown deriving (Eq, Show) data TickKey = TickKey TickerId DataType @@ -208,6 +212,7 @@ data Env = , serverConnected :: TVar ConnectionStage , candleKindMap :: TVar (M.Map Int Int) , brokerState :: BrokerState + , runVar :: TMVar () } newtype App a = App { unApp :: ReaderT Env IO a } @@ -237,6 +242,7 @@ start logger config qssChannel tisH = do notificationCallback <- newTVarIO Nothing orderTransactionIdMap <- newTVarIO BM.empty pendingOrders <- newTVarIO (fromList []) + runVar <- newEmptyTMVarIO let brokerState = BrokerState { @@ -261,6 +267,7 @@ start logger config qssChannel tisH = do , serverConnected = serverConnected , candleKindMap = candleKindMap , brokerState = brokerState + , runVar = runVar } threadId <- forkIO $ (runReaderT . unApp) workThread env return $ TXMLConnectorHandle @@ -271,8 +278,12 @@ start logger config qssChannel tisH = do , hResponseVar = responseVar , hRequestTimestamp = requestTimestamp , hNotificationCallback = notificationCallback + , hRunVar = runVar } +stop :: TXMLConnectorHandle -> IO () +stop h = atomically $ putTMVar (hRunVar h) () + workThread :: App () workThread = do cfg <- asks config @@ -286,12 +297,16 @@ workThread = do case rc of Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" Just cb -> do - void $ forever $ do - connStatus <- asks serverConnected >>= (liftIO . readTVarIO) + serverConnectionState <- asks serverConnected + void $ whileM $ do + connStatus <- liftIO . readTVarIO $ serverConnectionState case connStatus of StageConnection -> handleUnconnected StageGetInfo -> handleGetInfo StageConnected -> handleConnected + StageShutdown -> pure () + pure $ connStatus /= StageShutdown + liftIO $ freeCallback cb where parseTransaqLogLevel 1 = TXML.Warning @@ -299,7 +314,6 @@ workThread = do parseTransaqLogLevel _ = TXML.Info parseAndWrite queue logger xml = do let parsed = mapMaybe parseContent $ parseXML xml - logWith logger Debug "TXML.Callback" $ "Parsed entities: " <> (T.pack . show . length) parsed mapM_ (writeToQueue queue) parsed pure True parseContent (Elem el) = parseElement el @@ -321,11 +335,15 @@ workThread = do writeToQueue queue resp = atomically $ writeTBQueue queue resp handleConnected :: App () handleConnected = do + serverConn <- asks serverConnected rqVar <- asks requestVar + runVar' <- asks runVar queue <- asks transaqQueue item <- liftIO . atomically $ (MainQueueTransaqData <$> readTBQueue queue) `orElse` - (MainQueueRequest <$> takeTMVar rqVar) + (MainQueueRequest <$> takeTMVar rqVar) `orElse` + (takeTMVar runVar' >> pure MainQueueShutdown) case item of + MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown MainQueueTransaqData transaqData -> do tm <- asks tickMap case transaqData of @@ -569,6 +587,7 @@ workThread = do log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]" liftIO $ threadDelay (1000 * 1000 * 10) Right _ -> do + log Warning "TXMLConnector.WorkThread" "Connected" conn <- asks serverConnected liftIO . atomically $ writeTVar conn StageGetInfo -- item <- atomically $ readTBQueue queue @@ -737,5 +756,5 @@ brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend makeBrokerBackend h account = - BrokerBackend [account] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (pure ()) + BrokerBackend [account] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h)