|
|
|
@ -142,9 +142,7 @@ startStream = runErrorT $ do |
|
|
|
) |
|
|
|
) |
|
|
|
response <- ErrorT $ runEventsSink $ runErrorT $ streamS expectedTo |
|
|
|
response <- ErrorT $ runEventsSink $ runErrorT $ streamS expectedTo |
|
|
|
case response of |
|
|
|
case response of |
|
|
|
Left e -> throwError e |
|
|
|
Right (ver, from, to, sid, lt, features) |
|
|
|
-- Successful unpickling of stream element. |
|
|
|
|
|
|
|
Right (Right (ver, from, to, sid, lt, features)) |
|
|
|
|
|
|
|
| (Text.unpack ver) /= "1.0" -> |
|
|
|
| (Text.unpack ver) /= "1.0" -> |
|
|
|
closeStreamWithError StreamUnsupportedVersion Nothing |
|
|
|
closeStreamWithError StreamUnsupportedVersion Nothing |
|
|
|
"Unknown version" |
|
|
|
"Unknown version" |
|
|
|
@ -174,7 +172,7 @@ startStream = runErrorT $ do |
|
|
|
} ) |
|
|
|
} ) |
|
|
|
return () |
|
|
|
return () |
|
|
|
-- Unpickling failed - we investigate the element. |
|
|
|
-- Unpickling failed - we investigate the element. |
|
|
|
Right (Left (Element name attrs _children)) |
|
|
|
Left (Element name attrs _children) |
|
|
|
| (nameLocalName name /= "stream") -> |
|
|
|
| (nameLocalName name /= "stream") -> |
|
|
|
closeStreamWithError StreamInvalidXml Nothing |
|
|
|
closeStreamWithError StreamInvalidXml Nothing |
|
|
|
"Root element is not stream" |
|
|
|
"Root element is not stream" |
|
|
|
@ -236,11 +234,11 @@ flattenAttrs attrs = Prelude.map (\(name, cont) -> |
|
|
|
-- and calls xmppStartStream. |
|
|
|
-- and calls xmppStartStream. |
|
|
|
restartStream :: StateT StreamState IO (Either XmppFailure ()) |
|
|
|
restartStream :: StateT StreamState IO (Either XmppFailure ()) |
|
|
|
restartStream = do |
|
|
|
restartStream = do |
|
|
|
lift $ debugM "Pontarius.XMPP" "Restarting stream..." |
|
|
|
liftIO $ debugM "Pontarius.XMPP" "Restarting stream..." |
|
|
|
raw <- gets (streamReceive . streamHandle) |
|
|
|
raw <- gets (streamReceive . streamHandle) |
|
|
|
let newSource = DCI.ResumableSource (loopRead raw $= XP.parseBytes def) |
|
|
|
let newSource =loopRead raw $= XP.parseBytes def |
|
|
|
(return ()) |
|
|
|
buffered <- liftIO . bufferSrc $ newSource |
|
|
|
modify (\s -> s{streamEventSource = newSource }) |
|
|
|
modify (\s -> s{streamEventSource = buffered }) |
|
|
|
startStream |
|
|
|
startStream |
|
|
|
where |
|
|
|
where |
|
|
|
loopRead rd = do |
|
|
|
loopRead rd = do |
|
|
|
@ -253,6 +251,29 @@ restartStream = do |
|
|
|
yield bs |
|
|
|
yield bs |
|
|
|
loopRead rd |
|
|
|
loopRead rd |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- We buffer sources because we don't want to lose data when multiple |
|
|
|
|
|
|
|
-- xml-entities are sent with the same packet and we don't want to eternally |
|
|
|
|
|
|
|
-- block the StreamState while waiting for data to arrive |
|
|
|
|
|
|
|
bufferSrc :: MonadIO m => Source IO o -> IO (ConduitM i o m ()) |
|
|
|
|
|
|
|
bufferSrc src = do |
|
|
|
|
|
|
|
ref <- newTMVarIO $ DCI.ResumableSource src (return ()) |
|
|
|
|
|
|
|
let go = do |
|
|
|
|
|
|
|
dt <- liftIO $ Ex.bracketOnError (atomically $ takeTMVar ref) |
|
|
|
|
|
|
|
(\_ -> atomically . putTMVar ref $ |
|
|
|
|
|
|
|
DCI.ResumableSource zeroSource |
|
|
|
|
|
|
|
(return ()) |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
(\s -> do |
|
|
|
|
|
|
|
(s', dt) <- s $$++ CL.head |
|
|
|
|
|
|
|
atomically $ putTMVar ref s' |
|
|
|
|
|
|
|
return dt |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
case dt of |
|
|
|
|
|
|
|
Nothing -> return () |
|
|
|
|
|
|
|
Just d -> yield d >> go |
|
|
|
|
|
|
|
return go |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- Reads the (partial) stream:stream and the server features from the stream. |
|
|
|
-- Reads the (partial) stream:stream and the server features from the stream. |
|
|
|
-- Returns the (unvalidated) stream attributes, the unparsed element, or |
|
|
|
-- Returns the (unvalidated) stream attributes, the unparsed element, or |
|
|
|
-- throwError throws a `XmppOtherFailure' (if something other than an element |
|
|
|
-- throwError throws a `XmppOtherFailure' (if something other than an element |
|
|
|
@ -353,14 +374,18 @@ pushElement x = do |
|
|
|
-- HACK: We remove the "jabber:client" namespace because it is set as |
|
|
|
-- HACK: We remove the "jabber:client" namespace because it is set as |
|
|
|
-- default in the stream. This is to make isode's M-LINK server happy and |
|
|
|
-- default in the stream. This is to make isode's M-LINK server happy and |
|
|
|
-- should be removed once jabber.org accepts prefix-free canonicalization |
|
|
|
-- should be removed once jabber.org accepts prefix-free canonicalization |
|
|
|
nsHack e@(Element{elementName = n}) |
|
|
|
|
|
|
|
|
|
|
|
nsHack :: Element -> Element |
|
|
|
|
|
|
|
nsHack e@(Element{elementName = n}) |
|
|
|
| nameNamespace n == Just "jabber:client" = |
|
|
|
| nameNamespace n == Just "jabber:client" = |
|
|
|
e{ elementName = Name (nameLocalName n) Nothing Nothing |
|
|
|
e{ elementName = Name (nameLocalName n) Nothing Nothing |
|
|
|
, elementNodes = map mapNSHack $ elementNodes e |
|
|
|
, elementNodes = map mapNSHack $ elementNodes e |
|
|
|
} |
|
|
|
} |
|
|
|
| otherwise = e |
|
|
|
| otherwise = e |
|
|
|
mapNSHack (NodeElement e) = NodeElement $ nsHack e |
|
|
|
where |
|
|
|
mapNSHack n = n |
|
|
|
mapNSHack :: Node -> Node |
|
|
|
|
|
|
|
mapNSHack (NodeElement el) = NodeElement $ nsHack el |
|
|
|
|
|
|
|
mapNSHack nd = nd |
|
|
|
|
|
|
|
|
|
|
|
-- | Encode and send stanza |
|
|
|
-- | Encode and send stanza |
|
|
|
pushStanza :: Stanza -> Stream -> IO (Either XmppFailure Bool) |
|
|
|
pushStanza :: Stanza -> Stream -> IO (Either XmppFailure Bool) |
|
|
|
@ -384,23 +409,21 @@ pushOpenElement e = do |
|
|
|
|
|
|
|
|
|
|
|
-- `Connect-and-resumes' the given sink to the stream source, and pulls a |
|
|
|
-- `Connect-and-resumes' the given sink to the stream source, and pulls a |
|
|
|
-- `b' value. |
|
|
|
-- `b' value. |
|
|
|
runEventsSink :: Sink Event IO b -> StateT StreamState IO (Either XmppFailure b) |
|
|
|
runEventsSink :: Sink Event IO b -> StateT StreamState IO b |
|
|
|
runEventsSink snk = do -- TODO: Wrap exceptions? |
|
|
|
runEventsSink snk = do -- TODO: Wrap exceptions? |
|
|
|
src <- gets streamEventSource |
|
|
|
src <- gets streamEventSource |
|
|
|
(src', r) <- lift $ src $$++ snk |
|
|
|
r <- liftIO $ src $$ snk |
|
|
|
modify (\s -> s{streamEventSource = src'}) |
|
|
|
return r |
|
|
|
return $ Right r |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
pullElement :: StateT StreamState IO (Either XmppFailure Element) |
|
|
|
pullElement :: StateT StreamState IO (Either XmppFailure Element) |
|
|
|
pullElement = do |
|
|
|
pullElement = do |
|
|
|
ExL.catches (do |
|
|
|
ExL.catches (do |
|
|
|
e <- runEventsSink (elements =$ await) |
|
|
|
e <- runEventsSink (elements =$ await) |
|
|
|
case e of |
|
|
|
case e of |
|
|
|
Left f -> return $ Left f |
|
|
|
Nothing -> do |
|
|
|
Right Nothing -> do |
|
|
|
lift $ errorM "Pontarius.XMPP" "pullElement: Stream ended." |
|
|
|
lift $ errorM "Pontarius.XMPP" "pullElement: No element." |
|
|
|
|
|
|
|
return . Left $ XmppOtherFailure |
|
|
|
return . Left $ XmppOtherFailure |
|
|
|
Right (Just r) -> return $ Right r |
|
|
|
Just r -> return $ Right r |
|
|
|
) |
|
|
|
) |
|
|
|
[ ExL.Handler (\StreamEnd -> return $ Left StreamEndFailure) |
|
|
|
[ ExL.Handler (\StreamEnd -> return $ Left StreamEndFailure) |
|
|
|
, ExL.Handler (\(InvalidXmppXml s) -- Invalid XML `Event' encountered, or missing element close tag |
|
|
|
, ExL.Handler (\(InvalidXmppXml s) -- Invalid XML `Event' encountered, or missing element close tag |
|
|
|
@ -429,7 +452,7 @@ pullUnpickle p = do |
|
|
|
|
|
|
|
|
|
|
|
-- | Pulls a stanza (or stream error) from the stream. |
|
|
|
-- | Pulls a stanza (or stream error) from the stream. |
|
|
|
pullStanza :: Stream -> IO (Either XmppFailure Stanza) |
|
|
|
pullStanza :: Stream -> IO (Either XmppFailure Stanza) |
|
|
|
pullStanza = withStream $ do |
|
|
|
pullStanza = withStream' $ do |
|
|
|
res <- pullUnpickle xpStreamStanza |
|
|
|
res <- pullUnpickle xpStreamStanza |
|
|
|
case res of |
|
|
|
case res of |
|
|
|
Left e -> return $ Left e |
|
|
|
Left e -> return $ Left e |
|
|
|
@ -459,7 +482,7 @@ xmppNoStream = StreamState { |
|
|
|
, streamFlush = return () |
|
|
|
, streamFlush = return () |
|
|
|
, streamClose = return () |
|
|
|
, streamClose = return () |
|
|
|
} |
|
|
|
} |
|
|
|
, streamEventSource = DCI.ResumableSource zeroSource (return ()) |
|
|
|
, streamEventSource = zeroSource |
|
|
|
, streamFeatures = StreamFeatures Nothing [] [] |
|
|
|
, streamFeatures = StreamFeatures Nothing [] [] |
|
|
|
, streamAddress = Nothing |
|
|
|
, streamAddress = Nothing |
|
|
|
, streamFrom = Nothing |
|
|
|
, streamFrom = Nothing |
|
|
|
@ -468,10 +491,10 @@ xmppNoStream = StreamState { |
|
|
|
, streamJid = Nothing |
|
|
|
, streamJid = Nothing |
|
|
|
, streamConfiguration = def |
|
|
|
, streamConfiguration = def |
|
|
|
} |
|
|
|
} |
|
|
|
where |
|
|
|
|
|
|
|
zeroSource :: Source IO output |
|
|
|
zeroSource :: Source IO output |
|
|
|
zeroSource = liftIO $ do |
|
|
|
zeroSource = liftIO $ do |
|
|
|
errorM "Pontarius.Xmpp" "zeroSource utilized." |
|
|
|
errorM "Pontarius.Xmpp" "zeroSource" |
|
|
|
ExL.throwIO XmppOtherFailure |
|
|
|
ExL.throwIO XmppOtherFailure |
|
|
|
|
|
|
|
|
|
|
|
createStream :: HostName -> StreamConfiguration -> ErrorT XmppFailure IO (Stream) |
|
|
|
createStream :: HostName -> StreamConfiguration -> ErrorT XmppFailure IO (Stream) |
|
|
|
@ -482,9 +505,9 @@ createStream realm config = do |
|
|
|
debugM "Pontarius.Xmpp" "Acquired handle." |
|
|
|
debugM "Pontarius.Xmpp" "Acquired handle." |
|
|
|
debugM "Pontarius.Xmpp" "Setting NoBuffering mode on handle." |
|
|
|
debugM "Pontarius.Xmpp" "Setting NoBuffering mode on handle." |
|
|
|
hSetBuffering h NoBuffering |
|
|
|
hSetBuffering h NoBuffering |
|
|
|
let eSource = DCI.ResumableSource |
|
|
|
eSource <- liftIO . bufferSrc $ |
|
|
|
((sourceHandle h $= logConduit) $= XP.parseBytes def) |
|
|
|
(sourceHandle h $= logConduit) $= XP.parseBytes def |
|
|
|
(return ()) |
|
|
|
|
|
|
|
let hand = StreamHandle { streamSend = \d -> catchPush $ BS.hPut h d |
|
|
|
let hand = StreamHandle { streamSend = \d -> catchPush $ BS.hPut h d |
|
|
|
, streamReceive = \n -> BS.hGetSome h n |
|
|
|
, streamReceive = \n -> BS.hGetSome h n |
|
|
|
, streamFlush = hFlush h |
|
|
|
, streamFlush = hFlush h |
|
|
|
@ -791,5 +814,5 @@ withStream' action (Stream stream) = do |
|
|
|
return r |
|
|
|
return r |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
mkStream :: StreamState -> IO (Stream) |
|
|
|
mkStream :: StreamState -> IO Stream |
|
|
|
mkStream con = Stream `fmap` (atomically $ newTMVar con) |
|
|
|
mkStream con = Stream `fmap` atomically (newTMVar con) |
|
|
|
|