Philipp Balzarek 10 years ago
parent
commit
3b02df0ba4
  1. 32
      source/Network/Xmpp/Concurrent.hs
  2. 9
      source/Network/Xmpp/Concurrent/Basic.hs
  3. 10
      source/Network/Xmpp/Concurrent/Threads.hs
  4. 19
      source/Network/Xmpp/Concurrent/Types.hs
  5. 2
      source/Network/Xmpp/IM/PresenceTracker.hs
  6. 8
      source/Network/Xmpp/IM/Roster.hs
  7. 15
      source/Network/Xmpp/Lens.hs
  8. 11
      source/Network/Xmpp/Marshal.hs
  9. 17
      source/Network/Xmpp/Stream.hs
  10. 7
      source/Network/Xmpp/Types.hs

32
source/Network/Xmpp/Concurrent.hs

@ -53,7 +53,12 @@ import System.Random (randomRIO)
import Control.Monad.State.Strict import Control.Monad.State.Strict
runHandlers :: [Stanza -> [Annotation] -> IO [Annotated Stanza]] -> Stanza -> IO () runHandlers :: [ XmppElement
-> [Annotation]
-> IO [Annotated XmppElement]
]
-> XmppElement
-> IO ()
runHandlers [] sta = do runHandlers [] sta = do
errorM "Pontarius.Xmpp" $ errorM "Pontarius.Xmpp" $
"No stanza handlers set, discarding stanza" ++ show sta "No stanza handlers set, discarding stanza" ++ show sta
@ -66,17 +71,20 @@ runHandlers hs sta = go hs sta []
toChan :: TChan (Annotated Stanza) -> StanzaHandler toChan :: TChan (Annotated Stanza) -> StanzaHandler
toChan stanzaC _ sta as = do toChan stanzaC _ sta as = do
atomically $ writeTChan stanzaC (sta, as) case sta of
XmppStanza s -> atomically $ writeTChan stanzaC (s, as)
_ -> return ()
return [(sta, [])] return [(sta, [])]
handleIQ :: TVar IQHandlers handleIQ :: TVar IQHandlers
-> StanzaHandler -> StanzaHandler
handleIQ iqHands out sta as = do handleIQ _ _ s@XmppNonza{} _ = return [(s, [])]
handleIQ iqHands out s@(XmppStanza sta) as = do
case sta of case sta of
IQRequestS i -> handleIQRequest iqHands i >> return [] IQRequestS i -> handleIQRequest iqHands i >> return []
IQResultS i -> handleIQResponse iqHands (Right i) >> return [] IQResultS i -> handleIQResponse iqHands (Right i) >> return []
IQErrorS i -> handleIQResponse iqHands (Left i) >> return [] IQErrorS i -> handleIQResponse iqHands (Left i) >> return []
_ -> return [(sta, [])] _ -> return [(s, [])]
where where
-- If the IQ request has a namespace, send it through the appropriate channel. -- If the IQ request has a namespace, send it through the appropriate channel.
handleIQRequest :: TVar IQHandlers -> IQRequest -> IO () handleIQRequest :: TVar IQHandlers -> IQRequest -> IO ()
@ -106,7 +114,7 @@ handleIQ iqHands out sta as = do
atomically $ putTMVar sentRef True atomically $ putTMVar sentRef True
return Nothing return Nothing
False -> do False -> do
didSend <- out response didSend <- out $ XmppStanza response
case didSend of case didSend of
Right () -> do Right () -> do
atomically $ putTMVar sentRef True atomically $ putTMVar sentRef True
@ -116,7 +124,7 @@ handleIQ iqHands out sta as = do
return $ Just er return $ Just er
writeTChan ch $ IQRequestTicket answerT iq as writeTChan ch $ IQRequestTicket answerT iq as
return Nothing return Nothing
maybe (return ()) (void . out) res maybe (return ()) (void . out . XmppStanza) res
serviceUnavailable (IQRequest iqid from _to lang _tp bd _attrs) = serviceUnavailable (IQRequest iqid from _to lang _tp bd _attrs) =
IQErrorS $ IQError iqid Nothing from lang err (Just bd) [] IQErrorS $ IQError iqid Nothing from lang err (Just bd) []
err = StanzaError Cancel ServiceUnavailable Nothing Nothing err = StanzaError Cancel ServiceUnavailable Nothing Nothing
@ -176,23 +184,23 @@ newSession stream config realm mbSasl = runErrorT $ do
rosRef <- liftIO $ newTVarIO ros rosRef <- liftIO $ newTVarIO ros
peers <- liftIO . newTVarIO $ Peers Map.empty peers <- liftIO . newTVarIO $ Peers Map.empty
rew <- lift $ newTVarIO 60 rew <- lift $ newTVarIO 60
let out = writeStanza writeSem let out = writeXmppElem writeSem
boundJid <- liftIO $ withStream' (gets streamJid) stream boundJid <- liftIO $ withStream' (gets streamJid) stream
let rosterH = if (enableRoster config) let rosterH = if (enableRoster config)
then [handleRoster boundJid rosRef then [handleRoster boundJid rosRef
(fromMaybe (\_ -> return ()) $ onRosterPush config) (fromMaybe (\_ -> return ()) $ onRosterPush config)
out] (out)]
else [] else []
let presenceH = if (enablePresenceTracking config) let presenceH = if (enablePresenceTracking config)
then [handlePresence (onPresenceChange config) peers out] then [handlePresence (onPresenceChange config) peers out]
else [] else []
(sStanza, ps) <- initPlugins out $ plugins config (sXmppElement, ps) <- initPlugins out $ plugins config
let stanzaHandler = runHandlers $ List.concat let stanzaHandler = runHandlers $ List.concat
[ inHandler <$> ps [ inHandler <$> ps
, [ toChan stanzaChan sStanza] , [ toChan stanzaChan sXmppElement]
, presenceH , presenceH
, rosterH , rosterH
, [ handleIQ iqHands sStanza] , [ handleIQ iqHands sXmppElement]
] ]
(kill, sState, reader) <- ErrorT $ startThreadsWith writeSem stanzaHandler (kill, sState, reader) <- ErrorT $ startThreadsWith writeSem stanzaHandler
eh stream eh stream
@ -209,7 +217,7 @@ newSession stream config realm mbSasl = runErrorT $ do
, conf = config , conf = config
, rosterRef = rosRef , rosterRef = rosRef
, presenceRef = peers , presenceRef = peers
, sendStanza' = sStanza , sendStanza' = sXmppElement . XmppStanza
, sRealm = realm , sRealm = realm
, sSaslCredentials = mbSasl , sSaslCredentials = mbSasl
, reconnectWait = rew , reconnectWait = rew

9
source/Network/Xmpp/Concurrent/Basic.hs

@ -17,6 +17,15 @@ semWrite sem bs = Ex.bracket (atomically $ takeTMVar sem)
(atomically . putTMVar sem) (atomically . putTMVar sem)
($ bs) ($ bs)
writeXmppElem :: WriteSemaphore -> XmppElement -> IO (Either XmppFailure ())
writeXmppElem sem a = do
let el = case a of
XmppStanza s -> pickleElem xpStanza s
XmppNonza n -> n
outData = renderElement $ nsHack el
debugOut outData
semWrite sem outData
writeStanza :: WriteSemaphore -> Stanza -> IO (Either XmppFailure ()) writeStanza :: WriteSemaphore -> Stanza -> IO (Either XmppFailure ())
writeStanza sem a = do writeStanza sem a = do
let outData = renderElement $ nsHack (pickleElem xpStanza a) let outData = renderElement $ nsHack (pickleElem xpStanza a)

10
source/Network/Xmpp/Concurrent/Threads.hs

@ -19,11 +19,11 @@ import System.Log.Logger
-- Worker to read stanzas from the stream and concurrently distribute them to -- Worker to read stanzas from the stream and concurrently distribute them to
-- all listener threads. -- all listener threads.
readWorker :: (Stanza -> IO ()) readWorker :: (XmppElement -> IO ())
-> (XmppFailure -> IO ()) -> (XmppFailure -> IO ())
-> TMVar Stream -> TMVar Stream
-> IO a -> IO a
readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do readWorker onElement onCClosed stateRef = forever . Ex.mask_ $ do
s' <- Ex.catches ( do s' <- Ex.catches ( do
atomically $ do atomically $ do
s@(Stream con) <- readTMVar stateRef s@(Stream con) <- readTMVar stateRef
@ -44,7 +44,7 @@ readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do
-- we don't know whether pull will -- we don't know whether pull will
-- necessarily be interruptible -- necessarily be interruptible
allowInterrupt allowInterrupt
res <- pullStanza s res <- pullXmppElement s
case res of case res of
Left e -> do Left e -> do
errorM "Pontarius.Xmpp" $ "Read error: " errorM "Pontarius.Xmpp" $ "Read error: "
@ -61,7 +61,7 @@ readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do
case res of case res of
Nothing -> return () -- Caught an exception, nothing to Nothing -> return () -- Caught an exception, nothing to
-- do. TODO: Can this happen? -- do. TODO: Can this happen?
Just sta -> void $ onStanza sta Just sta -> void $ onElement sta
where where
-- Defining an Control.Exception.allowInterrupt equivalent for GHC 7 -- Defining an Control.Exception.allowInterrupt equivalent for GHC 7
-- compatibility. -- compatibility.
@ -82,7 +82,7 @@ readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do
-- stances, respectively, and an Action to stop the Threads and close the -- stances, respectively, and an Action to stop the Threads and close the
-- connection. -- connection.
startThreadsWith :: TMVar (BS.ByteString -> IO (Either XmppFailure ())) startThreadsWith :: TMVar (BS.ByteString -> IO (Either XmppFailure ()))
-> (Stanza -> IO ()) -> (XmppElement -> IO ())
-> TMVar EventHandlers -> TMVar EventHandlers
-> Stream -> Stream
-> Maybe Int -> Maybe Int

19
source/Network/Xmpp/Concurrent/Types.hs

@ -21,11 +21,12 @@ import Network.Xmpp.IM.PresenceTracker.Types
import Network.Xmpp.Sasl.Types import Network.Xmpp.Sasl.Types
import Network.Xmpp.Types import Network.Xmpp.Types
type StanzaHandler = (Stanza -> IO (Either XmppFailure ()) ) -- ^ outgoing stanza type StanzaHandler = (XmppElement -> IO (Either XmppFailure ()) ) -- ^ outgoing
-> Stanza -- ^ stanza to handle -- stanza
-> XmppElement -- ^ stanza to handle
-> [Annotation] -- ^ annotations added by previous handlers -> [Annotation] -- ^ annotations added by previous handlers
-> IO [(Stanza, [Annotation])] -- ^ modified stanzas and -> IO [(XmppElement, [Annotation])] -- ^ modified stanzas and
-- /additional/ annotations -- /additional/ annotations
type Resource = Text type Resource = Text
@ -56,17 +57,17 @@ getAnnotation = foldr (\(Annotation a) b -> maybe b Just $ cast a) Nothing . snd
data Plugin' = Plugin' data Plugin' = Plugin'
{ -- | Resulting stanzas and additional Annotations { -- | Resulting stanzas and additional Annotations
inHandler :: Stanza inHandler :: XmppElement
-> [Annotation] -> [Annotation]
-> IO [(Stanza, [Annotation])] -> IO [(XmppElement, [Annotation])]
, outHandler :: Stanza -> IO (Either XmppFailure ()) , outHandler :: XmppElement -> IO (Either XmppFailure ())
-- | In order to allow plugins to tie the knot (Plugin / Session) we pass -- | In order to allow plugins to tie the knot (Plugin / Session) we pass
-- the plugin the completed Session once it exists -- the plugin the completed Session once it exists
, onSessionUp :: Session -> IO () , onSessionUp :: Session -> IO ()
} }
type Plugin = (Stanza -> IO (Either XmppFailure ())) -- ^ pass stanza to next type Plugin = (XmppElement -> IO (Either XmppFailure ())) -- ^ pass stanza to
-- plugin -- next plugin
-> ErrorT XmppFailure IO Plugin' -> ErrorT XmppFailure IO Plugin'
-- | Configuration for the @Session@ object. -- | Configuration for the @Session@ object.

2
source/Network/Xmpp/IM/PresenceTracker.hs

@ -64,7 +64,7 @@ handlePresence :: Maybe (Jid -> PeerStatus -> PeerStatus -> IO ())
-> StanzaHandler -> StanzaHandler
handlePresence onChange peers _ st _ = do handlePresence onChange peers _ st _ = do
let mbPr = do let mbPr = do
pr <- st ^? _Presence -- Only act on presence stanzas pr <- st ^? _Stanza . _Presence -- Only act on presence stanzas
fr <- pr ^? from . _Just . _isFull -- Only act on full JIDs fr <- pr ^? from . _Just . _isFull -- Only act on full JIDs
return (pr, fr) return (pr, fr)
Foldable.forM_ mbPr $ \(pr, fr) -> Foldable.forM_ mbPr $ \(pr, fr) ->

8
source/Network/Xmpp/IM/Roster.hs

@ -104,8 +104,8 @@ handleRoster :: Maybe Jid
-> StanzaHandler -> StanzaHandler
handleRoster mbBoundJid ref onUpdate out sta _ = do handleRoster mbBoundJid ref onUpdate out sta _ = do
case sta of case sta of
IQRequestS (iqr@IQRequest{iqRequestPayload = XmppStanza (IQRequestS (iqr@IQRequest{iqRequestPayload =
iqb@Element{elementName = en}}) iqb@Element{elementName = en}}))
| nameNamespace en == Just "jabber:iq:roster" -> do | nameNamespace en == Just "jabber:iq:roster" -> do
let doHandle = case (iqRequestFrom iqr, mbBoundJid) of let doHandle = case (iqRequestFrom iqr, mbBoundJid) of
-- We don't need to check our own JID when the IQ -- We don't need to check our own JID when the IQ
@ -124,11 +124,11 @@ handleRoster mbBoundJid ref onUpdate out sta _ = do
} -> do } -> do
handleUpdate v update handleUpdate v update
onUpdate update onUpdate update
_ <- out $ result iqr _ <- out . XmppStanza $ result iqr
return [] return []
_ -> do _ -> do
errorM "Pontarius.Xmpp" "Invalid roster query" errorM "Pontarius.Xmpp" "Invalid roster query"
_ <- out $ badRequest iqr _ <- out . XmppStanza $ badRequest iqr
return [] return []
-- Don't handle roster pushes from unauthorized sources -- Don't handle roster pushes from unauthorized sources
else return [(sta, [])] else return [(sta, [])]

15
source/Network/Xmpp/Lens.hs

@ -51,7 +51,9 @@ module Network.Xmpp.Lens
, _isFull , _isFull
, _isBare , _isBare
-- ** Stanzas -- ** Stanzas and Nonzas
, _Stanza
, _Nonza
, _IQRequest , _IQRequest
, _IQResult , _IQResult
, _IQError , _IQError
@ -212,6 +214,17 @@ _isFull = prism' id (\j -> if isFull j then Just j else Nothing)
_isBare :: Prism Jid Jid _isBare :: Prism Jid Jid
_isBare = prism' toBare (\j -> if isBare j then Just j else Nothing) _isBare = prism' toBare (\j -> if isBare j then Just j else Nothing)
_Stanza :: Prism XmppElement Stanza
_Stanza = prism' XmppStanza (\v -> case v of
XmppStanza s -> Just s
_ -> Nothing)
_Nonza :: Prism XmppElement Element
_Nonza = prism' XmppNonza (\v -> case v of
XmppNonza n -> Just n
_ -> Nothing)
class IsStanza s where class IsStanza s where
-- | From-attribute of the stanza -- | From-attribute of the stanza
from :: Lens s (Maybe Jid) from :: Lens s (Maybe Jid)

11
source/Network/Xmpp/Marshal.hs

@ -22,6 +22,17 @@ import Network.Xmpp.Types
xpNonemptyText :: PU Text NonemptyText xpNonemptyText :: PU Text NonemptyText
xpNonemptyText = ("xpNonemptyText" , "") <?+> xpWrap Nonempty fromNonempty xpText xpNonemptyText = ("xpNonemptyText" , "") <?+> xpWrap Nonempty fromNonempty xpText
xpStreamElement :: PU [Node] (Either StreamErrorInfo XmppElement)
xpStreamElement = xpEither xpStreamError $
xpWrap (\v -> case v of
Left l -> XmppStanza l
Right r -> XmppNonza r
)
( \v -> case v of
XmppStanza l -> Left l
XmppNonza r -> Right r)
$ xpEither xpStanza xpElemVerbatim
xpStreamStanza :: PU [Node] (Either StreamErrorInfo Stanza) xpStreamStanza :: PU [Node] (Either StreamErrorInfo Stanza)
xpStreamStanza = xpEither xpStreamError xpStanza xpStreamStanza = xpEither xpStreamError xpStanza

17
source/Network/Xmpp/Stream.hs

@ -239,6 +239,7 @@ restartStream = do
startStream startStream
-- Creates a conduit from a StreamHandle
sourceStreamHandle :: (MonadIO m, MonadError XmppFailure m) sourceStreamHandle :: (MonadIO m, MonadError XmppFailure m)
=> StreamHandle -> ConduitM i ByteString m () => StreamHandle -> ConduitM i ByteString m ()
sourceStreamHandle s = loopRead $ streamReceive s sourceStreamHandle s = loopRead $ streamReceive s
@ -395,11 +396,10 @@ pushElement x = do
let outData = renderElement $ nsHack x let outData = renderElement $ nsHack x
debugOut outData debugOut outData
lift $ send outData lift $ send outData
where
-- HACK: We remove the "jabber:client" namespace because it is set as
-- default in the stream. This is to make isode's M-LINK server happy and
-- should be removed once jabber.org accepts prefix-free canonicalization
-- HACK: We remove the "jabber:client" namespace because it is set as
-- default in the stream. This is to make isode's M-LINK server happy and
-- should be removed once jabber.org accepts prefix-free canonicalization
nsHack :: Element -> Element nsHack :: Element -> Element
nsHack e@(Element{elementName = n}) nsHack e@(Element{elementName = n})
| nameNamespace n == Just "jabber:client" = | nameNamespace n == Just "jabber:client" =
@ -477,6 +477,15 @@ pullStanza = withStream' $ do
Right (Left e) -> return $ Left $ StreamErrorFailure e Right (Left e) -> return $ Left $ StreamErrorFailure e
Right (Right r) -> return $ Right r Right (Right r) -> return $ Right r
-- | Pulls a stanza, nonza or stream error from the stream.
pullXmppElement :: Stream -> IO (Either XmppFailure XmppElement)
pullXmppElement = withStream' $ do
res <- pullUnpickle xpStreamElement
case res of
Left e -> return $ Left e
Right (Left e) -> return $ Left $ StreamErrorFailure e
Right (Right r) -> return $ Right r
-- Performs the given IO operation, catches any errors and re-throws everything -- Performs the given IO operation, catches any errors and re-throws everything
-- except 'ResourceVanished' and IllegalOperation, which it will return. -- except 'ResourceVanished' and IllegalOperation, which it will return.
catchPush :: IO () -> IO (Either XmppFailure ()) catchPush :: IO () -> IO (Either XmppFailure ())

7
source/Network/Xmpp/Types.hs

@ -42,6 +42,7 @@ module Network.Xmpp.Types
, SaslFailure(..) , SaslFailure(..)
, StreamFeatures(..) , StreamFeatures(..)
, Stanza(..) , Stanza(..)
, XmppElement(..)
, messageS , messageS
, messageErrorS , messageErrorS
, presenceS , presenceS
@ -138,7 +139,11 @@ nonEmpty txt = if Text.all isSpace txt then Nothing else Just (Nonempty txt)
text :: NonemptyText -> Text text :: NonemptyText -> Text
text (Nonempty txt) = txt text (Nonempty txt) = txt
-- | The Xmpp communication primities (Message, Presence and Info/Query) are data XmppElement = XmppStanza !Stanza
| XmppNonza !Element
deriving (Eq, Show)
-- | The Xmpp communication primitives (Message, Presence and Info/Query) are
-- called stanzas. -- called stanzas.
data Stanza = IQRequestS !IQRequest data Stanza = IQRequestS !IQRequest
| IQResultS !IQResult | IQResultS !IQResult

Loading…
Cancel
Save