Browse Source

documentation clarifications

master
Jon Kristensen 14 years ago
parent
commit
4db30a67bb
  1. 13
      src/Network/XMPP/Concurrent/Monad.hs
  2. 19
      src/Network/XMPP/Concurrent/Threads.hs

13
src/Network/XMPP/Concurrent/Monad.hs

@ -170,7 +170,12 @@ withConnection a = do
write <- asks writeRef write <- asks writeRef
wait <- liftIO $ newEmptyTMVarIO wait <- liftIO $ newEmptyTMVarIO
liftIO . Ex.mask_ $ do liftIO . Ex.mask_ $ do
-- Kills the reader until the lock (wait) is released (set to `()').
throwTo readerId $ Interrupt wait throwTo readerId $ Interrupt wait
-- We acquire the write and stateRef locks, to make sure that this is
-- the only thread that can write to the stream and to perform a
-- withConnection calculation. Afterwards, we release the lock and
-- fetches an updated state.
s <- Ex.catch s <- Ex.catch
(atomically $ do (atomically $ do
_ <- takeTMVar write _ <- takeTMVar write
@ -178,9 +183,12 @@ withConnection a = do
putTMVar wait () putTMVar wait ()
return s return s
) )
-- If we catch an exception, we have failed to take the MVars above.
(\e -> atomically (putTMVar wait ()) >> (\e -> atomically (putTMVar wait ()) >>
Ex.throwIO (e :: Ex.SomeException) -- No MVar taken Ex.throwIO (e :: Ex.SomeException)
) )
-- Run the XMPPMonad action, save the (possibly updated) states, release
-- the locks, and return the result.
Ex.catches Ex.catches
(do (do
(res, s') <- runStateT a s (res, s') <- runStateT a s
@ -189,7 +197,8 @@ withConnection a = do
putTMVar stateRef s' putTMVar stateRef s'
return $ Right res return $ Right res
) )
-- Ee treat all Exceptions as fatal. -- We treat all Exceptions as fatal. If we catch a StreamError, we
-- return it. Otherwise, we throw an exception.
[ Ex.Handler $ \e -> return $ Left (e :: StreamError) [ Ex.Handler $ \e -> return $ Left (e :: StreamError)
, Ex.Handler $ \e -> runStateT xmppKillConnection s , Ex.Handler $ \e -> runStateT xmppKillConnection s
>> Ex.throwIO (e :: Ex.SomeException) >> Ex.throwIO (e :: Ex.SomeException)

19
src/Network/XMPP/Concurrent/Threads.hs

@ -47,7 +47,7 @@ readWorker messageC presenceC iqHands handlers stateRef =
sr <- readTMVar stateRef sr <- readTMVar stateRef
when (sConnectionState sr == XmppConnectionClosed) retry when (sConnectionState sr == XmppConnectionClosed) retry
return sr return sr
allowInterrupt allowInterrupt -- Allow exceptions while pulling the stanzas.
Just . fst <$> runStateT pullStanza s Just . fst <$> runStateT pullStanza s
) )
[ Ex.Handler $ \(Interrupt t) -> do [ Ex.Handler $ \(Interrupt t) -> do
@ -97,7 +97,9 @@ readWorker messageC presenceC iqHands handlers stateRef =
-- While waiting for the first semaphore(s) to flip we might receive another -- While waiting for the first semaphore(s) to flip we might receive another
-- interrupt. When that happens we add it's semaphore to the list and retry -- interrupt. When that happens we add it's semaphore to the list and retry
-- waiting. We do this because we might receive another interrupt while -- waiting. We do this because we might receive another interrupt while
-- recovering from the last one. -- recovering from the last one. We do this because we might receive another
-- interrupt while we're waiting for a mutex to unlock; if that happens, the
-- new interrupt is added to the list and is waited for as well.
handleInterrupts :: [TMVar ()] -> IO [()] handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts ts = handleInterrupts ts =
Ex.catch (atomically $ forM ts takeTMVar) Ex.catch (atomically $ forM ts takeTMVar)
@ -159,15 +161,21 @@ startThreads = do
messageC <- newTChanIO messageC <- newTChanIO
presenceC <- newTChanIO presenceC <- newTChanIO
outC <- newTChanIO outC <- newTChanIO
handlers <- newTVarIO ( Map.empty, Map.empty) handlers <- newTVarIO (Map.empty, Map.empty)
eh <- newTVarIO zeroEventHandlers eh <- newTVarIO zeroEventHandlers
conS <- newTMVarIO xmppNoConnection conS <- newTMVarIO xmppNoConnection
lw <- forkIO $ writeWorker outC writeLock lw <- forkIO $ writeWorker outC writeLock
cp <- forkIO $ connPersist writeLock cp <- forkIO $ connPersist writeLock
rd <- forkIO $ readWorker messageC presenceC handlers eh conS rd <- forkIO $ readWorker messageC presenceC handlers eh conS
return (messageC, presenceC, handlers, outC return ( messageC
, presenceC
, handlers
, outC
, killConnection writeLock [lw, rd, cp] , killConnection writeLock [lw, rd, cp]
, writeLock, conS ,rd, eh) , writeLock
, conS
, rd
, eh)
where where
killConnection writeLock threads = liftIO $ do killConnection writeLock threads = liftIO $ do
_ <- atomically $ takeTMVar writeLock -- Should we put it back? _ <- atomically $ takeTMVar writeLock -- Should we put it back?
@ -210,6 +218,7 @@ withNewSession a = do
withSession :: Session -> XMPP a -> IO a withSession :: Session -> XMPP a -> IO a
withSession = flip runReaderT withSession = flip runReaderT
-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every 30 seconds to keep the connection alive. -- | Sends a blank space every 30 seconds to keep the connection alive.
connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO () connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO ()
connPersist lock = forever $ do connPersist lock = forever $ do

Loading…
Cancel
Save