Browse Source

remove message and presence channels (use stanza channel instead)

rename forkSession to dupSession
master
Philipp Balzarek 13 years ago
parent
commit
461bbd0692
  1. 41
      examples/EchoClient.hs
  2. 4
      source/Network/Xmpp.hs
  3. 41
      source/Network/Xmpp/Concurrent/Channels.hs
  4. 16
      source/Network/Xmpp/Concurrent/Channels/Basic.hs
  5. 25
      source/Network/Xmpp/Concurrent/Channels/Message.hs
  6. 24
      source/Network/Xmpp/Concurrent/Channels/Presence.hs
  7. 13
      source/Network/Xmpp/Concurrent/Channels/Types.hs

41
examples/EchoClient.hs

@ -26,35 +26,40 @@ import Network.Xmpp.IM
-- Server and authentication details. -- Server and authentication details.
host = "localhost" host = "localhost"
port = PortNumber 5222 port = PortNumber 5222
realm = "host.com" realm = "server.com"
username = "echo" username = "echo"
password = "pwd" password = "pwd"
resource = Just "bot" resource = Just "bot"
-- | Automatically accept all subscription requests from other entities -- | Automatically accept all subscription requests from other entities
autoAccept :: Session -> IO () autoAccept :: Session -> IO ()
autoAccept context = forever $ do autoAccept session = forever $ do
st <- waitForPresence isPresenceSubscribe context st <- waitForPresence isPresenceSubscribe session
let Just friend = presenceFrom st let Just friend = presenceFrom st
sendPresence (presenceSubscribed friend) context sendPresence (presenceSubscribed friend) session
printf "Hello %s !" (show friend) printf "Hello %s !" (show friend)
main :: IO () main :: IO ()
main = do main = do
con <- simpleConnect sess <- simpleConnect
host host
port port
realm realm
username username
password password
resource resource
putStrLn "connected" -- We won't be able to receive stanzas before we set out status to online
sendPresence presenceOnline con sendPresence presenceOnline sess
_thread <- forkIO $ autoAccept con putStrLn "Connected."
forever $ do -- echo all messages back to the user -- We want to see all incoming stanzas in the auto-accept thread as well.
msg <- getMessage con sess' <- dupSession sess
_thread <- forkIO $ autoAccept sess'
forever $ do
-- Echo all messages back to the user.
msg <- getMessage sess
sendMessage (answerIM (bodies msg) [] msg) sess
-- Print the received message to the screen.
let sender = show . fromJust $ messageFrom msg let sender = show . fromJust $ messageFrom msg
let contents = maybe "nothing" Text.unpack $ body msg let contents = maybe "nothing" Text.unpack $ body msg
printf "%s sayd \"%s\"\n" sender contents printf "%s says \"%s\"\n" sender contents
sendMessage (answerIM (bodies msg) [] msg) con
return () return ()

4
source/Network/Xmpp.hs

@ -78,7 +78,7 @@ module Network.Xmpp
-- presence, or IQ stanza. The particular allowable values for the 'type' -- presence, or IQ stanza. The particular allowable values for the 'type'
-- attribute vary depending on whether the stanza is a message, presence, -- attribute vary depending on whether the stanza is a message, presence,
-- or IQ stanza. -- or IQ stanza.
, getStanzaChan --
-- ** Messages -- ** Messages
-- | The /message/ stanza is a /push/ mechanism whereby one entity -- | The /message/ stanza is a /push/ mechanism whereby one entity
-- pushes information to another entity, similar to the communications that -- pushes information to another entity, similar to the communications that
@ -144,7 +144,7 @@ module Network.Xmpp
, iqRequestPayload , iqRequestPayload
, iqResultPayload , iqResultPayload
-- * Threads -- * Threads
, forkSession , dupSession
-- * Miscellaneous -- * Miscellaneous
, LangTag(..) , LangTag(..)
, exampleParams , exampleParams

41
source/Network/Xmpp/Concurrent/Channels.hs

@ -34,38 +34,17 @@ import Network.Xmpp.Pickle
import Network.Xmpp.Types import Network.Xmpp.Types
import Text.XML.Stream.Elements import Text.XML.Stream.Elements
toChans :: TChan (Either MessageError Message) toChans :: TChan Stanza
-> TChan (Either PresenceError Presence)
-> TChan Stanza
-> TVar IQHandlers -> TVar IQHandlers
-> Stanza -> Stanza
-> IO () -> IO ()
toChans messageC presenceC stanzaC iqHands sta = atomically $ do toChans stanzaC iqHands sta = atomically $ do
writeTChan stanzaC sta writeTChan stanzaC sta
void $ readTChan stanzaC -- sic
case sta of case sta of
MessageS m -> do writeTChan messageC $ Right m
_ <- readTChan messageC -- Sic!
return ()
-- this may seem ridiculous, but to prevent
-- the channel from filling up we
-- immedtiately remove the
-- Stanza we just put in. It will still be
-- available in duplicates.
MessageErrorS m -> do writeTChan messageC $ Left m
_ <- readTChan messageC
return ()
PresenceS p -> do
writeTChan presenceC $ Right p
_ <- readTChan presenceC
return ()
PresenceErrorS p -> do
writeTChan presenceC $ Left p
_ <- readTChan presenceC
return ()
IQRequestS i -> handleIQRequest iqHands i IQRequestS i -> handleIQRequest iqHands i
IQResultS i -> handleIQResponse iqHands (Right i) IQResultS i -> handleIQResponse iqHands (Right i)
IQErrorS i -> handleIQResponse iqHands (Left i) IQErrorS i -> handleIQResponse iqHands (Left i)
_ -> return ()
where where
-- If the IQ request has a namespace, send it through the appropriate channel. -- If the IQ request has a namespace, send it through the appropriate channel.
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
@ -94,17 +73,13 @@ toChans messageC presenceC stanzaC iqHands sta = atomically $ do
-- | Creates and initializes a new Xmpp context. -- | Creates and initializes a new Xmpp context.
newSession :: Connection -> IO Session newSession :: Connection -> IO Session
newSession con = do newSession con = do
messageC <- newTChanIO
presenceC <- newTChanIO
outC <- newTChanIO outC <- newTChanIO
stanzaC <- newTChanIO stanzaChan <- newTChanIO
iqHandlers <- newTVarIO (Map.empty, Map.empty) iqHandlers <- newTVarIO (Map.empty, Map.empty)
eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () } eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () }
let stanzaHandler = toChans messageC presenceC stanzaC iqHandlers let stanzaHandler = toChans stanzaChan iqHandlers
(kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh con (kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh con
writer <- forkIO $ writeWorker outC wLock writer <- forkIO $ writeWorker outC wLock
workermCh <- newIORef $ Nothing
workerpCh <- newIORef $ Nothing
idRef <- newTVarIO 1 idRef <- newTVarIO 1
let getId = atomically $ do let getId = atomically $ do
curId <- readTVar idRef curId <- readTVar idRef
@ -118,11 +93,7 @@ newSession con = do
, stopThreads = kill >> killThread writer , stopThreads = kill >> killThread writer
} }
return $ Session { context = cont return $ Session { context = cont
, mShadow = messageC , stanzaCh = stanzaChan
, pShadow = presenceC
, sShadow = stanzaC
, messagesRef = workermCh
, presenceRef = workerpCh
, outCh = outC , outCh = outC
, iqHandlers = iqHandlers , iqHandlers = iqHandlers
} }

16
source/Network/Xmpp/Concurrent/Channels/Basic.hs

@ -2,21 +2,15 @@
module Network.Xmpp.Concurrent.Channels.Basic where module Network.Xmpp.Concurrent.Channels.Basic where
import Control.Concurrent.STM import Control.Concurrent.STM
import Data.IORef
import Network.Xmpp.Concurrent.Channels.Types import Network.Xmpp.Concurrent.Channels.Types
import Network.Xmpp.Types import Network.Xmpp.Types
-- | Get a duplicate of the stanza channel
getStanzaChan :: Session -> IO (TChan Stanza)
getStanzaChan session = atomically $ dupTChan (sShadow session)
-- | Send a stanza to the server. -- | Send a stanza to the server.
sendStanza :: Stanza -> Session -> IO () sendStanza :: Stanza -> Session -> IO ()
sendStanza a session = atomically $ writeTChan (outCh session) a sendStanza a session = atomically $ writeTChan (outCh session) a
-- | Create a forked session object -- | Create a new session object with the inbound channel duplicated
forkSession :: Session -> IO Session dupSession :: Session -> IO Session
forkSession session = do dupSession session = do
mCH' <- newIORef Nothing stanzaCh' <- atomically $ dupTChan (stanzaCh session)
pCH' <- newIORef Nothing return $ session {stanzaCh = stanzaCh'}
return $ session {messagesRef = mCH' , presenceRef = pCH'}

25
source/Network/Xmpp/Concurrent/Channels/Message.hs

@ -8,30 +8,15 @@ import Network.Xmpp.Types
import Network.Xmpp.Concurrent.Types import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Concurrent.Channels.Basic import Network.Xmpp.Concurrent.Channels.Basic
-- | Get the inbound stanza channel, duplicates from master if necessary. Please
-- note that once duplicated it will keep filling up, call 'dropMessageChan' to
-- allow it to be garbage collected.
getMessageChan :: Session -> IO (TChan (Either MessageError Message))
getMessageChan session = do
mCh <- readIORef . messagesRef $ session
case mCh of
Nothing -> do
mCh' <- atomically $ dupTChan (mShadow session)
writeIORef (messagesRef session) (Just mCh')
return mCh'
Just mCh' -> return mCh'
-- | Drop the local end of the inbound stanza channel from our context so it can
-- be GC-ed.
dropMessageChan :: Session -> IO ()
dropMessageChan session = writeIORef (messagesRef session) Nothing
-- | Read an element from the inbound stanza channel, acquiring a copy of the -- | Read an element from the inbound stanza channel, acquiring a copy of the
-- channel as necessary. -- channel as necessary.
pullMessage :: Session -> IO (Either MessageError Message) pullMessage :: Session -> IO (Either MessageError Message)
pullMessage session = do pullMessage session = do
c <- getMessageChan session stanza <- atomically . readTChan $ stanzaCh session
atomically $ readTChan c case stanza of
MessageS m -> return $ Right m
MessageErrorS e -> return $ Left e
_ -> pullMessage session
-- | Get the next received message -- | Get the next received message
getMessage :: Session -> IO Message getMessage :: Session -> IO Message

24
source/Network/Xmpp/Concurrent/Channels/Presence.hs

@ -8,29 +8,15 @@ import Network.Xmpp.Types
import Network.Xmpp.Concurrent.Types import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Concurrent.Channels.Basic import Network.Xmpp.Concurrent.Channels.Basic
-- | Analogous to 'getMessageChan'.
getPresenceChan :: Session -> IO (TChan (Either PresenceError Presence))
getPresenceChan session = do
pCh <- readIORef $ (presenceRef session)
case pCh of
Nothing -> do
pCh' <- atomically $ dupTChan (pShadow session)
writeIORef (presenceRef session) (Just pCh')
return pCh'
Just pCh' -> return pCh'
-- | Analogous to 'dropMessageChan'.
dropPresenceChan :: Session -> IO ()
dropPresenceChan session = writeIORef (presenceRef session) Nothing
-- | Read an element from the inbound stanza channel, acquiring a copy of the -- | Read an element from the inbound stanza channel, acquiring a copy of the
-- channel as necessary. -- channel as necessary.
pullPresence :: Session -> IO (Either PresenceError Presence) pullPresence :: Session -> IO (Either PresenceError Presence)
pullPresence session = do pullPresence session = do
c <- getPresenceChan session stanza <- atomically . readTChan $ stanzaCh session
atomically $ readTChan c case stanza of
PresenceS p -> return $ Right p
PresenceErrorS e -> return $ Left e
_ -> pullPresence session
-- | Pulls a (non-error) presence and returns it if the given predicate returns -- | Pulls a (non-error) presence and returns it if the given predicate returns
-- @True@. -- @True@.

13
source/Network/Xmpp/Concurrent/Channels/Types.hs

@ -11,18 +11,7 @@ import Network.Xmpp.Types
-- | An XMPP session context -- | An XMPP session context
data Session = Session data Session = Session
{ context :: Context { context :: Context
-- The original master channels that the reader puts stanzas , stanzaCh :: TChan Stanza -- All stanzas
-- into. These are cloned by @get{STanza,Message,Presence}Chan
-- on demand when first used by the thread and are stored in the
-- {message,presence}Ref fields below.
, mShadow :: TChan (Either MessageError Message)
, pShadow :: TChan (Either PresenceError Presence)
, sShadow :: TChan Stanza -- All stanzas
-- The cloned copies of the original/shadow channels. They are
-- thread-local (as opposed to the shadow channels) and contains all
-- stanzas received after the cloning of the shadow channels.
, messagesRef :: IORef (Maybe (TChan (Either MessageError Message)))
, presenceRef :: IORef (Maybe (TChan (Either PresenceError Presence)))
, outCh :: TChan Stanza , outCh :: TChan Stanza
, iqHandlers :: TVar IQHandlers , iqHandlers :: TVar IQHandlers
-- Writing lock, so that only one thread could write to the stream at any -- Writing lock, so that only one thread could write to the stream at any

Loading…
Cancel
Save