From ad51a56e3c3c4479a9dade765476d56172a83df1 Mon Sep 17 00:00:00 2001 From: Jon Kristensen Date: Wed, 9 May 2012 23:00:14 +0200 Subject: [PATCH] minor formatting and documentation additions --- src/Network/XMPP/Concurrent/Monad.hs | 276 ++++++++++++++------------- 1 file changed, 144 insertions(+), 132 deletions(-) diff --git a/src/Network/XMPP/Concurrent/Monad.hs b/src/Network/XMPP/Concurrent/Monad.hs index 5ed5508..299d39e 100644 --- a/src/Network/XMPP/Concurrent/Monad.hs +++ b/src/Network/XMPP/Concurrent/Monad.hs @@ -17,211 +17,223 @@ import Network.XMPP.Concurrent.Types import Network.XMPP.Monad --- | Register a new IQ listener. IQ requests matching the type and namespace will --- be put in the channel. +-- | Register a new IQ listener. IQ requests matching the type and namespace +-- will be put in the channel. -- -- Return the new channel or Nothing if this namespace/'IQRequestType' --- combination was alread handled -listenIQChan :: IQRequestType -- ^ type of IQs to receive (Get / Set) - -> Text -- ^ namespace of the child element - -> XMPP (Maybe ( TChan (IQRequest, TVar Bool))) +-- combination was alread handled. +listenIQChan :: IQRequestType -- ^ Type of IQs to receive (@Get@ or @Set@) + -> Text -- ^ Namespace of the child element + -> XMPP (Maybe (TChan (IQRequest, TVar Bool))) listenIQChan tp ns = do - handlers <- asks iqHandlers - liftIO . atomically $ do - (byNS, byID) <- readTVar handlers - iqCh <- newTChan - let (present, byNS') = Map.insertLookupWithKey' (\_ _ old -> old) - (tp,ns) iqCh byNS - writeTVar handlers (byNS', byID) - return $ case present of - Nothing -> Just iqCh - Just _iqCh' -> Nothing - --- | get the inbound stanza channel, duplicates from master if necessary --- please note that once duplicated it will keep filling up, call --- 'dropMessageChan' to allow it to be garbage collected + handlers <- asks iqHandlers + liftIO . atomically $ do + (byNS, byID) <- readTVar handlers + iqCh <- newTChan + let (present, byNS') = Map.insertLookupWithKey' + (\_ _ old -> old) + (tp, ns) + iqCh + byNS + writeTVar handlers (byNS', byID) + return $ case present of + Nothing -> Just iqCh + Just _iqCh' -> Nothing + +-- | Get the inbound stanza channel, duplicates from master if necessary. Please +-- note that once duplicated it will keep filling up, call 'dropMessageChan' to +-- allow it to be garbage collected. getMessageChan :: XMPP (TChan (Either MessageError Message)) getMessageChan = do - mChR <- asks messagesRef - mCh <- liftIO $ readIORef mChR - case mCh of - Nothing -> do - shadow <- asks mShadow - mCh' <- liftIO $ atomically $ dupTChan shadow - liftIO $ writeIORef mChR (Just mCh') - return mCh' - Just mCh' -> return mCh' - --- | see 'getMessageChan' + mChR <- asks messagesRef + mCh <- liftIO $ readIORef mChR + case mCh of + Nothing -> do + shadow <- asks mShadow + mCh' <- liftIO $ atomically $ dupTChan shadow + liftIO $ writeIORef mChR (Just mCh') + return mCh' + Just mCh' -> return mCh' + +-- | Analogous to 'getMessageChan'. getPresenceChan :: XMPP (TChan (Either PresenceError Presence)) getPresenceChan = do - pChR <- asks presenceRef - pCh <- liftIO $ readIORef pChR - case pCh of - Nothing -> do - shadow <- asks pShadow - pCh' <- liftIO $ atomically $ dupTChan shadow - liftIO $ writeIORef pChR (Just pCh') - return pCh' - Just pCh' -> return pCh' - --- | Drop the local end of the inbound stanza channel --- from our context so it can be GC-ed + pChR <- asks presenceRef + pCh <- liftIO $ readIORef pChR + case pCh of + Nothing -> do + shadow <- asks pShadow + pCh' <- liftIO $ atomically $ dupTChan shadow + liftIO $ writeIORef pChR (Just pCh') + return pCh' + Just pCh' -> return pCh' + +-- | Drop the local end of the inbound stanza channel from our context so it can +-- be GC-ed. dropMessageChan :: XMPP () dropMessageChan = do - r <- asks messagesRef - liftIO $ writeIORef r Nothing + r <- asks messagesRef + liftIO $ writeIORef r Nothing --- | see 'dropMessageChan' +-- | Abakigiys to 'dropMessageChan'. dropPresenceChan :: XMPP () dropPresenceChan = do - r <- asks presenceRef - liftIO $ writeIORef r Nothing + r <- asks presenceRef + liftIO $ writeIORef r Nothing --- | Read an element from the inbound stanza channel, acquiring a copy --- of the channel as necessary +-- | Read an element from the inbound stanza channel, acquiring a copy of the +-- channel as necessary. pullMessage :: XMPP (Either MessageError Message) pullMessage = do - c <- getMessageChan - liftIO $ atomically $ readTChan c + c <- getMessageChan + liftIO $ atomically $ readTChan c --- | Read an element from the inbound stanza channel, acquiring a copy --- of the channel as necessary +-- | Read an element from the inbound stanza channel, acquiring a copy of the +-- channel as necessary. pullPresence :: XMPP (Either PresenceError Presence) pullPresence = do - c <- getPresenceChan - liftIO $ atomically $ readTChan c + c <- getPresenceChan + liftIO $ atomically $ readTChan c --- | Send a stanza to the server +-- | Send a stanza to the server. sendStanza :: Stanza -> XMPP () sendStanza a = do - out <- asks outCh - liftIO . atomically $ writeTChan out a - return () + out <- asks outCh + liftIO . atomically $ writeTChan out a + return () --- | Create a forked session object without forking a thread +-- | Create a forked session object without forking a thread. forkSession :: Session -> IO Session forkSession sess = do mCH' <- newIORef Nothing pCH' <- newIORef Nothing - return $ sess {messagesRef = mCH' ,presenceRef = pCH'} + return $ sess {messagesRef = mCH', presenceRef = pCH'} --- | Fork a new thread +-- | Fork a new thread. fork :: XMPP () -> XMPP ThreadId fork a = do - sess <- ask - sess' <- liftIO $ forkSession sess - liftIO $ forkIO $ runReaderT a sess' + sess <- ask + sess' <- liftIO $ forkSession sess + liftIO $ forkIO $ runReaderT a sess' +-- | Pulls a message and returns it if the given predicate returns @True@. filterMessages :: (MessageError -> Bool) -> (Message -> Bool) -> XMPP (Either MessageError Message) filterMessages f g = do - s <- pullMessage - case s of - Left e | f e -> return $ Left e - | otherwise -> filterMessages f g - Right m | g m -> return $ Right m - | otherwise -> filterMessages f g - + s <- pullMessage + case s of + Left e | f e -> return $ Left e + | otherwise -> filterMessages f g + Right m | g m -> return $ Right m + | otherwise -> filterMessages f g + +-- | Pulls a (non-error) message and returns it if the given predicate returns +-- @True@. waitForMessage :: (Message -> Bool) -> XMPP Message waitForMessage f = do - s <- pullMessage - case s of - Left _ -> waitForMessage f - Right m | f m -> return m - | otherwise -> waitForMessage f + s <- pullMessage + case s of + Left _ -> waitForMessage f + Right m | f m -> return m + | otherwise -> waitForMessage f +-- | Pulls an error message and returns it if the given predicate returns @True@. waitForMessageError :: (MessageError -> Bool) -> XMPP MessageError waitForMessageError f = do - s <- pullMessage - case s of - Right _ -> waitForMessageError f - Left m | f m -> return m - | otherwise -> waitForMessageError f - + s <- pullMessage + case s of + Right _ -> waitForMessageError f + Left m | f m -> return m + | otherwise -> waitForMessageError f + +-- | Pulls a (non-error) presence and returns it if the given predicate returns +-- @True@. waitForPresence :: (Presence -> Bool) -> XMPP Presence waitForPresence f = do - s <- pullPresence - case s of - Left _ -> waitForPresence f - Right m | f m -> return m - | otherwise -> waitForPresence f - --- | Run an XMPPMonad action in isolation. --- Reader and writer workers will be temporarily stopped --- and resumed with the new session details once the action returns. --- The Action will run in the calling thread/ --- Any uncaught exceptions will be interpreted as connection failure + s <- pullPresence + case s of + Left _ -> waitForPresence f + Right m | f m -> return m + | otherwise -> waitForPresence f + +-- TODO: Wait for presence error? + +-- | Run an XMPPMonad action in isolation. Reader and writer workers will be +-- temporarily stopped and resumed with the new session details once the action +-- returns. The action will run in the calling thread. Any uncaught exceptions +-- will be interpreted as connection failure. withConnection :: XMPPConMonad a -> XMPP (Either StreamError a) withConnection a = do - readerId <- asks readerThread - stateRef <- asks conStateRef - write <- asks writeRef - wait <- liftIO $ newEmptyTMVarIO - liftIO . Ex.mask_ $ do - throwTo readerId $ Interrupt wait - s <- Ex.catch ( atomically $ do - _ <- takeTMVar write - s <- takeTMVar stateRef - putTMVar wait () - return s - ) - (\e -> atomically (putTMVar wait ()) - >> Ex.throwIO (e :: Ex.SomeException) - -- No MVar taken - ) - Ex.catches ( do - (res, s') <- runStateT a s - atomically $ do - putTMVar write (sConPushBS s') - putTMVar stateRef s' - return $ Right res - ) - -- we treat all Exceptions as fatal - [ Ex.Handler $ \e -> return $ Left (e :: StreamError) - , Ex.Handler $ \e -> runStateT xmppKillConnection s - >> Ex.throwIO (e :: Ex.SomeException) - ] - --- | Send a presence Stanza + readerId <- asks readerThread + stateRef <- asks conStateRef + write <- asks writeRef + wait <- liftIO $ newEmptyTMVarIO + liftIO . Ex.mask_ $ do + throwTo readerId $ Interrupt wait + s <- Ex.catch + (atomically $ do + _ <- takeTMVar write + s <- takeTMVar stateRef + putTMVar wait () + return s + ) + (\e -> atomically (putTMVar wait ()) >> + Ex.throwIO (e :: Ex.SomeException) -- No MVar taken + ) + Ex.catches + (do + (res, s') <- runStateT a s + atomically $ do + putTMVar write (sConPushBS s') + putTMVar stateRef s' + return $ Right res + ) + -- Ee treat all Exceptions as fatal. + [ Ex.Handler $ \e -> return $ Left (e :: StreamError) + , Ex.Handler $ \e -> runStateT xmppKillConnection s + >> Ex.throwIO (e :: Ex.SomeException) + ] + +-- | Send a presence stanza. sendPresence :: Presence -> XMPP () sendPresence = sendStanza . PresenceS --- | Send a Message Stanza +-- | Send a message stanza. sendMessage :: Message -> XMPP () sendMessage = sendStanza . MessageS - +-- | Executes a function to update the event handlers. modifyHandlers :: (EventHandlers -> EventHandlers) -> XMPP () modifyHandlers f = do eh <- asks eventHandlers liftIO . atomically $ writeTVar eh . f =<< readTVar eh +-- | Sets the handler to be executed when the session ends. setSessionEndHandler :: XMPP () -> XMPP () setSessionEndHandler eh = do r <- ask modifyHandlers (\s -> s{sessionEndHandler = runReaderT eh r}) +-- | Sets the handler to be executed when the server connection is closed. setConnectionClosedHandler :: (StreamError -> XMPP ()) -> XMPP () setConnectionClosedHandler eh = do r <- ask modifyHandlers (\s -> s{connectionClosedHandler = \e -> runReaderT (eh e) r}) --- | run an event handler +-- | Run an event handler. runHandler :: (EventHandlers -> IO a) -> XMPP a runHandler h = do eh <- liftIO . atomically . readTVar =<< asks eventHandlers liftIO $ h eh --- | End the current xmpp session +-- | End the current XMPP session. endSession :: XMPP () endSession = do -- TODO: This has to be idempotent (is it?) void $ withConnection xmppKillConnection liftIO =<< asks stopThreads runHandler sessionEndHandler --- | Close the connection to the server +-- | Close the connection to the server. closeConnection :: XMPP () -closeConnection = void $ withConnection xmppKillConnection - +closeConnection = void $ withConnection xmppKillConnection \ No newline at end of file