diff --git a/source/Network/Xmpp.hs b/source/Network/Xmpp.hs index 4da49ce..abaad61 100644 --- a/source/Network/Xmpp.hs +++ b/source/Network/Xmpp.hs @@ -37,6 +37,7 @@ module Network.Xmpp , startTLS , simpleAuth , auth + , closeConnection , endSession , setConnectionClosedHandler -- * JID diff --git a/source/Network/Xmpp/Bind.hs b/source/Network/Xmpp/Bind.hs index 8963de5..5f9d45e 100644 --- a/source/Network/Xmpp/Bind.hs +++ b/source/Network/Xmpp/Bind.hs @@ -32,16 +32,17 @@ xmppBind :: Maybe Text -> XmppConMonad Jid xmppBind rsrc = do answer <- xmppSendIQ' "bind" Nothing Set Nothing (bindBody rsrc) jid <- case () of () | Right IQResult{iqResultPayload = Just b} <- answer - , Right jid <- unpickleElem jidP b + , Right jid <- unpickleElem xpJid b -> return jid | otherwise -> throw $ StreamXMLError - "Bind couldn't unpickle JID" + ("Bind couldn't unpickle JID from " ++ show answer) modify (\s -> s{sJid = Just jid}) return jid where -- Extracts the character data in the `jid' element. - jidP :: PU [Node] Jid - jidP = xpBind $ xpElemNodes "jid" (xpContent xpPrim) + xpJid :: PU [Node] Jid + xpJid = xpBind $ xpElemNodes jidName (xpContent xpPrim) + jidName = "{urn:ietf:params:xml:ns:xmpp-bind}jid" -- A `bind' element pickler. xpBind :: PU [Node] b -> PU [Node] b diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs index 0d6102c..2cd7652 100644 --- a/source/Network/Xmpp/Concurrent/Monad.hs +++ b/source/Network/Xmpp/Concurrent/Monad.hs @@ -1,7 +1,9 @@ +{-# LANGUAGE OverloadedStrings #-} module Network.Xmpp.Concurrent.Monad where import Network.Xmpp.Types +import Control.Applicative((<$>)) import Control.Concurrent import Control.Concurrent.STM import qualified Control.Exception.Lifted as Ex @@ -244,4 +246,13 @@ endSession = do -- TODO: This has to be idempotent (is it?) -- | Close the connection to the server. closeConnection :: Xmpp () -closeConnection = void $ withConnection xmppKillConnection +closeConnection = Ex.mask_ $ do + write <- asks writeRef + send <- liftIO . atomically $ takeTMVar write + cc <- sCloseConnection <$> (liftIO . atomically . readTMVar =<< asks conStateRef) + liftIO . send $ "" + void . liftIO . forkIO $ do + threadDelay 3000000 + (Ex.try cc) :: IO (Either Ex.SomeException ()) + return () + liftIO . atomically $ putTMVar write (\_ -> return False) diff --git a/source/Network/Xmpp/Concurrent/Threads.hs b/source/Network/Xmpp/Concurrent/Threads.hs index 3dba714..aa1a47a 100644 --- a/source/Network/Xmpp/Concurrent/Threads.hs +++ b/source/Network/Xmpp/Concurrent/Threads.hs @@ -54,7 +54,10 @@ readWorker messageC presenceC stanzaC iqHands handlers stateRef = [ Ex.Handler $ \(Interrupt t) -> do void $ handleInterrupts [t] return Nothing - , Ex.Handler $ \(e :: StreamError) -> noCon handlers e + , Ex.Handler $ \(e :: StreamError) -> do + hands <- atomically $ readTVar handlers + _ <- forkIO $ connectionClosedHandler hands e + return Nothing ] liftIO . atomically $ do case res of @@ -139,10 +142,14 @@ writeWorker stCh writeR = forever $ do takeTMVar writeR <*> readTChan stCh r <- write $ renderElement (pickleElem xpStanza next) - unless r $ do -- If the writing failed, the connection is dead. - atomically $ unGetTChan stCh next + atomically $ putTMVar writeR write + unless r $ do + atomically $ unGetTChan stCh next -- If the writing failed, the + -- connection is dead. threadDelay 250000 -- Avoid free spinning. - atomically $ putTMVar writeR write -- Put it back. + + + -- Two streams: input and output. Threads read from input stream and write to -- output stream. @@ -236,4 +243,4 @@ connPersist lock = forever $ do pushBS <- atomically $ takeTMVar lock _ <- pushBS " " atomically $ putTMVar lock pushBS - threadDelay 30000000 + threadDelay 30000000 -- 30s diff --git a/source/Network/Xmpp/Marshal.hs b/source/Network/Xmpp/Marshal.hs index 481ad78..d7e0ae8 100644 --- a/source/Network/Xmpp/Marshal.hs +++ b/source/Network/Xmpp/Marshal.hs @@ -206,4 +206,4 @@ xpStreamError = xpWrap ) (xpOption xpElemVerbatim) -- Application specific error conditions ) - ) \ No newline at end of file + ) diff --git a/source/Network/Xmpp/Monad.hs b/source/Network/Xmpp/Monad.hs index f7062f9..0f7d078 100644 --- a/source/Network/Xmpp/Monad.hs +++ b/source/Network/Xmpp/Monad.hs @@ -64,13 +64,16 @@ pullToSink snk = do pullElement :: XmppConMonad Element pullElement = do - Ex.catch (do + Ex.catches (do e <- pullToSink (elements =$ CL.head) case e of Nothing -> liftIO $ Ex.throwIO StreamConnectionError Just r -> return r ) - (\(InvalidEventStream s) -> liftIO . Ex.throwIO $ StreamXMLError s) + [ Ex.Handler (\StreamEnd -> Ex.throwIO StreamStreamEnd) + , Ex.Handler (\(InvalidEventStream s) + -> liftIO . Ex.throwIO $ StreamXMLError s) + ] -- Pulls an element and unpickles it. pullPickle :: PU [Node] a -> XmppConMonad a @@ -95,6 +98,7 @@ catchPush p = Ex.catch (p >> return True) (\e -> case GIE.ioe_type e of GIE.ResourceVanished -> return False + GIE.IllegalOperation -> return False _ -> Ex.throwIO e ) @@ -143,11 +147,12 @@ xmppNewSession :: XmppConMonad a -> IO (a, XmppConnection) xmppNewSession action = runStateT action xmppNoConnection -- Closes the connection and updates the XmppConMonad XmppConnection state. -xmppKillConnection :: XmppConMonad () +xmppKillConnection :: XmppConMonad (Either Ex.SomeException ()) xmppKillConnection = do cc <- gets sCloseConnection - void . liftIO $ (Ex.try cc :: IO (Either Ex.SomeException ())) + err <- liftIO $ (Ex.try cc :: IO (Either Ex.SomeException ())) put xmppNoConnection + return err -- Sends an IQ request and waits for the response. If the response ID does not -- match the outgoing ID, an error is thrown. diff --git a/source/Network/Xmpp/Types.hs b/source/Network/Xmpp/Types.hs index 0dcd4ee..ce892f1 100644 --- a/source/Network/Xmpp/Types.hs +++ b/source/Network/Xmpp/Types.hs @@ -533,6 +533,7 @@ data XmppStreamError = XmppStreamError data StreamError = StreamError XmppStreamError | StreamWrongVersion Text | StreamXMLError String -- If stream pickling goes wrong. + | StreamStreamEnd -- received closing stream tag | StreamConnectionError deriving (Show, Eq, Typeable) diff --git a/source/Text/XML/Stream/Elements.hs b/source/Text/XML/Stream/Elements.hs index 72cf910..740a2f6 100644 --- a/source/Text/XML/Stream/Elements.hs +++ b/source/Text/XML/Stream/Elements.hs @@ -1,22 +1,26 @@ {-# OPTIONS_HADDOCK hide #-} +{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE OverloadedStrings #-} module Text.XML.Stream.Elements where import Control.Applicative ((<$>)) +import Control.Exception import Control.Monad.Trans.Class import Control.Monad.Trans.Resource as R import qualified Data.ByteString as BS +import Data.Conduit as C +import Data.Conduit.List as CL import qualified Data.Text as Text import qualified Data.Text.Encoding as Text +import Data.Typeable import Data.XML.Types -import qualified Text.XML.Stream.Render as TXSR -import Text.XML.Unresolved as TXU - -import Data.Conduit as C -import Data.Conduit.List as CL import System.IO.Unsafe(unsafePerformIO) +import qualified Text.XML.Stream.Render as TXSR +import Text.XML.Unresolved as TXU + compressNodes :: [Node] -> [Node] compressNodes [] = [] compressNodes [x] = [x] @@ -24,6 +28,13 @@ compressNodes (NodeContent (ContentText x) : NodeContent (ContentText y) : z) = compressNodes $ NodeContent (ContentText $ x `Text.append` y) : z compressNodes (x:xs) = x : compressNodes xs +streamName :: Name +streamName = + (Name "stream" (Just "http://etherx.jabber.org/streams") (Just "stream")) + +data StreamEnd = StreamEnd deriving (Typeable, Show) +instance Exception StreamEnd + elements :: R.MonadThrow m => C.Conduit Event m Element elements = do x <- C.await @@ -31,6 +42,7 @@ elements = do Just (EventBeginElement n as) -> do goE n as >>= C.yield elements + Just (EventEndElement streamName) -> lift $ R.monadThrow StreamEnd Nothing -> return () _ -> lift $ R.monadThrow $ InvalidEventStream $ "not an element: " ++ show x where diff --git a/tests/Tests.hs b/tests/Tests.hs index 7f5b3c1..de3d4bb 100644 --- a/tests/Tests.hs +++ b/tests/Tests.hs @@ -3,6 +3,7 @@ module Example where import Control.Concurrent import Control.Concurrent.STM +import qualified Control.Exception.Lifted as Ex import Control.Monad import Control.Monad.IO.Class @@ -17,7 +18,7 @@ import Network.Xmpp.IM.Presence import Network.Xmpp.Pickle import System.Environment -import Text.XML.Stream.Elements +import Text.XML.Stream.Elements testUser1 :: Jid testUser1 = read "testuser1@species64739.dyndns.org/bot1" @@ -114,19 +115,20 @@ runMain debug number = do debug . (("Thread " ++ show number ++ ":") ++) wait <- newEmptyTMVarIO withNewSession $ do - setSessionEndHandler (liftIO . atomically $ putTMVar wait ()) setConnectionClosedHandler (\e -> do liftIO (debug' $ "connection lost because " ++ show e) endSession ) debug' "running" - withConnection $ do + withConnection $ Ex.catch (do connect "localhost" "species64739.dyndns.org" startTLS exampleParams - saslResponse <- auth (fromJust $ localpart we) "pwd" (resourcepart we) + saslResponse <- simpleAuth + (fromJust $ localpart we) "pwd" (resourcepart we) case saslResponse of Right _ -> return () Left e -> error $ show e - debug' "session standing" + debug' "session standing") + (\e -> liftIO (print (e ::Ex.SomeException) >> Ex.throwIO e) ) sendPresence presenceOnline fork autoAccept sendPresence $ presenceSubscribe them @@ -148,7 +150,7 @@ runMain debug number = do sendUser "All tests done" debug' "ending session" liftIO . atomically $ putTMVar wait () - endSession + closeConnection liftIO . atomically $ takeTMVar wait return () return ()