From d5cdc74f233b48002a42e31bf4fe7d1dfd0bcb8f Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sun, 6 May 2012 12:46:07 +0200 Subject: [PATCH] Add connectionClosedHandler --- src/Network/XMPP.hs | 1 + src/Network/XMPP/Concurrent/Monad.hs | 15 +++++--- src/Network/XMPP/Concurrent/Threads.hs | 52 +++++++++++++++----------- src/Network/XMPP/Concurrent/Types.hs | 6 +-- src/Network/XMPP/Monad.hs | 44 +++++++--------------- src/Network/XMPP/Stream.hs | 2 +- src/Network/XMPP/Types.hs | 4 -- src/Tests.hs | 3 ++ 8 files changed, 62 insertions(+), 65 deletions(-) diff --git a/src/Network/XMPP.hs b/src/Network/XMPP.hs index 90a63d2..d850f15 100644 --- a/src/Network/XMPP.hs +++ b/src/Network/XMPP.hs @@ -43,6 +43,7 @@ module Network.XMPP , auth , endSession , setSessionEndHandler + , setConnectionClosedHandler -- * JID -- | A JID (historically: Jabber ID) is XMPPs native format -- for addressing entities in the network. It is somewhat similar to an diff --git a/src/Network/XMPP/Concurrent/Monad.hs b/src/Network/XMPP/Concurrent/Monad.hs index dc4ebdc..861d3ab 100644 --- a/src/Network/XMPP/Concurrent/Monad.hs +++ b/src/Network/XMPP/Concurrent/Monad.hs @@ -198,17 +198,20 @@ modifyHandlers f = do liftIO . atomically $ writeTVar eh . f =<< readTVar eh setSessionEndHandler :: XMPP () -> XMPP () -setSessionEndHandler eh = modifyHandlers (\s -> s{sessionEndHandler = eh}) +setSessionEndHandler eh = do + r <- ask + modifyHandlers (\s -> s{sessionEndHandler = runReaderT eh r}) -setConnectionClosedHandler :: XMPP () -> XMPP () -setConnectionClosedHandler eh = modifyHandlers - (\s -> s{connectionClosedHandler = eh}) +setConnectionClosedHandler :: (StreamError -> XMPP ()) -> XMPP () +setConnectionClosedHandler eh = do + r <- ask + modifyHandlers (\s -> s{connectionClosedHandler = \e -> runReaderT (eh e) r}) -- | run an event handler -runHandler :: (EventHandlers -> XMPP a) -> XMPP a +runHandler :: (EventHandlers -> IO a) -> XMPP a runHandler h = do eh <- liftIO . atomically . readTVar =<< asks eventHandlers - h eh + liftIO $ h eh -- | End the current xmpp session endSession :: XMPP () diff --git a/src/Network/XMPP/Concurrent/Threads.hs b/src/Network/XMPP/Concurrent/Threads.hs index 146ff52..f2c23cc 100644 --- a/src/Network/XMPP/Concurrent/Threads.hs +++ b/src/Network/XMPP/Concurrent/Threads.hs @@ -29,22 +29,15 @@ import Text.XML.Stream.Elements import GHC.IO (unsafeUnmask) --- While waiting for the first semaphore(s) to flip we might receive --- another interrupt. When that happens we add it's semaphore to the --- list and retry waiting -handleInterrupts :: [TMVar ()] -> IO [()] -handleInterrupts ts = - Ex.catch (atomically $ forM ts takeTMVar) - ( \(Interrupt t) -> handleInterrupts (t:ts)) - readWorker :: TChan (Either MessageError Message) -> TChan (Either PresenceError Presence) -> TVar IQHandlers + -> TVar EventHandlers -> TMVar XmppConnection -> IO () -readWorker messageC presenceC handlers stateRef = +readWorker messageC presenceC iqHands handlers stateRef = Ex.mask_ . forever $ do - res <- liftIO $ Ex.catch ( do + res <- liftIO $ Ex.catches ( do -- we don't know whether pull will -- necessarily be interruptible s <- liftIO . atomically $ do @@ -54,11 +47,12 @@ readWorker messageC presenceC handlers stateRef = return sr allowInterrupt Just . fst <$> runStateT pullStanza s - ) - (\(Interrupt t) -> do - void $ handleInterrupts [t] - return Nothing - ) + ) + [ Ex.Handler $ \(Interrupt t) -> do + void $ handleInterrupts [t] + return Nothing + , Ex.Handler $ \e -> noCon handlers (e :: StreamError) + ] liftIO . atomically $ do case res of Nothing -> return () @@ -84,14 +78,26 @@ readWorker messageC presenceC handlers stateRef = _ <- readTChan presenceC return () - IQRequestS i -> handleIQRequest handlers i - IQResultS i -> handleIQResponse handlers (Right i) - IQErrorS i -> handleIQResponse handlers (Left i) + IQRequestS i -> handleIQRequest iqHands i + IQResultS i -> handleIQResponse iqHands (Right i) + IQErrorS i -> handleIQResponse iqHands (Left i) where -- Defining an Control.Exception.allowInterrupt equivalent for -- GHC 7 compatibility. allowInterrupt :: IO () allowInterrupt = unsafeUnmask $ return () + noCon :: TVar EventHandlers -> StreamError -> IO (Maybe a) + noCon h e = do + hands <- atomically $ readTVar h + _ <- forkIO $ connectionClosedHandler hands e + return Nothing + -- While waiting for the first semaphore(s) to flip we might receive + -- another interrupt. When that happens we add it's semaphore to the + -- list and retry waiting + handleInterrupts :: [TMVar ()] -> IO [()] + handleInterrupts ts = + Ex.catch (atomically $ forM ts takeTMVar) + ( \(Interrupt t) -> handleInterrupts (t:ts)) handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () handleIQRequest handlers iq = do @@ -121,7 +127,10 @@ writeWorker stCh writeR = forever $ do (write, next) <- atomically $ (,) <$> takeTMVar writeR <*> readTChan stCh - _ <- write $ renderElement (pickleElem xpStanza next) + r <- write $ renderElement (pickleElem xpStanza next) + unless r $ do + atomically $ unGetTChan stCh next -- connection is dead + threadDelay 250000 -- avoid free spinning atomically $ putTMVar writeR write -- Two streams: input and output. Threads read from input stream and write to output stream. @@ -150,13 +159,14 @@ startThreads = do conS <- newTMVarIO xmppNoConnection lw <- forkIO $ writeWorker outC writeLock cp <- forkIO $ connPersist writeLock - rd <- forkIO $ readWorker messageC presenceC handlers conS + rd <- forkIO $ readWorker messageC presenceC handlers eh conS return (messageC, presenceC, handlers, outC , killConnection writeLock [lw, rd, cp] , writeLock, conS ,rd, eh) where killConnection writeLock threads = liftIO $ do _ <- atomically $ takeTMVar writeLock -- Should we put it back? + liftIO $ putStrLn "killing threads #" _ <- forM threads killThread return() @@ -186,6 +196,6 @@ withSession = flip runReaderT connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO () connPersist lock = forever $ do pushBS <- atomically $ takeTMVar lock - pushBS " " + _ <- pushBS " " atomically $ putTMVar lock pushBS threadDelay 30000000 diff --git a/src/Network/XMPP/Concurrent/Types.hs b/src/Network/XMPP/Concurrent/Types.hs index 0fff9c4..c9e6cab 100644 --- a/src/Network/XMPP/Concurrent/Types.hs +++ b/src/Network/XMPP/Concurrent/Types.hs @@ -23,14 +23,14 @@ type IQHandlers = (Map.Map (IQRequestType, Text) (TChan (IQRequest, TVar Bool)) ) data EventHandlers = EventHandlers - { sessionEndHandler :: XMPP () - , connectionClosedHandler :: XMPP () + { sessionEndHandler :: IO () + , connectionClosedHandler :: StreamError -> IO () } zeroEventHandlers :: EventHandlers zeroEventHandlers = EventHandlers { sessionEndHandler = return () - , connectionClosedHandler = return () + , connectionClosedHandler = \_ -> return () } data Session = Session { messagesRef :: IORef (Maybe ( TChan (Either diff --git a/src/Network/XMPP/Monad.hs b/src/Network/XMPP/Monad.hs index 7206646..9b9e65c 100644 --- a/src/Network/XMPP/Monad.hs +++ b/src/Network/XMPP/Monad.hs @@ -8,8 +8,8 @@ import Control.Monad import Control.Monad.IO.Class import Control.Monad.Trans.Class --import Control.Monad.Trans.Resource -import qualified Control.Exception as Ex -import qualified GHC.IO.Exception as Ex +import qualified Control.Exception.Lifted as Ex +import qualified GHC.IO.Exception as GIE import Control.Monad.State.Strict import Data.ByteString as BS @@ -30,6 +30,7 @@ import System.IO import Text.XML.Stream.Elements import Text.XML.Stream.Parse as XP +import Text.XML.Unresolved(InvalidEventStream(..)) pushN :: Element -> XMPPConMonad Bool pushN x = do @@ -52,10 +53,14 @@ pullSink snk = do pullElement :: XMPPConMonad Element pullElement = do - e <- pullSink (elements =$ CL.head) - case e of - Nothing -> liftIO $ Ex.throwIO XmppNoConnection - Just r -> return r + Ex.catch (do + e <- pullSink (elements =$ CL.head) + case e of + Nothing -> liftIO $ Ex.throwIO StreamConnectionError + Just r -> return r + ) + (\(InvalidEventStream s) -> liftIO . Ex.throwIO $ StreamXMLError s) + pullPickle :: PU [Node] a -> XMPPConMonad a pullPickle p = do @@ -72,34 +77,13 @@ pullStanza = do Right r -> return r catchPush p = Ex.catch (p >> return True) - (\e -> case Ex.ioe_type e of - Ex.ResourceVanished -> return False + (\e -> case GIE.ioe_type e of + GIE.ResourceVanished -> return False _ -> Ex.throwIO e ) -xmppFromHandle :: Handle - -> Text - -> XMPPConMonad a - -> IO (a, XmppConnection) -xmppFromHandle handle hostname f = do - liftIO $ hSetBuffering handle NoBuffering - let raw = sourceHandle handle - let src = raw $= XP.parseBytes def - let st = XmppConnection - src - (raw) - (catchPush . BS.hPut handle) - (Just handle) - (SF Nothing [] []) - XmppConnectionPlain - (Just hostname) - Nothing - Nothing - (hClose handle) - runStateT f st - zeroSource :: Source IO output -zeroSource = liftIO . Ex.throwIO $ XmppNoConnection +zeroSource = liftIO . Ex.throwIO $ StreamConnectionError xmppNoConnection :: XmppConnection xmppNoConnection = XmppConnection diff --git a/src/Network/XMPP/Stream.hs b/src/Network/XMPP/Stream.hs index 8c5d890..ef96fb0 100644 --- a/src/Network/XMPP/Stream.hs +++ b/src/Network/XMPP/Stream.hs @@ -86,7 +86,7 @@ xmppStreamFeatures :: StreamSink ServerFeatures xmppStreamFeatures = do e <- lift $ elements =$ CL.head case e of - Nothing -> liftIO $ Ex.throwIO XmppNoConnection + Nothing -> liftIO $ Ex.throwIO StreamConnectionError Just r -> streamUnpickleElem pickleStreamFeatures r -- Pickling diff --git a/src/Network/XMPP/Types.hs b/src/Network/XMPP/Types.hs index 0149f53..b77fd99 100644 --- a/src/Network/XMPP/Types.hs +++ b/src/Network/XMPP/Types.hs @@ -39,7 +39,6 @@ module Network.XMPP.Types , XMPPConMonad , XmppConnection(..) , XmppConnectionState(..) - , XmppNoConnection(..) , XMPPT(..) , XmppStreamError(..) , parseLangTag @@ -735,6 +734,3 @@ type XMPPConMonad a = StateT XmppConnection IO a deriving instance (Monad m, MonadIO m) => MonadState (XmppConnection) (XMPPT m) -data XmppNoConnection = XmppNoConnection deriving (Show, Typeable) -instance Exception XmppNoConnection - diff --git a/src/Tests.hs b/src/Tests.hs index 71242dd..4ba900d 100644 --- a/src/Tests.hs +++ b/src/Tests.hs @@ -111,6 +111,9 @@ runMain debug number = do wait <- newEmptyTMVarIO withNewSession $ do setSessionEndHandler (liftIO . atomically $ putTMVar wait ()) + setConnectionClosedHandler (\e -> do + liftIO (debug' $ "connection lost because " ++ show e) + endSession ) debug' "running" withConnection $ do connect "localhost" "species64739.dyndns.org"