@ -5,6 +5,7 @@
{- # LANGUAGE OverloadedStrings # -}
{- # LANGUAGE OverloadedStrings # -}
{- # LANGUAGE TupleSections # -}
{- # LANGUAGE TupleSections # -}
{- # LANGUAGE ScopedTypeVariables # -}
{- # LANGUAGE ScopedTypeVariables # -}
{- # LANGUAGE FlexibleContexts # -}
module Network.Xmpp.Stream where
module Network.Xmpp.Stream where
@ -16,7 +17,6 @@ import qualified Control.Exception.Lifted as ExL
import Control.Monad
import Control.Monad
import Control.Monad.Error
import Control.Monad.Error
import Control.Monad.State.Strict
import Control.Monad.State.Strict
import Control.Monad.Trans.Resource as R
import Data.ByteString ( ByteString )
import Data.ByteString ( ByteString )
import qualified Data.ByteString as BS
import qualified Data.ByteString as BS
import qualified Data.ByteString.Char8 as BSC8
import qualified Data.ByteString.Char8 as BSC8
@ -44,7 +44,6 @@ import System.IO
import System.Log.Logger
import System.Log.Logger
import System.Random ( randomRIO )
import System.Random ( randomRIO )
import Text.XML.Stream.Parse as XP
import Text.XML.Stream.Parse as XP
import Text.XML.Unresolved ( InvalidEventStream ( .. ) )
import Network.Xmpp.Utilities
import Network.Xmpp.Utilities
@ -65,17 +64,6 @@ lmb :: [t] -> Maybe [t]
lmb [] = Nothing
lmb [] = Nothing
lmb x = Just x
lmb x = Just x
pushing :: MonadIO m =>
m ( Either XmppFailure Bool )
-> ErrorT XmppFailure m ()
pushing m = do
res <- ErrorT m
case res of
True -> return ()
False -> do
liftIO $ debugM " Pontarius.Xmpp " " Failed to send data. "
throwError XmppOtherFailure
-- Unpickles and returns a stream element.
-- Unpickles and returns a stream element.
streamUnpickleElem :: PU [ Node ] a
streamUnpickleElem :: PU [ Node ] a
-> Element
-> Element
@ -89,7 +77,7 @@ streamUnpickleElem p x = do
-- This is the conduit sink that handles the stream XML events. We extend it
-- This is the conduit sink that handles the stream XML events. We extend it
-- with ErrorT capabilities.
-- with ErrorT capabilities.
type StreamSink a = ErrorT XmppFailure ( ConduitM Event Void IO ) a
type StreamSink a = ConduitM Event Void ( ErrorT XmppFailure IO ) a
-- Discards all events before the first EventBeginElement.
-- Discards all events before the first EventBeginElement.
throwOutJunk :: Monad m => ConduitM Event a m ()
throwOutJunk :: Monad m => ConduitM Event a m ()
@ -103,8 +91,8 @@ throwOutJunk = do
-- Returns an (empty) Element from a stream of XML events.
-- Returns an (empty) Element from a stream of XML events.
openElementFromEvents :: StreamSink Element
openElementFromEvents :: StreamSink Element
openElementFromEvents = do
openElementFromEvents = do
lift throwOutJunk
throwOutJunk
hd <- lift CL . head
hd <- await
case hd of
case hd of
Just ( EventBeginElement name attrs ) -> return $ Element name attrs []
Just ( EventBeginElement name attrs ) -> return $ Element name attrs []
_ -> do
_ -> do
@ -134,15 +122,15 @@ startStream = runErrorT $ do
lift $ lift $ errorM " Pontarius.Xmpp " " Server sent no hostname. "
lift $ lift $ errorM " Pontarius.Xmpp " " Server sent no hostname. "
throwError XmppOtherFailure
throwError XmppOtherFailure
Just address -> do
Just address -> do
pushing pushXmlDecl
ErrorT $ pushXmlDecl
pushing . pushOpenElement . streamNSHack $
ErrorT . pushOpenElement . streamNSHack $
pickleElem xpStream ( " 1.0 "
pickleElem xpStream ( " 1.0 "
, expectedTo
, expectedTo
, Just ( Jid Nothing address Nothing )
, Just ( Jid Nothing address Nothing )
, Nothing
, Nothing
, preferredLang $ streamConfiguration st
, preferredLang $ streamConfiguration st
)
)
response <- ErrorT $ runEventsSink $ runErrorT $ streamS expectedTo
response <- ErrorT $ runEventsSink $ streamS expectedTo
case response of
case response of
Right ( ver , from , to , sid , lt , features )
Right ( ver , from , to , sid , lt , features )
| ( Text . unpack ver ) /= " 1.0 " ->
| ( Text . unpack ver ) /= " 1.0 " ->
@ -244,11 +232,15 @@ restartStream = do
startStream
startStream
sourceStreamHandle :: MonadIO m => StreamHandle -> ConduitM i ByteString m ()
sourceStreamHandle :: ( MonadIO m , MonadError XmppFailure m )
=> StreamHandle -> ConduitM i ByteString m ()
sourceStreamHandle s = loopRead $ streamReceive s
sourceStreamHandle s = loopRead $ streamReceive s
where
where
loopRead rd = do
loopRead rd = do
bs <- liftIO ( rd 4096 )
bs' <- liftIO ( rd 4096 )
bs <- case bs' of
Left e -> throwError e
Right r -> return r
if BS . null bs
if BS . null bs
then return ()
then return ()
else do
else do
@ -260,25 +252,31 @@ sourceStreamHandle s = loopRead $ streamReceive s
-- We buffer sources because we don't want to lose data when multiple
-- We buffer sources because we don't want to lose data when multiple
-- xml-entities are sent with the same packet and we don't want to eternally
-- xml-entities are sent with the same packet and we don't want to eternally
-- block the StreamState while waiting for data to arrive
-- block the StreamState while waiting for data to arrive
bufferSrc :: MonadIO m => Source IO o -> IO ( ConduitM i o m () )
bufferSrc :: Source ( ErrorT XmppFailure IO ) o
-> IO ( ConduitM i o ( ErrorT XmppFailure IO ) () )
bufferSrc src = do
bufferSrc src = do
ref <- newTMVarIO $ DCI . ResumableSource src ( return () )
ref <- newTMVarIO $ DCI . ResumableSource src ( return () )
let go = do
let go = do
dt <- liftIO $ Ex . bracketOnError ( atomically $ takeTMVar ref )
dt <- liftIO $ Ex . bracketOnError
( \ _ -> atomically . putTMVar ref $
( atomically $ takeTMVar ref )
DCI . ResumableSource zeroSource
( \ _ -> atomically . putTMVar ref $ zeroResumableSource )
( return () )
)
( \ s -> do
( \ s -> do
( s' , dt ) <- s $$++ CL . head
res <- runErrorT ( s $$++ await )
case res of
Left e -> do
atomically $ putTMVar ref zeroResumableSource
return $ Left e
Right ( s' , b ) -> do
atomically $ putTMVar ref s'
atomically $ putTMVar ref s'
return dt
return $ Right b
)
)
case dt of
case dt of
Nothing -> return ()
Left e -> throwError e
Just d -> yield d >> go
Right Nothing -> return ()
Right ( Just d ) -> yield d >> go
return go
return go
where
zeroResumableSource = DCI . ResumableSource zeroSource ( return () )
-- Reads the (partial) stream:stream and the server features from the stream.
-- Reads the (partial) stream:stream and the server features from the stream.
-- Returns the (unvalidated) stream attributes, the unparsed element, or
-- Returns the (unvalidated) stream attributes, the unparsed element, or
@ -302,7 +300,7 @@ streamS _expectedTo = do -- TODO: check expectedTo
where
where
xmppStreamHeader :: StreamSink ( Either Element ( Text , Maybe Jid , Maybe Jid , Maybe Text . Text , Maybe LangTag ) )
xmppStreamHeader :: StreamSink ( Either Element ( Text , Maybe Jid , Maybe Jid , Maybe Text . Text , Maybe LangTag ) )
xmppStreamHeader = do
xmppStreamHeader = do
lift throwOutJunk
throwOutJunk
-- Get the stream:stream element (or whatever it is) from the server,
-- Get the stream:stream element (or whatever it is) from the server,
-- and validate what we get.
-- and validate what we get.
el <- openElementFromEvents -- May throw `XmppOtherFailure' if an
el <- openElementFromEvents -- May throw `XmppOtherFailure' if an
@ -312,7 +310,7 @@ streamS _expectedTo = do -- TODO: check expectedTo
Right r -> return $ Right r
Right r -> return $ Right r
xmppStreamFeatures :: StreamSink StreamFeatures
xmppStreamFeatures :: StreamSink StreamFeatures
xmppStreamFeatures = do
xmppStreamFeatures = do
e <- lift $ elements =$ CL . head
e <- elements =$ await
case e of
case e of
Nothing -> do
Nothing -> do
lift $ lift $ errorM " Pontarius.Xmpp " " streamS: Stream ended. "
lift $ lift $ errorM " Pontarius.Xmpp " " streamS: Stream ended. "
@ -367,21 +365,22 @@ debugOut :: MonadIO m => ByteString -> m ()
debugOut outData = liftIO $ debugM " Pontarius.Xmpp "
debugOut outData = liftIO $ debugM " Pontarius.Xmpp "
( " Out: " ++ ( Text . unpack . Text . decodeUtf8 $ outData ) )
( " Out: " ++ ( Text . unpack . Text . decodeUtf8 $ outData ) )
wrapIOException :: IO a -> StateT StreamState IO ( Either XmppFailure a )
wrapIOException :: MonadIO m =>
IO a -> m ( Either XmppFailure a )
wrapIOException action = do
wrapIOException action = do
r <- liftIO $ tryIOError action
r <- liftIO $ tryIOError action
case r of
case r of
Right b -> return $ Right b
Right b -> return $ Right b
Left e -> do
Left e -> do
lift $ warningM " Pontarius.Xmpp " $ " wrapIOException: Exception wrapped: " ++ ( show e )
liftIO $ warningM " Pontarius.Xmpp " $ " wrapIOException: Exception wrapped: " ++ ( show e )
return $ Left $ XmppIOException e
return $ Left $ XmppIOException e
pushElement :: Element -> StateT StreamState IO ( Either XmppFailure Bool )
pushElement :: Element -> StateT StreamState IO ( Either XmppFailure () )
pushElement x = do
pushElement x = do
send <- gets ( streamSend . streamHandle )
send <- gets ( streamSend . streamHandle )
let outData = renderElement $ nsHack x
let outData = renderElement $ nsHack x
debugOut outData
debugOut outData
wrapIOException $ send outData
lift $ send outData
where
where
-- HACK: We remove the "jabber:client" namespace because it is set as
-- HACK: We remove the "jabber:client" namespace because it is set as
-- default in the stream. This is to make isode's M-LINK server happy and
-- default in the stream. This is to make isode's M-LINK server happy and
@ -400,53 +399,46 @@ nsHack e@(Element{elementName = n})
mapNSHack nd = nd
mapNSHack nd = nd
-- | Encode and send stanza
-- | Encode and send stanza
pushStanza :: Stanza -> Stream -> IO ( Either XmppFailure Bool )
pushStanza :: Stanza -> Stream -> IO ( Either XmppFailure () )
pushStanza s = withStream' . pushElement $ pickleElem xpStanza s
pushStanza s = withStream' . pushElement $ pickleElem xpStanza s
-- XML documents and XMPP streams SHOULD be preceeded by an XML declaration.
-- XML documents and XMPP streams SHOULD be preceeded by an XML declaration.
-- UTF-8 is the only supported XMPP encoding. The standalone document
-- UTF-8 is the only supported XMPP encoding. The standalone document
-- declaration (matching "SDDecl" in the XML standard) MUST NOT be included in
-- declaration (matching "SDDecl" in the XML standard) MUST NOT be included in
-- XMPP streams. RFC 6120 defines XMPP only in terms of XML 1.0.
-- XMPP streams. RFC 6120 defines XMPP only in terms of XML 1.0.
pushXmlDecl :: StateT StreamState IO ( Either XmppFailure Bool )
pushXmlDecl :: StateT StreamState IO ( Either XmppFailure () )
pushXmlDecl = do
pushXmlDecl = do
con <- gets streamHandle
con <- gets streamHandle
wrapIOException $ ( streamSend con ) " <?xml version= \ " 1.0 \ " encoding= \ " UTF-8 \ " ?> "
lift $ streamSend con " <?xml version= \ " 1.0 \ " encoding= \ " UTF-8 \ " ?> "
pushOpenElement :: Element -> StateT StreamState IO ( Either XmppFailure Bool )
pushOpenElement :: Element -> StateT StreamState IO ( Either XmppFailure () )
pushOpenElement e = do
pushOpenElement e = do
send <- gets ( streamSend . streamHandle )
send <- gets ( streamSend . streamHandle )
let outData = renderOpenElement e
let outData = renderOpenElement e
debugOut outData
debugOut outData
wrapIOException $ send outData
lift $ send outData
-- `Connect-and-resumes' the given sink to the stream source, and pulls a
-- `Connect-and-resumes' the given sink to the stream source, and pulls a
-- `b' value.
-- `b' value.
runEventsSink :: Sink Event IO b -> StateT StreamState IO b
runEventsSink :: Sink Event ( ErrorT XmppFailure IO ) b
-> StateT StreamState IO ( Either XmppFailure b )
runEventsSink snk = do -- TODO: Wrap exceptions?
runEventsSink snk = do -- TODO: Wrap exceptions?
src <- gets streamEventSource
src <- gets streamEventSource
r <- liftIO $ src $$ snk
lift . runErrorT $ src $$ snk
return r
pullElement :: StateT StreamState IO ( Either XmppFailure Element )
pullElement :: StateT StreamState IO ( Either XmppFailure Element )
pullElement = do
pullElement = do
ExL . catches ( do
e <- runEventsSink ( elements =$ await )
e <- runEventsSink ( elements =$ await )
case e of
case e of
Nothing -> do
Left l -> do
lift $ errorM " Pontarius.Xmpp " " pullElement: Stream ended. "
liftIO . errorM " Pontarius.Xmpp " $
return . Left $ XmppOtherFailure
" Error while retrieving XML element: " ++ show l
Just r -> return $ Right r
return $ Left l
)
[ ExL . Handler ( \ StreamEnd -> return $ Left StreamEndFailure )
Right Nothing -> do
, ExL . Handler ( \ ( InvalidXmppXml s ) -- Invalid XML `Event' encountered, or missing element close tag
liftIO $ errorM " Pontarius.Xmpp " " pullElement: Stream ended. "
-> do
lift $ errorM " Pontarius.Xmpp " $ " pullElement: Invalid XML: " ++ ( show s )
return . Left $ XmppOtherFailure )
, ExL . Handler $ \ ( e :: InvalidEventStream )
-> do
lift $ errorM " Pontarius.Xmpp " $ " pullElement: Invalid event stream: " ++ ( show e )
return . Left $ XmppOtherFailure
return . Left $ XmppOtherFailure
]
Right ( Just r ) -> return $ Right r
-- Pulls an element and unpickles it.
-- Pulls an element and unpickles it.
pullUnpickle :: PU [ Node ] a -> StateT StreamState IO ( Either XmppFailure a )
pullUnpickle :: PU [ Node ] a -> StateT StreamState IO ( Either XmppFailure a )
@ -473,21 +465,21 @@ pullStanza = withStream' $ do
-- Performs the given IO operation, catches any errors and re-throws everything
-- Performs the given IO operation, catches any errors and re-throws everything
-- except 'ResourceVanished' and IllegalOperation, in which case it will return False instead
-- except 'ResourceVanished' and IllegalOperation, in which case it will return False instead
catchPush :: IO () -> IO Bool
catchPush :: IO () -> IO ( Either XmppFailure () )
catchPush p = ExL . catch
catchPush p = ExL . catch
( p >> return True )
( p >> return ( Right () ) )
( \ e -> case GIE . ioe_type e of
( \ e -> case GIE . ioe_type e of
GIE . ResourceVanished -> return Fals e
GIE . ResourceVanished -> return . Left $ XmppIOException e
GIE . IllegalOperation -> return Fals e
GIE . IllegalOperation -> return . Left $ XmppIOException e
_ -> ExL . throwIO e
_ -> ExL . throwIO e
)
)
zeroHandle :: StreamHandle
zeroHandle :: StreamHandle
zeroHandle = StreamHandle { streamSend = \ _ -> return False
zeroHandle = StreamHandle { streamSend = \ _ -> return ( Left XmppNoStream )
, streamReceive = \ _ -> do
, streamReceive = \ _ -> do
errorM " Pontarius.Xmpp "
errorM " Pontarius.Xmpp "
" xmppNoStream: Stream is closed. "
" xmppNoStream: Stream is closed. "
ExL . throwIO XmppOtherFailure
return $ Left XmppNoStream
, streamFlush = return ()
, streamFlush = return ()
, streamClose = return ()
, streamClose = return ()
}
}
@ -507,14 +499,16 @@ xmppNoStream = StreamState {
, streamConfiguration = def
, streamConfiguration = def
}
}
zeroSource :: Source IO output
zeroSource :: Source ( ErrorT XmppFailure IO ) a
zeroSource = liftIO $ do
zeroSource = do
debugM " Pontarius.Xmpp " " zeroSource "
liftIO $ debugM " Pontarius.Xmpp " " zeroSource "
ExL . throwIO XmppOtherFailure
throwError XmppNoStream
handleToStreamHandle :: Handle -> StreamHandle
handleToStreamHandle :: Handle -> StreamHandle
handleToStreamHandle h = StreamHandle { streamSend = \ d -> catchPush $ BS . hPut h d
handleToStreamHandle h = StreamHandle { streamSend = \ d ->
, streamReceive = \ n -> BS . hGetSome h n
wrapIOException $ BS . hPut h d
, streamReceive = \ n ->
wrapIOException $ BS . hGetSome h n
, streamFlush = hFlush h
, streamFlush = hFlush h
, streamClose = hClose h
, streamClose = hClose h
}
}
@ -547,9 +541,9 @@ createStream realm config = do
lift $ debugM " Pontarius.Xmpp " " Did not acquire handle. "
lift $ debugM " Pontarius.Xmpp " " Did not acquire handle. "
throwError TcpConnectionFailure
throwError TcpConnectionFailure
where
where
logConduit :: Conduit ByteString IO ByteString
logConduit :: MonadIO m => Conduit ByteString m ByteString
logConduit = CL . mapM $ \ d -> do
logConduit = CL . mapM $ \ d -> do
debugM " Pontarius.Xmpp " $ " In: " ++ ( BSC8 . unpack d ) ++
liftIO . debugM " Pontarius.Xmpp " $ " In: " ++ ( BSC8 . unpack d ) ++
" . "
" . "
return d
return d
@ -780,7 +774,7 @@ pushIQ :: Text
-> Stream
-> Stream
-> IO ( Either XmppFailure ( Either IQError IQResult ) )
-> IO ( Either XmppFailure ( Either IQError IQResult ) )
pushIQ iqID to tp lang body stream = runErrorT $ do
pushIQ iqID to tp lang body stream = runErrorT $ do
pushing $ pushStanza
ErrorT $ pushStanza
( IQRequestS $ IQRequest iqID Nothing to lang tp body ) stream
( IQRequestS $ IQRequest iqID Nothing to lang tp body ) stream
res <- lift $ pullStanza stream
res <- lift $ pullStanza stream
case res of
case res of
@ -807,7 +801,7 @@ debugConduit = forever $ do
yield s
yield s
Nothing -> return ()
Nothing -> return ()
elements :: R . MonadThrow m => Conduit Event m Element
elements :: MonadError XmppFailure m => Conduit Event m Element
elements = do
elements = do
x <- await
x <- await
case x of
case x of
@ -816,11 +810,11 @@ elements = do
elements
elements
-- This might be an XML error if the end element tag is not
-- This might be an XML error if the end element tag is not
-- "</stream>". TODO: We might want to check this at a later time
-- "</stream>". TODO: We might want to check this at a later time
Just ( EventEndElement _ ) -> lif t $ R . monadT hrow StreamEnd
Just ( EventEndElement _ ) -> throwError StreamEndFailure
Just ( EventContent ( ContentText ct ) ) | Text . all isSpace ct ->
Just ( EventContent ( ContentText ct ) ) | Text . all isSpace ct ->
elements
elements
Nothing -> return ()
Nothing -> return ()
_ -> lif t $ R . monadT hrow $ InvalidXmpp Xml $ " not an element: " ++ show x
_ -> throwError $ Xmpp InvalidXml $ " not an element: " ++ show x
where
where
many' f =
many' f =
go id
go id
@ -834,8 +828,7 @@ elements = do
( y , ns ) <- many' goN
( y , ns ) <- many' goN
if y == Just ( EventEndElement n )
if y == Just ( EventEndElement n )
then return $ Element n as $ compressNodes ns
then return $ Element n as $ compressNodes ns
else lift $ R . monadThrow $ InvalidXmppXml $
else throwError . XmppInvalidXml $ " Missing close tag: " ++ show n
" Missing close tag: " ++ show n
goN = do
goN = do
x <- await
x <- await
case x of
case x of