From 34e80d8fe2cc72d2d5fba22a1359946b55b24f25 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sat, 1 Jun 2013 17:41:36 +0200 Subject: [PATCH 01/12] export setConnectionClosedHandler --- source/Network/Xmpp.hs | 1 + source/Network/Xmpp/Concurrent/Monad.hs | 8 +++++--- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/source/Network/Xmpp.hs b/source/Network/Xmpp.hs index 0c4ef44..48a7e89 100644 --- a/source/Network/Xmpp.hs +++ b/source/Network/Xmpp.hs @@ -27,6 +27,7 @@ module Network.Xmpp ( -- * Session management Session , session + , setConnectionClosedHandler , StreamConfiguration(..) , SessionConfiguration(..) , ConnectionDetails(..) diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs index a7e5b19..06c0711 100644 --- a/source/Network/Xmpp/Concurrent/Monad.hs +++ b/source/Network/Xmpp/Concurrent/Monad.hs @@ -70,9 +70,11 @@ modifyHandlers f session = atomically $ modifyTVar_ (eventHandlers session) f x <- readTVar var writeTVar var (g x) --- | Sets the handler to be executed when the server connection is closed. -setConnectionClosedHandler_ :: (XmppFailure -> Session -> IO ()) -> Session -> IO () -setConnectionClosedHandler_ eh session = do +-- | Changes the handler to be executed when the server connection is closed. To +-- avoid race conditions the initial value should be set in the configuration +-- when creating the session +setConnectionClosedHandler :: (XmppFailure -> Session -> IO ()) -> Session -> IO () +setConnectionClosedHandler eh session = do modifyHandlers (\s -> s{connectionClosedHandler = \e -> eh e session}) session From 2aab7710e18e1664b433d84778617b881c2a8782 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sat, 1 Jun 2013 17:42:54 +0200 Subject: [PATCH 02/12] move SessionConfiguration to Network.Xmpp.Concurrent.Types --- source/Network/Xmpp/Concurrent/Types.hs | 29 ++++++++++++++++++++++++- source/Network/Xmpp/Types.hs | 26 ---------------------- 2 files changed, 28 insertions(+), 27 deletions(-) diff --git a/source/Network/Xmpp/Concurrent/Types.hs b/source/Network/Xmpp/Concurrent/Types.hs index c2b97b8..9491b93 100644 --- a/source/Network/Xmpp/Concurrent/Types.hs +++ b/source/Network/Xmpp/Concurrent/Types.hs @@ -7,14 +7,41 @@ import Control.Concurrent import Control.Concurrent.STM import qualified Control.Exception.Lifted as Ex import qualified Data.ByteString as BS +import Data.Default import qualified Data.Map as Map import Data.Text (Text) +import qualified Data.Text as Text import Data.Typeable import Data.XML.Types (Element) - import Network.Xmpp.IM.Roster.Types import Network.Xmpp.Types + +-- | Configuration for the @Session@ object. +data SessionConfiguration = SessionConfiguration + { -- | Configuration for the @Stream@ object. + sessionStreamConfiguration :: StreamConfiguration + -- | Handler to be run when the session ends (for whatever reason). + , onConnectionClosed :: XmppFailure -> IO () + -- | Function to generate the stream of stanza identifiers. + , sessionStanzaIDs :: IO (IO StanzaID) + , extraStanzaHandlers :: [StanzaHandler] + , enableRoster :: Bool + } + +instance Default SessionConfiguration where + def = SessionConfiguration { sessionStreamConfiguration = def + , onConnectionClosed = \_ -> return () + , sessionStanzaIDs = do + idRef <- newTVarIO 1 + return . atomically $ do + curId <- readTVar idRef + writeTVar idRef (curId + 1 :: Integer) + return . StanzaID . Text.pack . show $ curId + , extraStanzaHandlers = [] + , enableRoster = True + } + -- | Handlers to be run when the Xmpp session ends and when the Xmpp connection is -- closed. data EventHandlers = EventHandlers diff --git a/source/Network/Xmpp/Types.hs b/source/Network/Xmpp/Types.hs index d93bb70..4817d0e 100644 --- a/source/Network/Xmpp/Types.hs +++ b/source/Network/Xmpp/Types.hs @@ -49,7 +49,6 @@ module Network.Xmpp.Types , jidFromTexts , StreamEnd(..) , InvalidXmppXml(..) - , SessionConfiguration(..) , TlsBehaviour(..) ) where @@ -1088,31 +1087,6 @@ type StanzaHandler = TChan Stanza -- ^ outgoing stanza -> Stanza -- ^ stanza to handle -> IO Bool -- ^ True when processing should continue --- | Configuration for the @Session@ object. -data SessionConfiguration = SessionConfiguration - { -- | Configuration for the @Stream@ object. - sessionStreamConfiguration :: StreamConfiguration - -- | Handler to be run when the session ends (for whatever reason). - , sessionClosedHandler :: XmppFailure -> IO () - -- | Function to generate the stream of stanza identifiers. - , sessionStanzaIDs :: IO (IO StanzaID) - , extraStanzaHandlers :: [StanzaHandler] - , enableRoster :: Bool - } - -instance Default SessionConfiguration where - def = SessionConfiguration { sessionStreamConfiguration = def - , sessionClosedHandler = \_ -> return () - , sessionStanzaIDs = do - idRef <- newTVarIO 1 - return . atomically $ do - curId <- readTVar idRef - writeTVar idRef (curId + 1 :: Integer) - return . StanzaID . Text.pack . show $ curId - , extraStanzaHandlers = [] - , enableRoster = True - } - -- | How the client should behave in regards to TLS. data TlsBehaviour = RequireTls -- ^ Require the use of TLS; disconnect if it's -- not offered. From 3ac2079f2761a80930fc235a27742c7c86c2eba0 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sat, 1 Jun 2013 17:45:33 +0200 Subject: [PATCH 03/12] add Finished state to ConnectionState --- source/Network/Xmpp/Stream.hs | 37 ++++++++++++++++++++--------------- source/Network/Xmpp/Tls.hs | 7 +++++-- source/Network/Xmpp/Types.hs | 1 + 3 files changed, 27 insertions(+), 18 deletions(-) diff --git a/source/Network/Xmpp/Stream.hs b/source/Network/Xmpp/Stream.hs index 6bc9832..a3df8ea 100644 --- a/source/Network/Xmpp/Stream.hs +++ b/source/Network/Xmpp/Stream.hs @@ -122,11 +122,12 @@ startStream = runErrorT $ do -- state of the stream. let expectedTo = case ( streamConnectionState st , toJid $ streamConfiguration st) of - (Plain , (Just (jid, True))) -> Just jid - (Plain , _ ) -> Nothing - (Secured, (Just (jid, _ ))) -> Just jid - (Secured, Nothing ) -> Nothing - (Closed , _ ) -> Nothing + (Plain , (Just (jid, True))) -> Just jid + (Plain , _ ) -> Nothing + (Secured , (Just (jid, _ ))) -> Just jid + (Secured , Nothing ) -> Nothing + (Closed , _ ) -> Nothing + (Finished , _ ) -> Nothing case streamAddress st of Nothing -> do lift $ lift $ errorM "Pontarius.XMPP" "Server sent no hostname." @@ -338,6 +339,7 @@ closeStreams' = do threadDelay 3000000 -- TODO: Configurable value void ((Ex.try cc) :: IO (Either Ex.SomeException ())) return () + put xmppNoStream{ streamConnectionState = Finished } collectElems [] where -- Pulls elements from the stream until the stream ends, or an error is @@ -470,18 +472,21 @@ catchPush p = ExL.catch _ -> ExL.throwIO e ) +zeroHandle :: StreamHandle +zeroHandle = StreamHandle { streamSend = \_ -> return False + , streamReceive = \_ -> do + errorM "Pontarius.XMPP" + "xmppNoStream: Stream is closed." + ExL.throwIO XmppOtherFailure + , streamFlush = return () + , streamClose = return () + } + -- Stream state used when there is no connection. xmppNoStream :: StreamState xmppNoStream = StreamState { streamConnectionState = Closed - , streamHandle = StreamHandle { streamSend = \_ -> return False - , streamReceive = \_ -> do - errorM "Pontarius.XMPP" "xmppNoStream: No stream on receive." - ExL.throwIO $ - XmppOtherFailure - , streamFlush = return () - , streamClose = return () - } + , streamHandle = zeroHandle , streamEventSource = zeroSource , streamFeatures = StreamFeatures Nothing [] [] , streamAddress = Nothing @@ -705,14 +710,14 @@ srvLookup realm resolvSeed = ErrorT $ do rest <- orderSublist sublist'' return $ ((priority, weight, port, domain):rest) --- Closes the connection and updates the XmppConMonad Stream state. --- killStream :: Stream -> IO (Either ExL.SomeException ()) +-- | Close the connection and updates the XmppConMonad Stream state. Does +-- not send the stream end tag. killStream :: Stream -> IO (Either XmppFailure ()) killStream = withStream $ do cc <- gets (streamClose . streamHandle) err <- wrapIOException cc -- (ExL.try cc :: IO (Either ExL.SomeException ())) - put xmppNoStream + put xmppNoStream{ streamConnectionState = Finished } return err -- Sends an IQ request and waits for the response. If the response ID does not diff --git a/source/Network/Xmpp/Tls.hs b/source/Network/Xmpp/Tls.hs index f3b8a4e..e477eff 100644 --- a/source/Network/Xmpp/Tls.hs +++ b/source/Network/Xmpp/Tls.hs @@ -51,10 +51,13 @@ tls con = Ex.handle (return . Left . TlsError) case sState of Plain -> return () Closed -> do - liftIO $ errorM "Pontarius.Xmpp" "startTls: The stream is closed." + liftIO $ errorM "Pontarius.Xmpp.Tls" "The stream is closed." + throwError XmppNoStream + Finished -> do + liftIO $ errorM "Pontarius.Xmpp.Tls" "The stream is finished." throwError XmppNoStream Secured -> do - liftIO $ errorM "Pontarius.Xmpp" "startTls: The stream is already secured." + liftIO $ errorM "Pontarius.Xmpp.Tls" "The stream is already secured." throwError TlsStreamSecured features <- lift $ gets streamFeatures case (tlsBehaviour conf, streamTls features) of diff --git a/source/Network/Xmpp/Types.hs b/source/Network/Xmpp/Types.hs index 4817d0e..f33166c 100644 --- a/source/Network/Xmpp/Types.hs +++ b/source/Network/Xmpp/Types.hs @@ -803,6 +803,7 @@ data ConnectionState = Closed -- ^ No stream has been established | Plain -- ^ Stream established, but not secured via TLS | Secured -- ^ Stream established and secured via TLS + | Finished -- ^ Stream is closed deriving (Show, Eq, Typeable) -- | Defines operations for sending, receiving, flushing, and closing on a From 414aa33f8664c584830a12e547f9ebb028fbc7d6 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sat, 1 Jun 2013 17:46:28 +0200 Subject: [PATCH 04/12] change loggers in Network.Xmpp.Tls to log to Pontarius.Xmpp.Tls (was Pontarius.Xmpp) --- source/Network/Xmpp/Tls.hs | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/source/Network/Xmpp/Tls.hs b/source/Network/Xmpp/Tls.hs index e477eff..548d07b 100644 --- a/source/Network/Xmpp/Tls.hs +++ b/source/Network/Xmpp/Tls.hs @@ -70,13 +70,13 @@ tls con = Ex.handle (return . Left . TlsError) (RefuseTls , Just True) -> throwError XmppOtherFailure (RefuseTls , _ ) -> skipTls where - skipTls = liftIO $ infoM "Pontarius.Xmpp" "Skipping TLS negotiation" + skipTls = liftIO $ infoM "Pontarius.Xmpp.Tls" "Skipping TLS negotiation" startTls = do - liftIO $ infoM "Pontarius.Xmpp" "Running StartTLS" + liftIO $ infoM "Pontarius.Xmpp.Tls" "Running StartTLS" params <- gets $ tlsParams . streamConfiguration sent <- ErrorT $ pushElement starttlsE unless sent $ do - liftIO $ errorM "Pontarius.Xmpp" "startTls: Could not sent stanza." + liftIO $ errorM "Pontarius.Xmpp.Tls" "Could not sent stanza." throwError XmppOtherFailure answer <- lift $ pullElement case answer of @@ -87,8 +87,8 @@ tls con = Ex.handle (return . Left . TlsError) liftIO $ errorM "Pontarius.Xmpp" "startTls: TLS initiation failed." throwError XmppOtherFailure Right r -> - liftIO $ errorM "Pontarius.Xmpp" $ - "startTls: Unexpected element: " ++ show r + liftIO $ errorM "Pontarius.Xmpp.Tls" $ + "Unexpected element: " ++ show r hand <- gets streamHandle (_raw, _snk, psh, recv, ctx) <- lift $ tlsinit params (mkBackend hand) let newHand = StreamHandle { streamSend = catchPush . psh @@ -97,7 +97,7 @@ tls con = Ex.handle (return . Left . TlsError) , streamClose = bye ctx >> streamClose hand } lift $ modify ( \x -> x {streamHandle = newHand}) - liftIO $ infoM "Pontarius.Xmpp" "Stream Secured." + liftIO $ infoM "Pontarius.Xmpp.Tls" "Stream Secured." either (lift . Ex.throwIO) return =<< lift restartStream modify (\s -> s{streamConnectionState = Secured}) return () @@ -119,13 +119,13 @@ tlsinit :: (MonadIO m, MonadIO m1) => , Context ) tlsinit params backend = do - liftIO $ debugM "Pontarius.Xmpp.TLS" "TLS with debug mode enabled." + liftIO $ debugM "Pontarius.Xmpp.Tls" "TLS with debug mode enabled." gen <- liftIO $ getSystemRandomGen -- TODO: Find better random source? con <- client params gen backend handshake con let src = forever $ do dt <- liftIO $ recvData con - liftIO $ debugM "Pontarius.Xmpp.TLS" ("In :" ++ BSC8.unpack dt) + liftIO $ debugM "Pontarius.Xmpp.Tls" ("In :" ++ BSC8.unpack dt) yield dt let snk = do d <- await From 24a97e84ebfa9c0cccc20dbae60458aa22ca3569 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sat, 1 Jun 2013 17:47:06 +0200 Subject: [PATCH 05/12] remove duplicate export of InstantMessage in Network.Xmpp.IM --- source/Network/Xmpp/IM.hs | 1 - 1 file changed, 1 deletion(-) diff --git a/source/Network/Xmpp/IM.hs b/source/Network/Xmpp/IM.hs index 2109d30..fd88e05 100644 --- a/source/Network/Xmpp/IM.hs +++ b/source/Network/Xmpp/IM.hs @@ -6,7 +6,6 @@ module Network.Xmpp.IM , MessageBody(..) , MessageThread(..) , MessageSubject(..) - , InstantMessage (..) , Subscription(..) , instantMessage , simpleIM From c5f888615237c70624698f67a998b9e934dfc454 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sat, 1 Jun 2013 17:48:44 +0200 Subject: [PATCH 06/12] rename sessionClosedHandler configuration option to onConnectionClosed --- source/Network/Xmpp/Concurrent.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/Network/Xmpp/Concurrent.hs b/source/Network/Xmpp/Concurrent.hs index 24ced41..aa02fe0 100644 --- a/source/Network/Xmpp/Concurrent.hs +++ b/source/Network/Xmpp/Concurrent.hs @@ -114,7 +114,7 @@ newSession stream config = runErrorT $ do outC <- lift newTChanIO stanzaChan <- lift newTChanIO iqHands <- lift $ newTVarIO (Map.empty, Map.empty) - eh <- lift $ newTVarIO $ EventHandlers { connectionClosedHandler = sessionClosedHandler config } + eh <- lift $ newTVarIO $ EventHandlers { connectionClosedHandler = onConnectionClosed config } ros <- liftIO . newTVarIO $ Roster Nothing Map.empty let rosterH = if (enableRoster config) then handleRoster ros else \ _ _ -> return True From c56f9c88cd0015865728204cdfe6e539e3744f95 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sat, 1 Jun 2013 17:49:17 +0200 Subject: [PATCH 07/12] fix documentation type in Network.Xmpp.Types --- source/Network/Xmpp/Types.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/Network/Xmpp/Types.hs b/source/Network/Xmpp/Types.hs index f33166c..4bb5589 100644 --- a/source/Network/Xmpp/Types.hs +++ b/source/Network/Xmpp/Types.hs @@ -1041,7 +1041,7 @@ data InvalidXmppXml = InvalidXmppXml String deriving (Show, Typeable) instance Exception InvalidXmppXml data ConnectionDetails = UseRealm -- ^ Use realm to resolv host - | UseSrv HostName -- ^ Use this hostname for a SRC lookup + | UseSrv HostName -- ^ Use this hostname for a SRV lookup | UseHost HostName PortID -- ^ Use specified host -- | Configuration settings related to the stream. From f0d558a5281d9808509b2cf1e942e5e482b41694 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sat, 1 Jun 2013 17:50:21 +0200 Subject: [PATCH 08/12] reinstate withConnection and fix closeConnection --- source/Network/Xmpp/Concurrent/Monad.hs | 105 ++++++++++++------------ 1 file changed, 52 insertions(+), 53 deletions(-) diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs index 06c0711..858dce8 100644 --- a/source/Network/Xmpp/Concurrent/Monad.hs +++ b/source/Network/Xmpp/Concurrent/Monad.hs @@ -2,61 +2,63 @@ {-# LANGUAGE OverloadedStrings #-} module Network.Xmpp.Concurrent.Monad where -import Network.Xmpp.Types - +import Control.Concurrent import Control.Concurrent.STM import qualified Control.Exception.Lifted as Ex import Control.Monad.Reader - +import Control.Monad.State import Network.Xmpp.Concurrent.Types import Network.Xmpp.Stream - - +import Network.Xmpp.Types -- TODO: Wait for presence error? --- -- | Run an XmppConMonad action in isolation. Reader and writer workers will be --- -- temporarily stopped and resumed with the new session details once the action --- -- returns. The action will run in the calling thread. Any uncaught exceptions --- -- will be interpreted as connection failure. +-- | Run an XmppConMonad action in isolation. Reader and writer workers will be +-- temporarily stopped and resumed with the new session details once the action +-- returns. The action will run in the calling thread. Any uncaught exceptions +-- will be interpreted as connection failure. -- withConnection :: XmppConMonad a -> Context -> IO (Either StreamError a) --- withConnection a session = do --- wait <- newEmptyTMVarIO --- Ex.mask_ $ do --- -- Suspends the reader until the lock (wait) is released (set to `()'). --- throwTo (readerThread session) $ Interrupt wait --- -- We acquire the write and stateRef locks, to make sure that this is --- -- the only thread that can write to the stream and to perform a --- -- withConnection calculation. Afterwards, we release the lock and --- -- fetches an updated state. --- s <- Ex.catch --- (atomically $ do --- _ <- takeTMVar (writeRef session) --- s <- takeTMVar (conStateRef session) --- putTMVar wait () --- return s --- ) --- -- If we catch an exception, we have failed to take the MVars above. --- (\e -> atomically (putTMVar wait ()) >> --- Ex.throwIO (e :: Ex.SomeException) --- ) --- -- Run the XmppMonad action, save the (possibly updated) states, release --- -- the locks, and return the result. --- Ex.catches --- (do --- (res, s') <- runStateT a s --- atomically $ do --- putTMVar (writeRef session) (cSend . sCon $ s') --- putTMVar (conStateRef session) s' --- return $ Right res --- ) --- -- We treat all Exceptions as fatal. If we catch a StreamError, we --- -- return it. Otherwise, we throw an exception. --- [ Ex.Handler $ \e -> return $ Left (e :: StreamError) --- , Ex.Handler $ \e -> runStateT xmppKillConnection s --- >> Ex.throwIO (e :: Ex.SomeException) --- ] +withConnection :: (Stream -> IO (b, Stream)) + -> Session + -> IO (Either XmppFailure b) +withConnection a session = do + wait <- newEmptyTMVarIO + Ex.mask_ $ do + -- Suspends the reader until the lock (wait) is released (set to `()'). + throwTo (readerThread session) $ Interrupt wait + -- We acquire the write and stateRef locks, to make sure that this is + -- the only thread that can write to the stream and to perform a + -- withConnection calculation. Afterwards, we release the lock and + -- fetches an updated state. + s <- Ex.catch + (atomically $ do + _ <- takeTMVar (writeRef session) + s <- takeTMVar (streamRef session) + putTMVar wait () + return s + ) + -- If we catch an exception, we have failed to take the MVars above. + (\e -> atomically (putTMVar wait ()) >> + Ex.throwIO (e :: Ex.SomeException) + ) + -- Run the XmppMonad action, save the (possibly updated) states, release + -- the locks, and return the result. + Ex.catches + (do + (res, s') <- a s + wl <- withStream' (gets $ streamSend . streamHandle) s' + atomically $ do + putTMVar (writeRef session) wl + putTMVar (streamRef session) s' + return $ Right res + ) + -- We treat all Exceptions as fatal. If we catch a StreamError, we + -- return it. Otherwise, we throw an exception. + [ Ex.Handler $ \e -> return $ Left (e :: XmppFailure) + , Ex.Handler $ \e -> killStream s + >> Ex.throwIO (e :: Ex.SomeException) + ] -- | Executes a function to update the event handlers. modifyHandlers :: (EventHandlers -> EventHandlers) -> Session -> IO () @@ -86,16 +88,13 @@ runHandler h session = h =<< atomically (readTVar $ eventHandlers session) -- | End the current Xmpp session. endSession :: Session -> IO () endSession session = do -- TODO: This has to be idempotent (is it?) - closeConnection session + _ <- closeConnection session stopThreads session -- | Close the connection to the server. Closes the stream (by enforcing a -- write lock and sending a element), waits (blocks) for three -- seconds, and then closes the connection. -closeConnection :: Session -> IO () -closeConnection session = Ex.mask_ $ do - (_send, connection) <- atomically $ liftM2 (,) - (takeTMVar $ writeRef session) - (takeTMVar $ streamRef session) - _ <- closeStreams connection - return () +closeConnection :: Session -> IO (Either XmppFailure ()) +closeConnection = withConnection $ \stream -> do + _ <- closeStreams stream + return ((), stream) From 53b73bc905d268693731e1d5a4c996b00eebc89b Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sat, 1 Jun 2013 17:51:59 +0200 Subject: [PATCH 09/12] allow read Worker to close streams on read failure and prevent it from stopping itself in this case --- source/Network/Xmpp/Concurrent/Threads.hs | 73 ++++++++++++++--------- 1 file changed, 45 insertions(+), 28 deletions(-) diff --git a/source/Network/Xmpp/Concurrent/Threads.hs b/source/Network/Xmpp/Concurrent/Threads.hs index 81b3867..11db1c6 100644 --- a/source/Network/Xmpp/Concurrent/Threads.hs +++ b/source/Network/Xmpp/Concurrent/Threads.hs @@ -23,37 +23,50 @@ import System.Log.Logger readWorker :: (Stanza -> IO ()) -> (XmppFailure -> IO ()) -> TMVar Stream - -> IO () -readWorker onStanza onConnectionClosed stateRef = Ex.mask_ go - where - go = do - res <- Ex.catches ( do - -- we don't know whether pull will - -- necessarily be interruptible - s <- atomically $ do + -> IO a +readWorker onStanza onConnectionClosed stateRef = forever . Ex.mask_ $ do + + s' <- Ex.catches ( do + -- we don't know whether pull will + -- necessarily be interruptible + atomically $ do s@(Stream con) <- readTMVar stateRef scs <- streamConnectionState <$> readTMVar con - when (scs == Closed) + when (stateIsClosed scs) retry - return s - allowInterrupt - Just <$> pullStanza s - ) - [ Ex.Handler $ \(Interrupt t) -> do - void $ handleInterrupts [t] - return Nothing - , Ex.Handler $ \(e :: XmppFailure) -> do - onConnectionClosed e - errorM "Pontarius.Xmpp" $ "Read error: " ++ show e - return Nothing - ] - case res of - Nothing -> go -- Caught an exception, nothing to do. TODO: Can this happen? - Just (Left e) -> do - infoM "Pontarius.Xmpp.Reader" $ - "Connection died: " ++ show e - onConnectionClosed e - Just (Right sta) -> onStanza sta >> go + return $ Just s + ) + [ Ex.Handler $ \(Interrupt t) -> do + void $ handleInterrupts [t] + return Nothing + + ] + case s' of + Nothing -> return () + Just s -> do + res <- Ex.catches (do + allowInterrupt + Just <$> pullStanza s + ) + [ Ex.Handler $ \(Interrupt t) -> do + void $ handleInterrupts [t] + return Nothing + , Ex.Handler $ \(e :: XmppFailure) -> do + errorM "Pontarius.Xmpp" $ "Read error: " + ++ show e + closeStreams s + onConnectionClosed e + return Nothing + ] + case res of + Nothing -> return () -- Caught an exception, nothing to + -- do. TODO: Can this happen? + Just (Left e) -> do + errorM "Pontarius.Xmpp" $ "Stanza error:" ++ show e + closeStreams s + onConnectionClosed e + Just (Right sta) -> void $ onStanza sta + where -- Defining an Control.Exception.allowInterrupt equivalent for GHC 7 -- compatibility. allowInterrupt :: IO () @@ -67,6 +80,10 @@ readWorker onStanza onConnectionClosed stateRef = Ex.mask_ go handleInterrupts ts = Ex.catch (atomically $ forM ts takeTMVar) (\(Interrupt t) -> handleInterrupts (t:ts)) + stateIsClosed Closed = True + stateIsClosed Finished = True + stateIsClosed _ = False + -- Two streams: input and output. Threads read from input stream and write to -- output stream. From e0821567de6ca80d644eacc28a342e2ba325b47b Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sat, 1 Jun 2013 19:47:59 +0200 Subject: [PATCH 10/12] run connectionClosedHandler on calls of closeConnection --- source/Network/Xmpp/Concurrent/Monad.hs | 22 +++++++++++++++------- 1 file changed, 15 insertions(+), 7 deletions(-) diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs index 858dce8..343a588 100644 --- a/source/Network/Xmpp/Concurrent/Monad.hs +++ b/source/Network/Xmpp/Concurrent/Monad.hs @@ -2,16 +2,15 @@ {-# LANGUAGE OverloadedStrings #-} module Network.Xmpp.Concurrent.Monad where +import Control.Applicative ((<$>)) import Control.Concurrent import Control.Concurrent.STM import qualified Control.Exception.Lifted as Ex -import Control.Monad.Reader import Control.Monad.State import Network.Xmpp.Concurrent.Types import Network.Xmpp.Stream import Network.Xmpp.Types - -- TODO: Wait for presence error? -- | Run an XmppConMonad action in isolation. Reader and writer workers will be @@ -80,6 +79,11 @@ setConnectionClosedHandler eh session = do modifyHandlers (\s -> s{connectionClosedHandler = \e -> eh e session}) session +runConnectionClosedHandler :: Session -> XmppFailure -> IO () +runConnectionClosedHandler session e = do + h <- connectionClosedHandler <$> atomically (readTVar $ eventHandlers session) + h e + -- | Run an event handler. runHandler :: (EventHandlers -> IO a) -> Session -> IO a runHandler h session = h =<< atomically (readTVar $ eventHandlers session) @@ -88,13 +92,17 @@ runHandler h session = h =<< atomically (readTVar $ eventHandlers session) -- | End the current Xmpp session. endSession :: Session -> IO () endSession session = do -- TODO: This has to be idempotent (is it?) - _ <- closeConnection session + _ <- flip withConnection session $ \stream -> do + _ <- closeStreams stream + return ((), stream) stopThreads session -- | Close the connection to the server. Closes the stream (by enforcing a -- write lock and sending a element), waits (blocks) for three -- seconds, and then closes the connection. -closeConnection :: Session -> IO (Either XmppFailure ()) -closeConnection = withConnection $ \stream -> do - _ <- closeStreams stream - return ((), stream) +closeConnection :: Session -> IO () +closeConnection session = do + _ <-flip withConnection session $ \stream -> do + _ <- closeStreams stream + return ((), stream) + runConnectionClosedHandler session StreamEndFailure From 6c2830f8521385f2235015263c77e107eb2ba155 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sat, 1 Jun 2013 19:48:47 +0200 Subject: [PATCH 11/12] fix log messages in Network.Xmpp.Stream (XMPP => Xmpp) --- source/Network/Xmpp/Stream.hs | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/source/Network/Xmpp/Stream.hs b/source/Network/Xmpp/Stream.hs index a3df8ea..c158d1c 100644 --- a/source/Network/Xmpp/Stream.hs +++ b/source/Network/Xmpp/Stream.hs @@ -130,7 +130,7 @@ startStream = runErrorT $ do (Finished , _ ) -> Nothing case streamAddress st of Nothing -> do - lift $ lift $ errorM "Pontarius.XMPP" "Server sent no hostname." + lift $ lift $ errorM "Pontarius.Xmpp" "Server sent no hostname." throwError XmppOtherFailure Just address -> do pushing pushXmlDecl @@ -195,7 +195,7 @@ startStream = runErrorT $ do void . lift . pushElement . pickleElem xpStreamError $ StreamErrorInfo sec Nothing el void . lift $ closeStreams' - liftIO $ errorM "Pontarius.XMPP" $ "closeStreamWithError: " ++ msg + liftIO $ errorM "Pontarius.Xmpp" $ "closeStreamWithError: " ++ msg throwError XmppOtherFailure checkchildren children = let to' = lookup "to" children @@ -235,7 +235,7 @@ flattenAttrs attrs = Prelude.map (\(name, cont) -> -- and calls xmppStartStream. restartStream :: StateT StreamState IO (Either XmppFailure ()) restartStream = do - liftIO $ debugM "Pontarius.XMPP" "Restarting stream..." + liftIO $ debugM "Pontarius.Xmpp" "Restarting stream..." raw <- gets (streamReceive . streamHandle) let newSource =loopRead raw $= XP.parseBytes def buffered <- liftIO . bufferSrc $ newSource @@ -310,7 +310,7 @@ streamS _expectedTo = do -- TODO: check expectedTo e <- lift $ elements =$ CL.head case e of Nothing -> do - lift $ lift $ errorM "Pontarius.XMPP" "streamS: Stream ended." + lift $ lift $ errorM "Pontarius.Xmpp" "streamS: Stream ended." throwError XmppOtherFailure Just r -> streamUnpickleElem xpStreamFeatures r @@ -318,7 +318,7 @@ streamS _expectedTo = do -- TODO: check expectedTo -- realm. openStream :: HostName -> StreamConfiguration -> IO (Either XmppFailure (Stream)) openStream realm config = runErrorT $ do - lift $ debugM "Pontarius.XMPP" "Opening stream..." + lift $ debugM "Pontarius.Xmpp" "Opening stream..." stream' <- createStream realm config ErrorT . liftIO $ withStream startStream stream' return stream' @@ -331,7 +331,7 @@ closeStreams = withStream closeStreams' closeStreams' :: StateT StreamState IO (Either XmppFailure [Element]) closeStreams' = do - lift $ debugM "Pontarius.XMPP" "Closing stream..." + lift $ debugM "Pontarius.Xmpp" "Closing stream..." send <- gets (streamSend . streamHandle) cc <- gets (streamClose . streamHandle) void . liftIO $ send "" @@ -363,7 +363,7 @@ wrapIOException action = do case r of Right b -> return $ Right b Left e -> do - lift $ warningM "Pontarius.XMPP" $ "wrapIOException: Exception wrapped: " ++ (show e) + lift $ warningM "Pontarius.Xmpp" $ "wrapIOException: Exception wrapped: " ++ (show e) return $ Left $ XmppIOException e pushElement :: Element -> StateT StreamState IO (Either XmppFailure Bool) @@ -423,18 +423,18 @@ pullElement = do e <- runEventsSink (elements =$ await) case e of Nothing -> do - lift $ errorM "Pontarius.XMPP" "pullElement: Stream ended." + lift $ errorM "Pontarius.Xmpp" "pullElement: Stream ended." return . Left $ XmppOtherFailure Just r -> return $ Right r ) [ ExL.Handler (\StreamEnd -> return $ Left StreamEndFailure) , ExL.Handler (\(InvalidXmppXml s) -- Invalid XML `Event' encountered, or missing element close tag -> do - lift $ errorM "Pontarius.XMPP" $ "pullElement: Invalid XML: " ++ (show s) + lift $ errorM "Pontarius.Xmpp" $ "pullElement: Invalid XML: " ++ (show s) return . Left $ XmppOtherFailure) , ExL.Handler $ \(e :: InvalidEventStream) -> do - lift $ errorM "Pontarius.XMPP" $ "pullElement: Invalid event stream: " ++ (show e) + lift $ errorM "Pontarius.Xmpp" $ "pullElement: Invalid event stream: " ++ (show e) return . Left $ XmppOtherFailure ] @@ -448,7 +448,7 @@ pullUnpickle p = do let res = unpickleElem p elem' case res of Left e -> do - lift $ errorM "Pontarius.XMPP" $ "pullUnpickle: Unpickle failed: " ++ (ppUnpickleError e) + lift $ errorM "Pontarius.Xmpp" $ "pullUnpickle: Unpickle failed: " ++ (ppUnpickleError e) return . Left $ XmppOtherFailure Right r -> return $ Right r @@ -475,7 +475,7 @@ catchPush p = ExL.catch zeroHandle :: StreamHandle zeroHandle = StreamHandle { streamSend = \_ -> return False , streamReceive = \_ -> do - errorM "Pontarius.XMPP" + errorM "Pontarius.Xmpp" "xmppNoStream: Stream is closed." ExL.throwIO XmppOtherFailure , streamFlush = return () @@ -499,7 +499,7 @@ xmppNoStream = StreamState { zeroSource :: Source IO output zeroSource = liftIO $ do - errorM "Pontarius.Xmpp" "zeroSource" + debugM "Pontarius.Xmpp" "zeroSource" ExL.throwIO XmppOtherFailure createStream :: HostName -> StreamConfiguration -> ErrorT XmppFailure IO (Stream) @@ -739,13 +739,13 @@ pushIQ iqID to tp lang body stream = runErrorT $ do Right (IQResultS r) -> do unless (iqID == iqResultID r) $ liftIO $ do - liftIO $ errorM "Pontarius.XMPP" $ "pushIQ: ID mismatch (" ++ (show iqID) ++ " /= " ++ (show $ iqResultID r) ++ ")." + liftIO $ errorM "Pontarius.Xmpp" $ "pushIQ: ID mismatch (" ++ (show iqID) ++ " /= " ++ (show $ iqResultID r) ++ ")." liftIO $ ExL.throwIO XmppOtherFailure -- TODO: Log: ("In sendIQ' IDs don't match: " ++ show iqID ++ -- " /= " ++ show (iqResultID r) ++ " .") return $ Right r _ -> do - liftIO $ errorM "Pontarius.XMPP" $ "pushIQ: Unexpected stanza type." + liftIO $ errorM "Pontarius.Xmpp" $ "pushIQ: Unexpected stanza type." throwError XmppOtherFailure debugConduit :: (Show o, MonadIO m) => ConduitM o o m b @@ -753,7 +753,7 @@ debugConduit = forever $ do s' <- await case s' of Just s -> do - liftIO $ debugM "Pontarius.XMPP" $ "debugConduit: In: " ++ (show s) + liftIO $ debugM "Pontarius.Xmpp" $ "debugConduit: In: " ++ (show s) yield s Nothing -> return () From 5ad6c7f7c084a962b332c3b3badef6bf90f66642 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Fri, 7 Jun 2013 16:29:45 +0200 Subject: [PATCH 12/12] add a test case for closed connection --- tests/Tests.hs | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/tests/Tests.hs b/tests/Tests.hs index ba7dadb..c1fa991 100644 --- a/tests/Tests.hs +++ b/tests/Tests.hs @@ -175,7 +175,6 @@ runMain debug number multi = do sendPresence presenceOnline context thread1 <- forkIO $ autoAccept =<< dupSession context thread2 <- forkIO $ iqResponder =<< dupSession context - thread2 <- forkIO $ showPresence =<< dupSession context when active $ do liftIO $ threadDelay 1000000 -- Wait for the other thread to go online -- discoTest debug' @@ -199,3 +198,20 @@ run i multi = do main = do updateGlobalLogger "Pontarius.Xmpp" $ setLevel DEBUG run 0 True + + +connectionClosedTest = do + updateGlobalLogger "Pontarius.Xmpp" $ setLevel DEBUG + let debug' = infoM "Pontarius.Xmpp" + debug' "running" + let we = testUser1 + Right context <- session (Text.unpack $ domainpart we) + (Just ([scramSha1 (fromJust $ localpart we) Nothing "pwd"], resourcepart we)) + config {onConnectionClosed = \e -> do + debug' $ "closed: " ++ show e + + } + sendPresence presenceOnline context + forkIO $ threadDelay 3000000 >> void (closeConnection context) + forever $ threadDelay 1000000 + return ()