Browse Source

add Annotations to received stanzas

Annotations allow plugins to attach information to processed stanzas, for example to signal that the stanza was received encrypted.
master
Philipp Balzarek 12 years ago
parent
commit
ce97a57392
  1. 35
      source/Network/Xmpp/Concurrent.hs
  2. 4
      source/Network/Xmpp/Concurrent/Basic.hs
  3. 4
      source/Network/Xmpp/Concurrent/IQ.hs
  4. 23
      source/Network/Xmpp/Concurrent/Message.hs
  5. 13
      source/Network/Xmpp/Concurrent/Presence.hs
  6. 19
      source/Network/Xmpp/Concurrent/Types.hs
  7. 22
      source/Network/Xmpp/IM/Roster.hs
  8. 5
      source/Network/Xmpp/Types.hs

35
source/Network/Xmpp/Concurrent.hs

@ -1,3 +1,4 @@ @@ -1,3 +1,4 @@
{-# LANGUAGE TupleSections #-}
{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE OverloadedStrings #-}
module Network.Xmpp.Concurrent
@ -8,7 +9,6 @@ module Network.Xmpp.Concurrent @@ -8,7 +9,6 @@ module Network.Xmpp.Concurrent
, module Network.Xmpp.Concurrent.Message
, module Network.Xmpp.Concurrent.Presence
, module Network.Xmpp.Concurrent.IQ
, StanzaHandler
, newSession
, session
, newStanzaID
@ -49,25 +49,30 @@ import System.Random (randomRIO) @@ -49,25 +49,30 @@ import System.Random (randomRIO)
import Control.Monad.State.Strict
runHandlers :: [Stanza -> IO [Stanza]] -> Stanza -> IO ()
runHandlers [] _ = return ()
runHandlers (h:hands) sta = do
res <- h sta
forM_ res $ runHandlers hands
toChan :: TChan Stanza -> StanzaHandler
toChan stanzaC _ sta = do
atomically $ writeTChan stanzaC sta
return [sta]
runHandlers [] sta = do
errorM "Pontarius.Xmpp" $
"No stanza handlers set, discarding stanza" ++ show sta
return ()
runHandlers hs sta = go hs sta []
where go [] _ _ = return ()
go (h:hands) sta' as = do
res <- h sta' as
forM_ res $ uncurry (go hands)
toChan :: TChan (Annotated Stanza) -> StanzaHandler
toChan stanzaC _ sta as = do
atomically $ writeTChan stanzaC (sta, as)
return [(sta, as)]
handleIQ :: TVar IQHandlers
-> StanzaHandler
handleIQ iqHands out sta = do
handleIQ iqHands out sta as = do
case sta of
IQRequestS i -> handleIQRequest iqHands i >> return []
IQResultS i -> handleIQResponse iqHands (Right i) >> return []
IQErrorS i -> handleIQResponse iqHands (Left i) >> return []
_ -> return [sta]
_ -> return [(sta, as)]
where
-- If the IQ request has a namespace, send it through the appropriate channel.
handleIQRequest :: TVar IQHandlers -> IQRequest -> IO ()
@ -104,7 +109,7 @@ handleIQ iqHands out sta = do @@ -104,7 +109,7 @@ handleIQ iqHands out sta = do
False -> do
atomically $ putTMVar sentRef False
return $ Just False
writeTChan ch $ IQRequestTicket answerT iq
writeTChan ch $ IQRequestTicket answerT iq as
return Nothing
maybe (return ()) (void . out) res
serviceUnavailable (IQRequest iqid from _to lang _tp bd) =
@ -117,7 +122,7 @@ handleIQ iqHands out sta = do @@ -117,7 +122,7 @@ handleIQ iqHands out sta = do
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of
(Nothing, _) -> return () -- We are not supposed to send an error.
(Just tmvar, byID') -> do
let answer = Just $ either IQResponseError IQResponseResult iq
let answer = Just (either IQResponseError IQResponseResult iq, as)
_ <- tryPutTMVar tmvar answer -- Don't block.
writeTVar handlers (byNS, byID')
where
@ -139,7 +144,7 @@ newSession stream config realm mbSasl = runErrorT $ do @@ -139,7 +144,7 @@ newSession stream config realm mbSasl = runErrorT $ do
ros <- liftIO . newTVarIO $ Roster Nothing Map.empty
rew <- lift $ newTVarIO 60
let out = writeStanza writeSem
let rosterH = if (enableRoster config) then [handleRoster ros out ]
let rosterH = if (enableRoster config) then [handleRoster ros out]
else []
(sStanza, ps) <- initPlugins out $ plugins config
let stanzaHandler = runHandlers $ List.concat

4
source/Network/Xmpp/Concurrent/Basic.hs

@ -32,11 +32,11 @@ sendStanza :: Stanza -> Session -> IO Bool @@ -32,11 +32,11 @@ sendStanza :: Stanza -> Session -> IO Bool
sendStanza = flip sendStanza'
-- | Get the channel of incoming stanzas.
getStanzaChan :: Session -> TChan Stanza
getStanzaChan :: Session -> TChan (Stanza, [Annotation])
getStanzaChan session = stanzaCh session
-- | Get the next incoming stanza
getStanza :: Session -> IO Stanza
getStanza :: Session -> IO (Stanza, [Annotation])
getStanza session = atomically . readTChan $ stanzaCh session
-- | Create a new session object with the inbound channel duplicated

4
source/Network/Xmpp/Concurrent/IQ.hs

@ -28,7 +28,7 @@ sendIQ :: Maybe Integer -- ^ Timeout . When the timeout is reached the response @@ -28,7 +28,7 @@ sendIQ :: Maybe Integer -- ^ Timeout . When the timeout is reached the response
-- default)
-> Element -- ^ The IQ body (there has to be exactly one)
-> Session
-> IO (Maybe (TMVar (Maybe IQResponse)))
-> IO (Maybe (TMVar ( Maybe (Annotated IQResponse))))
sendIQ timeOut to tp lang body session = do -- TODO: Add timeout
newId <- idGenerator session
ref <- atomically $ do
@ -62,7 +62,7 @@ sendIQ' :: Maybe Integer @@ -62,7 +62,7 @@ sendIQ' :: Maybe Integer
-> Maybe LangTag
-> Element
-> Session
-> IO (Either IQSendError IQResponse)
-> IO (Either IQSendError (Annotated IQResponse))
sendIQ' timeout to tp lang body session = do
ref <- sendIQ timeout to tp lang body session
maybe (return $ Left IQSendError) (fmap (maybe (Left IQTimeOut) Right)

23
source/Network/Xmpp/Concurrent/Message.hs

@ -8,21 +8,21 @@ import Network.Xmpp.Concurrent.Basic @@ -8,21 +8,21 @@ import Network.Xmpp.Concurrent.Basic
-- | Read an element from the inbound stanza channel, discardes any
-- non-Message stanzas from the channel
pullMessage :: Session -> IO (Either MessageError Message)
pullMessage :: Session -> IO (Either (Annotated MessageError) (Annotated Message))
pullMessage session = do
stanza <- atomically . readTChan $ stanzaCh session
(stanza, as) <- atomically . readTChan $ stanzaCh session
case stanza of
MessageS m -> return $ Right m
MessageErrorS e -> return $ Left e
MessageS m -> return $ Right (m, as)
MessageErrorS e -> return $ Left (e, as)
_ -> pullMessage session
-- | Get the next received message
getMessage :: Session -> IO Message
getMessage :: Session -> IO (Annotated Message)
getMessage = waitForMessage (const True)
-- | Pulls a (non-error) message and returns it if the given predicate returns
-- @True@.
waitForMessage :: (Message -> Bool) -> Session -> IO Message
waitForMessage :: (Annotated Message -> Bool) -> Session -> IO (Annotated Message)
waitForMessage f session = do
s <- pullMessage session
case s of
@ -31,7 +31,9 @@ waitForMessage f session = do @@ -31,7 +31,9 @@ waitForMessage f session = do
| otherwise -> waitForMessage f session
-- | Pulls an error message and returns it if the given predicate returns @True@.
waitForMessageError :: (MessageError -> Bool) -> Session -> IO MessageError
waitForMessageError :: (Annotated MessageError -> Bool)
-> Session
-> IO (Annotated MessageError)
waitForMessageError f session = do
s <- pullMessage session
case s of
@ -41,9 +43,10 @@ waitForMessageError f session = do @@ -41,9 +43,10 @@ waitForMessageError f session = do
-- | Pulls a message and returns it if the given predicate returns @True@.
filterMessages :: (MessageError -> Bool)
-> (Message -> Bool)
-> Session -> IO (Either MessageError Message)
filterMessages :: (Annotated MessageError -> Bool)
-> (Annotated Message -> Bool)
-> Session -> IO (Either (Annotated MessageError)
(Annotated Message))
filterMessages f g session = do
s <- pullMessage session
case s of

13
source/Network/Xmpp/Concurrent/Presence.hs

@ -8,17 +8,20 @@ import Network.Xmpp.Concurrent.Basic @@ -8,17 +8,20 @@ import Network.Xmpp.Concurrent.Basic
-- | Read an element from the inbound stanza channel, discardes any non-Presence
-- stanzas from the channel
pullPresence :: Session -> IO (Either PresenceError Presence)
pullPresence :: Session -> IO (Either (Annotated PresenceError)
(Annotated Presence))
pullPresence session = do
stanza <- atomically . readTChan $ stanzaCh session
(stanza, as) <- atomically . readTChan $ stanzaCh session
case stanza of
PresenceS p -> return $ Right p
PresenceErrorS e -> return $ Left e
PresenceS p -> return $ Right (p, as)
PresenceErrorS e -> return $ Left (e, as)
_ -> pullPresence session
-- | Pulls a (non-error) presence and returns it if the given predicate returns
-- @True@.
waitForPresence :: (Presence -> Bool) -> Session -> IO Presence
waitForPresence :: (Annotated Presence -> Bool)
-> Session
-> IO (Annotated Presence)
waitForPresence f session = do
s <- pullPresence session
case s of

19
source/Network/Xmpp/Concurrent/Types.hs

@ -1,3 +1,4 @@ @@ -1,3 +1,4 @@
{-# LANGUAGE ExistentialQuantification #-}
{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE DeriveDataTypeable #-}
@ -19,8 +20,18 @@ import Network.Xmpp.IM.Roster.Types @@ -19,8 +20,18 @@ import Network.Xmpp.IM.Roster.Types
import Network.Xmpp.Sasl.Types
import Network.Xmpp.Types
type StanzaHandler = (Stanza -> IO Bool) -- ^ outgoing stanza
-> Stanza -- ^ stanza to handle
-> [Annotation]
-> IO [(Stanza, [Annotation])] -- ^ modified stanzas (if any)
data Plugin' = Plugin' { inHandler :: Stanza -> IO [Stanza]
data Annotation = forall f. (Typeable f, Show f) => Annotation f
type Annotated a = (a, [Annotation])
data Plugin' = Plugin' { inHandler :: Stanza
-> [Annotation]
-> IO [(Stanza, [Annotation])]
, outHandler :: Stanza -> IO Bool
-- | In order to allow plugins to tie the knot (Plugin
-- / Session) we pass the plugin the completed Session
@ -72,7 +83,7 @@ type WriteSemaphore = TMVar (BS.ByteString -> IO Bool) @@ -72,7 +83,7 @@ type WriteSemaphore = TMVar (BS.ByteString -> IO Bool)
-- | The Session object represents a single session with an XMPP server. You can
-- use 'session' to establish a session
data Session = Session
{ stanzaCh :: TChan Stanza -- All stanzas
{ stanzaCh :: TChan (Stanza, [Annotation]) -- All stanzas
, iqHandlers :: TVar IQHandlers
-- Writing lock, so that only one thread could write to the stream at any
-- given time.
@ -97,7 +108,7 @@ data Session = Session @@ -97,7 +108,7 @@ data Session = Session
-- TMVars of and TMVars for expected IQ responses (the second Text represent a
-- stanza identifier.
type IQHandlers = ( Map.Map (IQRequestType, Text) (TChan IQRequestTicket)
, Map.Map Text (TMVar (Maybe IQResponse))
, Map.Map Text (TMVar (Maybe (Annotated IQResponse)))
)
-- | Contains whether or not a reply has been sent, and the IQ request body to
@ -109,6 +120,8 @@ data IQRequestTicket = IQRequestTicket @@ -109,6 +120,8 @@ data IQRequestTicket = IQRequestTicket
-- answered and Just False when the answer was attempted,
-- but failed (e.g. there is a connection failure)
, iqRequestBody :: IQRequest
-- | Annotations set by plugins in receive
, iqRequestAnnotations :: [Annotation]
}
-- | Error that can occur during sendIQ'

22
source/Network/Xmpp/IM/Roster.hs

@ -33,10 +33,10 @@ timeout = Just 3000000 -- 3 seconds @@ -33,10 +33,10 @@ timeout = Just 3000000 -- 3 seconds
-- | Push a roster item to the server. The values for approved and ask are
-- ignored and all values for subsciption except "remove" are ignored
rosterPush :: Item -> Session -> IO (Either IQSendError IQResponse)
rosterPush :: Item -> Session -> IO (Either IQSendError (Annotated IQResponse))
rosterPush item session = do
let el = pickleElem xpQuery (Query Nothing [fromItem item])
sendIQ' timeout Nothing Set Nothing el session
sendIQ' timeout Nothing Set Nothing el session
-- | Add or update an item to the roster.
--
@ -45,7 +45,7 @@ rosterAdd :: Jid -- ^ JID of the item @@ -45,7 +45,7 @@ rosterAdd :: Jid -- ^ JID of the item
-> Maybe Text -- ^ Name alias
-> [Text] -- ^ Groups (duplicates will be removed)
-> Session
-> IO (Either IQSendError IQResponse)
-> IO (Either IQSendError (Annotated IQResponse))
rosterAdd j n gs session = do
let el = pickleElem xpQuery (Query Nothing
[QueryItem { qiApproved = Nothing
@ -67,7 +67,7 @@ rosterRemove j sess = do @@ -67,7 +67,7 @@ rosterRemove j sess = do
Just _ -> do
res <- rosterPush (Item False False j Nothing Remove []) sess
case res of
Right (IQResponseResult IQResult{}) -> return True
Right (IQResponseResult IQResult{}, _) -> return True
_ -> return False
-- | Retrieve the current Roster state
@ -86,13 +86,13 @@ initRoster session = do @@ -86,13 +86,13 @@ initRoster session = do
Just roster -> atomically $ writeTVar (rosterRef session) roster
handleRoster :: TVar Roster -> StanzaHandler
handleRoster ref out sta = case sta of
handleRoster ref out sta as = case sta of
IQRequestS (iqr@IQRequest{iqRequestPayload =
iqb@Element{elementName = en}})
| nameNamespace en == Just "jabber:iq:roster" -> do
case iqRequestFrom iqr of
Just _from -> return [sta] -- Don't handle roster pushes from
-- unauthorized sources
Just _from -> return [(sta, as)] -- Don't handle roster pushes
-- from unauthorized sources
Nothing -> case unpickleElem xpQuery iqb of
Right Query{ queryVer = v
, queryItems = [update]
@ -104,7 +104,7 @@ handleRoster ref out sta = case sta of @@ -104,7 +104,7 @@ handleRoster ref out sta = case sta of
errorM "Pontarius.Xmpp" "Invalid roster query"
_ <- out $ badRequest iqr
return []
_ -> return [sta]
_ -> return [(sta, as)]
where
handleUpdate v' update = atomically $ modifyTVar ref $ \(Roster v is) ->
Roster (v' `mplus` v) $ case qiSubscription update of
@ -132,16 +132,16 @@ retrieveRoster mbOldRoster sess = do @@ -132,16 +132,16 @@ retrieveRoster mbOldRoster sess = do
Left e -> do
errorM "Pontarius.Xmpp.Roster" $ "getRoster: " ++ show e
return Nothing
Right (IQResponseResult (IQResult{iqResultPayload = Just ros}))
Right (IQResponseResult (IQResult{iqResultPayload = Just ros}), _)
-> case unpickleElem xpQuery ros of
Left _e -> do
errorM "Pontarius.Xmpp.Roster" "getRoster: invalid query element"
return Nothing
Right ros' -> return . Just $ toRoster ros'
Right (IQResponseResult (IQResult{iqResultPayload = Nothing})) -> do
Right (IQResponseResult (IQResult{iqResultPayload = Nothing}), _) -> do
return mbOldRoster
-- sever indicated that no roster updates are necessary
Right (IQResponseError e) -> do
Right (IQResponseError e, _) -> do
errorM "Pontarius.Xmpp.Roster" $ "getRoster: server returned error"
++ show e
return Nothing

5
source/Network/Xmpp/Types.hs

@ -45,7 +45,6 @@ module Network.Xmpp.Types @@ -45,7 +45,6 @@ module Network.Xmpp.Types
, StreamState(..)
, ConnectionState(..)
, StreamErrorInfo(..)
, StanzaHandler
, ConnectionDetails(..)
, StreamConfiguration(..)
, Jid(..)
@ -1042,10 +1041,6 @@ instance Default StreamConfiguration where @@ -1042,10 +1041,6 @@ instance Default StreamConfiguration where
}
}
type StanzaHandler = (Stanza -> IO Bool) -- ^ outgoing stanza
-> Stanza -- ^ stanza to handle
-> IO [Stanza] -- ^ modified stanzas (if any)
-- | How the client should behave in regards to TLS.
data TlsBehaviour = RequireTls -- ^ Require the use of TLS; disconnect if it's
-- not offered.

Loading…
Cancel
Save