From 4e24d6f16a2865e58b2b8d4f8315d8c5cec74cbc Mon Sep 17 00:00:00 2001
From: Philipp Balzarek
Date: Wed, 13 Jun 2012 18:18:06 +0200
Subject: [PATCH] do proper disconnects
---
source/Network/Xmpp.hs | 1 +
source/Network/Xmpp/Bind.hs | 9 +++++----
source/Network/Xmpp/Concurrent/Monad.hs | 13 ++++++++++++-
source/Network/Xmpp/Concurrent/Threads.hs | 17 ++++++++++++-----
source/Network/Xmpp/Marshal.hs | 2 +-
source/Network/Xmpp/Monad.hs | 13 +++++++++----
source/Network/Xmpp/Types.hs | 1 +
source/Text/XML/Stream/Elements.hs | 22 +++++++++++++++++-----
tests/Tests.hs | 14 ++++++++------
9 files changed, 66 insertions(+), 26 deletions(-)
diff --git a/source/Network/Xmpp.hs b/source/Network/Xmpp.hs
index 4da49ce..abaad61 100644
--- a/source/Network/Xmpp.hs
+++ b/source/Network/Xmpp.hs
@@ -37,6 +37,7 @@ module Network.Xmpp
, startTLS
, simpleAuth
, auth
+ , closeConnection
, endSession
, setConnectionClosedHandler
-- * JID
diff --git a/source/Network/Xmpp/Bind.hs b/source/Network/Xmpp/Bind.hs
index 8963de5..5f9d45e 100644
--- a/source/Network/Xmpp/Bind.hs
+++ b/source/Network/Xmpp/Bind.hs
@@ -32,16 +32,17 @@ xmppBind :: Maybe Text -> XmppConMonad Jid
xmppBind rsrc = do
answer <- xmppSendIQ' "bind" Nothing Set Nothing (bindBody rsrc)
jid <- case () of () | Right IQResult{iqResultPayload = Just b} <- answer
- , Right jid <- unpickleElem jidP b
+ , Right jid <- unpickleElem xpJid b
-> return jid
| otherwise -> throw $ StreamXMLError
- "Bind couldn't unpickle JID"
+ ("Bind couldn't unpickle JID from " ++ show answer)
modify (\s -> s{sJid = Just jid})
return jid
where
-- Extracts the character data in the `jid' element.
- jidP :: PU [Node] Jid
- jidP = xpBind $ xpElemNodes "jid" (xpContent xpPrim)
+ xpJid :: PU [Node] Jid
+ xpJid = xpBind $ xpElemNodes jidName (xpContent xpPrim)
+ jidName = "{urn:ietf:params:xml:ns:xmpp-bind}jid"
-- A `bind' element pickler.
xpBind :: PU [Node] b -> PU [Node] b
diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs
index 0d6102c..2cd7652 100644
--- a/source/Network/Xmpp/Concurrent/Monad.hs
+++ b/source/Network/Xmpp/Concurrent/Monad.hs
@@ -1,7 +1,9 @@
+{-# LANGUAGE OverloadedStrings #-}
module Network.Xmpp.Concurrent.Monad where
import Network.Xmpp.Types
+import Control.Applicative((<$>))
import Control.Concurrent
import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
@@ -244,4 +246,13 @@ endSession = do -- TODO: This has to be idempotent (is it?)
-- | Close the connection to the server.
closeConnection :: Xmpp ()
-closeConnection = void $ withConnection xmppKillConnection
+closeConnection = Ex.mask_ $ do
+ write <- asks writeRef
+ send <- liftIO . atomically $ takeTMVar write
+ cc <- sCloseConnection <$> (liftIO . atomically . readTMVar =<< asks conStateRef)
+ liftIO . send $ ""
+ void . liftIO . forkIO $ do
+ threadDelay 3000000
+ (Ex.try cc) :: IO (Either Ex.SomeException ())
+ return ()
+ liftIO . atomically $ putTMVar write (\_ -> return False)
diff --git a/source/Network/Xmpp/Concurrent/Threads.hs b/source/Network/Xmpp/Concurrent/Threads.hs
index 3dba714..aa1a47a 100644
--- a/source/Network/Xmpp/Concurrent/Threads.hs
+++ b/source/Network/Xmpp/Concurrent/Threads.hs
@@ -54,7 +54,10 @@ readWorker messageC presenceC stanzaC iqHands handlers stateRef =
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
- , Ex.Handler $ \(e :: StreamError) -> noCon handlers e
+ , Ex.Handler $ \(e :: StreamError) -> do
+ hands <- atomically $ readTVar handlers
+ _ <- forkIO $ connectionClosedHandler hands e
+ return Nothing
]
liftIO . atomically $ do
case res of
@@ -139,10 +142,14 @@ writeWorker stCh writeR = forever $ do
takeTMVar writeR <*>
readTChan stCh
r <- write $ renderElement (pickleElem xpStanza next)
- unless r $ do -- If the writing failed, the connection is dead.
- atomically $ unGetTChan stCh next
+ atomically $ putTMVar writeR write
+ unless r $ do
+ atomically $ unGetTChan stCh next -- If the writing failed, the
+ -- connection is dead.
threadDelay 250000 -- Avoid free spinning.
- atomically $ putTMVar writeR write -- Put it back.
+
+
+
-- Two streams: input and output. Threads read from input stream and write to
-- output stream.
@@ -236,4 +243,4 @@ connPersist lock = forever $ do
pushBS <- atomically $ takeTMVar lock
_ <- pushBS " "
atomically $ putTMVar lock pushBS
- threadDelay 30000000
+ threadDelay 30000000 -- 30s
diff --git a/source/Network/Xmpp/Marshal.hs b/source/Network/Xmpp/Marshal.hs
index 481ad78..d7e0ae8 100644
--- a/source/Network/Xmpp/Marshal.hs
+++ b/source/Network/Xmpp/Marshal.hs
@@ -206,4 +206,4 @@ xpStreamError = xpWrap
)
(xpOption xpElemVerbatim) -- Application specific error conditions
)
- )
\ No newline at end of file
+ )
diff --git a/source/Network/Xmpp/Monad.hs b/source/Network/Xmpp/Monad.hs
index f7062f9..0f7d078 100644
--- a/source/Network/Xmpp/Monad.hs
+++ b/source/Network/Xmpp/Monad.hs
@@ -64,13 +64,16 @@ pullToSink snk = do
pullElement :: XmppConMonad Element
pullElement = do
- Ex.catch (do
+ Ex.catches (do
e <- pullToSink (elements =$ CL.head)
case e of
Nothing -> liftIO $ Ex.throwIO StreamConnectionError
Just r -> return r
)
- (\(InvalidEventStream s) -> liftIO . Ex.throwIO $ StreamXMLError s)
+ [ Ex.Handler (\StreamEnd -> Ex.throwIO StreamStreamEnd)
+ , Ex.Handler (\(InvalidEventStream s)
+ -> liftIO . Ex.throwIO $ StreamXMLError s)
+ ]
-- Pulls an element and unpickles it.
pullPickle :: PU [Node] a -> XmppConMonad a
@@ -95,6 +98,7 @@ catchPush p = Ex.catch
(p >> return True)
(\e -> case GIE.ioe_type e of
GIE.ResourceVanished -> return False
+ GIE.IllegalOperation -> return False
_ -> Ex.throwIO e
)
@@ -143,11 +147,12 @@ xmppNewSession :: XmppConMonad a -> IO (a, XmppConnection)
xmppNewSession action = runStateT action xmppNoConnection
-- Closes the connection and updates the XmppConMonad XmppConnection state.
-xmppKillConnection :: XmppConMonad ()
+xmppKillConnection :: XmppConMonad (Either Ex.SomeException ())
xmppKillConnection = do
cc <- gets sCloseConnection
- void . liftIO $ (Ex.try cc :: IO (Either Ex.SomeException ()))
+ err <- liftIO $ (Ex.try cc :: IO (Either Ex.SomeException ()))
put xmppNoConnection
+ return err
-- Sends an IQ request and waits for the response. If the response ID does not
-- match the outgoing ID, an error is thrown.
diff --git a/source/Network/Xmpp/Types.hs b/source/Network/Xmpp/Types.hs
index 0dcd4ee..ce892f1 100644
--- a/source/Network/Xmpp/Types.hs
+++ b/source/Network/Xmpp/Types.hs
@@ -533,6 +533,7 @@ data XmppStreamError = XmppStreamError
data StreamError = StreamError XmppStreamError
| StreamWrongVersion Text
| StreamXMLError String -- If stream pickling goes wrong.
+ | StreamStreamEnd -- received closing stream tag
| StreamConnectionError
deriving (Show, Eq, Typeable)
diff --git a/source/Text/XML/Stream/Elements.hs b/source/Text/XML/Stream/Elements.hs
index 72cf910..740a2f6 100644
--- a/source/Text/XML/Stream/Elements.hs
+++ b/source/Text/XML/Stream/Elements.hs
@@ -1,22 +1,26 @@
{-# OPTIONS_HADDOCK hide #-}
+{-# LANGUAGE DeriveDataTypeable #-}
+{-# LANGUAGE OverloadedStrings #-}
module Text.XML.Stream.Elements where
import Control.Applicative ((<$>))
+import Control.Exception
import Control.Monad.Trans.Class
import Control.Monad.Trans.Resource as R
import qualified Data.ByteString as BS
+import Data.Conduit as C
+import Data.Conduit.List as CL
import qualified Data.Text as Text
import qualified Data.Text.Encoding as Text
+import Data.Typeable
import Data.XML.Types
-import qualified Text.XML.Stream.Render as TXSR
-import Text.XML.Unresolved as TXU
-
-import Data.Conduit as C
-import Data.Conduit.List as CL
import System.IO.Unsafe(unsafePerformIO)
+import qualified Text.XML.Stream.Render as TXSR
+import Text.XML.Unresolved as TXU
+
compressNodes :: [Node] -> [Node]
compressNodes [] = []
compressNodes [x] = [x]
@@ -24,6 +28,13 @@ compressNodes (NodeContent (ContentText x) : NodeContent (ContentText y) : z) =
compressNodes $ NodeContent (ContentText $ x `Text.append` y) : z
compressNodes (x:xs) = x : compressNodes xs
+streamName :: Name
+streamName =
+ (Name "stream" (Just "http://etherx.jabber.org/streams") (Just "stream"))
+
+data StreamEnd = StreamEnd deriving (Typeable, Show)
+instance Exception StreamEnd
+
elements :: R.MonadThrow m => C.Conduit Event m Element
elements = do
x <- C.await
@@ -31,6 +42,7 @@ elements = do
Just (EventBeginElement n as) -> do
goE n as >>= C.yield
elements
+ Just (EventEndElement streamName) -> lift $ R.monadThrow StreamEnd
Nothing -> return ()
_ -> lift $ R.monadThrow $ InvalidEventStream $ "not an element: " ++ show x
where
diff --git a/tests/Tests.hs b/tests/Tests.hs
index 7f5b3c1..de3d4bb 100644
--- a/tests/Tests.hs
+++ b/tests/Tests.hs
@@ -3,6 +3,7 @@ module Example where
import Control.Concurrent
import Control.Concurrent.STM
+import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.IO.Class
@@ -17,7 +18,7 @@ import Network.Xmpp.IM.Presence
import Network.Xmpp.Pickle
import System.Environment
-import Text.XML.Stream.Elements
+import Text.XML.Stream.Elements
testUser1 :: Jid
testUser1 = read "testuser1@species64739.dyndns.org/bot1"
@@ -114,19 +115,20 @@ runMain debug number = do
debug . (("Thread " ++ show number ++ ":") ++)
wait <- newEmptyTMVarIO
withNewSession $ do
- setSessionEndHandler (liftIO . atomically $ putTMVar wait ())
setConnectionClosedHandler (\e -> do
liftIO (debug' $ "connection lost because " ++ show e)
endSession )
debug' "running"
- withConnection $ do
+ withConnection $ Ex.catch (do
connect "localhost" "species64739.dyndns.org"
startTLS exampleParams
- saslResponse <- auth (fromJust $ localpart we) "pwd" (resourcepart we)
+ saslResponse <- simpleAuth
+ (fromJust $ localpart we) "pwd" (resourcepart we)
case saslResponse of
Right _ -> return ()
Left e -> error $ show e
- debug' "session standing"
+ debug' "session standing")
+ (\e -> liftIO (print (e ::Ex.SomeException) >> Ex.throwIO e) )
sendPresence presenceOnline
fork autoAccept
sendPresence $ presenceSubscribe them
@@ -148,7 +150,7 @@ runMain debug number = do
sendUser "All tests done"
debug' "ending session"
liftIO . atomically $ putTMVar wait ()
- endSession
+ closeConnection
liftIO . atomically $ takeTMVar wait
return ()
return ()