Browse Source

stopp reader from reading from a dead connection

rename elementFromEvents to elements and transform it to a conduit
add proper failing case for pullElement
change pushing to dead connection to throw away element rather than throw an exception
master
Philipp Balzarek 14 years ago
parent
commit
f73eec93d9
  1. 5
      src/Network/XMPP/Concurrent/Monad.hs
  2. 10
      src/Network/XMPP/Concurrent/Threads.hs
  3. 9
      src/Network/XMPP/Monad.hs
  4. 8
      src/Network/XMPP/Stream.hs
  5. 1
      src/Network/XMPP/Types.hs
  6. 34
      src/Text/XML/Stream/Elements.hs

5
src/Network/XMPP/Concurrent/Monad.hs

@ -174,9 +174,8 @@ withConnection a = do
Ex.catch ( do Ex.catch ( do
(res, s') <- runStateT a s (res, s') <- runStateT a s
atomically $ do atomically $ do
_ <- tryPutTMVar write (sConPushBS s') putTMVar write (sConPushBS s')
_ <- tryPutTMVar stateRef s' putTMVar stateRef s'
return ()
return res return res
) )
-- we treat all Exceptions as fatal -- we treat all Exceptions as fatal

10
src/Network/XMPP/Concurrent/Threads.hs

@ -47,9 +47,13 @@ readWorker messageC presenceC handlers stateRef =
res <- liftIO $ Ex.catch ( do res <- liftIO $ Ex.catch ( do
-- we don't know whether pull will -- we don't know whether pull will
-- necessarily be interruptible -- necessarily be interruptible
s <- liftIO . atomically $ readTMVar stateRef s <- liftIO . atomically $ do
sr <- readTMVar stateRef
when (sConnectionState sr == XmppConnectionClosed)
retry
return sr
allowInterrupt allowInterrupt
Just <$> runStateT pullStanza s Just . fst <$> runStateT pullStanza s
) )
(\(Interrupt t) -> do (\(Interrupt t) -> do
void $ handleInterrupts [t] void $ handleInterrupts [t]
@ -58,7 +62,7 @@ readWorker messageC presenceC handlers stateRef =
liftIO . atomically $ do liftIO . atomically $ do
case res of case res of
Nothing -> return () Nothing -> return ()
Just (sta, _s) -> do Just sta -> do
case sta of case sta of
MessageS m -> do writeTChan messageC $ Right m MessageS m -> do writeTChan messageC $ Right m
_ <- readTChan messageC -- Sic! _ <- readTChan messageC -- Sic!

9
src/Network/XMPP/Monad.hs

@ -13,6 +13,7 @@ import Control.Monad.State.Strict
import Data.ByteString as BS import Data.ByteString as BS
import Data.Conduit import Data.Conduit
import qualified Data.Conduit.List as CL
import Data.Conduit.BufferedSource import Data.Conduit.BufferedSource
import Data.Conduit.Binary as CB import Data.Conduit.Binary as CB
import Data.Text(Text) import Data.Text(Text)
@ -50,7 +51,11 @@ pullSink snk = do
return r return r
pullElement :: XMPPConMonad Element pullElement :: XMPPConMonad Element
pullElement = pullSink elementFromEvents pullElement = do
e <- pullSink (elements =$ CL.head)
case e of
Nothing -> liftIO $ Ex.throwIO XmppNoConnection
Just r -> return r
pullPickle :: PU [Node] a -> XMPPConMonad a pullPickle :: PU [Node] a -> XMPPConMonad a
pullPickle p = do pullPickle p = do
@ -94,7 +99,7 @@ xmppNoConnection :: XmppConnection
xmppNoConnection = XmppConnection xmppNoConnection = XmppConnection
{ sConSrc = zeroSource { sConSrc = zeroSource
, sRawSrc = zeroSource , sRawSrc = zeroSource
, sConPushBS = \_ -> Ex.throwIO $ XmppNoConnection , sConPushBS = \_ -> return ()
, sConHandle = Nothing , sConHandle = Nothing
, sFeatures = SF Nothing [] [] , sFeatures = SF Nothing [] []
, sConnectionState = XmppConnectionClosed , sConnectionState = XmppConnectionClosed

8
src/Network/XMPP/Stream.hs

@ -3,6 +3,7 @@
module Network.XMPP.Stream where module Network.XMPP.Stream where
import qualified Control.Exception as Ex
import Control.Monad.Error import Control.Monad.Error
import Control.Monad.State.Strict import Control.Monad.State.Strict
@ -82,8 +83,11 @@ xmppStreamHeader = do
xmppStreamFeatures :: StreamSink ServerFeatures xmppStreamFeatures :: StreamSink ServerFeatures
xmppStreamFeatures = streamUnpickleElem pickleStreamFeatures xmppStreamFeatures = do
=<< lift elementFromEvents e <- lift $ elements =$ CL.head
case e of
Nothing -> liftIO $ Ex.throwIO XmppNoConnection
Just r -> streamUnpickleElem pickleStreamFeatures r
-- Pickling -- Pickling

1
src/Network/XMPP/Types.hs

@ -714,6 +714,7 @@ data XmppConnectionState = XmppConnectionClosed -- ^ No connection at
| XmppConnectionSecured -- ^ Connection | XmppConnectionSecured -- ^ Connection
-- established and -- established and
-- secured via TLS -- secured via TLS
deriving (Show, Eq, Typeable)
data XmppConnection = XmppConnection data XmppConnection = XmppConnection
{ sConSrc :: Source IO Event { sConSrc :: Source IO Event
, sRawSrc :: Source IO BS.ByteString , sRawSrc :: Source IO BS.ByteString

34
src/Text/XML/Stream/Elements.hs

@ -24,11 +24,14 @@ compressNodes (NodeContent (ContentText x) : NodeContent (ContentText y) : z) =
compressNodes $ NodeContent (ContentText $ x `Text.append` y) : z compressNodes $ NodeContent (ContentText $ x `Text.append` y) : z
compressNodes (x:xs) = x : compressNodes xs compressNodes (x:xs) = x : compressNodes xs
elementFromEvents :: R.MonadThrow m => C.Sink Event m Element elements :: R.MonadThrow m => C.Conduit Event m Element
elementFromEvents = do elements = do
x <- CL.peek x <- C.await
case x of case x of
Just (EventBeginElement n as) -> goE n as Just (EventBeginElement n as) -> do
goE n as >>= C.yield
elements
Nothing -> return ()
_ -> lift $ R.monadThrow $ InvalidEventStream $ "not an element: " ++ show x _ -> lift $ R.monadThrow $ InvalidEventStream $ "not an element: " ++ show x
where where
many' f = many' f =
@ -37,25 +40,22 @@ elementFromEvents = do
go front = do go front = do
x <- f x <- f
case x of case x of
Nothing -> return $ front [] Left x -> return $ (x, front [])
Just y -> go (front . (:) y) Right y -> go (front . (:) y)
dropReturn x = CL.drop 1 >> return x
goE n as = do goE n as = do
CL.drop 1 (y, ns) <- many' goN
ns <- many' goN
y <- CL.head
if y == Just (EventEndElement n) if y == Just (EventEndElement n)
then return $ Element n as $ compressNodes ns then return $ Element n as $ compressNodes ns
else lift $ R.monadThrow $ InvalidEventStream $ "Missing end element for " ++ show n ++ ", got: " ++ show y else lift $ R.monadThrow $ InvalidEventStream $ "Missing end element for " ++ show n ++ ", got: " ++ show y
goN = do goN = do
x <- CL.peek x <- await
case x of case x of
Just (EventBeginElement n as) -> (Just . NodeElement) <$> goE n as Just (EventBeginElement n as) -> (Right . NodeElement) <$> goE n as
Just (EventInstruction i) -> dropReturn $ Just $ NodeInstruction i Just (EventInstruction i) -> return $ Right $ NodeInstruction i
Just (EventContent c) -> dropReturn $ Just $ NodeContent c Just (EventContent c) -> return $ Right $ NodeContent c
Just (EventComment t) -> dropReturn $ Just $ NodeComment t Just (EventComment t) -> return $ Right $ NodeComment t
Just (EventCDATA t) -> dropReturn $ Just $ NodeContent $ ContentText t Just (EventCDATA t) -> return $ Right $ NodeContent $ ContentText t
_ -> return Nothing _ -> return $ Left x
openElementToEvents :: Element -> [Event] openElementToEvents :: Element -> [Event]

Loading…
Cancel
Save