From f0d558a5281d9808509b2cf1e942e5e482b41694 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sat, 1 Jun 2013 17:50:21 +0200 Subject: [PATCH] reinstate withConnection and fix closeConnection --- source/Network/Xmpp/Concurrent/Monad.hs | 105 ++++++++++++------------ 1 file changed, 52 insertions(+), 53 deletions(-) diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs index 06c0711..858dce8 100644 --- a/source/Network/Xmpp/Concurrent/Monad.hs +++ b/source/Network/Xmpp/Concurrent/Monad.hs @@ -2,61 +2,63 @@ {-# LANGUAGE OverloadedStrings #-} module Network.Xmpp.Concurrent.Monad where -import Network.Xmpp.Types - +import Control.Concurrent import Control.Concurrent.STM import qualified Control.Exception.Lifted as Ex import Control.Monad.Reader - +import Control.Monad.State import Network.Xmpp.Concurrent.Types import Network.Xmpp.Stream - - +import Network.Xmpp.Types -- TODO: Wait for presence error? --- -- | Run an XmppConMonad action in isolation. Reader and writer workers will be --- -- temporarily stopped and resumed with the new session details once the action --- -- returns. The action will run in the calling thread. Any uncaught exceptions --- -- will be interpreted as connection failure. +-- | Run an XmppConMonad action in isolation. Reader and writer workers will be +-- temporarily stopped and resumed with the new session details once the action +-- returns. The action will run in the calling thread. Any uncaught exceptions +-- will be interpreted as connection failure. -- withConnection :: XmppConMonad a -> Context -> IO (Either StreamError a) --- withConnection a session = do --- wait <- newEmptyTMVarIO --- Ex.mask_ $ do --- -- Suspends the reader until the lock (wait) is released (set to `()'). --- throwTo (readerThread session) $ Interrupt wait --- -- We acquire the write and stateRef locks, to make sure that this is --- -- the only thread that can write to the stream and to perform a --- -- withConnection calculation. Afterwards, we release the lock and --- -- fetches an updated state. --- s <- Ex.catch --- (atomically $ do --- _ <- takeTMVar (writeRef session) --- s <- takeTMVar (conStateRef session) --- putTMVar wait () --- return s --- ) --- -- If we catch an exception, we have failed to take the MVars above. --- (\e -> atomically (putTMVar wait ()) >> --- Ex.throwIO (e :: Ex.SomeException) --- ) --- -- Run the XmppMonad action, save the (possibly updated) states, release --- -- the locks, and return the result. --- Ex.catches --- (do --- (res, s') <- runStateT a s --- atomically $ do --- putTMVar (writeRef session) (cSend . sCon $ s') --- putTMVar (conStateRef session) s' --- return $ Right res --- ) --- -- We treat all Exceptions as fatal. If we catch a StreamError, we --- -- return it. Otherwise, we throw an exception. --- [ Ex.Handler $ \e -> return $ Left (e :: StreamError) --- , Ex.Handler $ \e -> runStateT xmppKillConnection s --- >> Ex.throwIO (e :: Ex.SomeException) --- ] +withConnection :: (Stream -> IO (b, Stream)) + -> Session + -> IO (Either XmppFailure b) +withConnection a session = do + wait <- newEmptyTMVarIO + Ex.mask_ $ do + -- Suspends the reader until the lock (wait) is released (set to `()'). + throwTo (readerThread session) $ Interrupt wait + -- We acquire the write and stateRef locks, to make sure that this is + -- the only thread that can write to the stream and to perform a + -- withConnection calculation. Afterwards, we release the lock and + -- fetches an updated state. + s <- Ex.catch + (atomically $ do + _ <- takeTMVar (writeRef session) + s <- takeTMVar (streamRef session) + putTMVar wait () + return s + ) + -- If we catch an exception, we have failed to take the MVars above. + (\e -> atomically (putTMVar wait ()) >> + Ex.throwIO (e :: Ex.SomeException) + ) + -- Run the XmppMonad action, save the (possibly updated) states, release + -- the locks, and return the result. + Ex.catches + (do + (res, s') <- a s + wl <- withStream' (gets $ streamSend . streamHandle) s' + atomically $ do + putTMVar (writeRef session) wl + putTMVar (streamRef session) s' + return $ Right res + ) + -- We treat all Exceptions as fatal. If we catch a StreamError, we + -- return it. Otherwise, we throw an exception. + [ Ex.Handler $ \e -> return $ Left (e :: XmppFailure) + , Ex.Handler $ \e -> killStream s + >> Ex.throwIO (e :: Ex.SomeException) + ] -- | Executes a function to update the event handlers. modifyHandlers :: (EventHandlers -> EventHandlers) -> Session -> IO () @@ -86,16 +88,13 @@ runHandler h session = h =<< atomically (readTVar $ eventHandlers session) -- | End the current Xmpp session. endSession :: Session -> IO () endSession session = do -- TODO: This has to be idempotent (is it?) - closeConnection session + _ <- closeConnection session stopThreads session -- | Close the connection to the server. Closes the stream (by enforcing a -- write lock and sending a element), waits (blocks) for three -- seconds, and then closes the connection. -closeConnection :: Session -> IO () -closeConnection session = Ex.mask_ $ do - (_send, connection) <- atomically $ liftM2 (,) - (takeTMVar $ writeRef session) - (takeTMVar $ streamRef session) - _ <- closeStreams connection - return () +closeConnection :: Session -> IO (Either XmppFailure ()) +closeConnection = withConnection $ \stream -> do + _ <- closeStreams stream + return ((), stream)