diff --git a/source/Network/Xmpp/Concurrent.hs b/source/Network/Xmpp/Concurrent.hs index 4f33461..5ecf681 100644 --- a/source/Network/Xmpp/Concurrent.hs +++ b/source/Network/Xmpp/Concurrent.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE TupleSections #-} {-# OPTIONS_HADDOCK hide #-} {-# LANGUAGE OverloadedStrings #-} module Network.Xmpp.Concurrent @@ -8,7 +9,6 @@ module Network.Xmpp.Concurrent , module Network.Xmpp.Concurrent.Message , module Network.Xmpp.Concurrent.Presence , module Network.Xmpp.Concurrent.IQ - , StanzaHandler , newSession , session , newStanzaID @@ -49,25 +49,30 @@ import System.Random (randomRIO) import Control.Monad.State.Strict -runHandlers :: [Stanza -> IO [Stanza]] -> Stanza -> IO () -runHandlers [] _ = return () -runHandlers (h:hands) sta = do - res <- h sta - forM_ res $ runHandlers hands -toChan :: TChan Stanza -> StanzaHandler -toChan stanzaC _ sta = do - atomically $ writeTChan stanzaC sta - return [sta] +runHandlers [] sta = do + errorM "Pontarius.Xmpp" $ + "No stanza handlers set, discarding stanza" ++ show sta + return () +runHandlers hs sta = go hs sta [] + where go [] _ _ = return () + go (h:hands) sta' as = do + res <- h sta' as + forM_ res $ uncurry (go hands) + +toChan :: TChan (Annotated Stanza) -> StanzaHandler +toChan stanzaC _ sta as = do + atomically $ writeTChan stanzaC (sta, as) + return [(sta, as)] handleIQ :: TVar IQHandlers -> StanzaHandler -handleIQ iqHands out sta = do +handleIQ iqHands out sta as = do case sta of IQRequestS i -> handleIQRequest iqHands i >> return [] IQResultS i -> handleIQResponse iqHands (Right i) >> return [] IQErrorS i -> handleIQResponse iqHands (Left i) >> return [] - _ -> return [sta] + _ -> return [(sta, as)] where -- If the IQ request has a namespace, send it through the appropriate channel. handleIQRequest :: TVar IQHandlers -> IQRequest -> IO () @@ -104,7 +109,7 @@ handleIQ iqHands out sta = do False -> do atomically $ putTMVar sentRef False return $ Just False - writeTChan ch $ IQRequestTicket answerT iq + writeTChan ch $ IQRequestTicket answerT iq as return Nothing maybe (return ()) (void . out) res serviceUnavailable (IQRequest iqid from _to lang _tp bd) = @@ -117,7 +122,7 @@ handleIQ iqHands out sta = do case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of (Nothing, _) -> return () -- We are not supposed to send an error. (Just tmvar, byID') -> do - let answer = Just $ either IQResponseError IQResponseResult iq + let answer = Just (either IQResponseError IQResponseResult iq, as) _ <- tryPutTMVar tmvar answer -- Don't block. writeTVar handlers (byNS, byID') where @@ -139,7 +144,7 @@ newSession stream config realm mbSasl = runErrorT $ do ros <- liftIO . newTVarIO $ Roster Nothing Map.empty rew <- lift $ newTVarIO 60 let out = writeStanza writeSem - let rosterH = if (enableRoster config) then [handleRoster ros out ] + let rosterH = if (enableRoster config) then [handleRoster ros out] else [] (sStanza, ps) <- initPlugins out $ plugins config let stanzaHandler = runHandlers $ List.concat diff --git a/source/Network/Xmpp/Concurrent/Basic.hs b/source/Network/Xmpp/Concurrent/Basic.hs index 271995f..3731cbb 100644 --- a/source/Network/Xmpp/Concurrent/Basic.hs +++ b/source/Network/Xmpp/Concurrent/Basic.hs @@ -32,11 +32,11 @@ sendStanza :: Stanza -> Session -> IO Bool sendStanza = flip sendStanza' -- | Get the channel of incoming stanzas. -getStanzaChan :: Session -> TChan Stanza +getStanzaChan :: Session -> TChan (Stanza, [Annotation]) getStanzaChan session = stanzaCh session -- | Get the next incoming stanza -getStanza :: Session -> IO Stanza +getStanza :: Session -> IO (Stanza, [Annotation]) getStanza session = atomically . readTChan $ stanzaCh session -- | Create a new session object with the inbound channel duplicated diff --git a/source/Network/Xmpp/Concurrent/IQ.hs b/source/Network/Xmpp/Concurrent/IQ.hs index 718ba03..acd1fe8 100644 --- a/source/Network/Xmpp/Concurrent/IQ.hs +++ b/source/Network/Xmpp/Concurrent/IQ.hs @@ -28,7 +28,7 @@ sendIQ :: Maybe Integer -- ^ Timeout . When the timeout is reached the response -- default) -> Element -- ^ The IQ body (there has to be exactly one) -> Session - -> IO (Maybe (TMVar (Maybe IQResponse))) + -> IO (Maybe (TMVar ( Maybe (Annotated IQResponse)))) sendIQ timeOut to tp lang body session = do -- TODO: Add timeout newId <- idGenerator session ref <- atomically $ do @@ -62,7 +62,7 @@ sendIQ' :: Maybe Integer -> Maybe LangTag -> Element -> Session - -> IO (Either IQSendError IQResponse) + -> IO (Either IQSendError (Annotated IQResponse)) sendIQ' timeout to tp lang body session = do ref <- sendIQ timeout to tp lang body session maybe (return $ Left IQSendError) (fmap (maybe (Left IQTimeOut) Right) diff --git a/source/Network/Xmpp/Concurrent/Message.hs b/source/Network/Xmpp/Concurrent/Message.hs index c3c7bef..c35157f 100644 --- a/source/Network/Xmpp/Concurrent/Message.hs +++ b/source/Network/Xmpp/Concurrent/Message.hs @@ -8,21 +8,21 @@ import Network.Xmpp.Concurrent.Basic -- | Read an element from the inbound stanza channel, discardes any -- non-Message stanzas from the channel -pullMessage :: Session -> IO (Either MessageError Message) +pullMessage :: Session -> IO (Either (Annotated MessageError) (Annotated Message)) pullMessage session = do - stanza <- atomically . readTChan $ stanzaCh session + (stanza, as) <- atomically . readTChan $ stanzaCh session case stanza of - MessageS m -> return $ Right m - MessageErrorS e -> return $ Left e + MessageS m -> return $ Right (m, as) + MessageErrorS e -> return $ Left (e, as) _ -> pullMessage session -- | Get the next received message -getMessage :: Session -> IO Message +getMessage :: Session -> IO (Annotated Message) getMessage = waitForMessage (const True) -- | Pulls a (non-error) message and returns it if the given predicate returns -- @True@. -waitForMessage :: (Message -> Bool) -> Session -> IO Message +waitForMessage :: (Annotated Message -> Bool) -> Session -> IO (Annotated Message) waitForMessage f session = do s <- pullMessage session case s of @@ -31,7 +31,9 @@ waitForMessage f session = do | otherwise -> waitForMessage f session -- | Pulls an error message and returns it if the given predicate returns @True@. -waitForMessageError :: (MessageError -> Bool) -> Session -> IO MessageError +waitForMessageError :: (Annotated MessageError -> Bool) + -> Session + -> IO (Annotated MessageError) waitForMessageError f session = do s <- pullMessage session case s of @@ -41,9 +43,10 @@ waitForMessageError f session = do -- | Pulls a message and returns it if the given predicate returns @True@. -filterMessages :: (MessageError -> Bool) - -> (Message -> Bool) - -> Session -> IO (Either MessageError Message) +filterMessages :: (Annotated MessageError -> Bool) + -> (Annotated Message -> Bool) + -> Session -> IO (Either (Annotated MessageError) + (Annotated Message)) filterMessages f g session = do s <- pullMessage session case s of diff --git a/source/Network/Xmpp/Concurrent/Presence.hs b/source/Network/Xmpp/Concurrent/Presence.hs index 6c08298..f3a5c2e 100644 --- a/source/Network/Xmpp/Concurrent/Presence.hs +++ b/source/Network/Xmpp/Concurrent/Presence.hs @@ -8,17 +8,20 @@ import Network.Xmpp.Concurrent.Basic -- | Read an element from the inbound stanza channel, discardes any non-Presence -- stanzas from the channel -pullPresence :: Session -> IO (Either PresenceError Presence) +pullPresence :: Session -> IO (Either (Annotated PresenceError) + (Annotated Presence)) pullPresence session = do - stanza <- atomically . readTChan $ stanzaCh session + (stanza, as) <- atomically . readTChan $ stanzaCh session case stanza of - PresenceS p -> return $ Right p - PresenceErrorS e -> return $ Left e + PresenceS p -> return $ Right (p, as) + PresenceErrorS e -> return $ Left (e, as) _ -> pullPresence session -- | Pulls a (non-error) presence and returns it if the given predicate returns -- @True@. -waitForPresence :: (Presence -> Bool) -> Session -> IO Presence +waitForPresence :: (Annotated Presence -> Bool) + -> Session + -> IO (Annotated Presence) waitForPresence f session = do s <- pullPresence session case s of diff --git a/source/Network/Xmpp/Concurrent/Types.hs b/source/Network/Xmpp/Concurrent/Types.hs index 5d7eef2..3dde2a9 100644 --- a/source/Network/Xmpp/Concurrent/Types.hs +++ b/source/Network/Xmpp/Concurrent/Types.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE ExistentialQuantification #-} {-# OPTIONS_HADDOCK hide #-} {-# LANGUAGE DeriveDataTypeable #-} @@ -19,8 +20,18 @@ import Network.Xmpp.IM.Roster.Types import Network.Xmpp.Sasl.Types import Network.Xmpp.Types +type StanzaHandler = (Stanza -> IO Bool) -- ^ outgoing stanza + -> Stanza -- ^ stanza to handle + -> [Annotation] + -> IO [(Stanza, [Annotation])] -- ^ modified stanzas (if any) -data Plugin' = Plugin' { inHandler :: Stanza -> IO [Stanza] +data Annotation = forall f. (Typeable f, Show f) => Annotation f + +type Annotated a = (a, [Annotation]) + +data Plugin' = Plugin' { inHandler :: Stanza + -> [Annotation] + -> IO [(Stanza, [Annotation])] , outHandler :: Stanza -> IO Bool -- | In order to allow plugins to tie the knot (Plugin -- / Session) we pass the plugin the completed Session @@ -72,7 +83,7 @@ type WriteSemaphore = TMVar (BS.ByteString -> IO Bool) -- | The Session object represents a single session with an XMPP server. You can -- use 'session' to establish a session data Session = Session - { stanzaCh :: TChan Stanza -- All stanzas + { stanzaCh :: TChan (Stanza, [Annotation]) -- All stanzas , iqHandlers :: TVar IQHandlers -- Writing lock, so that only one thread could write to the stream at any -- given time. @@ -97,7 +108,7 @@ data Session = Session -- TMVars of and TMVars for expected IQ responses (the second Text represent a -- stanza identifier. type IQHandlers = ( Map.Map (IQRequestType, Text) (TChan IQRequestTicket) - , Map.Map Text (TMVar (Maybe IQResponse)) + , Map.Map Text (TMVar (Maybe (Annotated IQResponse))) ) -- | Contains whether or not a reply has been sent, and the IQ request body to @@ -109,6 +120,8 @@ data IQRequestTicket = IQRequestTicket -- answered and Just False when the answer was attempted, -- but failed (e.g. there is a connection failure) , iqRequestBody :: IQRequest + -- | Annotations set by plugins in receive + , iqRequestAnnotations :: [Annotation] } -- | Error that can occur during sendIQ' diff --git a/source/Network/Xmpp/IM/Roster.hs b/source/Network/Xmpp/IM/Roster.hs index 324d667..03b76b0 100644 --- a/source/Network/Xmpp/IM/Roster.hs +++ b/source/Network/Xmpp/IM/Roster.hs @@ -33,10 +33,10 @@ timeout = Just 3000000 -- 3 seconds -- | Push a roster item to the server. The values for approved and ask are -- ignored and all values for subsciption except "remove" are ignored -rosterPush :: Item -> Session -> IO (Either IQSendError IQResponse) +rosterPush :: Item -> Session -> IO (Either IQSendError (Annotated IQResponse)) rosterPush item session = do let el = pickleElem xpQuery (Query Nothing [fromItem item]) - sendIQ' timeout Nothing Set Nothing el session + sendIQ' timeout Nothing Set Nothing el session -- | Add or update an item to the roster. -- @@ -45,7 +45,7 @@ rosterAdd :: Jid -- ^ JID of the item -> Maybe Text -- ^ Name alias -> [Text] -- ^ Groups (duplicates will be removed) -> Session - -> IO (Either IQSendError IQResponse) + -> IO (Either IQSendError (Annotated IQResponse)) rosterAdd j n gs session = do let el = pickleElem xpQuery (Query Nothing [QueryItem { qiApproved = Nothing @@ -67,7 +67,7 @@ rosterRemove j sess = do Just _ -> do res <- rosterPush (Item False False j Nothing Remove []) sess case res of - Right (IQResponseResult IQResult{}) -> return True + Right (IQResponseResult IQResult{}, _) -> return True _ -> return False -- | Retrieve the current Roster state @@ -86,13 +86,13 @@ initRoster session = do Just roster -> atomically $ writeTVar (rosterRef session) roster handleRoster :: TVar Roster -> StanzaHandler -handleRoster ref out sta = case sta of +handleRoster ref out sta as = case sta of IQRequestS (iqr@IQRequest{iqRequestPayload = iqb@Element{elementName = en}}) | nameNamespace en == Just "jabber:iq:roster" -> do case iqRequestFrom iqr of - Just _from -> return [sta] -- Don't handle roster pushes from - -- unauthorized sources + Just _from -> return [(sta, as)] -- Don't handle roster pushes + -- from unauthorized sources Nothing -> case unpickleElem xpQuery iqb of Right Query{ queryVer = v , queryItems = [update] @@ -104,7 +104,7 @@ handleRoster ref out sta = case sta of errorM "Pontarius.Xmpp" "Invalid roster query" _ <- out $ badRequest iqr return [] - _ -> return [sta] + _ -> return [(sta, as)] where handleUpdate v' update = atomically $ modifyTVar ref $ \(Roster v is) -> Roster (v' `mplus` v) $ case qiSubscription update of @@ -132,16 +132,16 @@ retrieveRoster mbOldRoster sess = do Left e -> do errorM "Pontarius.Xmpp.Roster" $ "getRoster: " ++ show e return Nothing - Right (IQResponseResult (IQResult{iqResultPayload = Just ros})) + Right (IQResponseResult (IQResult{iqResultPayload = Just ros}), _) -> case unpickleElem xpQuery ros of Left _e -> do errorM "Pontarius.Xmpp.Roster" "getRoster: invalid query element" return Nothing Right ros' -> return . Just $ toRoster ros' - Right (IQResponseResult (IQResult{iqResultPayload = Nothing})) -> do + Right (IQResponseResult (IQResult{iqResultPayload = Nothing}), _) -> do return mbOldRoster -- sever indicated that no roster updates are necessary - Right (IQResponseError e) -> do + Right (IQResponseError e, _) -> do errorM "Pontarius.Xmpp.Roster" $ "getRoster: server returned error" ++ show e return Nothing diff --git a/source/Network/Xmpp/Types.hs b/source/Network/Xmpp/Types.hs index f411403..57d9a76 100644 --- a/source/Network/Xmpp/Types.hs +++ b/source/Network/Xmpp/Types.hs @@ -45,7 +45,6 @@ module Network.Xmpp.Types , StreamState(..) , ConnectionState(..) , StreamErrorInfo(..) - , StanzaHandler , ConnectionDetails(..) , StreamConfiguration(..) , Jid(..) @@ -1042,10 +1041,6 @@ instance Default StreamConfiguration where } } -type StanzaHandler = (Stanza -> IO Bool) -- ^ outgoing stanza - -> Stanza -- ^ stanza to handle - -> IO [Stanza] -- ^ modified stanzas (if any) - -- | How the client should behave in regards to TLS. data TlsBehaviour = RequireTls -- ^ Require the use of TLS; disconnect if it's -- not offered.