From 461bbd06927c8eaa21fc3b8fe18d8cf83ff71463 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sun, 9 Dec 2012 18:06:25 +0100 Subject: [PATCH] remove message and presence channels (use stanza channel instead) rename forkSession to dupSession --- examples/EchoClient.hs | 41 +++++++++++-------- source/Network/Xmpp.hs | 4 +- source/Network/Xmpp/Concurrent/Channels.hs | 41 +++---------------- .../Network/Xmpp/Concurrent/Channels/Basic.hs | 16 +++----- .../Xmpp/Concurrent/Channels/Message.hs | 25 +++-------- .../Xmpp/Concurrent/Channels/Presence.hs | 24 +++-------- .../Network/Xmpp/Concurrent/Channels/Types.hs | 13 +----- 7 files changed, 47 insertions(+), 117 deletions(-) diff --git a/examples/EchoClient.hs b/examples/EchoClient.hs index 0a2f9f3..953cc2b 100644 --- a/examples/EchoClient.hs +++ b/examples/EchoClient.hs @@ -26,35 +26,40 @@ import Network.Xmpp.IM -- Server and authentication details. host = "localhost" port = PortNumber 5222 -realm = "host.com" +realm = "server.com" username = "echo" password = "pwd" resource = Just "bot" -- | Automatically accept all subscription requests from other entities autoAccept :: Session -> IO () -autoAccept context = forever $ do - st <- waitForPresence isPresenceSubscribe context +autoAccept session = forever $ do + st <- waitForPresence isPresenceSubscribe session let Just friend = presenceFrom st - sendPresence (presenceSubscribed friend) context + sendPresence (presenceSubscribed friend) session printf "Hello %s !" (show friend) main :: IO () main = do - con <- simpleConnect - host - port - realm - username - password - resource - putStrLn "connected" - sendPresence presenceOnline con - _thread <- forkIO $ autoAccept con - forever $ do -- echo all messages back to the user - msg <- getMessage con + sess <- simpleConnect + host + port + realm + username + password + resource + -- We won't be able to receive stanzas before we set out status to online + sendPresence presenceOnline sess + putStrLn "Connected." + -- We want to see all incoming stanzas in the auto-accept thread as well. + sess' <- dupSession sess + _thread <- forkIO $ autoAccept sess' + forever $ do + -- Echo all messages back to the user. + msg <- getMessage sess + sendMessage (answerIM (bodies msg) [] msg) sess + -- Print the received message to the screen. let sender = show . fromJust $ messageFrom msg let contents = maybe "nothing" Text.unpack $ body msg - printf "%s sayd \"%s\"\n" sender contents - sendMessage (answerIM (bodies msg) [] msg) con + printf "%s says \"%s\"\n" sender contents return () diff --git a/source/Network/Xmpp.hs b/source/Network/Xmpp.hs index 0d8400c..d2547da 100644 --- a/source/Network/Xmpp.hs +++ b/source/Network/Xmpp.hs @@ -78,7 +78,7 @@ module Network.Xmpp -- presence, or IQ stanza. The particular allowable values for the 'type' -- attribute vary depending on whether the stanza is a message, presence, -- or IQ stanza. - , getStanzaChan + -- -- ** Messages -- | The /message/ stanza is a /push/ mechanism whereby one entity -- pushes information to another entity, similar to the communications that @@ -144,7 +144,7 @@ module Network.Xmpp , iqRequestPayload , iqResultPayload -- * Threads - , forkSession + , dupSession -- * Miscellaneous , LangTag(..) , exampleParams diff --git a/source/Network/Xmpp/Concurrent/Channels.hs b/source/Network/Xmpp/Concurrent/Channels.hs index 933f168..31c294a 100644 --- a/source/Network/Xmpp/Concurrent/Channels.hs +++ b/source/Network/Xmpp/Concurrent/Channels.hs @@ -34,38 +34,17 @@ import Network.Xmpp.Pickle import Network.Xmpp.Types import Text.XML.Stream.Elements -toChans :: TChan (Either MessageError Message) - -> TChan (Either PresenceError Presence) - -> TChan Stanza +toChans :: TChan Stanza -> TVar IQHandlers -> Stanza -> IO () -toChans messageC presenceC stanzaC iqHands sta = atomically $ do +toChans stanzaC iqHands sta = atomically $ do writeTChan stanzaC sta - void $ readTChan stanzaC -- sic case sta of - MessageS m -> do writeTChan messageC $ Right m - _ <- readTChan messageC -- Sic! - return () - -- this may seem ridiculous, but to prevent - -- the channel from filling up we - -- immedtiately remove the - -- Stanza we just put in. It will still be - -- available in duplicates. - MessageErrorS m -> do writeTChan messageC $ Left m - _ <- readTChan messageC - return () - PresenceS p -> do - writeTChan presenceC $ Right p - _ <- readTChan presenceC - return () - PresenceErrorS p -> do - writeTChan presenceC $ Left p - _ <- readTChan presenceC - return () IQRequestS i -> handleIQRequest iqHands i IQResultS i -> handleIQResponse iqHands (Right i) IQErrorS i -> handleIQResponse iqHands (Left i) + _ -> return () where -- If the IQ request has a namespace, send it through the appropriate channel. handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () @@ -94,17 +73,13 @@ toChans messageC presenceC stanzaC iqHands sta = atomically $ do -- | Creates and initializes a new Xmpp context. newSession :: Connection -> IO Session newSession con = do - messageC <- newTChanIO - presenceC <- newTChanIO outC <- newTChanIO - stanzaC <- newTChanIO + stanzaChan <- newTChanIO iqHandlers <- newTVarIO (Map.empty, Map.empty) eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () } - let stanzaHandler = toChans messageC presenceC stanzaC iqHandlers + let stanzaHandler = toChans stanzaChan iqHandlers (kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh con writer <- forkIO $ writeWorker outC wLock - workermCh <- newIORef $ Nothing - workerpCh <- newIORef $ Nothing idRef <- newTVarIO 1 let getId = atomically $ do curId <- readTVar idRef @@ -118,11 +93,7 @@ newSession con = do , stopThreads = kill >> killThread writer } return $ Session { context = cont - , mShadow = messageC - , pShadow = presenceC - , sShadow = stanzaC - , messagesRef = workermCh - , presenceRef = workerpCh + , stanzaCh = stanzaChan , outCh = outC , iqHandlers = iqHandlers } diff --git a/source/Network/Xmpp/Concurrent/Channels/Basic.hs b/source/Network/Xmpp/Concurrent/Channels/Basic.hs index cb820fe..e01d920 100644 --- a/source/Network/Xmpp/Concurrent/Channels/Basic.hs +++ b/source/Network/Xmpp/Concurrent/Channels/Basic.hs @@ -2,21 +2,15 @@ module Network.Xmpp.Concurrent.Channels.Basic where import Control.Concurrent.STM -import Data.IORef import Network.Xmpp.Concurrent.Channels.Types import Network.Xmpp.Types --- | Get a duplicate of the stanza channel -getStanzaChan :: Session -> IO (TChan Stanza) -getStanzaChan session = atomically $ dupTChan (sShadow session) - -- | Send a stanza to the server. sendStanza :: Stanza -> Session -> IO () sendStanza a session = atomically $ writeTChan (outCh session) a --- | Create a forked session object -forkSession :: Session -> IO Session -forkSession session = do - mCH' <- newIORef Nothing - pCH' <- newIORef Nothing - return $ session {messagesRef = mCH' , presenceRef = pCH'} +-- | Create a new session object with the inbound channel duplicated +dupSession :: Session -> IO Session +dupSession session = do + stanzaCh' <- atomically $ dupTChan (stanzaCh session) + return $ session {stanzaCh = stanzaCh'} diff --git a/source/Network/Xmpp/Concurrent/Channels/Message.hs b/source/Network/Xmpp/Concurrent/Channels/Message.hs index c3619a0..5cff80a 100644 --- a/source/Network/Xmpp/Concurrent/Channels/Message.hs +++ b/source/Network/Xmpp/Concurrent/Channels/Message.hs @@ -8,30 +8,15 @@ import Network.Xmpp.Types import Network.Xmpp.Concurrent.Types import Network.Xmpp.Concurrent.Channels.Basic --- | Get the inbound stanza channel, duplicates from master if necessary. Please --- note that once duplicated it will keep filling up, call 'dropMessageChan' to --- allow it to be garbage collected. -getMessageChan :: Session -> IO (TChan (Either MessageError Message)) -getMessageChan session = do - mCh <- readIORef . messagesRef $ session - case mCh of - Nothing -> do - mCh' <- atomically $ dupTChan (mShadow session) - writeIORef (messagesRef session) (Just mCh') - return mCh' - Just mCh' -> return mCh' - --- | Drop the local end of the inbound stanza channel from our context so it can --- be GC-ed. -dropMessageChan :: Session -> IO () -dropMessageChan session = writeIORef (messagesRef session) Nothing - -- | Read an element from the inbound stanza channel, acquiring a copy of the -- channel as necessary. pullMessage :: Session -> IO (Either MessageError Message) pullMessage session = do - c <- getMessageChan session - atomically $ readTChan c + stanza <- atomically . readTChan $ stanzaCh session + case stanza of + MessageS m -> return $ Right m + MessageErrorS e -> return $ Left e + _ -> pullMessage session -- | Get the next received message getMessage :: Session -> IO Message diff --git a/source/Network/Xmpp/Concurrent/Channels/Presence.hs b/source/Network/Xmpp/Concurrent/Channels/Presence.hs index abcd367..32ec83f 100644 --- a/source/Network/Xmpp/Concurrent/Channels/Presence.hs +++ b/source/Network/Xmpp/Concurrent/Channels/Presence.hs @@ -8,29 +8,15 @@ import Network.Xmpp.Types import Network.Xmpp.Concurrent.Types import Network.Xmpp.Concurrent.Channels.Basic --- | Analogous to 'getMessageChan'. -getPresenceChan :: Session -> IO (TChan (Either PresenceError Presence)) -getPresenceChan session = do - pCh <- readIORef $ (presenceRef session) - case pCh of - Nothing -> do - pCh' <- atomically $ dupTChan (pShadow session) - writeIORef (presenceRef session) (Just pCh') - return pCh' - Just pCh' -> return pCh' - - --- | Analogous to 'dropMessageChan'. -dropPresenceChan :: Session -> IO () -dropPresenceChan session = writeIORef (presenceRef session) Nothing - - -- | Read an element from the inbound stanza channel, acquiring a copy of the -- channel as necessary. pullPresence :: Session -> IO (Either PresenceError Presence) pullPresence session = do - c <- getPresenceChan session - atomically $ readTChan c + stanza <- atomically . readTChan $ stanzaCh session + case stanza of + PresenceS p -> return $ Right p + PresenceErrorS e -> return $ Left e + _ -> pullPresence session -- | Pulls a (non-error) presence and returns it if the given predicate returns -- @True@. diff --git a/source/Network/Xmpp/Concurrent/Channels/Types.hs b/source/Network/Xmpp/Concurrent/Channels/Types.hs index a70e4c2..8de98f1 100644 --- a/source/Network/Xmpp/Concurrent/Channels/Types.hs +++ b/source/Network/Xmpp/Concurrent/Channels/Types.hs @@ -11,18 +11,7 @@ import Network.Xmpp.Types -- | An XMPP session context data Session = Session { context :: Context - -- The original master channels that the reader puts stanzas - -- into. These are cloned by @get{STanza,Message,Presence}Chan - -- on demand when first used by the thread and are stored in the - -- {message,presence}Ref fields below. - , mShadow :: TChan (Either MessageError Message) - , pShadow :: TChan (Either PresenceError Presence) - , sShadow :: TChan Stanza -- All stanzas - -- The cloned copies of the original/shadow channels. They are - -- thread-local (as opposed to the shadow channels) and contains all - -- stanzas received after the cloning of the shadow channels. - , messagesRef :: IORef (Maybe (TChan (Either MessageError Message))) - , presenceRef :: IORef (Maybe (TChan (Either PresenceError Presence))) + , stanzaCh :: TChan Stanza -- All stanzas , outCh :: TChan Stanza , iqHandlers :: TVar IQHandlers -- Writing lock, so that only one thread could write to the stream at any