From 8430e7419573ccf428b934c07c7620387440b8b7 Mon Sep 17 00:00:00 2001 From: Jon Kristensen Date: Thu, 10 May 2012 01:31:08 +0200 Subject: [PATCH] minor formatting and documentation additions --- src/Network/XMPP/Concurrent/Monad.hs | 2 +- src/Network/XMPP/Concurrent/Threads.hs | 257 +++++++++++++------------ 2 files changed, 139 insertions(+), 120 deletions(-) diff --git a/src/Network/XMPP/Concurrent/Monad.hs b/src/Network/XMPP/Concurrent/Monad.hs index 299d39e..9f5b279 100644 --- a/src/Network/XMPP/Concurrent/Monad.hs +++ b/src/Network/XMPP/Concurrent/Monad.hs @@ -75,7 +75,7 @@ dropMessageChan = do r <- asks messagesRef liftIO $ writeIORef r Nothing --- | Abakigiys to 'dropMessageChan'. +-- | Analogous to 'dropMessageChan'. dropPresenceChan :: XMPP () dropPresenceChan = do r <- asks presenceRef diff --git a/src/Network/XMPP/Concurrent/Threads.hs b/src/Network/XMPP/Concurrent/Threads.hs index 3109771..7d2accc 100644 --- a/src/Network/XMPP/Concurrent/Threads.hs +++ b/src/Network/XMPP/Concurrent/Threads.hs @@ -29,6 +29,8 @@ import Text.XML.Stream.Elements import GHC.IO (unsafeUnmask) +-- Worker to read stanzas from the stream and concurrently distribute them to +-- all listener threads. readWorker :: TChan (Either MessageError Message) -> TChan (Either PresenceError Presence) -> TVar IQHandlers @@ -36,138 +38,141 @@ readWorker :: TChan (Either MessageError Message) -> TMVar XmppConnection -> IO () readWorker messageC presenceC iqHands handlers stateRef = - Ex.mask_ . forever $ do - res <- liftIO $ Ex.catches ( do - -- we don't know whether pull will - -- necessarily be interruptible - s <- liftIO . atomically $ do - sr <- readTMVar stateRef - when (sConnectionState sr == XmppConnectionClosed) - retry - return sr - allowInterrupt - Just . fst <$> runStateT pullStanza s - ) - [ Ex.Handler $ \(Interrupt t) -> do - void $ handleInterrupts [t] - return Nothing - , Ex.Handler $ \e -> noCon handlers (e :: StreamError) - ] + Ex.mask_ . forever $ do -- Suppress exceptions for the time-being. + res <- liftIO $ Ex.catches + (do + -- We don't know whether pull will necessarily be. + -- interruptible. TODO: Will this matter? + s <- liftIO . atomically $ do + sr <- readTMVar stateRef + when (sConnectionState sr == XmppConnectionClosed) retry + return sr + allowInterrupt + Just . fst <$> runStateT pullStanza s + ) + [ Ex.Handler $ \(Interrupt t) -> do + void $ handleInterrupts [t] + return Nothing + , Ex.Handler $ \e -> noCon handlers (e :: StreamError) + ] liftIO . atomically $ do - case res of - Nothing -> return () - Just sta -> do - case sta of - MessageS m -> do writeTChan messageC $ Right m - _ <- readTChan messageC -- Sic! - return () - -- this may seem ridiculous, but to prevent - -- the channel from filling up we - -- immedtiately remove the - -- Stanza we just put in. It will still be - -- available in duplicates. - MessageErrorS m -> do writeTChan messageC $ Left m - _ <- readTChan messageC - return () - PresenceS p -> do - writeTChan presenceC $ Right p - _ <- readTChan presenceC - return () - PresenceErrorS p -> do - writeTChan presenceC $ Left p - _ <- readTChan presenceC - return () - - IQRequestS i -> handleIQRequest iqHands i - IQResultS i -> handleIQResponse iqHands (Right i) - IQErrorS i -> handleIQResponse iqHands (Left i) + case res of + Nothing -> return () + Just sta -> do + case sta of + MessageS m -> do + writeTChan messageC $ Right m + _ <- readTChan messageC -- Sic! + return () + -- This may seem ridiculous, but to prevent the + -- channel from filling up we immedtiately remove the + -- Stanza we just put in. It will still be available + -- in duplicates. + MessageErrorS m -> do + writeTChan messageC $ Left m + _ <- readTChan messageC + return () + PresenceS p -> do + writeTChan presenceC $ Right p + _ <- readTChan presenceC + return () + PresenceErrorS p -> do + writeTChan presenceC $ Left p + _ <- readTChan presenceC + return () + IQRequestS i -> handleIQRequest iqHands i + IQResultS i -> handleIQResponse iqHands (Right i) + IQErrorS i -> handleIQResponse iqHands (Left i) where - -- Defining an Control.Exception.allowInterrupt equivalent for - -- GHC 7 compatibility. + -- Defining an Control.Exception.allowInterrupt equivalent for GHC 7 + -- compatibility. allowInterrupt :: IO () allowInterrupt = unsafeUnmask $ return () + -- Call the connection closed handlers. noCon :: TVar EventHandlers -> StreamError -> IO (Maybe a) noCon h e = do hands <- atomically $ readTVar h _ <- forkIO $ connectionClosedHandler hands e return Nothing - -- While waiting for the first semaphore(s) to flip we might receive - -- another interrupt. When that happens we add it's semaphore to the - -- list and retry waiting + -- While waiting for the first semaphore(s) to flip we might receive another + -- interrupt. When that happens we add it's semaphore to the list and retry + -- waiting. We do this because we might receive another interrupt while + -- recovering from the last one. handleInterrupts :: [TMVar ()] -> IO [()] handleInterrupts ts = Ex.catch (atomically $ forM ts takeTMVar) - ( \(Interrupt t) -> handleInterrupts (t:ts)) + (\(Interrupt t) -> handleInterrupts (t:ts)) +-- If the IQ request has a namespace, sent it through the appropriate channel. handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () handleIQRequest handlers iq = do - (byNS, _) <- readTVar handlers - let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq) - case Map.lookup (iqRequestType iq, iqNS) byNS of - Nothing -> return () -- TODO: send error stanza - Just ch -> do - sent <- newTVar False - writeTChan ch (iq, sent) - + (byNS, _) <- readTVar handlers + let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq) + case Map.lookup (iqRequestType iq, iqNS) byNS of + Nothing -> return () -- TODO: send error stanza + Just ch -> do + sent <- newTVar False + writeTChan ch (iq, sent) + +-- Update the TMVar to contain the IQ response. handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM () handleIQResponse handlers iq = do - (byNS, byID) <- readTVar handlers - case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of - (Nothing, _) -> return () -- we are not supposed - -- to send an error - (Just tmvar, byID') -> do - _ <- tryPutTMVar tmvar iq -- don't block - writeTVar handlers (byNS, byID') - where - iqID (Left err) = iqErrorID err - iqID (Right iq') = iqResultID iq' + (byNS, byID) <- readTVar handlers + case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of + (Nothing, _) -> return () -- We are not supposed to send an error. + (Just tmvar, byID') -> do + _ <- tryPutTMVar tmvar iq -- Don't block. + writeTVar handlers (byNS, byID') + where + iqID (Left err) = iqErrorID err + iqID (Right iq') = iqResultID iq' +-- Worker to write stanzas to the stream concurrently. writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO () writeWorker stCh writeR = forever $ do - (write, next) <- atomically $ (,) <$> - takeTMVar writeR <*> - readTChan stCh - r <- write $ renderElement (pickleElem xpStanza next) - unless r $ do - atomically $ unGetTChan stCh next -- connection is dead - threadDelay 250000 -- avoid free spinning - atomically $ putTMVar writeR write - --- Two streams: input and output. Threads read from input stream and write to output stream. --- | Runs thread in XmppState monad --- returns channel of incoming and outgoing stances, respectively --- and an Action to stop the Threads and close the connection -startThreads - :: IO ( TChan (Either MessageError Message) - , TChan (Either PresenceError Presence) - , TVar IQHandlers - , TChan Stanza - , IO () - , TMVar (BS.ByteString -> IO Bool) - , TMVar XmppConnection - , ThreadId - , TVar EventHandlers - ) - + (write, next) <- atomically $ (,) <$> + takeTMVar writeR <*> + readTChan stCh + r <- write $ renderElement (pickleElem xpStanza next) + unless r $ do -- If the writing failed, the connection is dead. + atomically $ unGetTChan stCh next + threadDelay 250000 -- Avoid free spinning. + atomically $ putTMVar writeR write -- Put it back. + +-- Two streams: input and output. Threads read from input stream and write to +-- output stream. +-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing +-- stances, respectively, and an Action to stop the Threads and close the +-- connection. +startThreads :: IO ( TChan (Either MessageError Message) + , TChan (Either PresenceError Presence) + , TVar IQHandlers + , TChan Stanza + , IO () + , TMVar (BS.ByteString -> IO Bool) + , TMVar XmppConnection + , ThreadId + , TVar EventHandlers + ) startThreads = do - writeLock <- newTMVarIO (\_ -> return False) - messageC <- newTChanIO - presenceC <- newTChanIO - outC <- newTChanIO - handlers <- newTVarIO ( Map.empty, Map.empty) - eh <- newTVarIO zeroEventHandlers - conS <- newTMVarIO xmppNoConnection - lw <- forkIO $ writeWorker outC writeLock - cp <- forkIO $ connPersist writeLock - rd <- forkIO $ readWorker messageC presenceC handlers eh conS - return (messageC, presenceC, handlers, outC - , killConnection writeLock [lw, rd, cp] - , writeLock, conS ,rd, eh) + writeLock <- newTMVarIO (\_ -> return False) + messageC <- newTChanIO + presenceC <- newTChanIO + outC <- newTChanIO + handlers <- newTVarIO ( Map.empty, Map.empty) + eh <- newTVarIO zeroEventHandlers + conS <- newTMVarIO xmppNoConnection + lw <- forkIO $ writeWorker outC writeLock + cp <- forkIO $ connPersist writeLock + rd <- forkIO $ readWorker messageC presenceC handlers eh conS + return (messageC, presenceC, handlers, outC + , killConnection writeLock [lw, rd, cp] + , writeLock, conS ,rd, eh) where - killConnection writeLock threads = liftIO $ do + killConnection writeLock threads = liftIO $ do _ <- atomically $ takeTMVar writeLock -- Should we put it back? _ <- forM threads killThread - return() + return () -- | Creates and initializes a new XMPP session. newSession :: IO Session @@ -180,21 +185,35 @@ newSession = do curId <- readTVar idRef writeTVar idRef (curId + 1 :: Integer) return . read. show $ curId - return (Session workermCh workerpCh mC pC outC hand writeR rdr getId conS eh stopThreads') - + return $ Session + workermCh + workerpCh + mC + pC + outC + hand + writeR + rdr + getId + conS + eh + stopThreads' + +-- | Creates a new session and runs the given XMPP computation. withNewSession :: XMPP b -> IO (Session, b) withNewSession a = do - sess <- newSession - ret <- runReaderT a sess - return (sess, ret) + sess <- newSession + ret <- runReaderT a sess + return (sess, ret) +-- | Runs the given XMPP computation in the given session. withSession :: Session -> XMPP a -> IO a withSession = flip runReaderT --- | Sends a blank space every 30 seconds to keep the connection alive -connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO () +-- | Sends a blank space every 30 seconds to keep the connection alive. +connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO () connPersist lock = forever $ do - pushBS <- atomically $ takeTMVar lock - _ <- pushBS " " - atomically $ putTMVar lock pushBS - threadDelay 30000000 + pushBS <- atomically $ takeTMVar lock + _ <- pushBS " " + atomically $ putTMVar lock pushBS + threadDelay 30000000 \ No newline at end of file