From 0f43a5f63ce9541895bf07bc93f75fe1acfc79fd Mon Sep 17 00:00:00 2001
From: Philipp Balzarek
Date: Thu, 10 May 2012 17:13:11 +0200
Subject: [PATCH] add stanza channel
---
src/Network/XMPP.hs | 2 +-
src/Network/XMPP/Concurrent/Monad.hs | 8 +++++++-
src/Network/XMPP/Concurrent/Threads.hs | 16 +++++++++++-----
src/Network/XMPP/Concurrent/Types.hs | 1 +
4 files changed, 20 insertions(+), 7 deletions(-)
diff --git a/src/Network/XMPP.hs b/src/Network/XMPP.hs
index 8047c81..4de7ce9 100644
--- a/src/Network/XMPP.hs
+++ b/src/Network/XMPP.hs
@@ -66,7 +66,7 @@ module Network.XMPP
-- presence, or IQ stanza. The particular allowable values for the 'type'
-- attribute vary depending on whether the stanza is a message, presence,
-- or IQ stanza.
-
+ , getStanzaChan
-- ** Messages
-- | The /message/ stanza is a /push/ mechanism whereby one entity pushes
-- information to another entity, similar to the communications that occur in
diff --git a/src/Network/XMPP/Concurrent/Monad.hs b/src/Network/XMPP/Concurrent/Monad.hs
index 815ce8a..4a5f51d 100644
--- a/src/Network/XMPP/Concurrent/Monad.hs
+++ b/src/Network/XMPP/Concurrent/Monad.hs
@@ -37,7 +37,7 @@ listenIQChan tp ns = do
Nothing -> Just iqCh
Just _iqCh' -> Nothing
--- | get the inbound stanza channel, duplicates from master if necessary
+-- | get the inbound message channel, duplicates from master if necessary
-- please note that once duplicated it will keep filling up, call
-- 'dropMessageChan' to allow it to be garbage collected
getMessageChan :: XMPP (TChan (Either MessageError Message))
@@ -52,6 +52,12 @@ getMessageChan = do
return mCh'
Just mCh' -> return mCh'
+-- | Get a duplicate of the stanza channel
+getStanzaChan :: XMPP (TChan Stanza)
+getStanzaChan = do
+ shadow <- asks sShadow
+ liftIO $ atomically $ dupTChan shadow
+
-- | see 'getMessageChan'
getPresenceChan :: XMPP (TChan (Either PresenceError Presence))
getPresenceChan = do
diff --git a/src/Network/XMPP/Concurrent/Threads.hs b/src/Network/XMPP/Concurrent/Threads.hs
index 4f6a1d7..43717ae 100644
--- a/src/Network/XMPP/Concurrent/Threads.hs
+++ b/src/Network/XMPP/Concurrent/Threads.hs
@@ -31,11 +31,12 @@ import GHC.IO (unsafeUnmask)
readWorker :: TChan (Either MessageError Message)
-> TChan (Either PresenceError Presence)
+ -> TChan Stanza
-> TVar IQHandlers
-> TVar EventHandlers
-> TMVar XmppConnection
-> IO ()
-readWorker messageC presenceC iqHands handlers stateRef =
+readWorker messageC presenceC stanzaC iqHands handlers stateRef =
Ex.mask_ . forever $ do
res <- liftIO $ Ex.catches ( do
-- we don't know whether pull will
@@ -57,6 +58,8 @@ readWorker messageC presenceC iqHands handlers stateRef =
case res of
Nothing -> return ()
Just sta -> do
+ writeTChan stanzaC sta
+ void $ readTChan stanzaC -- sic
case sta of
MessageS m -> do writeTChan messageC $ Right m
_ <- readTChan messageC -- Sic!
@@ -139,6 +142,7 @@ writeWorker stCh writeR = forever $ do
startThreads
:: IO ( TChan (Either MessageError Message)
, TChan (Either PresenceError Presence)
+ , TChan Stanza
, TVar IQHandlers
, TChan Stanza
, IO ()
@@ -153,13 +157,14 @@ startThreads = do
messageC <- newTChanIO
presenceC <- newTChanIO
outC <- newTChanIO
+ stanzaC <- newTChanIO
handlers <- newTVarIO ( Map.empty, Map.empty)
eh <- newTVarIO zeroEventHandlers
conS <- newTMVarIO xmppNoConnection
lw <- forkIO $ writeWorker outC writeLock
cp <- forkIO $ connPersist writeLock
- rd <- forkIO $ readWorker messageC presenceC handlers eh conS
- return (messageC, presenceC, handlers, outC
+ rd <- forkIO $ readWorker messageC presenceC stanzaC handlers eh conS
+ return (messageC, presenceC, stanzaC, handlers, outC
, killConnection writeLock [lw, rd, cp]
, writeLock, conS ,rd, eh)
where
@@ -171,7 +176,7 @@ startThreads = do
-- | Creates and initializes a new XMPP session.
newSession :: IO Session
newSession = do
- (mC, pC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads
+ (mC, pC, sC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads
workermCh <- newIORef $ Nothing
workerpCh <- newIORef $ Nothing
idRef <- newTVarIO 1
@@ -179,7 +184,8 @@ newSession = do
curId <- readTVar idRef
writeTVar idRef (curId + 1 :: Integer)
return . read. show $ curId
- return (Session workermCh workerpCh mC pC outC hand writeR rdr getId conS eh stopThreads')
+ return (Session workermCh workerpCh mC pC sC outC hand writeR rdr getId
+ conS eh stopThreads')
withNewSession :: XMPP b -> IO (Session, b)
withNewSession a = do
diff --git a/src/Network/XMPP/Concurrent/Types.hs b/src/Network/XMPP/Concurrent/Types.hs
index 964d126..b8f3e0c 100644
--- a/src/Network/XMPP/Concurrent/Types.hs
+++ b/src/Network/XMPP/Concurrent/Types.hs
@@ -45,6 +45,7 @@ data Session = Session { messagesRef :: IORef (Maybe ( TChan (Either
, pShadow :: TChan (Either PresenceError
Presence)
-- the original chan
+ , sShadow :: TChan Stanza -- All stanzas
, outCh :: TChan Stanza
, iqHandlers :: TVar IQHandlers
, writeRef :: TMVar (BS.ByteString -> IO Bool )