Browse Source

renamed XMPPThread to XMPP

renamed Thread to Session
split runThreaded in newSession and WithNewSession
master
Philipp Balzarek 14 years ago
parent
commit
70d9b5b47d
  1. 16
      src/Network/XMPP.hs
  2. 2
      src/Network/XMPP/Bind.hs
  3. 4
      src/Network/XMPP/Concurrent.hs
  4. 6
      src/Network/XMPP/Concurrent/IQ.hs
  5. 42
      src/Network/XMPP/Concurrent/Monad.hs
  6. 46
      src/Network/XMPP/Concurrent/Threads.hs
  7. 18
      src/Network/XMPP/Concurrent/Types.hs
  8. 4
      src/Network/XMPP/Monad.hs
  9. 2
      src/Network/XMPP/Session.hs

16
src/Network/XMPP.hs

@ -34,7 +34,7 @@ @@ -34,7 +34,7 @@
module Network.XMPP
( -- * Session management
xmppNewSession
withNewSession
, connect
, startTLS
, auth
@ -132,7 +132,7 @@ module Network.XMPP @@ -132,7 +132,7 @@ module Network.XMPP
, iqRequestPayload
, iqResultPayload
-- * Threads
, XMPPThread
, XMPP
, forkXMPP
-- * Misc
, exampleParams
@ -155,10 +155,6 @@ import Network.XMPP.Types @@ -155,10 +155,6 @@ import Network.XMPP.Types
import Control.Monad.Error
-- | Create a new, pristine session without an active connection.
xmppNewSession :: XMPPThread a -> IO (a, XMPPConState)
xmppNewSession = withNewSession . runThreaded
-- | Connect to host with given address.
xmppConnect :: HostName -> Text -> XMPPConMonad (Either StreamError ())
xmppConnect address hostname = xmppRawConnect address hostname >> xmppStartStream
@ -166,18 +162,16 @@ xmppConnect address hostname = xmppRawConnect address hostname >> xmppStartStre @@ -166,18 +162,16 @@ xmppConnect address hostname = xmppRawConnect address hostname >> xmppStartStre
-- | Attempts to secure the connection using TLS. Will return
-- 'TLSNoServerSupport' when the server does not offer TLS or does not
-- expect it at this time.
startTLS :: TLS.TLSParams -> XMPPThread (Either XMPPTLSError ())
startTLS :: TLS.TLSParams -> XMPP (Either XMPPTLSError ())
startTLS = withConnection . xmppStartTLS
-- | Authenticate to the server with the given username and password
-- and bind a resource
auth :: Text.Text -- ^ The username
-> Text.Text -- ^ The password
-> Maybe Text -- ^ The desired resource or 'Nothing' to let the server
-- assign one
-> XMPPThread (Either SaslError Text.Text)
-> XMPP (Either SaslError Text.Text)
auth username passwd resource = runErrorT $ do
ErrorT . withConnection $ xmppSASL username passwd
res <- lift $ xmppBind resource
@ -185,5 +179,5 @@ auth username passwd resource = runErrorT $ do @@ -185,5 +179,5 @@ auth username passwd resource = runErrorT $ do
return res
-- | Connect to an xmpp server
connect :: HostName -> Text -> XMPPThread (Either StreamError ())
connect :: HostName -> Text -> XMPP (Either StreamError ())
connect address hostname = withConnection $ xmppConnect address hostname

2
src/Network/XMPP/Bind.hs

@ -40,7 +40,7 @@ jidP = bindP $ xpElemNodes "jid" (xpContent xpPrim) @@ -40,7 +40,7 @@ jidP = bindP $ xpElemNodes "jid" (xpContent xpPrim)
-- server-generated resource and extract the JID from the non-error
-- response.
xmppBind :: Maybe Text -> XMPPThread Text
xmppBind :: Maybe Text -> XMPP Text
xmppBind rsrc = do
answer <- sendIQ' Nothing Set Nothing (bindBody rsrc)
let (Right IQResult{iqResultPayload = Just b}) = answer -- TODO: Error handling

4
src/Network/XMPP/Concurrent.hs

@ -1,6 +1,6 @@ @@ -1,6 +1,6 @@
module Network.XMPP.Concurrent
( Thread
, XMPPThread
( Session
, XMPP
, module Network.XMPP.Concurrent.Monad
, module Network.XMPP.Concurrent.Threads
, module Network.XMPP.Concurrent.IQ

6
src/Network/XMPP/Concurrent/IQ.hs

@ -17,7 +17,7 @@ sendIQ :: Maybe JID -- ^ Recipient (to) @@ -17,7 +17,7 @@ sendIQ :: Maybe JID -- ^ Recipient (to)
-> IQRequestType -- ^ IQ type (Get or Set)
-> Maybe LangTag -- ^ Language tag of the payload (Nothing for default)
-> Element -- ^ The iq body (there has to be exactly one)
-> XMPPThread (TMVar IQResponse)
-> XMPP (TMVar IQResponse)
sendIQ to tp lang body = do -- TODO: add timeout
newId <- liftIO =<< asks idGenerator
handlers <- asks iqHandlers
@ -35,14 +35,14 @@ sendIQ' :: Maybe JID @@ -35,14 +35,14 @@ sendIQ' :: Maybe JID
-> IQRequestType
-> Maybe LangTag
-> Element
-> XMPPThread IQResponse
-> XMPP IQResponse
sendIQ' to tp lang body = do
ref <- sendIQ to tp lang body
liftIO . atomically $ takeTMVar ref
answerIQ :: (IQRequest, TVar Bool)
-> Either StanzaError (Maybe Element)
-> XMPPThread Bool
-> XMPP Bool
answerIQ ((IQRequest iqid from _to lang _tp bd), sentRef) answer = do
out <- asks outCh
let response = case answer of

42
src/Network/XMPP/Concurrent/Monad.hs

@ -23,7 +23,7 @@ import Network.XMPP.Monad @@ -23,7 +23,7 @@ import Network.XMPP.Monad
-- combination was alread handled
listenIQChan :: IQRequestType -- ^ type of IQs to receive (Get / Set)
-> Text -- ^ namespace of the child element
-> XMPPThread (Maybe ( TChan (IQRequest, TVar Bool)))
-> XMPP (Maybe ( TChan (IQRequest, TVar Bool)))
listenIQChan tp ns = do
handlers <- asks iqHandlers
liftIO . atomically $ do
@ -39,7 +39,7 @@ listenIQChan tp ns = do @@ -39,7 +39,7 @@ listenIQChan tp ns = do
-- | get the inbound stanza channel, duplicates from master if necessary
-- please note that once duplicated it will keep filling up, call
-- 'dropMessageChan' to allow it to be garbage collected
getMessageChan :: XMPPThread (TChan (Either MessageError Message))
getMessageChan :: XMPP (TChan (Either MessageError Message))
getMessageChan = do
mChR <- asks messagesRef
mCh <- liftIO $ readIORef mChR
@ -52,7 +52,7 @@ getMessageChan = do @@ -52,7 +52,7 @@ getMessageChan = do
Just mCh' -> return mCh'
-- | see 'getMessageChan'
getPresenceChan :: XMPPThread (TChan (Either PresenceError Presence))
getPresenceChan :: XMPP (TChan (Either PresenceError Presence))
getPresenceChan = do
pChR <- asks presenceRef
pCh <- liftIO $ readIORef pChR
@ -66,40 +66,40 @@ getPresenceChan = do @@ -66,40 +66,40 @@ getPresenceChan = do
-- | Drop the local end of the inbound stanza channel
-- from our context so it can be GC-ed
dropMessageChan :: XMPPThread ()
dropMessageChan :: XMPP ()
dropMessageChan = do
r <- asks messagesRef
liftIO $ writeIORef r Nothing
-- | see 'dropMessageChan'
dropPresenceChan :: XMPPThread ()
dropPresenceChan :: XMPP ()
dropPresenceChan = do
r <- asks presenceRef
liftIO $ writeIORef r Nothing
-- | Read an element from the inbound stanza channel, acquiring a copy
-- of the channel as necessary
pullMessage :: XMPPThread (Either MessageError Message)
pullMessage :: XMPP (Either MessageError Message)
pullMessage = do
c <- getMessageChan
liftIO $ atomically $ readTChan c
-- | Read an element from the inbound stanza channel, acquiring a copy
-- of the channel as necessary
pullPresence :: XMPPThread (Either PresenceError Presence)
pullPresence :: XMPP (Either PresenceError Presence)
pullPresence = do
c <- getPresenceChan
liftIO $ atomically $ readTChan c
-- | Send a stanza to the server
sendS :: Stanza -> XMPPThread ()
sendS :: Stanza -> XMPP ()
sendS a = do
out <- asks outCh
liftIO . atomically $ writeTChan out a
return ()
-- | Fork a new thread
forkXMPP :: XMPPThread () -> XMPPThread ThreadId
forkXMPP :: XMPP () -> XMPP ThreadId
forkXMPP a = do
thread <- ask
mCH' <- liftIO $ newIORef Nothing
@ -110,7 +110,7 @@ forkXMPP a = do @@ -110,7 +110,7 @@ forkXMPP a = do
filterMessages :: (MessageError -> Bool)
-> (Message -> Bool)
-> XMPPThread (Either MessageError Message)
-> XMPP (Either MessageError Message)
filterMessages f g = do
s <- pullMessage
case s of
@ -119,7 +119,7 @@ filterMessages f g = do @@ -119,7 +119,7 @@ filterMessages f g = do
Right m | g m -> return $ Right m
| otherwise -> filterMessages f g
waitForMessage :: (Message -> Bool) -> XMPPThread Message
waitForMessage :: (Message -> Bool) -> XMPP Message
waitForMessage f = do
s <- pullMessage
case s of
@ -127,7 +127,7 @@ waitForMessage f = do @@ -127,7 +127,7 @@ waitForMessage f = do
Right m | f m -> return m
| otherwise -> waitForMessage f
waitForMessageError :: (MessageError -> Bool) -> XMPPThread MessageError
waitForMessageError :: (MessageError -> Bool) -> XMPP MessageError
waitForMessageError f = do
s <- pullMessage
case s of
@ -135,7 +135,7 @@ waitForMessageError f = do @@ -135,7 +135,7 @@ waitForMessageError f = do
Left m | f m -> return m
| otherwise -> waitForMessageError f
waitForPresence :: (Presence -> Bool) -> XMPPThread Presence
waitForPresence :: (Presence -> Bool) -> XMPP Presence
waitForPresence f = do
s <- pullPresence
case s of
@ -149,7 +149,7 @@ waitForPresence f = do @@ -149,7 +149,7 @@ waitForPresence f = do
-- The Action will run in the calling thread/
-- NB: This will /not/ catch any exceptions. If you action dies, deadlocks
-- or otherwisely exits abnormaly the XMPP session will be dead.
withConnection :: XMPPConMonad a -> XMPPThread a
withConnection :: XMPPConMonad a -> XMPP a
withConnection a = do
readerId <- asks readerThread
stateRef <- asks conStateRef
@ -167,36 +167,36 @@ withConnection a = do @@ -167,36 +167,36 @@ withConnection a = do
return res
-- | Send a presence Stanza
sendPresence :: Presence -> XMPPThread ()
sendPresence :: Presence -> XMPP ()
sendPresence = sendS . PresenceS
-- | Send a Message Stanza
sendMessage :: Message -> XMPPThread ()
sendMessage :: Message -> XMPP ()
sendMessage = sendS . MessageS
modifyHandlers :: (EventHandlers -> EventHandlers) -> XMPPThread ()
modifyHandlers :: (EventHandlers -> EventHandlers) -> XMPP ()
modifyHandlers f = do
eh <- asks eventHandlers
liftIO . atomically $ writeTVar eh . f =<< readTVar eh
setSessionEndHandler :: XMPPThread () -> XMPPThread ()
setSessionEndHandler :: XMPP () -> XMPP ()
setSessionEndHandler eh = modifyHandlers (\s -> s{sessionEndHandler = eh})
-- | run an event handler
runHandler :: (EventHandlers -> XMPPThread a) -> XMPPThread a
runHandler :: (EventHandlers -> XMPP a) -> XMPP a
runHandler h = do
eh <- liftIO . atomically . readTVar =<< asks eventHandlers
h eh
-- | End the current xmpp session
endSession :: XMPPThread ()
endSession :: XMPP ()
endSession = do -- TODO: This has to be idempotent (is it?)
withConnection xmppKillConnection
liftIO =<< asks stopThreads
runHandler sessionEndHandler
-- | Close the connection to the server
closeConnection :: XMPPThread ()
closeConnection :: XMPP ()
closeConnection = withConnection xmppKillConnection

46
src/Network/XMPP/Concurrent/Threads.hs

@ -119,7 +119,7 @@ writeWorker stCh writeR = forever $ do @@ -119,7 +119,7 @@ writeWorker stCh writeR = forever $ do
-- returns channel of incoming and outgoing stances, respectively
-- and an Action to stop the Threads and close the connection
startThreads
:: XMPPConMonad ( TChan (Either MessageError Message)
:: IO ( TChan (Either MessageError Message)
, TChan (Either PresenceError Presence)
, TVar IQHandlers
, TChan Stanza
@ -131,16 +131,16 @@ startThreads @@ -131,16 +131,16 @@ startThreads
)
startThreads = do
writeLock <- liftIO . newTMVarIO =<< gets sConPushBS
messageC <- liftIO newTChanIO
presenceC <- liftIO newTChanIO
outC <- liftIO newTChanIO
handlers <- liftIO $ newTVarIO ( Map.empty, Map.empty)
eh <- liftIO $ newTVarIO zeroEventHandlers
conS <- liftIO . newTMVarIO =<< get
lw <- liftIO . forkIO $ writeWorker outC writeLock
cp <- liftIO . forkIO $ connPersist writeLock
rd <- liftIO . forkIO $ readWorker messageC presenceC handlers conS
writeLock <- newEmptyTMVarIO
messageC <- newTChanIO
presenceC <- newTChanIO
outC <- newTChanIO
handlers <- newTVarIO ( Map.empty, Map.empty)
eh <- newTVarIO zeroEventHandlers
conS <- newEmptyTMVarIO
lw <- forkIO $ writeWorker outC writeLock
cp <- forkIO $ connPersist writeLock
rd <- forkIO $ readWorker messageC presenceC handlers conS
return (messageC, presenceC, handlers, outC
, killConnection writeLock [lw, rd, cp]
, writeLock, conS ,rd, eh)
@ -150,24 +150,24 @@ startThreads = do @@ -150,24 +150,24 @@ startThreads = do
_ <- forM threads killThread
return()
-- | Start worker threads and run action. The supplied action will run
-- in the calling thread. use 'forkXMPP' to start another thread.
runThreaded :: XMPPThread a
-> XMPPConMonad a
runThreaded a = do
liftIO . putStrLn $ "starting threads"
-- | Creates and initializes a new XMPP session.
newSession :: IO Session
newSession = do
(mC, pC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads
liftIO . putStrLn $ "threads running"
workermCh <- liftIO . newIORef $ Nothing
workerpCh <- liftIO . newIORef $ Nothing
idRef <- liftIO $ newTVarIO 1
workermCh <- newIORef $ Nothing
workerpCh <- newIORef $ Nothing
idRef <- newTVarIO 1
let getId = atomically $ do
curId <- readTVar idRef
writeTVar idRef (curId + 1 :: Integer)
return . read. show $ curId
liftIO . putStrLn $ "starting application"
liftIO $ runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR rdr getId conS eh stopThreads')
return (Session workermCh workerpCh mC pC outC hand writeR rdr getId conS eh stopThreads')
withNewSession :: XMPP b -> IO b
withNewSession a = newSession >>= runReaderT a
withSession :: Session -> XMPP a -> IO a
withSession = flip runReaderT
-- | Sends a blank space every 30 seconds to keep the connection alive
connPersist :: TMVar (BS.ByteString -> IO ()) -> IO ()

18
src/Network/XMPP/Concurrent/Types.hs

@ -23,8 +23,8 @@ type IQHandlers = (Map.Map (IQRequestType, Text) (TChan (IQRequest, TVar Bool)) @@ -23,8 +23,8 @@ type IQHandlers = (Map.Map (IQRequestType, Text) (TChan (IQRequest, TVar Bool))
)
data EventHandlers = EventHandlers
{ sessionEndHandler :: XMPPThread ()
, connectionClosedHandler :: XMPPThread ()
{ sessionEndHandler :: XMPP ()
, connectionClosedHandler :: XMPP ()
}
zeroEventHandlers :: EventHandlers
@ -33,18 +33,18 @@ zeroEventHandlers = EventHandlers @@ -33,18 +33,18 @@ zeroEventHandlers = EventHandlers
, connectionClosedHandler = return ()
}
data Thread = Thread { messagesRef :: IORef (Maybe ( TChan (Either
data Session = Session { messagesRef :: IORef (Maybe ( TChan (Either
MessageError
Message
)))
, presenceRef :: IORef (Maybe (TChan (Either
PresenceError
Presence
)))
PresenceError Presence )))
, mShadow :: TChan (Either MessageError
Message) -- the original chan
Message)
-- the original chan
, pShadow :: TChan (Either PresenceError
Presence) -- the original chan
Presence)
-- the original chan
, outCh :: TChan Stanza
, iqHandlers :: TVar IQHandlers
, writeRef :: TMVar (BS.ByteString -> IO () )
@ -55,7 +55,7 @@ data Thread = Thread { messagesRef :: IORef (Maybe ( TChan (Either @@ -55,7 +55,7 @@ data Thread = Thread { messagesRef :: IORef (Maybe ( TChan (Either
, stopThreads :: IO ()
}
type XMPPThread a = ReaderT Thread IO a
type XMPP a = ReaderT Session IO a
data Interrupt = Interrupt (TMVar ()) deriving Typeable
instance Show Interrupt where show _ = "<Interrupt>"

4
src/Network/XMPP/Monad.hs

@ -120,8 +120,8 @@ xmppRawConnect host hostname = do @@ -120,8 +120,8 @@ xmppRawConnect host hostname = do
put st
withNewSession :: XMPPConMonad a -> IO (a, XMPPConState)
withNewSession action = do
xmppNewSession :: XMPPConMonad a -> IO (a, XMPPConState)
xmppNewSession action = do
runStateT action xmppZeroConState
xmppKillConnection :: XMPPConMonad ()

2
src/Network/XMPP/Session.hs

@ -32,7 +32,7 @@ xmppSession = do @@ -32,7 +32,7 @@ xmppSession = do
let IQResultS (IQResult "sess" Nothing Nothing _lang _body) = answer
return ()
startSession :: XMPPThread ()
startSession :: XMPP ()
startSession = do
answer <- sendIQ' Nothing Set Nothing sessionXML
case answer of

Loading…
Cancel
Save