diff --git a/src/Network/XMPP.hs b/src/Network/XMPP.hs index 8047c81..4de7ce9 100644 --- a/src/Network/XMPP.hs +++ b/src/Network/XMPP.hs @@ -66,7 +66,7 @@ module Network.XMPP -- presence, or IQ stanza. The particular allowable values for the 'type' -- attribute vary depending on whether the stanza is a message, presence, -- or IQ stanza. - + , getStanzaChan -- ** Messages -- | The /message/ stanza is a /push/ mechanism whereby one entity pushes -- information to another entity, similar to the communications that occur in diff --git a/src/Network/XMPP/Concurrent/Monad.hs b/src/Network/XMPP/Concurrent/Monad.hs index 815ce8a..4a5f51d 100644 --- a/src/Network/XMPP/Concurrent/Monad.hs +++ b/src/Network/XMPP/Concurrent/Monad.hs @@ -37,7 +37,7 @@ listenIQChan tp ns = do Nothing -> Just iqCh Just _iqCh' -> Nothing --- | get the inbound stanza channel, duplicates from master if necessary +-- | get the inbound message channel, duplicates from master if necessary -- please note that once duplicated it will keep filling up, call -- 'dropMessageChan' to allow it to be garbage collected getMessageChan :: XMPP (TChan (Either MessageError Message)) @@ -52,6 +52,12 @@ getMessageChan = do return mCh' Just mCh' -> return mCh' +-- | Get a duplicate of the stanza channel +getStanzaChan :: XMPP (TChan Stanza) +getStanzaChan = do + shadow <- asks sShadow + liftIO $ atomically $ dupTChan shadow + -- | see 'getMessageChan' getPresenceChan :: XMPP (TChan (Either PresenceError Presence)) getPresenceChan = do diff --git a/src/Network/XMPP/Concurrent/Threads.hs b/src/Network/XMPP/Concurrent/Threads.hs index 4f6a1d7..43717ae 100644 --- a/src/Network/XMPP/Concurrent/Threads.hs +++ b/src/Network/XMPP/Concurrent/Threads.hs @@ -31,11 +31,12 @@ import GHC.IO (unsafeUnmask) readWorker :: TChan (Either MessageError Message) -> TChan (Either PresenceError Presence) + -> TChan Stanza -> TVar IQHandlers -> TVar EventHandlers -> TMVar XmppConnection -> IO () -readWorker messageC presenceC iqHands handlers stateRef = +readWorker messageC presenceC stanzaC iqHands handlers stateRef = Ex.mask_ . forever $ do res <- liftIO $ Ex.catches ( do -- we don't know whether pull will @@ -57,6 +58,8 @@ readWorker messageC presenceC iqHands handlers stateRef = case res of Nothing -> return () Just sta -> do + writeTChan stanzaC sta + void $ readTChan stanzaC -- sic case sta of MessageS m -> do writeTChan messageC $ Right m _ <- readTChan messageC -- Sic! @@ -139,6 +142,7 @@ writeWorker stCh writeR = forever $ do startThreads :: IO ( TChan (Either MessageError Message) , TChan (Either PresenceError Presence) + , TChan Stanza , TVar IQHandlers , TChan Stanza , IO () @@ -153,13 +157,14 @@ startThreads = do messageC <- newTChanIO presenceC <- newTChanIO outC <- newTChanIO + stanzaC <- newTChanIO handlers <- newTVarIO ( Map.empty, Map.empty) eh <- newTVarIO zeroEventHandlers conS <- newTMVarIO xmppNoConnection lw <- forkIO $ writeWorker outC writeLock cp <- forkIO $ connPersist writeLock - rd <- forkIO $ readWorker messageC presenceC handlers eh conS - return (messageC, presenceC, handlers, outC + rd <- forkIO $ readWorker messageC presenceC stanzaC handlers eh conS + return (messageC, presenceC, stanzaC, handlers, outC , killConnection writeLock [lw, rd, cp] , writeLock, conS ,rd, eh) where @@ -171,7 +176,7 @@ startThreads = do -- | Creates and initializes a new XMPP session. newSession :: IO Session newSession = do - (mC, pC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads + (mC, pC, sC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads workermCh <- newIORef $ Nothing workerpCh <- newIORef $ Nothing idRef <- newTVarIO 1 @@ -179,7 +184,8 @@ newSession = do curId <- readTVar idRef writeTVar idRef (curId + 1 :: Integer) return . read. show $ curId - return (Session workermCh workerpCh mC pC outC hand writeR rdr getId conS eh stopThreads') + return (Session workermCh workerpCh mC pC sC outC hand writeR rdr getId + conS eh stopThreads') withNewSession :: XMPP b -> IO (Session, b) withNewSession a = do diff --git a/src/Network/XMPP/Concurrent/Types.hs b/src/Network/XMPP/Concurrent/Types.hs index 964d126..b8f3e0c 100644 --- a/src/Network/XMPP/Concurrent/Types.hs +++ b/src/Network/XMPP/Concurrent/Types.hs @@ -45,6 +45,7 @@ data Session = Session { messagesRef :: IORef (Maybe ( TChan (Either , pShadow :: TChan (Either PresenceError Presence) -- the original chan + , sShadow :: TChan Stanza -- All stanzas , outCh :: TChan Stanza , iqHandlers :: TVar IQHandlers , writeRef :: TMVar (BS.ByteString -> IO Bool )