From bcb0f9177b953c35f0a148d340995fe3e8ad5c02 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Wed, 14 Nov 2012 18:06:57 +0100 Subject: [PATCH] un-ReaderT Xmpp actions remove Xmpp type synonym --- source/Network/Xmpp.hs | 6 +- source/Network/Xmpp/Concurrent.hs | 2 - source/Network/Xmpp/Concurrent/IQ.hs | 42 ++-- source/Network/Xmpp/Concurrent/Monad.hs | 205 +++++++++----------- source/Network/Xmpp/Concurrent/Threads.hs | 11 -- source/Network/Xmpp/Concurrent/Types.hs | 2 - source/Network/Xmpp/Session.hs | 6 +- source/Network/Xmpp/Xep/ServiceDiscovery.hs | 15 +- 8 files changed, 126 insertions(+), 163 deletions(-) diff --git a/source/Network/Xmpp.hs b/source/Network/Xmpp.hs index 8a10bfe..5b23db8 100644 --- a/source/Network/Xmpp.hs +++ b/source/Network/Xmpp.hs @@ -29,9 +29,7 @@ module Network.Xmpp ( -- * Session management - withNewSession - , withSession - , newSession + newSession , withConnection , connect , simpleConnect @@ -142,8 +140,6 @@ module Network.Xmpp , iqRequestPayload , iqResultPayload -- * Threads - , Xmpp - , fork , forkSession -- * Misc , LangTag(..) diff --git a/source/Network/Xmpp/Concurrent.hs b/source/Network/Xmpp/Concurrent.hs index e7ed6ec..a797b1d 100644 --- a/source/Network/Xmpp/Concurrent.hs +++ b/source/Network/Xmpp/Concurrent.hs @@ -1,6 +1,5 @@ module Network.Xmpp.Concurrent ( Session - , Xmpp , module Network.Xmpp.Concurrent.Monad , module Network.Xmpp.Concurrent.Threads , module Network.Xmpp.Concurrent.IQ @@ -10,4 +9,3 @@ import Network.Xmpp.Concurrent.Types import Network.Xmpp.Concurrent.Monad import Network.Xmpp.Concurrent.Threads import Network.Xmpp.Concurrent.IQ - diff --git a/source/Network/Xmpp/Concurrent/IQ.hs b/source/Network/Xmpp/Concurrent/IQ.hs index c77bb2b..25e0c3d 100644 --- a/source/Network/Xmpp/Concurrent/IQ.hs +++ b/source/Network/Xmpp/Concurrent/IQ.hs @@ -21,59 +21,61 @@ sendIQ :: Maybe Int -- ^ Timeout -> Maybe LangTag -- ^ Language tag of the payload (@Nothing@ for -- default) -> Element -- ^ The IQ body (there has to be exactly one) - -> Xmpp (TMVar IQResponse) -sendIQ timeOut to tp lang body = do -- TODO: Add timeout - newId <- liftIO =<< asks idGenerator - handlers <- asks iqHandlers - ref <- liftIO . atomically $ do + -> Session + -> IO (TMVar IQResponse) +sendIQ timeOut to tp lang body session = do -- TODO: Add timeout + newId <- idGenerator session + ref <- atomically $ do resRef <- newEmptyTMVar - (byNS, byId) <- readTVar handlers - writeTVar handlers (byNS, Map.insert newId resRef byId) + (byNS, byId) <- readTVar (iqHandlers session) + writeTVar (iqHandlers session) (byNS, Map.insert newId resRef byId) -- TODO: Check for id collisions (shouldn't happen?) return resRef - sendStanza . IQRequestS $ IQRequest newId Nothing to lang tp body + sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) session case timeOut of Nothing -> return () - Just t -> void . liftIO . forkIO $ do + Just t -> void . forkIO $ do threadDelay t - doTimeOut handlers newId ref + doTimeOut (iqHandlers session) newId ref return ref where doTimeOut handlers iqid var = atomically $ do p <- tryPutTMVar var IQResponseTimeout when p $ do - (byNS, byId) <- readTVar handlers + (byNS, byId) <- readTVar (iqHandlers session) writeTVar handlers (byNS, Map.delete iqid byId) return () + -- | Like 'sendIQ', but waits for the answer IQ. Times out after 3 seconds sendIQ' :: Maybe Jid -> IQRequestType -> Maybe LangTag -> Element - -> Xmpp IQResponse -sendIQ' to tp lang body = do - ref <- sendIQ (Just 3000000) to tp lang body - liftIO . atomically $ takeTMVar ref + -> Session + -> IO IQResponse +sendIQ' to tp lang body session = do + ref <- sendIQ (Just 3000000) to tp lang body session + atomically $ takeTMVar ref answerIQ :: IQRequestTicket -> Either StanzaError (Maybe Element) - -> Xmpp Bool + -> Session + -> IO Bool answerIQ (IQRequestTicket sentRef (IQRequest iqid from _to lang _tp bd)) - answer = do - out <- asks outCh + answer session = do let response = case answer of Left err -> IQErrorS $ IQError iqid Nothing from lang err (Just bd) Right res -> IQResultS $ IQResult iqid Nothing from lang res - liftIO . atomically $ do + atomically $ do sent <- readTVar sentRef case sent of False -> do writeTVar sentRef True - writeTChan out response + writeTChan (outCh session) response return True True -> return False diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs index 960cb10..6a3de26 100644 --- a/source/Network/Xmpp/Concurrent/Monad.hs +++ b/source/Network/Xmpp/Concurrent/Monad.hs @@ -26,10 +26,11 @@ import Network.Xmpp.Monad -- to interfere with existing consumers. listenIQChan :: IQRequestType -- ^ Type of IQs to receive (@Get@ or @Set@) -> Text -- ^ Namespace of the child element - -> Xmpp (Either (TChan IQRequestTicket) (TChan IQRequestTicket)) -listenIQChan tp ns = do - handlers <- asks iqHandlers - liftIO . atomically $ do + -> Session + -> IO (Either (TChan IQRequestTicket) (TChan IQRequestTicket)) +listenIQChan tp ns session = do + let handlers = iqHandlers session + atomically $ do (byNS, byID) <- readTVar handlers iqCh <- newTChan let (present, byNS') = Map.insertLookupWithKey' @@ -43,127 +44,110 @@ listenIQChan tp ns = do Just iqCh' -> Left iqCh' -- | Get a duplicate of the stanza channel -getStanzaChan :: Xmpp (TChan Stanza) -getStanzaChan = do - shadow <- asks sShadow - liftIO $ atomically $ dupTChan shadow +getStanzaChan :: Session -> IO (TChan Stanza) +getStanzaChan session = atomically $ dupTChan (sShadow session) -- | Get the inbound stanza channel, duplicates from master if necessary. Please -- note that once duplicated it will keep filling up, call 'dropMessageChan' to -- allow it to be garbage collected. -getMessageChan :: Xmpp (TChan (Either MessageError Message)) -getMessageChan = do - mChR <- asks messagesRef - mCh <- liftIO $ readIORef mChR +getMessageChan :: Session -> IO (TChan (Either MessageError Message)) +getMessageChan session = do + mCh <- readIORef $ messagesRef session case mCh of Nothing -> do - shadow <- asks mShadow - mCh' <- liftIO $ atomically $ dupTChan shadow - liftIO $ writeIORef mChR (Just mCh') + mCh' <- atomically $ dupTChan (mShadow session) + writeIORef (messagesRef session) (Just mCh') return mCh' Just mCh' -> return mCh' -- | Analogous to 'getMessageChan'. -getPresenceChan :: Xmpp (TChan (Either PresenceError Presence)) -getPresenceChan = do - pChR <- asks presenceRef - pCh <- liftIO $ readIORef pChR +getPresenceChan :: Session -> IO (TChan (Either PresenceError Presence)) +getPresenceChan session = do + pCh <- readIORef $ presenceRef session case pCh of Nothing -> do - shadow <- asks pShadow - pCh' <- liftIO $ atomically $ dupTChan shadow - liftIO $ writeIORef pChR (Just pCh') + pCh' <- atomically $ dupTChan (pShadow session) + writeIORef (presenceRef session) (Just pCh') return pCh' Just pCh' -> return pCh' -- | Drop the local end of the inbound stanza channel from our context so it can -- be GC-ed. -dropMessageChan :: Xmpp () -dropMessageChan = do - r <- asks messagesRef - liftIO $ writeIORef r Nothing +dropMessageChan :: Session -> IO () +dropMessageChan session = writeIORef (messagesRef session) Nothing -- | Analogous to 'dropMessageChan'. -dropPresenceChan :: Xmpp () -dropPresenceChan = do - r <- asks presenceRef - liftIO $ writeIORef r Nothing +dropPresenceChan :: Session -> IO () +dropPresenceChan session = writeIORef (presenceRef session) Nothing -- | Read an element from the inbound stanza channel, acquiring a copy of the -- channel as necessary. -pullMessage :: Xmpp (Either MessageError Message) -pullMessage = do - c <- getMessageChan - liftIO $ atomically $ readTChan c +pullMessage :: Session -> IO (Either MessageError Message) +pullMessage session = do + c <- getMessageChan session + atomically $ readTChan c -- | Read an element from the inbound stanza channel, acquiring a copy of the -- channel as necessary. -pullPresence :: Xmpp (Either PresenceError Presence) -pullPresence = do - c <- getPresenceChan - liftIO $ atomically $ readTChan c +pullPresence :: Session -> IO (Either PresenceError Presence) +pullPresence session = do + c <- getPresenceChan session + atomically $ readTChan c -- | Send a stanza to the server. -sendStanza :: Stanza -> Xmpp () -sendStanza a = do - out <- asks outCh - liftIO . atomically $ writeTChan out a - return () +sendStanza :: Stanza -> Session -> IO () +sendStanza a session = atomically $ writeTChan (outCh session) a --- | Create a forked session object without forking a thread. + +-- | Create a forked session object forkSession :: Session -> IO Session -forkSession sess = do +forkSession session = do mCH' <- newIORef Nothing pCH' <- newIORef Nothing - return $ sess {messagesRef = mCH', presenceRef = pCH'} - --- | Fork a new thread. -fork :: Xmpp () -> Xmpp ThreadId -fork a = do - sess <- ask - sess' <- liftIO $ forkSession sess - liftIO $ forkIO $ runReaderT a sess' + return $ session {messagesRef = mCH', presenceRef = pCH'} -- | Pulls a message and returns it if the given predicate returns @True@. filterMessages :: (MessageError -> Bool) -> (Message -> Bool) - -> Xmpp (Either MessageError Message) -filterMessages f g = do - s <- pullMessage + -> Session -> IO (Either MessageError Message) +filterMessages f g session = do + s <- pullMessage session case s of Left e | f e -> return $ Left e - | otherwise -> filterMessages f g + | otherwise -> filterMessages f g session Right m | g m -> return $ Right m - | otherwise -> filterMessages f g + | otherwise -> filterMessages f g session -- | Pulls a (non-error) message and returns it if the given predicate returns -- @True@. -waitForMessage :: (Message -> Bool) -> Xmpp Message -waitForMessage f = do - s <- pullMessage +waitForMessage :: (Message -> Bool) -> Session -> IO Message +waitForMessage f session = do + s <- pullMessage session case s of - Left _ -> waitForMessage f + Left _ -> waitForMessage f session Right m | f m -> return m - | otherwise -> waitForMessage f + | otherwise -> waitForMessage f session + -- | Pulls an error message and returns it if the given predicate returns @True@. -waitForMessageError :: (MessageError -> Bool) -> Xmpp MessageError -waitForMessageError f = do - s <- pullMessage +waitForMessageError :: (MessageError -> Bool) -> Session -> IO MessageError +waitForMessageError f session = do + s <- pullMessage session case s of - Right _ -> waitForMessageError f + Right _ -> waitForMessageError f session Left m | f m -> return m - | otherwise -> waitForMessageError f + | otherwise -> waitForMessageError f session + -- | Pulls a (non-error) presence and returns it if the given predicate returns -- @True@. -waitForPresence :: (Presence -> Bool) -> Xmpp Presence -waitForPresence f = do - s <- pullPresence +waitForPresence :: (Presence -> Bool) -> Session -> IO Presence +waitForPresence f session = do + s <- pullPresence session case s of - Left _ -> waitForPresence f + Left _ -> waitForPresence f session Right m | f m -> return m - | otherwise -> waitForPresence f + | otherwise -> waitForPresence f session -- TODO: Wait for presence error? @@ -171,23 +155,20 @@ waitForPresence f = do -- temporarily stopped and resumed with the new session details once the action -- returns. The action will run in the calling thread. Any uncaught exceptions -- will be interpreted as connection failure. -withConnection :: XmppConMonad a -> Xmpp (Either StreamError a) -withConnection a = do - readerId <- asks readerThread - stateRef <- asks conStateRef - write <- asks writeRef - wait <- liftIO $ newEmptyTMVarIO - liftIO . Ex.mask_ $ do +withConnection :: XmppConMonad a -> Session -> IO (Either StreamError a) +withConnection a session = do + wait <- newEmptyTMVarIO + Ex.mask_ $ do -- Suspends the reader until the lock (wait) is released (set to `()'). - throwTo readerId $ Interrupt wait + throwTo (readerThread session) $ Interrupt wait -- We acquire the write and stateRef locks, to make sure that this is -- the only thread that can write to the stream and to perform a -- withConnection calculation. Afterwards, we release the lock and -- fetches an updated state. s <- Ex.catch (atomically $ do - _ <- takeTMVar write - s <- takeTMVar stateRef + _ <- takeTMVar (writeRef session) + s <- takeTMVar (conStateRef session) putTMVar wait () return s ) @@ -201,8 +182,8 @@ withConnection a = do (do (res, s') <- runStateT a s atomically $ do - putTMVar write (sConPushBS s') - putTMVar stateRef s' + putTMVar (writeRef session) (sConPushBS s') + putTMVar (conStateRef session) s' return $ Right res ) -- We treat all Exceptions as fatal. If we catch a StreamError, we @@ -213,52 +194,48 @@ withConnection a = do ] -- | Send a presence stanza. -sendPresence :: Presence -> Xmpp () -sendPresence = sendStanza . PresenceS +sendPresence :: Presence -> Session -> IO () +sendPresence p session = sendStanza (PresenceS p) session -- | Send a message stanza. -sendMessage :: Message -> Xmpp () -sendMessage = sendStanza . MessageS +sendMessage :: Message -> Session -> IO () +sendMessage m session = sendStanza (MessageS m) session + -- | Executes a function to update the event handlers. -modifyHandlers :: (EventHandlers -> EventHandlers) -> Xmpp () -modifyHandlers f = do - eh <- asks eventHandlers - liftIO . atomically $ writeTVar eh . f =<< readTVar eh +modifyHandlers :: (EventHandlers -> EventHandlers) -> Session -> IO () +modifyHandlers f session = atomically $ modifyTVar (eventHandlers session) f -- | Sets the handler to be executed when the server connection is closed. -setConnectionClosedHandler :: (StreamError -> Xmpp ()) -> Xmpp () -setConnectionClosedHandler eh = do - r <- ask - modifyHandlers (\s -> s{connectionClosedHandler = \e -> runReaderT (eh e) r}) +setConnectionClosedHandler :: (StreamError -> Session -> IO ()) -> Session -> IO () +setConnectionClosedHandler eh session = do + modifyHandlers (\s -> s{connectionClosedHandler = + \e -> eh e session}) session -- | Run an event handler. -runHandler :: (EventHandlers -> IO a) -> Xmpp a -runHandler h = do - eh <- liftIO . atomically . readTVar =<< asks eventHandlers - liftIO $ h eh +runHandler :: (EventHandlers -> IO a) -> Session -> IO a +runHandler h session = h =<< atomically (readTVar $ eventHandlers session) + -- | End the current Xmpp session. -endSession :: Xmpp () -endSession = do -- TODO: This has to be idempotent (is it?) - void $ withConnection xmppKillConnection - liftIO =<< asks stopThreads +endSession :: Session -> IO () +endSession session = do -- TODO: This has to be idempotent (is it?) + void $ withConnection xmppKillConnection session + stopThreads session -- | Close the connection to the server. Closes the stream (by enforcing a -- write lock and sending a element), waits (blocks) for three -- seconds, and then closes the connection. -closeConnection :: Xmpp () -closeConnection = Ex.mask_ $ do - write <- asks writeRef - send <- liftIO . atomically $ takeTMVar write - cc <- sCloseConnection <$> - (liftIO . atomically . readTMVar =<< asks conStateRef) - liftIO . send $ "" - void . liftIO . forkIO $ do +closeConnection :: Session -> IO () +closeConnection session = Ex.mask_ $ do + send <- atomically $ takeTMVar (writeRef session) + cc <- sCloseConnection <$> ( atomically $ readTMVar (conStateRef session)) + send "" + void . forkIO $ do threadDelay 3000000 -- When we close the connection, we close the handle that was used in the -- sCloseConnection above. So even if a new connection has been -- established at this point, it will not be affected by this action. (Ex.try cc) :: IO (Either Ex.SomeException ()) return () - liftIO . atomically $ putTMVar write (\_ -> return False) \ No newline at end of file + atomically $ putTMVar (writeRef session) (\_ -> return False) diff --git a/source/Network/Xmpp/Concurrent/Threads.hs b/source/Network/Xmpp/Concurrent/Threads.hs index b2c4656..8788996 100644 --- a/source/Network/Xmpp/Concurrent/Threads.hs +++ b/source/Network/Xmpp/Concurrent/Threads.hs @@ -225,17 +225,6 @@ newSession = do eh stopThreads' --- | Creates a new session and runs the given Xmpp computation. -withNewSession :: Xmpp b -> IO (Session, b) -withNewSession a = do - sess <- newSession - ret <- runReaderT a sess - return (sess, ret) - --- | Runs the given Xmpp computation in the given session. -withSession :: Session -> Xmpp a -> IO a -withSession = flip runReaderT - -- Acquires the write lock, pushes a space, and releases the lock. -- | Sends a blank space every 30 seconds to keep the connection alive. connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO () diff --git a/source/Network/Xmpp/Concurrent/Types.hs b/source/Network/Xmpp/Concurrent/Types.hs index c5be122..60a465d 100644 --- a/source/Network/Xmpp/Concurrent/Types.hs +++ b/source/Network/Xmpp/Concurrent/Types.hs @@ -56,8 +56,6 @@ data Session = Session , stopThreads :: IO () } --- Xmpp is a monad for concurrent Xmpp usage. -type Xmpp a = ReaderT Session IO a -- Interrupt is used to signal to the reader thread that it should stop. data Interrupt = Interrupt (TMVar ()) deriving Typeable diff --git a/source/Network/Xmpp/Session.hs b/source/Network/Xmpp/Session.hs index 1c783d8..94e34f5 100644 --- a/source/Network/Xmpp/Session.hs +++ b/source/Network/Xmpp/Session.hs @@ -35,9 +35,9 @@ xmppStartSession = do -- Sends the session IQ set element and waits for an answer. Throws an error if -- if an IQ error stanza is returned from the server. -startSession :: Xmpp () -startSession = do - answer <- sendIQ' Nothing Set Nothing sessionXML +startSession :: Session -> IO () +startSession session = do + answer <- sendIQ' Nothing Set Nothing sessionXML session case answer of IQResponseResult _ -> return () e -> error $ show e diff --git a/source/Network/Xmpp/Xep/ServiceDiscovery.hs b/source/Network/Xmpp/Xep/ServiceDiscovery.hs index 24440c4..2d45618 100644 --- a/source/Network/Xmpp/Xep/ServiceDiscovery.hs +++ b/source/Network/Xmpp/Xep/ServiceDiscovery.hs @@ -27,6 +27,7 @@ import Network.Xmpp import Network.Xmpp.Monad import Network.Xmpp.Pickle import Network.Xmpp.Types +import Network.Xmpp.Concurrent data DiscoError = DiscoNoQueryElement | DiscoIQError IQError @@ -83,9 +84,10 @@ xpQueryInfo = xpWrap (\(nd, (feats, ids)) -> QIR nd ids feats) -- | Query an entity for it's identity and features queryInfo :: Jid -- ^ Entity to query -> Maybe Text.Text -- ^ Node - -> Xmpp (Either DiscoError QueryInfoResult) -queryInfo to node = do - res <- sendIQ' (Just to) Get Nothing queryBody + -> Session + -> IO (Either DiscoError QueryInfoResult) +queryInfo to node session = do + res <- sendIQ' (Just to) Get Nothing queryBody session return $ case res of IQResponseError e -> Left $ DiscoIQError e IQResponseTimeout -> Left $ DiscoTimeout @@ -145,9 +147,10 @@ xpQueryItems = xpElem (itemsN "query") -- | Query an entity for Items of a node queryItems :: Jid -- ^ Entity to query -> Maybe Text.Text -- ^ Node - -> Xmpp (Either DiscoError (Maybe Text.Text, [Item])) -queryItems to node = do - res <- sendIQ' (Just to) Get Nothing queryBody + -> Session + -> IO (Either DiscoError (Maybe Text.Text, [Item])) +queryItems to node session = do + res <- sendIQ' (Just to) Get Nothing queryBody session return $ case res of IQResponseError e -> Left $ DiscoIQError e IQResponseTimeout -> Left $ DiscoTimeout