Browse Source

add stanza channel

master
Philipp Balzarek 14 years ago
parent
commit
0f43a5f63c
  1. 2
      src/Network/XMPP.hs
  2. 8
      src/Network/XMPP/Concurrent/Monad.hs
  3. 16
      src/Network/XMPP/Concurrent/Threads.hs
  4. 1
      src/Network/XMPP/Concurrent/Types.hs

2
src/Network/XMPP.hs

@ -66,7 +66,7 @@ module Network.XMPP
-- presence, or IQ stanza. The particular allowable values for the 'type' -- presence, or IQ stanza. The particular allowable values for the 'type'
-- attribute vary depending on whether the stanza is a message, presence, -- attribute vary depending on whether the stanza is a message, presence,
-- or IQ stanza. -- or IQ stanza.
, getStanzaChan
-- ** Messages -- ** Messages
-- | The /message/ stanza is a /push/ mechanism whereby one entity pushes -- | The /message/ stanza is a /push/ mechanism whereby one entity pushes
-- information to another entity, similar to the communications that occur in -- information to another entity, similar to the communications that occur in

8
src/Network/XMPP/Concurrent/Monad.hs

@ -37,7 +37,7 @@ listenIQChan tp ns = do
Nothing -> Just iqCh Nothing -> Just iqCh
Just _iqCh' -> Nothing Just _iqCh' -> Nothing
-- | get the inbound stanza channel, duplicates from master if necessary -- | get the inbound message channel, duplicates from master if necessary
-- please note that once duplicated it will keep filling up, call -- please note that once duplicated it will keep filling up, call
-- 'dropMessageChan' to allow it to be garbage collected -- 'dropMessageChan' to allow it to be garbage collected
getMessageChan :: XMPP (TChan (Either MessageError Message)) getMessageChan :: XMPP (TChan (Either MessageError Message))
@ -52,6 +52,12 @@ getMessageChan = do
return mCh' return mCh'
Just mCh' -> return mCh' Just mCh' -> return mCh'
-- | Get a duplicate of the stanza channel
getStanzaChan :: XMPP (TChan Stanza)
getStanzaChan = do
shadow <- asks sShadow
liftIO $ atomically $ dupTChan shadow
-- | see 'getMessageChan' -- | see 'getMessageChan'
getPresenceChan :: XMPP (TChan (Either PresenceError Presence)) getPresenceChan :: XMPP (TChan (Either PresenceError Presence))
getPresenceChan = do getPresenceChan = do

16
src/Network/XMPP/Concurrent/Threads.hs

@ -31,11 +31,12 @@ import GHC.IO (unsafeUnmask)
readWorker :: TChan (Either MessageError Message) readWorker :: TChan (Either MessageError Message)
-> TChan (Either PresenceError Presence) -> TChan (Either PresenceError Presence)
-> TChan Stanza
-> TVar IQHandlers -> TVar IQHandlers
-> TVar EventHandlers -> TVar EventHandlers
-> TMVar XmppConnection -> TMVar XmppConnection
-> IO () -> IO ()
readWorker messageC presenceC iqHands handlers stateRef = readWorker messageC presenceC stanzaC iqHands handlers stateRef =
Ex.mask_ . forever $ do Ex.mask_ . forever $ do
res <- liftIO $ Ex.catches ( do res <- liftIO $ Ex.catches ( do
-- we don't know whether pull will -- we don't know whether pull will
@ -57,6 +58,8 @@ readWorker messageC presenceC iqHands handlers stateRef =
case res of case res of
Nothing -> return () Nothing -> return ()
Just sta -> do Just sta -> do
writeTChan stanzaC sta
void $ readTChan stanzaC -- sic
case sta of case sta of
MessageS m -> do writeTChan messageC $ Right m MessageS m -> do writeTChan messageC $ Right m
_ <- readTChan messageC -- Sic! _ <- readTChan messageC -- Sic!
@ -139,6 +142,7 @@ writeWorker stCh writeR = forever $ do
startThreads startThreads
:: IO ( TChan (Either MessageError Message) :: IO ( TChan (Either MessageError Message)
, TChan (Either PresenceError Presence) , TChan (Either PresenceError Presence)
, TChan Stanza
, TVar IQHandlers , TVar IQHandlers
, TChan Stanza , TChan Stanza
, IO () , IO ()
@ -153,13 +157,14 @@ startThreads = do
messageC <- newTChanIO messageC <- newTChanIO
presenceC <- newTChanIO presenceC <- newTChanIO
outC <- newTChanIO outC <- newTChanIO
stanzaC <- newTChanIO
handlers <- newTVarIO ( Map.empty, Map.empty) handlers <- newTVarIO ( Map.empty, Map.empty)
eh <- newTVarIO zeroEventHandlers eh <- newTVarIO zeroEventHandlers
conS <- newTMVarIO xmppNoConnection conS <- newTMVarIO xmppNoConnection
lw <- forkIO $ writeWorker outC writeLock lw <- forkIO $ writeWorker outC writeLock
cp <- forkIO $ connPersist writeLock cp <- forkIO $ connPersist writeLock
rd <- forkIO $ readWorker messageC presenceC handlers eh conS rd <- forkIO $ readWorker messageC presenceC stanzaC handlers eh conS
return (messageC, presenceC, handlers, outC return (messageC, presenceC, stanzaC, handlers, outC
, killConnection writeLock [lw, rd, cp] , killConnection writeLock [lw, rd, cp]
, writeLock, conS ,rd, eh) , writeLock, conS ,rd, eh)
where where
@ -171,7 +176,7 @@ startThreads = do
-- | Creates and initializes a new XMPP session. -- | Creates and initializes a new XMPP session.
newSession :: IO Session newSession :: IO Session
newSession = do newSession = do
(mC, pC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads (mC, pC, sC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads
workermCh <- newIORef $ Nothing workermCh <- newIORef $ Nothing
workerpCh <- newIORef $ Nothing workerpCh <- newIORef $ Nothing
idRef <- newTVarIO 1 idRef <- newTVarIO 1
@ -179,7 +184,8 @@ newSession = do
curId <- readTVar idRef curId <- readTVar idRef
writeTVar idRef (curId + 1 :: Integer) writeTVar idRef (curId + 1 :: Integer)
return . read. show $ curId return . read. show $ curId
return (Session workermCh workerpCh mC pC outC hand writeR rdr getId conS eh stopThreads') return (Session workermCh workerpCh mC pC sC outC hand writeR rdr getId
conS eh stopThreads')
withNewSession :: XMPP b -> IO (Session, b) withNewSession :: XMPP b -> IO (Session, b)
withNewSession a = do withNewSession a = do

1
src/Network/XMPP/Concurrent/Types.hs

@ -45,6 +45,7 @@ data Session = Session { messagesRef :: IORef (Maybe ( TChan (Either
, pShadow :: TChan (Either PresenceError , pShadow :: TChan (Either PresenceError
Presence) Presence)
-- the original chan -- the original chan
, sShadow :: TChan Stanza -- All stanzas
, outCh :: TChan Stanza , outCh :: TChan Stanza
, iqHandlers :: TVar IQHandlers , iqHandlers :: TVar IQHandlers
, writeRef :: TMVar (BS.ByteString -> IO Bool ) , writeRef :: TMVar (BS.ByteString -> IO Bool )

Loading…
Cancel
Save