You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

224 lines
7.0 KiB

14 years ago
module Network.XMPP.Concurrent.Monad where
import Network.XMPP.Types
import Control.Concurrent
import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
14 years ago
import Control.Monad.IO.Class
import Control.Monad.Reader
import Control.Monad.State.Strict
14 years ago
import Data.IORef
import qualified Data.Map as Map
import Data.Text(Text)
import Network.XMPP.Concurrent.Types
import Network.XMPP.Monad
14 years ago
-- | Register a new IQ listener. IQ requests matching the type and namespace will
-- be put in the channel.
--
-- Return the new channel or Nothing if this namespace/'IQRequestType'
-- combination was alread handled
listenIQChan :: IQRequestType -- ^ type of IQs to receive (Get / Set)
14 years ago
-> Text -- ^ namespace of the child element
-> XMPP (Maybe ( TChan (IQRequest, TVar Bool)))
14 years ago
listenIQChan tp ns = do
handlers <- asks iqHandlers
liftIO . atomically $ do
(byNS, byID) <- readTVar handlers
iqCh <- newTChan
let (present, byNS') = Map.insertLookupWithKey' (\_ _ old -> old)
14 years ago
(tp,ns) iqCh byNS
writeTVar handlers (byNS', byID)
return $ case present of
Nothing -> Just iqCh
Just _iqCh' -> Nothing
14 years ago
-- | get the inbound stanza channel, duplicates from master if necessary
-- please note that once duplicated it will keep filling up, call
-- 'dropMessageChan' to allow it to be garbage collected
getMessageChan :: XMPP (TChan (Either MessageError Message))
14 years ago
getMessageChan = do
mChR <- asks messagesRef
mCh <- liftIO $ readIORef mChR
case mCh of
Nothing -> do
shadow <- asks mShadow
mCh' <- liftIO $ atomically $ dupTChan shadow
liftIO $ writeIORef mChR (Just mCh')
return mCh'
Just mCh' -> return mCh'
-- | see 'getMessageChan'
getPresenceChan :: XMPP (TChan (Either PresenceError Presence))
14 years ago
getPresenceChan = do
pChR <- asks presenceRef
pCh <- liftIO $ readIORef pChR
case pCh of
Nothing -> do
shadow <- asks pShadow
pCh' <- liftIO $ atomically $ dupTChan shadow
liftIO $ writeIORef pChR (Just pCh')
return pCh'
Just pCh' -> return pCh'
-- | Drop the local end of the inbound stanza channel
-- from our context so it can be GC-ed
dropMessageChan :: XMPP ()
14 years ago
dropMessageChan = do
r <- asks messagesRef
liftIO $ writeIORef r Nothing
-- | see 'dropMessageChan'
dropPresenceChan :: XMPP ()
14 years ago
dropPresenceChan = do
r <- asks presenceRef
liftIO $ writeIORef r Nothing
-- | Read an element from the inbound stanza channel, acquiring a copy
-- of the channel as necessary
pullMessage :: XMPP (Either MessageError Message)
14 years ago
pullMessage = do
c <- getMessageChan
liftIO $ atomically $ readTChan c
14 years ago
-- | Read an element from the inbound stanza channel, acquiring a copy
-- of the channel as necessary
pullPresence :: XMPP (Either PresenceError Presence)
14 years ago
pullPresence = do
c <- getPresenceChan
liftIO $ atomically $ readTChan c
14 years ago
-- | Send a stanza to the server
sendStanza :: Stanza -> XMPP ()
sendStanza a = do
14 years ago
out <- asks outCh
liftIO . atomically $ writeTChan out a
return ()
-- | Create a forked session object without forking a thread
forkSession :: Session -> IO Session
forkSession sess = do
mCH' <- newIORef Nothing
pCH' <- newIORef Nothing
return $ sess {messagesRef = mCH' ,presenceRef = pCH'}
14 years ago
-- | Fork a new thread
fork :: XMPP () -> XMPP ThreadId
fork a = do
sess <- ask
sess' <- liftIO $ forkSession sess
liftIO $ forkIO $ runReaderT a sess'
14 years ago
filterMessages :: (MessageError -> Bool)
-> (Message -> Bool)
-> XMPP (Either MessageError Message)
filterMessages f g = do
s <- pullMessage
case s of
Left e | f e -> return $ Left e
| otherwise -> filterMessages f g
Right m | g m -> return $ Right m
| otherwise -> filterMessages f g
waitForMessage :: (Message -> Bool) -> XMPP Message
14 years ago
waitForMessage f = do
s <- pullMessage
case s of
Left _ -> waitForMessage f
Right m | f m -> return m
| otherwise -> waitForMessage f
waitForMessageError :: (MessageError -> Bool) -> XMPP MessageError
waitForMessageError f = do
s <- pullMessage
case s of
Right _ -> waitForMessageError f
Left m | f m -> return m
| otherwise -> waitForMessageError f
14 years ago
waitForPresence :: (Presence -> Bool) -> XMPP Presence
14 years ago
waitForPresence f = do
s <- pullPresence
case s of
Left _ -> waitForPresence f
Right m | f m -> return m
| otherwise -> waitForPresence f
14 years ago
-- | Run an XMPPMonad action in isolation.
-- Reader and writer workers will be temporarily stopped
-- and resumed with the new session details once the action returns.
-- The Action will run in the calling thread/
-- Any uncaught exceptions will be interpreted as connection failure
withConnection :: XMPPConMonad a -> XMPP a
withConnection a = do
readerId <- asks readerThread
stateRef <- asks conStateRef
write <- asks writeRef
wait <- liftIO $ newEmptyTMVarIO
liftIO . Ex.mask_ $ do
throwTo readerId $ Interrupt wait
s <- Ex.catch ( atomically $ do
_ <- takeTMVar write
s <- takeTMVar stateRef
putTMVar wait ()
return s
)
(\e -> atomically (putTMVar wait ())
>> Ex.throwIO (e :: Ex.SomeException)
-- No MVar taken
)
Ex.catch ( do
(res, s') <- runStateT a s
atomically $ do
putTMVar write (sConPushBS s')
putTMVar stateRef s'
return res
)
-- we treat all Exceptions as fatal
(\e -> runStateT xmppKillConnection s
>> Ex.throwIO (e :: Ex.SomeException)
)
-- | Send a presence Stanza
sendPresence :: Presence -> XMPP ()
sendPresence = sendStanza . PresenceS
-- | Send a Message Stanza
sendMessage :: Message -> XMPP ()
sendMessage = sendStanza . MessageS
modifyHandlers :: (EventHandlers -> EventHandlers) -> XMPP ()
modifyHandlers f = do
eh <- asks eventHandlers
liftIO . atomically $ writeTVar eh . f =<< readTVar eh
setSessionEndHandler :: XMPP () -> XMPP ()
setSessionEndHandler eh = modifyHandlers (\s -> s{sessionEndHandler = eh})
setConnectionClosedHandler :: XMPP () -> XMPP ()
setConnectionClosedHandler eh = modifyHandlers
(\s -> s{connectionClosedHandler = eh})
-- | run an event handler
runHandler :: (EventHandlers -> XMPP a) -> XMPP a
runHandler h = do
eh <- liftIO . atomically . readTVar =<< asks eventHandlers
h eh
-- | End the current xmpp session
endSession :: XMPP ()
endSession = do -- TODO: This has to be idempotent (is it?)
withConnection xmppKillConnection
liftIO =<< asks stopThreads
runHandler sessionEndHandler
-- | Close the connection to the server
closeConnection :: XMPP ()
closeConnection = withConnection xmppKillConnection