diff --git a/src/Network/XMPP/Concurrent/Monad.hs b/src/Network/XMPP/Concurrent/Monad.hs index 9f5b279..d89d51e 100644 --- a/src/Network/XMPP/Concurrent/Monad.hs +++ b/src/Network/XMPP/Concurrent/Monad.hs @@ -170,7 +170,12 @@ withConnection a = do write <- asks writeRef wait <- liftIO $ newEmptyTMVarIO liftIO . Ex.mask_ $ do + -- Kills the reader until the lock (wait) is released (set to `()'). throwTo readerId $ Interrupt wait + -- We acquire the write and stateRef locks, to make sure that this is + -- the only thread that can write to the stream and to perform a + -- withConnection calculation. Afterwards, we release the lock and + -- fetches an updated state. s <- Ex.catch (atomically $ do _ <- takeTMVar write @@ -178,9 +183,12 @@ withConnection a = do putTMVar wait () return s ) + -- If we catch an exception, we have failed to take the MVars above. (\e -> atomically (putTMVar wait ()) >> - Ex.throwIO (e :: Ex.SomeException) -- No MVar taken + Ex.throwIO (e :: Ex.SomeException) ) + -- Run the XMPPMonad action, save the (possibly updated) states, release + -- the locks, and return the result. Ex.catches (do (res, s') <- runStateT a s @@ -189,7 +197,8 @@ withConnection a = do putTMVar stateRef s' return $ Right res ) - -- Ee treat all Exceptions as fatal. + -- We treat all Exceptions as fatal. If we catch a StreamError, we + -- return it. Otherwise, we throw an exception. [ Ex.Handler $ \e -> return $ Left (e :: StreamError) , Ex.Handler $ \e -> runStateT xmppKillConnection s >> Ex.throwIO (e :: Ex.SomeException) diff --git a/src/Network/XMPP/Concurrent/Threads.hs b/src/Network/XMPP/Concurrent/Threads.hs index 7d2accc..654e94e 100644 --- a/src/Network/XMPP/Concurrent/Threads.hs +++ b/src/Network/XMPP/Concurrent/Threads.hs @@ -47,7 +47,7 @@ readWorker messageC presenceC iqHands handlers stateRef = sr <- readTMVar stateRef when (sConnectionState sr == XmppConnectionClosed) retry return sr - allowInterrupt + allowInterrupt -- Allow exceptions while pulling the stanzas. Just . fst <$> runStateT pullStanza s ) [ Ex.Handler $ \(Interrupt t) -> do @@ -97,7 +97,9 @@ readWorker messageC presenceC iqHands handlers stateRef = -- While waiting for the first semaphore(s) to flip we might receive another -- interrupt. When that happens we add it's semaphore to the list and retry -- waiting. We do this because we might receive another interrupt while - -- recovering from the last one. + -- recovering from the last one. We do this because we might receive another + -- interrupt while we're waiting for a mutex to unlock; if that happens, the + -- new interrupt is added to the list and is waited for as well. handleInterrupts :: [TMVar ()] -> IO [()] handleInterrupts ts = Ex.catch (atomically $ forM ts takeTMVar) @@ -159,15 +161,21 @@ startThreads = do messageC <- newTChanIO presenceC <- newTChanIO outC <- newTChanIO - handlers <- newTVarIO ( Map.empty, Map.empty) - eh <- newTVarIO zeroEventHandlers + handlers <- newTVarIO (Map.empty, Map.empty) + eh <- newTVarIO zeroEventHandlers conS <- newTMVarIO xmppNoConnection lw <- forkIO $ writeWorker outC writeLock cp <- forkIO $ connPersist writeLock rd <- forkIO $ readWorker messageC presenceC handlers eh conS - return (messageC, presenceC, handlers, outC + return ( messageC + , presenceC + , handlers + , outC , killConnection writeLock [lw, rd, cp] - , writeLock, conS ,rd, eh) + , writeLock + , conS + , rd + , eh) where killConnection writeLock threads = liftIO $ do _ <- atomically $ takeTMVar writeLock -- Should we put it back? @@ -210,6 +218,7 @@ withNewSession a = do withSession :: Session -> XMPP a -> IO a withSession = flip runReaderT +-- Acquires the write lock, pushes a space, and releases the lock. -- | Sends a blank space every 30 seconds to keep the connection alive. connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO () connPersist lock = forever $ do