Browse Source

split off channels

master
Philipp Balzarek 13 years ago
parent
commit
c0af09d40c
  1. 11
      source/Network/Xmpp.hs
  2. 12
      source/Network/Xmpp/Concurrent/IQ.hs
  3. 110
      source/Network/Xmpp/Concurrent/Monad.hs
  4. 140
      source/Network/Xmpp/Concurrent/Threads.hs
  5. 25
      source/Network/Xmpp/Concurrent/Types.hs
  6. 16
      source/Network/Xmpp/Session.hs
  7. 2
      source/Network/Xmpp/Xep/ServiceDiscovery.hs

11
source/Network/Xmpp.hs

@ -29,7 +29,7 @@ @@ -29,7 +29,7 @@
module Network.Xmpp
( -- * Session management
newSession
newSessionChans
, withConnection
, connect
, simpleConnect
@ -140,7 +140,7 @@ module Network.Xmpp @@ -140,7 +140,7 @@ module Network.Xmpp
, iqRequestPayload
, iqResultPayload
-- * Threads
, forkSession
, forkChans
-- * Misc
, LangTag(..)
, exampleParams
@ -152,6 +152,7 @@ import Network @@ -152,6 +152,7 @@ import Network
import qualified Network.TLS as TLS
import Network.Xmpp.Bind
import Network.Xmpp.Concurrent
import Network.Xmpp.Concurrent.Channels
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Marshal
import Network.Xmpp.Message
@ -169,9 +170,9 @@ import Network.Xmpp.Types @@ -169,9 +170,9 @@ import Network.Xmpp.Types
import Control.Monad.Error
-- | Connect to host with given address.
connect :: HostName -> Text -> XmppConMonad (Either StreamError ())
connect address hostname = do
xmppRawConnect address hostname
connect :: HostName -> PortID -> Text -> XmppConMonad (Either StreamError ())
connect address port hostname = do
xmppRawConnect address port hostname
result <- xmppStartStream
case result of
Left e -> do

12
source/Network/Xmpp/Concurrent/IQ.hs

@ -27,22 +27,22 @@ sendIQ timeOut to tp lang body session = do -- TODO: Add timeout @@ -27,22 +27,22 @@ sendIQ timeOut to tp lang body session = do -- TODO: Add timeout
newId <- idGenerator session
ref <- atomically $ do
resRef <- newEmptyTMVar
(byNS, byId) <- readTVar (iqHandlers session)
writeTVar (iqHandlers session) (byNS, Map.insert newId resRef byId)
(byNS, byId) <- readTVar (iqHandlers . chans $ session)
writeTVar (iqHandlers . chans $ session) (byNS, Map.insert newId resRef byId)
-- TODO: Check for id collisions (shouldn't happen?)
return resRef
sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) session
sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) (chans session)
case timeOut of
Nothing -> return ()
Just t -> void . forkIO $ do
threadDelay t
doTimeOut (iqHandlers session) newId ref
doTimeOut (iqHandlers . chans $ session) newId ref
return ref
where
doTimeOut handlers iqid var = atomically $ do
p <- tryPutTMVar var IQResponseTimeout
when p $ do
(byNS, byId) <- readTVar (iqHandlers session)
(byNS, byId) <- readTVar (iqHandlers . chans $ session)
writeTVar handlers (byNS, Map.delete iqid byId)
return ()
@ -76,6 +76,6 @@ answerIQ (IQRequestTicket @@ -76,6 +76,6 @@ answerIQ (IQRequestTicket
False -> do
writeTVar sentRef True
writeTChan (outCh session) response
writeTChan (outCh . chans $ session) response
return True
True -> return False

110
source/Network/Xmpp/Concurrent/Monad.hs

@ -26,10 +26,10 @@ import Network.Xmpp.Monad @@ -26,10 +26,10 @@ import Network.Xmpp.Monad
-- to interfere with existing consumers.
listenIQChan :: IQRequestType -- ^ Type of IQs to receive (@Get@ or @Set@)
-> Text -- ^ Namespace of the child element
-> Session
-> Chans
-> IO (Either (TChan IQRequestTicket) (TChan IQRequestTicket))
listenIQChan tp ns session = do
let handlers = iqHandlers session
listenIQChan tp ns chans = do
let handlers = iqHandlers chans
atomically $ do
(byNS, byID) <- readTVar handlers
iqCh <- newTChan
@ -44,110 +44,110 @@ listenIQChan tp ns session = do @@ -44,110 +44,110 @@ listenIQChan tp ns session = do
Just iqCh' -> Left iqCh'
-- | Get a duplicate of the stanza channel
getStanzaChan :: Session -> IO (TChan Stanza)
getStanzaChan session = atomically $ dupTChan (sShadow session)
getStanzaChan :: Chans -> IO (TChan Stanza)
getStanzaChan chans = atomically $ dupTChan (sShadow chans)
-- | Get the inbound stanza channel, duplicates from master if necessary. Please
-- note that once duplicated it will keep filling up, call 'dropMessageChan' to
-- allow it to be garbage collected.
getMessageChan :: Session -> IO (TChan (Either MessageError Message))
getMessageChan session = do
mCh <- readIORef $ messagesRef session
getMessageChan :: Chans -> IO (TChan (Either MessageError Message))
getMessageChan chans = do
mCh <- readIORef $ messagesRef chans
case mCh of
Nothing -> do
mCh' <- atomically $ dupTChan (mShadow session)
writeIORef (messagesRef session) (Just mCh')
mCh' <- atomically $ dupTChan (mShadow chans)
writeIORef (messagesRef chans) (Just mCh')
return mCh'
Just mCh' -> return mCh'
-- | Analogous to 'getMessageChan'.
getPresenceChan :: Session -> IO (TChan (Either PresenceError Presence))
getPresenceChan session = do
pCh <- readIORef $ presenceRef session
getPresenceChan :: Chans -> IO (TChan (Either PresenceError Presence))
getPresenceChan chans = do
pCh <- readIORef $ presenceRef chans
case pCh of
Nothing -> do
pCh' <- atomically $ dupTChan (pShadow session)
writeIORef (presenceRef session) (Just pCh')
pCh' <- atomically $ dupTChan (pShadow chans)
writeIORef (presenceRef chans) (Just pCh')
return pCh'
Just pCh' -> return pCh'
-- | Drop the local end of the inbound stanza channel from our context so it can
-- be GC-ed.
dropMessageChan :: Session -> IO ()
dropMessageChan session = writeIORef (messagesRef session) Nothing
dropMessageChan :: Chans -> IO ()
dropMessageChan chans = writeIORef (messagesRef chans) Nothing
-- | Analogous to 'dropMessageChan'.
dropPresenceChan :: Session -> IO ()
dropPresenceChan session = writeIORef (presenceRef session) Nothing
dropPresenceChan :: Chans -> IO ()
dropPresenceChan chans = writeIORef (presenceRef chans) Nothing
-- | Read an element from the inbound stanza channel, acquiring a copy of the
-- channel as necessary.
pullMessage :: Session -> IO (Either MessageError Message)
pullMessage session = do
c <- getMessageChan session
pullMessage :: Chans -> IO (Either MessageError Message)
pullMessage chans = do
c <- getMessageChan chans
atomically $ readTChan c
-- | Read an element from the inbound stanza channel, acquiring a copy of the
-- channel as necessary.
pullPresence :: Session -> IO (Either PresenceError Presence)
pullPresence session = do
c <- getPresenceChan session
pullPresence :: Chans -> IO (Either PresenceError Presence)
pullPresence chans = do
c <- getPresenceChan chans
atomically $ readTChan c
-- | Send a stanza to the server.
sendStanza :: Stanza -> Session -> IO ()
sendStanza a session = atomically $ writeTChan (outCh session) a
sendStanza :: Stanza -> Chans -> IO ()
sendStanza a chans = atomically $ writeTChan (outCh chans) a
-- | Create a forked session object
forkSession :: Session -> IO Session
forkSession session = do
-- | Create a forked chans object
forkChans :: Chans -> IO Chans
forkChans chans = do
mCH' <- newIORef Nothing
pCH' <- newIORef Nothing
return $ session {messagesRef = mCH', presenceRef = pCH'}
return $ chans {messagesRef = mCH', presenceRef = pCH'}
-- | Pulls a message and returns it if the given predicate returns @True@.
filterMessages :: (MessageError -> Bool)
-> (Message -> Bool)
-> Session -> IO (Either MessageError Message)
filterMessages f g session = do
s <- pullMessage session
-> Chans -> IO (Either MessageError Message)
filterMessages f g chans = do
s <- pullMessage chans
case s of
Left e | f e -> return $ Left e
| otherwise -> filterMessages f g session
| otherwise -> filterMessages f g chans
Right m | g m -> return $ Right m
| otherwise -> filterMessages f g session
| otherwise -> filterMessages f g chans
-- | Pulls a (non-error) message and returns it if the given predicate returns
-- @True@.
waitForMessage :: (Message -> Bool) -> Session -> IO Message
waitForMessage f session = do
s <- pullMessage session
waitForMessage :: (Message -> Bool) -> Chans -> IO Message
waitForMessage f chans = do
s <- pullMessage chans
case s of
Left _ -> waitForMessage f session
Left _ -> waitForMessage f chans
Right m | f m -> return m
| otherwise -> waitForMessage f session
| otherwise -> waitForMessage f chans
-- | Pulls an error message and returns it if the given predicate returns @True@.
waitForMessageError :: (MessageError -> Bool) -> Session -> IO MessageError
waitForMessageError f session = do
s <- pullMessage session
waitForMessageError :: (MessageError -> Bool) -> Chans -> IO MessageError
waitForMessageError f chans = do
s <- pullMessage chans
case s of
Right _ -> waitForMessageError f session
Right _ -> waitForMessageError f chans
Left m | f m -> return m
| otherwise -> waitForMessageError f session
| otherwise -> waitForMessageError f chans
-- | Pulls a (non-error) presence and returns it if the given predicate returns
-- @True@.
waitForPresence :: (Presence -> Bool) -> Session -> IO Presence
waitForPresence f session = do
s <- pullPresence session
waitForPresence :: (Presence -> Bool) -> Chans -> IO Presence
waitForPresence f chans = do
s <- pullPresence chans
case s of
Left _ -> waitForPresence f session
Left _ -> waitForPresence f chans
Right m | f m -> return m
| otherwise -> waitForPresence f session
| otherwise -> waitForPresence f chans
-- TODO: Wait for presence error?
@ -194,12 +194,12 @@ withConnection a session = do @@ -194,12 +194,12 @@ withConnection a session = do
]
-- | Send a presence stanza.
sendPresence :: Presence -> Session -> IO ()
sendPresence p session = sendStanza (PresenceS p) session
sendPresence :: Presence -> Chans -> IO ()
sendPresence p chans = sendStanza (PresenceS p) chans
-- | Send a message stanza.
sendMessage :: Message -> Session -> IO ()
sendMessage m session = sendStanza (MessageS m) session
sendMessage :: Message -> Chans -> IO ()
sendMessage m chans = sendStanza (MessageS m) chans
-- | Executes a function to update the event handlers.

140
source/Network/Xmpp/Concurrent/Threads.hs

@ -1,32 +1,22 @@ @@ -1,32 +1,22 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.Xmpp.Concurrent.Threads where
import Network.Xmpp.Types
import Control.Applicative((<$>),(<*>))
import Control.Applicative((<$>))
import Control.Concurrent
import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Reader
import Control.Monad.State.Strict
import qualified Data.ByteString as BS
import Data.IORef
import qualified Data.Map as Map
import Data.Maybe
import Data.XML.Types
import Network.Xmpp.Monad
import Network.Xmpp.Marshal
import Network.Xmpp.Pickle
import Network.Xmpp.Concurrent.Types
import Text.XML.Stream.Elements
import GHC.IO (unsafeUnmask)
-- Worker to read stanzas from the stream and concurrently distribute them to
@ -73,55 +63,25 @@ readWorker onStanza onConnectionClosed stateRef = @@ -73,55 +63,25 @@ readWorker onStanza onConnectionClosed stateRef =
Ex.catch (atomically $ forM ts takeTMVar)
(\(Interrupt t) -> handleInterrupts (t:ts))
-- If the IQ request has a namespace, send it through the appropriate channel.
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
handleIQRequest handlers iq = do
(byNS, _) <- readTVar handlers
let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq)
case Map.lookup (iqRequestType iq, iqNS) byNS of
Nothing -> return () -- TODO: send error stanza
Just ch -> do
sent <- newTVar False
writeTChan ch $ IQRequestTicket sent iq
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM ()
handleIQResponse handlers iq = do
(byNS, byID) <- readTVar handlers
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of
(Nothing, _) -> return () -- We are not supposed to send an error.
(Just tmvar, byID') -> do
let answer = either IQResponseError IQResponseResult iq
_ <- tryPutTMVar tmvar answer -- Don't block.
writeTVar handlers (byNS, byID')
where
iqID (Left err) = iqErrorID err
iqID (Right iq') = iqResultID iq'
-- Worker to write stanzas to the stream concurrently.
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO ()
writeWorker stCh writeR = forever $ do
(write, next) <- atomically $ (,) <$>
takeTMVar writeR <*>
readTChan stCh
r <- write $ renderElement (pickleElem xpStanza next)
atomically $ putTMVar writeR write
unless r $ do
atomically $ unGetTChan stCh next -- If the writing failed, the
-- connection is dead.
threadDelay 250000 -- Avoid free spinning.
-- Two streams: input and output. Threads read from input stream and write to
-- output stream.
-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing
-- stances, respectively, and an Action to stop the Threads and close the
-- connection.
startThreadsWith stanzaHandler outC eh = do
startThreadsWith :: (Stanza -> IO ())
-> TVar EventHandlers
-> IO
(IO (),
TMVar (BS.ByteString -> IO Bool),
TMVar XmppConnection,
ThreadId)
startThreadsWith stanzaHandler eh = do
writeLock <- newTMVarIO (\_ -> return False)
conS <- newTMVarIO xmppNoConnection
lw <- forkIO $ writeWorker outC writeLock
-- lw <- forkIO $ writeWorker outC writeLock
cp <- forkIO $ connPersist writeLock
rd <- forkIO $ readWorker stanzaHandler (noCon eh) conS
return ( killConnection writeLock [lw, rd, cp]
return ( killConnection writeLock [rd, cp]
, writeLock
, conS
, rd
@ -131,39 +91,12 @@ startThreadsWith stanzaHandler outC eh = do @@ -131,39 +91,12 @@ startThreadsWith stanzaHandler outC eh = do
_ <- atomically $ takeTMVar writeLock -- Should we put it back?
_ <- forM threads killThread
return ()
-- | Creates and initializes a new concurrent session.
newSessionChans :: IO Session
newSessionChans = do
messageC <- newTChanIO
presenceC <- newTChanIO
outC <- newTChanIO
stanzaC <- newTChanIO
iqHandlers <- newTVarIO (Map.empty, Map.empty)
eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () }
let stanzaHandler = toChans messageC presenceC stanzaC iqHandlers
(kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler outC eh
workermCh <- newIORef $ Nothing
workerpCh <- newIORef $ Nothing
idRef <- newTVarIO 1
let getId = atomically $ do
curId <- readTVar idRef
writeTVar idRef (curId + 1 :: Integer)
return . read. show $ curId
return $ Session { mShadow = messageC
, pShadow = presenceC
, sShadow = stanzaC
, messagesRef = workermCh
, presenceRef = workerpCh
, outCh = outC
, iqHandlers = iqHandlers
, writeRef = wLock
, readerThread = readerThread
, idGenerator = getId
, conStateRef = conState
, eventHandlers = eh
, stopThreads = kill
}
-- Call the connection closed handlers.
noCon :: TVar EventHandlers -> StreamError -> IO ()
noCon h e = do
hands <- atomically $ readTVar h
_ <- forkIO $ connectionClosedHandler hands e
return ()
-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every 30 seconds to keep the connection alive.
@ -173,38 +106,3 @@ connPersist lock = forever $ do @@ -173,38 +106,3 @@ connPersist lock = forever $ do
_ <- pushBS " "
atomically $ putTMVar lock pushBS
threadDelay 30000000 -- 30s
toChans messageC presenceC stanzaC iqHands sta = atomically $ do
writeTChan stanzaC sta
void $ readTChan stanzaC -- sic
case sta of
MessageS m -> do writeTChan messageC $ Right m
_ <- readTChan messageC -- Sic!
return ()
-- this may seem ridiculous, but to prevent
-- the channel from filling up we
-- immedtiately remove the
-- Stanza we just put in. It will still be
-- available in duplicates.
MessageErrorS m -> do writeTChan messageC $ Left m
_ <- readTChan messageC
return ()
PresenceS p -> do
writeTChan presenceC $ Right p
_ <- readTChan presenceC
return ()
PresenceErrorS p -> do
writeTChan presenceC $ Left p
_ <- readTChan presenceC
return ()
IQRequestS i -> handleIQRequest iqHands i
IQResultS i -> handleIQResponse iqHands (Right i)
IQErrorS i -> handleIQResponse iqHands (Left i)
-- Call the connection closed handlers.
noCon :: TVar EventHandlers -> StreamError -> IO ()
noCon h e = do
hands <- atomically $ readTVar h
_ <- forkIO $ connectionClosedHandler hands e
return ()

25
source/Network/Xmpp/Concurrent/Types.hs

@ -6,7 +6,6 @@ module Network.Xmpp.Concurrent.Types where @@ -6,7 +6,6 @@ module Network.Xmpp.Concurrent.Types where
import qualified Control.Exception.Lifted as Ex
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Trans.Reader
import qualified Data.ByteString as BS
import Data.IORef
@ -30,7 +29,20 @@ data EventHandlers = EventHandlers @@ -30,7 +29,20 @@ data EventHandlers = EventHandlers
-- The Session object is the Xmpp (ReaderT) state.
data Session = Session
{ -- The original master channels that the reader puts stanzas
{ writeRef :: TMVar (BS.ByteString -> IO Bool)
, readerThread :: ThreadId
, idGenerator :: IO StanzaId
-- Lock (used by withConnection) to make sure that a maximum of one
-- XmppConMonad calculation is executed at any given time.
, conStateRef :: TMVar XmppConnection
, eventHandlers :: TVar EventHandlers
, stopThreads :: IO ()
, chans :: Chans
}
data Chans = Chans
{
-- The original master channels that the reader puts stanzas
-- into. These are cloned by @get{STanza,Message,Presence}Chan
-- on demand when first used by the thread and are stored in the
-- {message,presence}Ref fields below.
@ -46,17 +58,8 @@ data Session = Session @@ -46,17 +58,8 @@ data Session = Session
, iqHandlers :: TVar IQHandlers
-- Writing lock, so that only one thread could write to the stream at any
-- given time.
, writeRef :: TMVar (BS.ByteString -> IO Bool)
, readerThread :: ThreadId
, idGenerator :: IO StanzaId
-- Lock (used by withConnection) to make sure that a maximum of one
-- XmppConMonad calculation is executed at any given time.
, conStateRef :: TMVar XmppConnection
, eventHandlers :: TVar EventHandlers
, stopThreads :: IO ()
}
-- Interrupt is used to signal to the reader thread that it should stop.
data Interrupt = Interrupt (TMVar ()) deriving Typeable
instance Show Interrupt where show _ = "<Interrupt>"

16
source/Network/Xmpp/Session.hs

@ -33,11 +33,11 @@ xmppStartSession = do @@ -33,11 +33,11 @@ xmppStartSession = do
Left e -> error $ show e
Right _ -> return ()
-- Sends the session IQ set element and waits for an answer. Throws an error if
-- if an IQ error stanza is returned from the server.
startSession :: Session -> IO ()
startSession session = do
answer <- sendIQ' Nothing Set Nothing sessionXML session
case answer of
IQResponseResult _ -> return ()
e -> error $ show e
-- -- Sends the session IQ set element and waits for an answer. Throws an error if
-- -- if an IQ error stanza is returned from the server.
-- startSession :: Session -> IO ()
-- startSession session = do
-- answer <- sendIQ' Nothing Set Nothing sessionXML session
-- case answer of
-- IQResponseResult _ -> return ()
-- e -> error $ show e

2
source/Network/Xmpp/Xep/ServiceDiscovery.hs

@ -28,6 +28,8 @@ import Network.Xmpp.Monad @@ -28,6 +28,8 @@ import Network.Xmpp.Monad
import Network.Xmpp.Pickle
import Network.Xmpp.Types
import Network.Xmpp.Concurrent
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Concurrent.Channels
data DiscoError = DiscoNoQueryElement
| DiscoIQError IQError

Loading…
Cancel
Save