From f73eec93d9961a763bdfd0fdac3664434a9da3cb Mon Sep 17 00:00:00 2001
From: Philipp Balzarek
Date: Sat, 5 May 2012 11:25:23 +0200
Subject: [PATCH] stopp reader from reading from a dead connection rename
elementFromEvents to elements and transform it to a conduit add proper
failing case for pullElement change pushing to dead connection to throw away
element rather than throw an exception
---
src/Network/XMPP/Concurrent/Monad.hs | 5 ++--
src/Network/XMPP/Concurrent/Threads.hs | 10 ++++---
src/Network/XMPP/Monad.hs | 9 +++++--
src/Network/XMPP/Stream.hs | 36 ++++++++++++++------------
src/Network/XMPP/Types.hs | 1 +
src/Text/XML/Stream/Elements.hs | 34 ++++++++++++------------
6 files changed, 54 insertions(+), 41 deletions(-)
diff --git a/src/Network/XMPP/Concurrent/Monad.hs b/src/Network/XMPP/Concurrent/Monad.hs
index 748ed9f..91dad05 100644
--- a/src/Network/XMPP/Concurrent/Monad.hs
+++ b/src/Network/XMPP/Concurrent/Monad.hs
@@ -174,9 +174,8 @@ withConnection a = do
Ex.catch ( do
(res, s') <- runStateT a s
atomically $ do
- _ <- tryPutTMVar write (sConPushBS s')
- _ <- tryPutTMVar stateRef s'
- return ()
+ putTMVar write (sConPushBS s')
+ putTMVar stateRef s'
return res
)
-- we treat all Exceptions as fatal
diff --git a/src/Network/XMPP/Concurrent/Threads.hs b/src/Network/XMPP/Concurrent/Threads.hs
index f6e397f..aaa5d47 100644
--- a/src/Network/XMPP/Concurrent/Threads.hs
+++ b/src/Network/XMPP/Concurrent/Threads.hs
@@ -47,9 +47,13 @@ readWorker messageC presenceC handlers stateRef =
res <- liftIO $ Ex.catch ( do
-- we don't know whether pull will
-- necessarily be interruptible
- s <- liftIO . atomically $ readTMVar stateRef
+ s <- liftIO . atomically $ do
+ sr <- readTMVar stateRef
+ when (sConnectionState sr == XmppConnectionClosed)
+ retry
+ return sr
allowInterrupt
- Just <$> runStateT pullStanza s
+ Just . fst <$> runStateT pullStanza s
)
(\(Interrupt t) -> do
void $ handleInterrupts [t]
@@ -58,7 +62,7 @@ readWorker messageC presenceC handlers stateRef =
liftIO . atomically $ do
case res of
Nothing -> return ()
- Just (sta, _s) -> do
+ Just sta -> do
case sta of
MessageS m -> do writeTChan messageC $ Right m
_ <- readTChan messageC -- Sic!
diff --git a/src/Network/XMPP/Monad.hs b/src/Network/XMPP/Monad.hs
index 6621f12..278ab56 100644
--- a/src/Network/XMPP/Monad.hs
+++ b/src/Network/XMPP/Monad.hs
@@ -13,6 +13,7 @@ import Control.Monad.State.Strict
import Data.ByteString as BS
import Data.Conduit
+import qualified Data.Conduit.List as CL
import Data.Conduit.BufferedSource
import Data.Conduit.Binary as CB
import Data.Text(Text)
@@ -50,7 +51,11 @@ pullSink snk = do
return r
pullElement :: XMPPConMonad Element
-pullElement = pullSink elementFromEvents
+pullElement = do
+ e <- pullSink (elements =$ CL.head)
+ case e of
+ Nothing -> liftIO $ Ex.throwIO XmppNoConnection
+ Just r -> return r
pullPickle :: PU [Node] a -> XMPPConMonad a
pullPickle p = do
@@ -94,7 +99,7 @@ xmppNoConnection :: XmppConnection
xmppNoConnection = XmppConnection
{ sConSrc = zeroSource
, sRawSrc = zeroSource
- , sConPushBS = \_ -> Ex.throwIO $ XmppNoConnection
+ , sConPushBS = \_ -> return ()
, sConHandle = Nothing
, sFeatures = SF Nothing [] []
, sConnectionState = XmppConnectionClosed
diff --git a/src/Network/XMPP/Stream.hs b/src/Network/XMPP/Stream.hs
index 80f3462..8c5d890 100644
--- a/src/Network/XMPP/Stream.hs
+++ b/src/Network/XMPP/Stream.hs
@@ -3,23 +3,24 @@
module Network.XMPP.Stream where
-import Control.Monad.Error
-import Control.Monad.State.Strict
+import qualified Control.Exception as Ex
+import Control.Monad.Error
+import Control.Monad.State.Strict
-import Data.Conduit
-import Data.Conduit.BufferedSource
-import Data.Conduit.List as CL
-import Data.Text as T
-import Data.XML.Pickle
-import Data.XML.Types
-import Data.Void(Void)
+import Data.Conduit
+import Data.Conduit.BufferedSource
+import Data.Conduit.List as CL
+import Data.Text as T
+import Data.XML.Pickle
+import Data.XML.Types
+import Data.Void(Void)
-import Network.XMPP.Monad
-import Network.XMPP.Pickle
-import Network.XMPP.Types
+import Network.XMPP.Monad
+import Network.XMPP.Pickle
+import Network.XMPP.Types
-import Text.XML.Stream.Elements
-import Text.XML.Stream.Parse as XP
+import Text.XML.Stream.Elements
+import Text.XML.Stream.Parse as XP
-- import Text.XML.Stream.Elements
@@ -82,8 +83,11 @@ xmppStreamHeader = do
xmppStreamFeatures :: StreamSink ServerFeatures
-xmppStreamFeatures = streamUnpickleElem pickleStreamFeatures
- =<< lift elementFromEvents
+xmppStreamFeatures = do
+ e <- lift $ elements =$ CL.head
+ case e of
+ Nothing -> liftIO $ Ex.throwIO XmppNoConnection
+ Just r -> streamUnpickleElem pickleStreamFeatures r
-- Pickling
diff --git a/src/Network/XMPP/Types.hs b/src/Network/XMPP/Types.hs
index 1c86f07..e71f9b2 100644
--- a/src/Network/XMPP/Types.hs
+++ b/src/Network/XMPP/Types.hs
@@ -714,6 +714,7 @@ data XmppConnectionState = XmppConnectionClosed -- ^ No connection at
| XmppConnectionSecured -- ^ Connection
-- established and
-- secured via TLS
+ deriving (Show, Eq, Typeable)
data XmppConnection = XmppConnection
{ sConSrc :: Source IO Event
, sRawSrc :: Source IO BS.ByteString
diff --git a/src/Text/XML/Stream/Elements.hs b/src/Text/XML/Stream/Elements.hs
index 4be9ff6..72cf910 100644
--- a/src/Text/XML/Stream/Elements.hs
+++ b/src/Text/XML/Stream/Elements.hs
@@ -24,11 +24,14 @@ compressNodes (NodeContent (ContentText x) : NodeContent (ContentText y) : z) =
compressNodes $ NodeContent (ContentText $ x `Text.append` y) : z
compressNodes (x:xs) = x : compressNodes xs
-elementFromEvents :: R.MonadThrow m => C.Sink Event m Element
-elementFromEvents = do
- x <- CL.peek
+elements :: R.MonadThrow m => C.Conduit Event m Element
+elements = do
+ x <- C.await
case x of
- Just (EventBeginElement n as) -> goE n as
+ Just (EventBeginElement n as) -> do
+ goE n as >>= C.yield
+ elements
+ Nothing -> return ()
_ -> lift $ R.monadThrow $ InvalidEventStream $ "not an element: " ++ show x
where
many' f =
@@ -37,25 +40,22 @@ elementFromEvents = do
go front = do
x <- f
case x of
- Nothing -> return $ front []
- Just y -> go (front . (:) y)
- dropReturn x = CL.drop 1 >> return x
+ Left x -> return $ (x, front [])
+ Right y -> go (front . (:) y)
goE n as = do
- CL.drop 1
- ns <- many' goN
- y <- CL.head
+ (y, ns) <- many' goN
if y == Just (EventEndElement n)
then return $ Element n as $ compressNodes ns
else lift $ R.monadThrow $ InvalidEventStream $ "Missing end element for " ++ show n ++ ", got: " ++ show y
goN = do
- x <- CL.peek
+ x <- await
case x of
- Just (EventBeginElement n as) -> (Just . NodeElement) <$> goE n as
- Just (EventInstruction i) -> dropReturn $ Just $ NodeInstruction i
- Just (EventContent c) -> dropReturn $ Just $ NodeContent c
- Just (EventComment t) -> dropReturn $ Just $ NodeComment t
- Just (EventCDATA t) -> dropReturn $ Just $ NodeContent $ ContentText t
- _ -> return Nothing
+ Just (EventBeginElement n as) -> (Right . NodeElement) <$> goE n as
+ Just (EventInstruction i) -> return $ Right $ NodeInstruction i
+ Just (EventContent c) -> return $ Right $ NodeContent c
+ Just (EventComment t) -> return $ Right $ NodeComment t
+ Just (EventCDATA t) -> return $ Right $ NodeContent $ ContentText t
+ _ -> return $ Left x
openElementToEvents :: Element -> [Event]