|
|
|
@ -1,6 +1,6 @@ |
|
|
|
module Network.XMPP.Concurrent.Monad where |
|
|
|
module Network.Xmpp.Concurrent.Monad where |
|
|
|
|
|
|
|
|
|
|
|
import Network.XMPP.Types |
|
|
|
import Network.Xmpp.Types |
|
|
|
|
|
|
|
|
|
|
|
import Control.Concurrent |
|
|
|
import Control.Concurrent |
|
|
|
import Control.Concurrent.STM |
|
|
|
import Control.Concurrent.STM |
|
|
|
@ -13,8 +13,8 @@ import Data.IORef |
|
|
|
import qualified Data.Map as Map |
|
|
|
import qualified Data.Map as Map |
|
|
|
import Data.Text(Text) |
|
|
|
import Data.Text(Text) |
|
|
|
|
|
|
|
|
|
|
|
import Network.XMPP.Concurrent.Types |
|
|
|
import Network.Xmpp.Concurrent.Types |
|
|
|
import Network.XMPP.Monad |
|
|
|
import Network.Xmpp.Monad |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- | Register a new IQ listener. IQ requests matching the type and namespace |
|
|
|
-- | Register a new IQ listener. IQ requests matching the type and namespace |
|
|
|
@ -24,7 +24,7 @@ import Network.XMPP.Monad |
|
|
|
-- combination was alread handled. |
|
|
|
-- combination was alread handled. |
|
|
|
listenIQChan :: IQRequestType -- ^ Type of IQs to receive (@Get@ or @Set@) |
|
|
|
listenIQChan :: IQRequestType -- ^ Type of IQs to receive (@Get@ or @Set@) |
|
|
|
-> Text -- ^ Namespace of the child element |
|
|
|
-> Text -- ^ Namespace of the child element |
|
|
|
-> XMPP (Maybe (TChan IQRequestTicket)) |
|
|
|
-> Xmpp (Maybe (TChan IQRequestTicket)) |
|
|
|
listenIQChan tp ns = do |
|
|
|
listenIQChan tp ns = do |
|
|
|
handlers <- asks iqHandlers |
|
|
|
handlers <- asks iqHandlers |
|
|
|
liftIO . atomically $ do |
|
|
|
liftIO . atomically $ do |
|
|
|
@ -41,7 +41,7 @@ listenIQChan tp ns = do |
|
|
|
Just _iqCh' -> Nothing |
|
|
|
Just _iqCh' -> Nothing |
|
|
|
|
|
|
|
|
|
|
|
-- | Get a duplicate of the stanza channel |
|
|
|
-- | Get a duplicate of the stanza channel |
|
|
|
getStanzaChan :: XMPP (TChan Stanza) |
|
|
|
getStanzaChan :: Xmpp (TChan Stanza) |
|
|
|
getStanzaChan = do |
|
|
|
getStanzaChan = do |
|
|
|
shadow <- asks sShadow |
|
|
|
shadow <- asks sShadow |
|
|
|
liftIO $ atomically $ dupTChan shadow |
|
|
|
liftIO $ atomically $ dupTChan shadow |
|
|
|
@ -49,7 +49,7 @@ getStanzaChan = do |
|
|
|
-- | Get the inbound stanza channel, duplicates from master if necessary. Please |
|
|
|
-- | Get the inbound stanza channel, duplicates from master if necessary. Please |
|
|
|
-- note that once duplicated it will keep filling up, call 'dropMessageChan' to |
|
|
|
-- note that once duplicated it will keep filling up, call 'dropMessageChan' to |
|
|
|
-- allow it to be garbage collected. |
|
|
|
-- allow it to be garbage collected. |
|
|
|
getMessageChan :: XMPP (TChan (Either MessageError Message)) |
|
|
|
getMessageChan :: Xmpp (TChan (Either MessageError Message)) |
|
|
|
getMessageChan = do |
|
|
|
getMessageChan = do |
|
|
|
mChR <- asks messagesRef |
|
|
|
mChR <- asks messagesRef |
|
|
|
mCh <- liftIO $ readIORef mChR |
|
|
|
mCh <- liftIO $ readIORef mChR |
|
|
|
@ -62,7 +62,7 @@ getMessageChan = do |
|
|
|
Just mCh' -> return mCh' |
|
|
|
Just mCh' -> return mCh' |
|
|
|
|
|
|
|
|
|
|
|
-- | Analogous to 'getMessageChan'. |
|
|
|
-- | Analogous to 'getMessageChan'. |
|
|
|
getPresenceChan :: XMPP (TChan (Either PresenceError Presence)) |
|
|
|
getPresenceChan :: Xmpp (TChan (Either PresenceError Presence)) |
|
|
|
getPresenceChan = do |
|
|
|
getPresenceChan = do |
|
|
|
pChR <- asks presenceRef |
|
|
|
pChR <- asks presenceRef |
|
|
|
pCh <- liftIO $ readIORef pChR |
|
|
|
pCh <- liftIO $ readIORef pChR |
|
|
|
@ -76,33 +76,33 @@ getPresenceChan = do |
|
|
|
|
|
|
|
|
|
|
|
-- | Drop the local end of the inbound stanza channel from our context so it can |
|
|
|
-- | Drop the local end of the inbound stanza channel from our context so it can |
|
|
|
-- be GC-ed. |
|
|
|
-- be GC-ed. |
|
|
|
dropMessageChan :: XMPP () |
|
|
|
dropMessageChan :: Xmpp () |
|
|
|
dropMessageChan = do |
|
|
|
dropMessageChan = do |
|
|
|
r <- asks messagesRef |
|
|
|
r <- asks messagesRef |
|
|
|
liftIO $ writeIORef r Nothing |
|
|
|
liftIO $ writeIORef r Nothing |
|
|
|
|
|
|
|
|
|
|
|
-- | Analogous to 'dropMessageChan'. |
|
|
|
-- | Analogous to 'dropMessageChan'. |
|
|
|
dropPresenceChan :: XMPP () |
|
|
|
dropPresenceChan :: Xmpp () |
|
|
|
dropPresenceChan = do |
|
|
|
dropPresenceChan = do |
|
|
|
r <- asks presenceRef |
|
|
|
r <- asks presenceRef |
|
|
|
liftIO $ writeIORef r Nothing |
|
|
|
liftIO $ writeIORef r Nothing |
|
|
|
|
|
|
|
|
|
|
|
-- | Read an element from the inbound stanza channel, acquiring a copy of the |
|
|
|
-- | Read an element from the inbound stanza channel, acquiring a copy of the |
|
|
|
-- channel as necessary. |
|
|
|
-- channel as necessary. |
|
|
|
pullMessage :: XMPP (Either MessageError Message) |
|
|
|
pullMessage :: Xmpp (Either MessageError Message) |
|
|
|
pullMessage = do |
|
|
|
pullMessage = do |
|
|
|
c <- getMessageChan |
|
|
|
c <- getMessageChan |
|
|
|
liftIO $ atomically $ readTChan c |
|
|
|
liftIO $ atomically $ readTChan c |
|
|
|
|
|
|
|
|
|
|
|
-- | Read an element from the inbound stanza channel, acquiring a copy of the |
|
|
|
-- | Read an element from the inbound stanza channel, acquiring a copy of the |
|
|
|
-- channel as necessary. |
|
|
|
-- channel as necessary. |
|
|
|
pullPresence :: XMPP (Either PresenceError Presence) |
|
|
|
pullPresence :: Xmpp (Either PresenceError Presence) |
|
|
|
pullPresence = do |
|
|
|
pullPresence = do |
|
|
|
c <- getPresenceChan |
|
|
|
c <- getPresenceChan |
|
|
|
liftIO $ atomically $ readTChan c |
|
|
|
liftIO $ atomically $ readTChan c |
|
|
|
|
|
|
|
|
|
|
|
-- | Send a stanza to the server. |
|
|
|
-- | Send a stanza to the server. |
|
|
|
sendStanza :: Stanza -> XMPP () |
|
|
|
sendStanza :: Stanza -> Xmpp () |
|
|
|
sendStanza a = do |
|
|
|
sendStanza a = do |
|
|
|
out <- asks outCh |
|
|
|
out <- asks outCh |
|
|
|
liftIO . atomically $ writeTChan out a |
|
|
|
liftIO . atomically $ writeTChan out a |
|
|
|
@ -116,7 +116,7 @@ forkSession sess = do |
|
|
|
return $ sess {messagesRef = mCH', presenceRef = pCH'} |
|
|
|
return $ sess {messagesRef = mCH', presenceRef = pCH'} |
|
|
|
|
|
|
|
|
|
|
|
-- | Fork a new thread. |
|
|
|
-- | Fork a new thread. |
|
|
|
fork :: XMPP () -> XMPP ThreadId |
|
|
|
fork :: Xmpp () -> Xmpp ThreadId |
|
|
|
fork a = do |
|
|
|
fork a = do |
|
|
|
sess <- ask |
|
|
|
sess <- ask |
|
|
|
sess' <- liftIO $ forkSession sess |
|
|
|
sess' <- liftIO $ forkSession sess |
|
|
|
@ -125,7 +125,7 @@ fork a = do |
|
|
|
-- | Pulls a message and returns it if the given predicate returns @True@. |
|
|
|
-- | Pulls a message and returns it if the given predicate returns @True@. |
|
|
|
filterMessages :: (MessageError -> Bool) |
|
|
|
filterMessages :: (MessageError -> Bool) |
|
|
|
-> (Message -> Bool) |
|
|
|
-> (Message -> Bool) |
|
|
|
-> XMPP (Either MessageError Message) |
|
|
|
-> Xmpp (Either MessageError Message) |
|
|
|
filterMessages f g = do |
|
|
|
filterMessages f g = do |
|
|
|
s <- pullMessage |
|
|
|
s <- pullMessage |
|
|
|
case s of |
|
|
|
case s of |
|
|
|
@ -136,7 +136,7 @@ filterMessages f g = do |
|
|
|
|
|
|
|
|
|
|
|
-- | Pulls a (non-error) message and returns it if the given predicate returns |
|
|
|
-- | Pulls a (non-error) message and returns it if the given predicate returns |
|
|
|
-- @True@. |
|
|
|
-- @True@. |
|
|
|
waitForMessage :: (Message -> Bool) -> XMPP Message |
|
|
|
waitForMessage :: (Message -> Bool) -> Xmpp Message |
|
|
|
waitForMessage f = do |
|
|
|
waitForMessage f = do |
|
|
|
s <- pullMessage |
|
|
|
s <- pullMessage |
|
|
|
case s of |
|
|
|
case s of |
|
|
|
@ -145,7 +145,7 @@ waitForMessage f = do |
|
|
|
| otherwise -> waitForMessage f |
|
|
|
| otherwise -> waitForMessage f |
|
|
|
|
|
|
|
|
|
|
|
-- | Pulls an error message and returns it if the given predicate returns @True@. |
|
|
|
-- | Pulls an error message and returns it if the given predicate returns @True@. |
|
|
|
waitForMessageError :: (MessageError -> Bool) -> XMPP MessageError |
|
|
|
waitForMessageError :: (MessageError -> Bool) -> Xmpp MessageError |
|
|
|
waitForMessageError f = do |
|
|
|
waitForMessageError f = do |
|
|
|
s <- pullMessage |
|
|
|
s <- pullMessage |
|
|
|
case s of |
|
|
|
case s of |
|
|
|
@ -155,7 +155,7 @@ waitForMessageError f = do |
|
|
|
|
|
|
|
|
|
|
|
-- | Pulls a (non-error) presence and returns it if the given predicate returns |
|
|
|
-- | Pulls a (non-error) presence and returns it if the given predicate returns |
|
|
|
-- @True@. |
|
|
|
-- @True@. |
|
|
|
waitForPresence :: (Presence -> Bool) -> XMPP Presence |
|
|
|
waitForPresence :: (Presence -> Bool) -> Xmpp Presence |
|
|
|
waitForPresence f = do |
|
|
|
waitForPresence f = do |
|
|
|
s <- pullPresence |
|
|
|
s <- pullPresence |
|
|
|
case s of |
|
|
|
case s of |
|
|
|
@ -165,11 +165,11 @@ waitForPresence f = do |
|
|
|
|
|
|
|
|
|
|
|
-- TODO: Wait for presence error? |
|
|
|
-- TODO: Wait for presence error? |
|
|
|
|
|
|
|
|
|
|
|
-- | Run an XMPPMonad action in isolation. Reader and writer workers will be |
|
|
|
-- | Run an XmppMonad action in isolation. Reader and writer workers will be |
|
|
|
-- temporarily stopped and resumed with the new session details once the action |
|
|
|
-- temporarily stopped and resumed with the new session details once the action |
|
|
|
-- returns. The action will run in the calling thread. Any uncaught exceptions |
|
|
|
-- returns. The action will run in the calling thread. Any uncaught exceptions |
|
|
|
-- will be interpreted as connection failure. |
|
|
|
-- will be interpreted as connection failure. |
|
|
|
withConnection :: XMPPConMonad a -> XMPP (Either StreamError a) |
|
|
|
withConnection :: XmppConMonad a -> Xmpp (Either StreamError a) |
|
|
|
withConnection a = do |
|
|
|
withConnection a = do |
|
|
|
readerId <- asks readerThread |
|
|
|
readerId <- asks readerThread |
|
|
|
stateRef <- asks conStateRef |
|
|
|
stateRef <- asks conStateRef |
|
|
|
@ -193,7 +193,7 @@ withConnection a = do |
|
|
|
(\e -> atomically (putTMVar wait ()) >> |
|
|
|
(\e -> atomically (putTMVar wait ()) >> |
|
|
|
Ex.throwIO (e :: Ex.SomeException) |
|
|
|
Ex.throwIO (e :: Ex.SomeException) |
|
|
|
) |
|
|
|
) |
|
|
|
-- Run the XMPPMonad action, save the (possibly updated) states, release |
|
|
|
-- Run the XmppMonad action, save the (possibly updated) states, release |
|
|
|
-- the locks, and return the result. |
|
|
|
-- the locks, and return the result. |
|
|
|
Ex.catches |
|
|
|
Ex.catches |
|
|
|
(do |
|
|
|
(do |
|
|
|
@ -211,44 +211,44 @@ withConnection a = do |
|
|
|
] |
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
-- | Send a presence stanza. |
|
|
|
-- | Send a presence stanza. |
|
|
|
sendPresence :: Presence -> XMPP () |
|
|
|
sendPresence :: Presence -> Xmpp () |
|
|
|
sendPresence = sendStanza . PresenceS |
|
|
|
sendPresence = sendStanza . PresenceS |
|
|
|
|
|
|
|
|
|
|
|
-- | Send a message stanza. |
|
|
|
-- | Send a message stanza. |
|
|
|
sendMessage :: Message -> XMPP () |
|
|
|
sendMessage :: Message -> Xmpp () |
|
|
|
sendMessage = sendStanza . MessageS |
|
|
|
sendMessage = sendStanza . MessageS |
|
|
|
|
|
|
|
|
|
|
|
-- | Executes a function to update the event handlers. |
|
|
|
-- | Executes a function to update the event handlers. |
|
|
|
modifyHandlers :: (EventHandlers -> EventHandlers) -> XMPP () |
|
|
|
modifyHandlers :: (EventHandlers -> EventHandlers) -> Xmpp () |
|
|
|
modifyHandlers f = do |
|
|
|
modifyHandlers f = do |
|
|
|
eh <- asks eventHandlers |
|
|
|
eh <- asks eventHandlers |
|
|
|
liftIO . atomically $ writeTVar eh . f =<< readTVar eh |
|
|
|
liftIO . atomically $ writeTVar eh . f =<< readTVar eh |
|
|
|
|
|
|
|
|
|
|
|
-- | Sets the handler to be executed when the session ends. |
|
|
|
-- | Sets the handler to be executed when the session ends. |
|
|
|
setSessionEndHandler :: XMPP () -> XMPP () |
|
|
|
setSessionEndHandler :: Xmpp () -> Xmpp () |
|
|
|
setSessionEndHandler eh = do |
|
|
|
setSessionEndHandler eh = do |
|
|
|
r <- ask |
|
|
|
r <- ask |
|
|
|
modifyHandlers (\s -> s{sessionEndHandler = runReaderT eh r}) |
|
|
|
modifyHandlers (\s -> s{sessionEndHandler = runReaderT eh r}) |
|
|
|
|
|
|
|
|
|
|
|
-- | Sets the handler to be executed when the server connection is closed. |
|
|
|
-- | Sets the handler to be executed when the server connection is closed. |
|
|
|
setConnectionClosedHandler :: (StreamError -> XMPP ()) -> XMPP () |
|
|
|
setConnectionClosedHandler :: (StreamError -> Xmpp ()) -> Xmpp () |
|
|
|
setConnectionClosedHandler eh = do |
|
|
|
setConnectionClosedHandler eh = do |
|
|
|
r <- ask |
|
|
|
r <- ask |
|
|
|
modifyHandlers (\s -> s{connectionClosedHandler = \e -> runReaderT (eh e) r}) |
|
|
|
modifyHandlers (\s -> s{connectionClosedHandler = \e -> runReaderT (eh e) r}) |
|
|
|
|
|
|
|
|
|
|
|
-- | Run an event handler. |
|
|
|
-- | Run an event handler. |
|
|
|
runHandler :: (EventHandlers -> IO a) -> XMPP a |
|
|
|
runHandler :: (EventHandlers -> IO a) -> Xmpp a |
|
|
|
runHandler h = do |
|
|
|
runHandler h = do |
|
|
|
eh <- liftIO . atomically . readTVar =<< asks eventHandlers |
|
|
|
eh <- liftIO . atomically . readTVar =<< asks eventHandlers |
|
|
|
liftIO $ h eh |
|
|
|
liftIO $ h eh |
|
|
|
|
|
|
|
|
|
|
|
-- | End the current XMPP session. |
|
|
|
-- | End the current Xmpp session. |
|
|
|
endSession :: XMPP () |
|
|
|
endSession :: Xmpp () |
|
|
|
endSession = do -- TODO: This has to be idempotent (is it?) |
|
|
|
endSession = do -- TODO: This has to be idempotent (is it?) |
|
|
|
void $ withConnection xmppKillConnection |
|
|
|
void $ withConnection xmppKillConnection |
|
|
|
liftIO =<< asks stopThreads |
|
|
|
liftIO =<< asks stopThreads |
|
|
|
runHandler sessionEndHandler |
|
|
|
runHandler sessionEndHandler |
|
|
|
|
|
|
|
|
|
|
|
-- | Close the connection to the server. |
|
|
|
-- | Close the connection to the server. |
|
|
|
closeConnection :: XMPP () |
|
|
|
closeConnection :: Xmpp () |
|
|
|
closeConnection = void $ withConnection xmppKillConnection |
|
|
|
closeConnection = void $ withConnection xmppKillConnection |