Browse Source

un-ReaderT Xmpp actions

remove Xmpp type synonym
master
Philipp Balzarek 13 years ago
parent
commit
bcb0f9177b
  1. 6
      source/Network/Xmpp.hs
  2. 2
      source/Network/Xmpp/Concurrent.hs
  3. 42
      source/Network/Xmpp/Concurrent/IQ.hs
  4. 205
      source/Network/Xmpp/Concurrent/Monad.hs
  5. 11
      source/Network/Xmpp/Concurrent/Threads.hs
  6. 2
      source/Network/Xmpp/Concurrent/Types.hs
  7. 6
      source/Network/Xmpp/Session.hs
  8. 15
      source/Network/Xmpp/Xep/ServiceDiscovery.hs

6
source/Network/Xmpp.hs

@ -29,9 +29,7 @@ @@ -29,9 +29,7 @@
module Network.Xmpp
( -- * Session management
withNewSession
, withSession
, newSession
newSession
, withConnection
, connect
, simpleConnect
@ -142,8 +140,6 @@ module Network.Xmpp @@ -142,8 +140,6 @@ module Network.Xmpp
, iqRequestPayload
, iqResultPayload
-- * Threads
, Xmpp
, fork
, forkSession
-- * Misc
, LangTag(..)

2
source/Network/Xmpp/Concurrent.hs

@ -1,6 +1,5 @@ @@ -1,6 +1,5 @@
module Network.Xmpp.Concurrent
( Session
, Xmpp
, module Network.Xmpp.Concurrent.Monad
, module Network.Xmpp.Concurrent.Threads
, module Network.Xmpp.Concurrent.IQ
@ -10,4 +9,3 @@ import Network.Xmpp.Concurrent.Types @@ -10,4 +9,3 @@ import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Concurrent.Monad
import Network.Xmpp.Concurrent.Threads
import Network.Xmpp.Concurrent.IQ

42
source/Network/Xmpp/Concurrent/IQ.hs

@ -21,59 +21,61 @@ sendIQ :: Maybe Int -- ^ Timeout @@ -21,59 +21,61 @@ sendIQ :: Maybe Int -- ^ Timeout
-> Maybe LangTag -- ^ Language tag of the payload (@Nothing@ for
-- default)
-> Element -- ^ The IQ body (there has to be exactly one)
-> Xmpp (TMVar IQResponse)
sendIQ timeOut to tp lang body = do -- TODO: Add timeout
newId <- liftIO =<< asks idGenerator
handlers <- asks iqHandlers
ref <- liftIO . atomically $ do
-> Session
-> IO (TMVar IQResponse)
sendIQ timeOut to tp lang body session = do -- TODO: Add timeout
newId <- idGenerator session
ref <- atomically $ do
resRef <- newEmptyTMVar
(byNS, byId) <- readTVar handlers
writeTVar handlers (byNS, Map.insert newId resRef byId)
(byNS, byId) <- readTVar (iqHandlers session)
writeTVar (iqHandlers session) (byNS, Map.insert newId resRef byId)
-- TODO: Check for id collisions (shouldn't happen?)
return resRef
sendStanza . IQRequestS $ IQRequest newId Nothing to lang tp body
sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) session
case timeOut of
Nothing -> return ()
Just t -> void . liftIO . forkIO $ do
Just t -> void . forkIO $ do
threadDelay t
doTimeOut handlers newId ref
doTimeOut (iqHandlers session) newId ref
return ref
where
doTimeOut handlers iqid var = atomically $ do
p <- tryPutTMVar var IQResponseTimeout
when p $ do
(byNS, byId) <- readTVar handlers
(byNS, byId) <- readTVar (iqHandlers session)
writeTVar handlers (byNS, Map.delete iqid byId)
return ()
-- | Like 'sendIQ', but waits for the answer IQ. Times out after 3 seconds
sendIQ' :: Maybe Jid
-> IQRequestType
-> Maybe LangTag
-> Element
-> Xmpp IQResponse
sendIQ' to tp lang body = do
ref <- sendIQ (Just 3000000) to tp lang body
liftIO . atomically $ takeTMVar ref
-> Session
-> IO IQResponse
sendIQ' to tp lang body session = do
ref <- sendIQ (Just 3000000) to tp lang body session
atomically $ takeTMVar ref
answerIQ :: IQRequestTicket
-> Either StanzaError (Maybe Element)
-> Xmpp Bool
-> Session
-> IO Bool
answerIQ (IQRequestTicket
sentRef
(IQRequest iqid from _to lang _tp bd))
answer = do
out <- asks outCh
answer session = do
let response = case answer of
Left err -> IQErrorS $ IQError iqid Nothing from lang err (Just bd)
Right res -> IQResultS $ IQResult iqid Nothing from lang res
liftIO . atomically $ do
atomically $ do
sent <- readTVar sentRef
case sent of
False -> do
writeTVar sentRef True
writeTChan out response
writeTChan (outCh session) response
return True
True -> return False

205
source/Network/Xmpp/Concurrent/Monad.hs

@ -26,10 +26,11 @@ import Network.Xmpp.Monad @@ -26,10 +26,11 @@ import Network.Xmpp.Monad
-- to interfere with existing consumers.
listenIQChan :: IQRequestType -- ^ Type of IQs to receive (@Get@ or @Set@)
-> Text -- ^ Namespace of the child element
-> Xmpp (Either (TChan IQRequestTicket) (TChan IQRequestTicket))
listenIQChan tp ns = do
handlers <- asks iqHandlers
liftIO . atomically $ do
-> Session
-> IO (Either (TChan IQRequestTicket) (TChan IQRequestTicket))
listenIQChan tp ns session = do
let handlers = iqHandlers session
atomically $ do
(byNS, byID) <- readTVar handlers
iqCh <- newTChan
let (present, byNS') = Map.insertLookupWithKey'
@ -43,127 +44,110 @@ listenIQChan tp ns = do @@ -43,127 +44,110 @@ listenIQChan tp ns = do
Just iqCh' -> Left iqCh'
-- | Get a duplicate of the stanza channel
getStanzaChan :: Xmpp (TChan Stanza)
getStanzaChan = do
shadow <- asks sShadow
liftIO $ atomically $ dupTChan shadow
getStanzaChan :: Session -> IO (TChan Stanza)
getStanzaChan session = atomically $ dupTChan (sShadow session)
-- | Get the inbound stanza channel, duplicates from master if necessary. Please
-- note that once duplicated it will keep filling up, call 'dropMessageChan' to
-- allow it to be garbage collected.
getMessageChan :: Xmpp (TChan (Either MessageError Message))
getMessageChan = do
mChR <- asks messagesRef
mCh <- liftIO $ readIORef mChR
getMessageChan :: Session -> IO (TChan (Either MessageError Message))
getMessageChan session = do
mCh <- readIORef $ messagesRef session
case mCh of
Nothing -> do
shadow <- asks mShadow
mCh' <- liftIO $ atomically $ dupTChan shadow
liftIO $ writeIORef mChR (Just mCh')
mCh' <- atomically $ dupTChan (mShadow session)
writeIORef (messagesRef session) (Just mCh')
return mCh'
Just mCh' -> return mCh'
-- | Analogous to 'getMessageChan'.
getPresenceChan :: Xmpp (TChan (Either PresenceError Presence))
getPresenceChan = do
pChR <- asks presenceRef
pCh <- liftIO $ readIORef pChR
getPresenceChan :: Session -> IO (TChan (Either PresenceError Presence))
getPresenceChan session = do
pCh <- readIORef $ presenceRef session
case pCh of
Nothing -> do
shadow <- asks pShadow
pCh' <- liftIO $ atomically $ dupTChan shadow
liftIO $ writeIORef pChR (Just pCh')
pCh' <- atomically $ dupTChan (pShadow session)
writeIORef (presenceRef session) (Just pCh')
return pCh'
Just pCh' -> return pCh'
-- | Drop the local end of the inbound stanza channel from our context so it can
-- be GC-ed.
dropMessageChan :: Xmpp ()
dropMessageChan = do
r <- asks messagesRef
liftIO $ writeIORef r Nothing
dropMessageChan :: Session -> IO ()
dropMessageChan session = writeIORef (messagesRef session) Nothing
-- | Analogous to 'dropMessageChan'.
dropPresenceChan :: Xmpp ()
dropPresenceChan = do
r <- asks presenceRef
liftIO $ writeIORef r Nothing
dropPresenceChan :: Session -> IO ()
dropPresenceChan session = writeIORef (presenceRef session) Nothing
-- | Read an element from the inbound stanza channel, acquiring a copy of the
-- channel as necessary.
pullMessage :: Xmpp (Either MessageError Message)
pullMessage = do
c <- getMessageChan
liftIO $ atomically $ readTChan c
pullMessage :: Session -> IO (Either MessageError Message)
pullMessage session = do
c <- getMessageChan session
atomically $ readTChan c
-- | Read an element from the inbound stanza channel, acquiring a copy of the
-- channel as necessary.
pullPresence :: Xmpp (Either PresenceError Presence)
pullPresence = do
c <- getPresenceChan
liftIO $ atomically $ readTChan c
pullPresence :: Session -> IO (Either PresenceError Presence)
pullPresence session = do
c <- getPresenceChan session
atomically $ readTChan c
-- | Send a stanza to the server.
sendStanza :: Stanza -> Xmpp ()
sendStanza a = do
out <- asks outCh
liftIO . atomically $ writeTChan out a
return ()
sendStanza :: Stanza -> Session -> IO ()
sendStanza a session = atomically $ writeTChan (outCh session) a
-- | Create a forked session object without forking a thread.
-- | Create a forked session object
forkSession :: Session -> IO Session
forkSession sess = do
forkSession session = do
mCH' <- newIORef Nothing
pCH' <- newIORef Nothing
return $ sess {messagesRef = mCH', presenceRef = pCH'}
-- | Fork a new thread.
fork :: Xmpp () -> Xmpp ThreadId
fork a = do
sess <- ask
sess' <- liftIO $ forkSession sess
liftIO $ forkIO $ runReaderT a sess'
return $ session {messagesRef = mCH', presenceRef = pCH'}
-- | Pulls a message and returns it if the given predicate returns @True@.
filterMessages :: (MessageError -> Bool)
-> (Message -> Bool)
-> Xmpp (Either MessageError Message)
filterMessages f g = do
s <- pullMessage
-> Session -> IO (Either MessageError Message)
filterMessages f g session = do
s <- pullMessage session
case s of
Left e | f e -> return $ Left e
| otherwise -> filterMessages f g
| otherwise -> filterMessages f g session
Right m | g m -> return $ Right m
| otherwise -> filterMessages f g
| otherwise -> filterMessages f g session
-- | Pulls a (non-error) message and returns it if the given predicate returns
-- @True@.
waitForMessage :: (Message -> Bool) -> Xmpp Message
waitForMessage f = do
s <- pullMessage
waitForMessage :: (Message -> Bool) -> Session -> IO Message
waitForMessage f session = do
s <- pullMessage session
case s of
Left _ -> waitForMessage f
Left _ -> waitForMessage f session
Right m | f m -> return m
| otherwise -> waitForMessage f
| otherwise -> waitForMessage f session
-- | Pulls an error message and returns it if the given predicate returns @True@.
waitForMessageError :: (MessageError -> Bool) -> Xmpp MessageError
waitForMessageError f = do
s <- pullMessage
waitForMessageError :: (MessageError -> Bool) -> Session -> IO MessageError
waitForMessageError f session = do
s <- pullMessage session
case s of
Right _ -> waitForMessageError f
Right _ -> waitForMessageError f session
Left m | f m -> return m
| otherwise -> waitForMessageError f
| otherwise -> waitForMessageError f session
-- | Pulls a (non-error) presence and returns it if the given predicate returns
-- @True@.
waitForPresence :: (Presence -> Bool) -> Xmpp Presence
waitForPresence f = do
s <- pullPresence
waitForPresence :: (Presence -> Bool) -> Session -> IO Presence
waitForPresence f session = do
s <- pullPresence session
case s of
Left _ -> waitForPresence f
Left _ -> waitForPresence f session
Right m | f m -> return m
| otherwise -> waitForPresence f
| otherwise -> waitForPresence f session
-- TODO: Wait for presence error?
@ -171,23 +155,20 @@ waitForPresence f = do @@ -171,23 +155,20 @@ waitForPresence f = do
-- temporarily stopped and resumed with the new session details once the action
-- returns. The action will run in the calling thread. Any uncaught exceptions
-- will be interpreted as connection failure.
withConnection :: XmppConMonad a -> Xmpp (Either StreamError a)
withConnection a = do
readerId <- asks readerThread
stateRef <- asks conStateRef
write <- asks writeRef
wait <- liftIO $ newEmptyTMVarIO
liftIO . Ex.mask_ $ do
withConnection :: XmppConMonad a -> Session -> IO (Either StreamError a)
withConnection a session = do
wait <- newEmptyTMVarIO
Ex.mask_ $ do
-- Suspends the reader until the lock (wait) is released (set to `()').
throwTo readerId $ Interrupt wait
throwTo (readerThread session) $ Interrupt wait
-- We acquire the write and stateRef locks, to make sure that this is
-- the only thread that can write to the stream and to perform a
-- withConnection calculation. Afterwards, we release the lock and
-- fetches an updated state.
s <- Ex.catch
(atomically $ do
_ <- takeTMVar write
s <- takeTMVar stateRef
_ <- takeTMVar (writeRef session)
s <- takeTMVar (conStateRef session)
putTMVar wait ()
return s
)
@ -201,8 +182,8 @@ withConnection a = do @@ -201,8 +182,8 @@ withConnection a = do
(do
(res, s') <- runStateT a s
atomically $ do
putTMVar write (sConPushBS s')
putTMVar stateRef s'
putTMVar (writeRef session) (sConPushBS s')
putTMVar (conStateRef session) s'
return $ Right res
)
-- We treat all Exceptions as fatal. If we catch a StreamError, we
@ -213,52 +194,48 @@ withConnection a = do @@ -213,52 +194,48 @@ withConnection a = do
]
-- | Send a presence stanza.
sendPresence :: Presence -> Xmpp ()
sendPresence = sendStanza . PresenceS
sendPresence :: Presence -> Session -> IO ()
sendPresence p session = sendStanza (PresenceS p) session
-- | Send a message stanza.
sendMessage :: Message -> Xmpp ()
sendMessage = sendStanza . MessageS
sendMessage :: Message -> Session -> IO ()
sendMessage m session = sendStanza (MessageS m) session
-- | Executes a function to update the event handlers.
modifyHandlers :: (EventHandlers -> EventHandlers) -> Xmpp ()
modifyHandlers f = do
eh <- asks eventHandlers
liftIO . atomically $ writeTVar eh . f =<< readTVar eh
modifyHandlers :: (EventHandlers -> EventHandlers) -> Session -> IO ()
modifyHandlers f session = atomically $ modifyTVar (eventHandlers session) f
-- | Sets the handler to be executed when the server connection is closed.
setConnectionClosedHandler :: (StreamError -> Xmpp ()) -> Xmpp ()
setConnectionClosedHandler eh = do
r <- ask
modifyHandlers (\s -> s{connectionClosedHandler = \e -> runReaderT (eh e) r})
setConnectionClosedHandler :: (StreamError -> Session -> IO ()) -> Session -> IO ()
setConnectionClosedHandler eh session = do
modifyHandlers (\s -> s{connectionClosedHandler =
\e -> eh e session}) session
-- | Run an event handler.
runHandler :: (EventHandlers -> IO a) -> Xmpp a
runHandler h = do
eh <- liftIO . atomically . readTVar =<< asks eventHandlers
liftIO $ h eh
runHandler :: (EventHandlers -> IO a) -> Session -> IO a
runHandler h session = h =<< atomically (readTVar $ eventHandlers session)
-- | End the current Xmpp session.
endSession :: Xmpp ()
endSession = do -- TODO: This has to be idempotent (is it?)
void $ withConnection xmppKillConnection
liftIO =<< asks stopThreads
endSession :: Session -> IO ()
endSession session = do -- TODO: This has to be idempotent (is it?)
void $ withConnection xmppKillConnection session
stopThreads session
-- | Close the connection to the server. Closes the stream (by enforcing a
-- write lock and sending a </stream:stream> element), waits (blocks) for three
-- seconds, and then closes the connection.
closeConnection :: Xmpp ()
closeConnection = Ex.mask_ $ do
write <- asks writeRef
send <- liftIO . atomically $ takeTMVar write
cc <- sCloseConnection <$>
(liftIO . atomically . readTMVar =<< asks conStateRef)
liftIO . send $ "</stream:stream>"
void . liftIO . forkIO $ do
closeConnection :: Session -> IO ()
closeConnection session = Ex.mask_ $ do
send <- atomically $ takeTMVar (writeRef session)
cc <- sCloseConnection <$> ( atomically $ readTMVar (conStateRef session))
send "</stream:stream>"
void . forkIO $ do
threadDelay 3000000
-- When we close the connection, we close the handle that was used in the
-- sCloseConnection above. So even if a new connection has been
-- established at this point, it will not be affected by this action.
(Ex.try cc) :: IO (Either Ex.SomeException ())
return ()
liftIO . atomically $ putTMVar write (\_ -> return False)
atomically $ putTMVar (writeRef session) (\_ -> return False)

11
source/Network/Xmpp/Concurrent/Threads.hs

@ -225,17 +225,6 @@ newSession = do @@ -225,17 +225,6 @@ newSession = do
eh
stopThreads'
-- | Creates a new session and runs the given Xmpp computation.
withNewSession :: Xmpp b -> IO (Session, b)
withNewSession a = do
sess <- newSession
ret <- runReaderT a sess
return (sess, ret)
-- | Runs the given Xmpp computation in the given session.
withSession :: Session -> Xmpp a -> IO a
withSession = flip runReaderT
-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every 30 seconds to keep the connection alive.
connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO ()

2
source/Network/Xmpp/Concurrent/Types.hs

@ -56,8 +56,6 @@ data Session = Session @@ -56,8 +56,6 @@ data Session = Session
, stopThreads :: IO ()
}
-- Xmpp is a monad for concurrent Xmpp usage.
type Xmpp a = ReaderT Session IO a
-- Interrupt is used to signal to the reader thread that it should stop.
data Interrupt = Interrupt (TMVar ()) deriving Typeable

6
source/Network/Xmpp/Session.hs

@ -35,9 +35,9 @@ xmppStartSession = do @@ -35,9 +35,9 @@ xmppStartSession = do
-- Sends the session IQ set element and waits for an answer. Throws an error if
-- if an IQ error stanza is returned from the server.
startSession :: Xmpp ()
startSession = do
answer <- sendIQ' Nothing Set Nothing sessionXML
startSession :: Session -> IO ()
startSession session = do
answer <- sendIQ' Nothing Set Nothing sessionXML session
case answer of
IQResponseResult _ -> return ()
e -> error $ show e

15
source/Network/Xmpp/Xep/ServiceDiscovery.hs

@ -27,6 +27,7 @@ import Network.Xmpp @@ -27,6 +27,7 @@ import Network.Xmpp
import Network.Xmpp.Monad
import Network.Xmpp.Pickle
import Network.Xmpp.Types
import Network.Xmpp.Concurrent
data DiscoError = DiscoNoQueryElement
| DiscoIQError IQError
@ -83,9 +84,10 @@ xpQueryInfo = xpWrap (\(nd, (feats, ids)) -> QIR nd ids feats) @@ -83,9 +84,10 @@ xpQueryInfo = xpWrap (\(nd, (feats, ids)) -> QIR nd ids feats)
-- | Query an entity for it's identity and features
queryInfo :: Jid -- ^ Entity to query
-> Maybe Text.Text -- ^ Node
-> Xmpp (Either DiscoError QueryInfoResult)
queryInfo to node = do
res <- sendIQ' (Just to) Get Nothing queryBody
-> Session
-> IO (Either DiscoError QueryInfoResult)
queryInfo to node session = do
res <- sendIQ' (Just to) Get Nothing queryBody session
return $ case res of
IQResponseError e -> Left $ DiscoIQError e
IQResponseTimeout -> Left $ DiscoTimeout
@ -145,9 +147,10 @@ xpQueryItems = xpElem (itemsN "query") @@ -145,9 +147,10 @@ xpQueryItems = xpElem (itemsN "query")
-- | Query an entity for Items of a node
queryItems :: Jid -- ^ Entity to query
-> Maybe Text.Text -- ^ Node
-> Xmpp (Either DiscoError (Maybe Text.Text, [Item]))
queryItems to node = do
res <- sendIQ' (Just to) Get Nothing queryBody
-> Session
-> IO (Either DiscoError (Maybe Text.Text, [Item]))
queryItems to node session = do
res <- sendIQ' (Just to) Get Nothing queryBody session
return $ case res of
IQResponseError e -> Left $ DiscoIQError e
IQResponseTimeout -> Left $ DiscoTimeout

Loading…
Cancel
Save