From 70d9b5b47d44f9f99f21514ea6bdd765012ea99f Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Wed, 25 Apr 2012 11:43:35 +0200 Subject: [PATCH] renamed XMPPThread to XMPP renamed Thread to Session split runThreaded in newSession and WithNewSession --- src/Network/XMPP.hs | 16 ++----- src/Network/XMPP/Bind.hs | 2 +- src/Network/XMPP/Concurrent.hs | 4 +- src/Network/XMPP/Concurrent/IQ.hs | 6 +-- src/Network/XMPP/Concurrent/Monad.hs | 42 ++++++++--------- src/Network/XMPP/Concurrent/Threads.hs | 64 +++++++++++++------------- src/Network/XMPP/Concurrent/Types.hs | 46 +++++++++--------- src/Network/XMPP/Monad.hs | 4 +- src/Network/XMPP/Session.hs | 2 +- 9 files changed, 90 insertions(+), 96 deletions(-) diff --git a/src/Network/XMPP.hs b/src/Network/XMPP.hs index 1a78a42..b2a1e54 100644 --- a/src/Network/XMPP.hs +++ b/src/Network/XMPP.hs @@ -34,7 +34,7 @@ module Network.XMPP ( -- * Session management - xmppNewSession + withNewSession , connect , startTLS , auth @@ -132,7 +132,7 @@ module Network.XMPP , iqRequestPayload , iqResultPayload -- * Threads - , XMPPThread + , XMPP , forkXMPP -- * Misc , exampleParams @@ -155,10 +155,6 @@ import Network.XMPP.Types import Control.Monad.Error --- | Create a new, pristine session without an active connection. -xmppNewSession :: XMPPThread a -> IO (a, XMPPConState) -xmppNewSession = withNewSession . runThreaded - -- | Connect to host with given address. xmppConnect :: HostName -> Text -> XMPPConMonad (Either StreamError ()) xmppConnect address hostname = xmppRawConnect address hostname >> xmppStartStream @@ -166,18 +162,16 @@ xmppConnect address hostname = xmppRawConnect address hostname >> xmppStartStre -- | Attempts to secure the connection using TLS. Will return -- 'TLSNoServerSupport' when the server does not offer TLS or does not -- expect it at this time. -startTLS :: TLS.TLSParams -> XMPPThread (Either XMPPTLSError ()) +startTLS :: TLS.TLSParams -> XMPP (Either XMPPTLSError ()) startTLS = withConnection . xmppStartTLS - - -- | Authenticate to the server with the given username and password -- and bind a resource auth :: Text.Text -- ^ The username -> Text.Text -- ^ The password -> Maybe Text -- ^ The desired resource or 'Nothing' to let the server -- assign one - -> XMPPThread (Either SaslError Text.Text) + -> XMPP (Either SaslError Text.Text) auth username passwd resource = runErrorT $ do ErrorT . withConnection $ xmppSASL username passwd res <- lift $ xmppBind resource @@ -185,5 +179,5 @@ auth username passwd resource = runErrorT $ do return res -- | Connect to an xmpp server -connect :: HostName -> Text -> XMPPThread (Either StreamError ()) +connect :: HostName -> Text -> XMPP (Either StreamError ()) connect address hostname = withConnection $ xmppConnect address hostname diff --git a/src/Network/XMPP/Bind.hs b/src/Network/XMPP/Bind.hs index b525923..77b25c3 100644 --- a/src/Network/XMPP/Bind.hs +++ b/src/Network/XMPP/Bind.hs @@ -40,7 +40,7 @@ jidP = bindP $ xpElemNodes "jid" (xpContent xpPrim) -- server-generated resource and extract the JID from the non-error -- response. -xmppBind :: Maybe Text -> XMPPThread Text +xmppBind :: Maybe Text -> XMPP Text xmppBind rsrc = do answer <- sendIQ' Nothing Set Nothing (bindBody rsrc) let (Right IQResult{iqResultPayload = Just b}) = answer -- TODO: Error handling diff --git a/src/Network/XMPP/Concurrent.hs b/src/Network/XMPP/Concurrent.hs index fe15713..2750ff1 100644 --- a/src/Network/XMPP/Concurrent.hs +++ b/src/Network/XMPP/Concurrent.hs @@ -1,6 +1,6 @@ module Network.XMPP.Concurrent - ( Thread - , XMPPThread + ( Session + , XMPP , module Network.XMPP.Concurrent.Monad , module Network.XMPP.Concurrent.Threads , module Network.XMPP.Concurrent.IQ diff --git a/src/Network/XMPP/Concurrent/IQ.hs b/src/Network/XMPP/Concurrent/IQ.hs index cc97898..6693397 100644 --- a/src/Network/XMPP/Concurrent/IQ.hs +++ b/src/Network/XMPP/Concurrent/IQ.hs @@ -17,7 +17,7 @@ sendIQ :: Maybe JID -- ^ Recipient (to) -> IQRequestType -- ^ IQ type (Get or Set) -> Maybe LangTag -- ^ Language tag of the payload (Nothing for default) -> Element -- ^ The iq body (there has to be exactly one) - -> XMPPThread (TMVar IQResponse) + -> XMPP (TMVar IQResponse) sendIQ to tp lang body = do -- TODO: add timeout newId <- liftIO =<< asks idGenerator handlers <- asks iqHandlers @@ -35,14 +35,14 @@ sendIQ' :: Maybe JID -> IQRequestType -> Maybe LangTag -> Element - -> XMPPThread IQResponse + -> XMPP IQResponse sendIQ' to tp lang body = do ref <- sendIQ to tp lang body liftIO . atomically $ takeTMVar ref answerIQ :: (IQRequest, TVar Bool) -> Either StanzaError (Maybe Element) - -> XMPPThread Bool + -> XMPP Bool answerIQ ((IQRequest iqid from _to lang _tp bd), sentRef) answer = do out <- asks outCh let response = case answer of diff --git a/src/Network/XMPP/Concurrent/Monad.hs b/src/Network/XMPP/Concurrent/Monad.hs index f4a9f23..2d97372 100644 --- a/src/Network/XMPP/Concurrent/Monad.hs +++ b/src/Network/XMPP/Concurrent/Monad.hs @@ -23,7 +23,7 @@ import Network.XMPP.Monad -- combination was alread handled listenIQChan :: IQRequestType -- ^ type of IQs to receive (Get / Set) -> Text -- ^ namespace of the child element - -> XMPPThread (Maybe ( TChan (IQRequest, TVar Bool))) + -> XMPP (Maybe ( TChan (IQRequest, TVar Bool))) listenIQChan tp ns = do handlers <- asks iqHandlers liftIO . atomically $ do @@ -39,7 +39,7 @@ listenIQChan tp ns = do -- | get the inbound stanza channel, duplicates from master if necessary -- please note that once duplicated it will keep filling up, call -- 'dropMessageChan' to allow it to be garbage collected -getMessageChan :: XMPPThread (TChan (Either MessageError Message)) +getMessageChan :: XMPP (TChan (Either MessageError Message)) getMessageChan = do mChR <- asks messagesRef mCh <- liftIO $ readIORef mChR @@ -52,7 +52,7 @@ getMessageChan = do Just mCh' -> return mCh' -- | see 'getMessageChan' -getPresenceChan :: XMPPThread (TChan (Either PresenceError Presence)) +getPresenceChan :: XMPP (TChan (Either PresenceError Presence)) getPresenceChan = do pChR <- asks presenceRef pCh <- liftIO $ readIORef pChR @@ -66,40 +66,40 @@ getPresenceChan = do -- | Drop the local end of the inbound stanza channel -- from our context so it can be GC-ed -dropMessageChan :: XMPPThread () +dropMessageChan :: XMPP () dropMessageChan = do r <- asks messagesRef liftIO $ writeIORef r Nothing -- | see 'dropMessageChan' -dropPresenceChan :: XMPPThread () +dropPresenceChan :: XMPP () dropPresenceChan = do r <- asks presenceRef liftIO $ writeIORef r Nothing -- | Read an element from the inbound stanza channel, acquiring a copy -- of the channel as necessary -pullMessage :: XMPPThread (Either MessageError Message) +pullMessage :: XMPP (Either MessageError Message) pullMessage = do c <- getMessageChan liftIO $ atomically $ readTChan c -- | Read an element from the inbound stanza channel, acquiring a copy -- of the channel as necessary -pullPresence :: XMPPThread (Either PresenceError Presence) +pullPresence :: XMPP (Either PresenceError Presence) pullPresence = do c <- getPresenceChan liftIO $ atomically $ readTChan c -- | Send a stanza to the server -sendS :: Stanza -> XMPPThread () +sendS :: Stanza -> XMPP () sendS a = do out <- asks outCh liftIO . atomically $ writeTChan out a return () -- | Fork a new thread -forkXMPP :: XMPPThread () -> XMPPThread ThreadId +forkXMPP :: XMPP () -> XMPP ThreadId forkXMPP a = do thread <- ask mCH' <- liftIO $ newIORef Nothing @@ -110,7 +110,7 @@ forkXMPP a = do filterMessages :: (MessageError -> Bool) -> (Message -> Bool) - -> XMPPThread (Either MessageError Message) + -> XMPP (Either MessageError Message) filterMessages f g = do s <- pullMessage case s of @@ -119,7 +119,7 @@ filterMessages f g = do Right m | g m -> return $ Right m | otherwise -> filterMessages f g -waitForMessage :: (Message -> Bool) -> XMPPThread Message +waitForMessage :: (Message -> Bool) -> XMPP Message waitForMessage f = do s <- pullMessage case s of @@ -127,7 +127,7 @@ waitForMessage f = do Right m | f m -> return m | otherwise -> waitForMessage f -waitForMessageError :: (MessageError -> Bool) -> XMPPThread MessageError +waitForMessageError :: (MessageError -> Bool) -> XMPP MessageError waitForMessageError f = do s <- pullMessage case s of @@ -135,7 +135,7 @@ waitForMessageError f = do Left m | f m -> return m | otherwise -> waitForMessageError f -waitForPresence :: (Presence -> Bool) -> XMPPThread Presence +waitForPresence :: (Presence -> Bool) -> XMPP Presence waitForPresence f = do s <- pullPresence case s of @@ -149,7 +149,7 @@ waitForPresence f = do -- The Action will run in the calling thread/ -- NB: This will /not/ catch any exceptions. If you action dies, deadlocks -- or otherwisely exits abnormaly the XMPP session will be dead. -withConnection :: XMPPConMonad a -> XMPPThread a +withConnection :: XMPPConMonad a -> XMPP a withConnection a = do readerId <- asks readerThread stateRef <- asks conStateRef @@ -167,36 +167,36 @@ withConnection a = do return res -- | Send a presence Stanza -sendPresence :: Presence -> XMPPThread () +sendPresence :: Presence -> XMPP () sendPresence = sendS . PresenceS -- | Send a Message Stanza -sendMessage :: Message -> XMPPThread () +sendMessage :: Message -> XMPP () sendMessage = sendS . MessageS -modifyHandlers :: (EventHandlers -> EventHandlers) -> XMPPThread () +modifyHandlers :: (EventHandlers -> EventHandlers) -> XMPP () modifyHandlers f = do eh <- asks eventHandlers liftIO . atomically $ writeTVar eh . f =<< readTVar eh -setSessionEndHandler :: XMPPThread () -> XMPPThread () +setSessionEndHandler :: XMPP () -> XMPP () setSessionEndHandler eh = modifyHandlers (\s -> s{sessionEndHandler = eh}) -- | run an event handler -runHandler :: (EventHandlers -> XMPPThread a) -> XMPPThread a +runHandler :: (EventHandlers -> XMPP a) -> XMPP a runHandler h = do eh <- liftIO . atomically . readTVar =<< asks eventHandlers h eh -- | End the current xmpp session -endSession :: XMPPThread () +endSession :: XMPP () endSession = do -- TODO: This has to be idempotent (is it?) withConnection xmppKillConnection liftIO =<< asks stopThreads runHandler sessionEndHandler -- | Close the connection to the server -closeConnection :: XMPPThread () +closeConnection :: XMPP () closeConnection = withConnection xmppKillConnection diff --git a/src/Network/XMPP/Concurrent/Threads.hs b/src/Network/XMPP/Concurrent/Threads.hs index a801b05..7a4309a 100644 --- a/src/Network/XMPP/Concurrent/Threads.hs +++ b/src/Network/XMPP/Concurrent/Threads.hs @@ -119,28 +119,28 @@ writeWorker stCh writeR = forever $ do -- returns channel of incoming and outgoing stances, respectively -- and an Action to stop the Threads and close the connection startThreads - :: XMPPConMonad ( TChan (Either MessageError Message) - , TChan (Either PresenceError Presence) - , TVar IQHandlers - , TChan Stanza - , IO () - , TMVar (BS.ByteString -> IO ()) - , TMVar XMPPConState - , ThreadId - , TVar EventHandlers - ) + :: IO ( TChan (Either MessageError Message) + , TChan (Either PresenceError Presence) + , TVar IQHandlers + , TChan Stanza + , IO () + , TMVar (BS.ByteString -> IO ()) + , TMVar XMPPConState + , ThreadId + , TVar EventHandlers + ) startThreads = do - writeLock <- liftIO . newTMVarIO =<< gets sConPushBS - messageC <- liftIO newTChanIO - presenceC <- liftIO newTChanIO - outC <- liftIO newTChanIO - handlers <- liftIO $ newTVarIO ( Map.empty, Map.empty) - eh <- liftIO $ newTVarIO zeroEventHandlers - conS <- liftIO . newTMVarIO =<< get - lw <- liftIO . forkIO $ writeWorker outC writeLock - cp <- liftIO . forkIO $ connPersist writeLock - rd <- liftIO . forkIO $ readWorker messageC presenceC handlers conS + writeLock <- newEmptyTMVarIO + messageC <- newTChanIO + presenceC <- newTChanIO + outC <- newTChanIO + handlers <- newTVarIO ( Map.empty, Map.empty) + eh <- newTVarIO zeroEventHandlers + conS <- newEmptyTMVarIO + lw <- forkIO $ writeWorker outC writeLock + cp <- forkIO $ connPersist writeLock + rd <- forkIO $ readWorker messageC presenceC handlers conS return (messageC, presenceC, handlers, outC , killConnection writeLock [lw, rd, cp] , writeLock, conS ,rd, eh) @@ -150,24 +150,24 @@ startThreads = do _ <- forM threads killThread return() --- | Start worker threads and run action. The supplied action will run --- in the calling thread. use 'forkXMPP' to start another thread. -runThreaded :: XMPPThread a - -> XMPPConMonad a -runThreaded a = do - liftIO . putStrLn $ "starting threads" +-- | Creates and initializes a new XMPP session. +newSession :: IO Session +newSession = do (mC, pC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads - liftIO . putStrLn $ "threads running" - workermCh <- liftIO . newIORef $ Nothing - workerpCh <- liftIO . newIORef $ Nothing - idRef <- liftIO $ newTVarIO 1 + workermCh <- newIORef $ Nothing + workerpCh <- newIORef $ Nothing + idRef <- newTVarIO 1 let getId = atomically $ do curId <- readTVar idRef writeTVar idRef (curId + 1 :: Integer) return . read. show $ curId - liftIO . putStrLn $ "starting application" - liftIO $ runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR rdr getId conS eh stopThreads') + return (Session workermCh workerpCh mC pC outC hand writeR rdr getId conS eh stopThreads') +withNewSession :: XMPP b -> IO b +withNewSession a = newSession >>= runReaderT a + +withSession :: Session -> XMPP a -> IO a +withSession = flip runReaderT -- | Sends a blank space every 30 seconds to keep the connection alive connPersist :: TMVar (BS.ByteString -> IO ()) -> IO () diff --git a/src/Network/XMPP/Concurrent/Types.hs b/src/Network/XMPP/Concurrent/Types.hs index fa15f7e..37aa821 100644 --- a/src/Network/XMPP/Concurrent/Types.hs +++ b/src/Network/XMPP/Concurrent/Types.hs @@ -23,8 +23,8 @@ type IQHandlers = (Map.Map (IQRequestType, Text) (TChan (IQRequest, TVar Bool)) ) data EventHandlers = EventHandlers - { sessionEndHandler :: XMPPThread () - , connectionClosedHandler :: XMPPThread () + { sessionEndHandler :: XMPP () + , connectionClosedHandler :: XMPP () } zeroEventHandlers :: EventHandlers @@ -33,29 +33,29 @@ zeroEventHandlers = EventHandlers , connectionClosedHandler = return () } -data Thread = Thread { messagesRef :: IORef (Maybe ( TChan (Either +data Session = Session { messagesRef :: IORef (Maybe ( TChan (Either MessageError Message - ))) - , presenceRef :: IORef (Maybe (TChan (Either - PresenceError - Presence - ))) - , mShadow :: TChan (Either MessageError - Message) -- the original chan - , pShadow :: TChan (Either PresenceError - Presence) -- the original chan - , outCh :: TChan Stanza - , iqHandlers :: TVar IQHandlers - , writeRef :: TMVar (BS.ByteString -> IO () ) - , readerThread :: ThreadId - , idGenerator :: IO StanzaId - , conStateRef :: TMVar XMPPConState - , eventHandlers :: TVar EventHandlers - , stopThreads :: IO () - } - -type XMPPThread a = ReaderT Thread IO a + ))) + , presenceRef :: IORef (Maybe (TChan (Either + PresenceError Presence ))) + , mShadow :: TChan (Either MessageError + Message) + -- the original chan + , pShadow :: TChan (Either PresenceError + Presence) + -- the original chan + , outCh :: TChan Stanza + , iqHandlers :: TVar IQHandlers + , writeRef :: TMVar (BS.ByteString -> IO () ) + , readerThread :: ThreadId + , idGenerator :: IO StanzaId + , conStateRef :: TMVar XMPPConState + , eventHandlers :: TVar EventHandlers + , stopThreads :: IO () + } + +type XMPP a = ReaderT Session IO a data Interrupt = Interrupt (TMVar ()) deriving Typeable instance Show Interrupt where show _ = "" diff --git a/src/Network/XMPP/Monad.hs b/src/Network/XMPP/Monad.hs index f860c15..013f186 100644 --- a/src/Network/XMPP/Monad.hs +++ b/src/Network/XMPP/Monad.hs @@ -120,8 +120,8 @@ xmppRawConnect host hostname = do put st -withNewSession :: XMPPConMonad a -> IO (a, XMPPConState) -withNewSession action = do +xmppNewSession :: XMPPConMonad a -> IO (a, XMPPConState) +xmppNewSession action = do runStateT action xmppZeroConState xmppKillConnection :: XMPPConMonad () diff --git a/src/Network/XMPP/Session.hs b/src/Network/XMPP/Session.hs index 5a355b0..8e3082f 100644 --- a/src/Network/XMPP/Session.hs +++ b/src/Network/XMPP/Session.hs @@ -32,7 +32,7 @@ xmppSession = do let IQResultS (IQResult "sess" Nothing Nothing _lang _body) = answer return () -startSession :: XMPPThread () +startSession :: XMPP () startSession = do answer <- sendIQ' Nothing Set Nothing sessionXML case answer of