From 1ce5aaf222525b8383d7f4f64c82b7a8ca94cf60 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Thu, 22 Aug 2013 19:33:11 +0200 Subject: [PATCH] improve reconnect rename reconnect to reconnectNow * reconnect immediately * function returns failure mode on error add reconnect function * retries when reconnect failes * waits exponentially increasing amount of seconds between attemps * returns last encountered error when all attempts fail --- source/Network/Xmpp.hs | 1 + source/Network/Xmpp/Concurrent.hs | 54 ++++++++++++++++++++++--- source/Network/Xmpp/Concurrent/Types.hs | 1 + source/Network/Xmpp/Stream.hs | 22 +++++----- source/Network/Xmpp/Utilities.hs | 18 ++++----- tests/Tests.hs | 19 ++++----- 6 files changed, 81 insertions(+), 34 deletions(-) diff --git a/source/Network/Xmpp.hs b/source/Network/Xmpp.hs index 6ceb0e4..25648a5 100644 --- a/source/Network/Xmpp.hs +++ b/source/Network/Xmpp.hs @@ -29,6 +29,7 @@ module Network.Xmpp , session , setConnectionClosedHandler , reconnect + , reconnectNow , StreamConfiguration(..) , SessionConfiguration(..) , ConnectionDetails(..) diff --git a/source/Network/Xmpp/Concurrent.hs b/source/Network/Xmpp/Concurrent.hs index df4ba04..6c28571 100644 --- a/source/Network/Xmpp/Concurrent.hs +++ b/source/Network/Xmpp/Concurrent.hs @@ -15,6 +15,7 @@ module Network.Xmpp.Concurrent , reconnect ) where +import Control.Concurrent (threadDelay) import Control.Concurrent.STM import qualified Control.Exception as Ex import Control.Monad @@ -39,6 +40,7 @@ import Network.Xmpp.Stream import Network.Xmpp.Tls import Network.Xmpp.Types import System.Log.Logger +import System.Random (randomRIO) import Control.Monad.State.Strict @@ -133,6 +135,7 @@ newSession stream config realm mbSasl = runErrorT $ do iqHands <- lift $ newTVarIO (Map.empty, Map.empty) eh <- lift $ newEmptyTMVarIO ros <- liftIO . newTVarIO $ Roster Nothing Map.empty + rew <- lift $ newTVarIO 3 let rosterH = if (enableRoster config) then handleRoster ros else \ _ _ -> return True let stanzaHandler = runHandlers writeSem @@ -157,6 +160,7 @@ newSession stream config realm mbSasl = runErrorT $ do , rosterRef = ros , sRealm = realm , sSaslCredentials = mbSasl + , reconnectWait = rew } liftIO . atomically $ putTMVar eh $ EventHandlers { connectionClosedHandler = onConnectionClosed config sess } @@ -187,10 +191,13 @@ session realm mbSasl config = runErrorT $ do liftIO $ when (enableRoster config) $ initRoster ses return ses -reconnect :: Session -> IO () -reconnect sess@Session{conf = config} = do +-- | Reconnect immediately with the stored settings. Returns Just the error when +-- the reconnect attempt fails and Nothing when no failure was encountered +reconnectNow :: Session -- ^ session to reconnect + -> IO (Maybe XmppFailure) +reconnectNow sess@Session{conf = config, reconnectWait = rw} = do debugM "Pontarius.Xmpp" "reconnecting" - _ <- flip withConnection sess $ \oldStream -> do + res <- flip withConnection sess $ \oldStream -> do s <- runErrorT $ do liftIO $ debugM "Pontarius.Xmpp" "reconnect: closing stream" _ <- liftIO $ closeStreams oldStream @@ -213,9 +220,46 @@ reconnect sess@Session{conf = config} = do Left e -> do errorM "Pontarius.Xmpp" $ "reconnect failed" ++ show e return (Left e , oldStream ) - Right r -> return (Right () , r ) - when (enableRoster config) $ initRoster sess + Right r -> return (Right () , r ) + case res of + Left e -> return $ Just e + Right (Left e) -> return $ Just e + Right (Right ()) -> do + atomically $ writeTVar rw 3 + when (enableRoster config) $ initRoster sess + return Nothing + +-- | Reconnect with the stored settings. Returns Just the error when the +-- reconnect attempt fails and Nothing when no failure was encountered +reconnect :: Maybe Int -- ^ maximum number of retries (Nothing for + -- unbounded). Numbers of 1 or less will perform exactly + -- one retry + -> Session -- ^ session to reconnect + -> IO (Maybe XmppFailure) -- ^ The failure mode of the last retry +reconnect maxTries sess@Session{reconnectWait = rw} = go maxTries + where + go Nothing = do + res <- doRetry + case res of + Nothing -> return Nothing + Just _e -> go Nothing + go (Just t) = do + res <- doRetry + case res of + Nothing -> return Nothing + Just e -> if (t > 1) then go (Just $ t - 1) + else return $ Just e + doRetry = do + wait <- atomically $ do + wt <- readTVar rw + writeTVar rw $ min 300 (2 * wt + 5) + return wt + t <- randomRIO (wait `div` 2, wait) + debugM "Pontarius.Xmpp" $ + "Waiting " ++ show t ++ " seconds before reconnecting" + threadDelay $ t * 10^(6 :: Int) + reconnectNow sess newStanzaID :: Session -> IO StanzaID diff --git a/source/Network/Xmpp/Concurrent/Types.hs b/source/Network/Xmpp/Concurrent/Types.hs index 30d460a..3c99f0b 100644 --- a/source/Network/Xmpp/Concurrent/Types.hs +++ b/source/Network/Xmpp/Concurrent/Types.hs @@ -77,6 +77,7 @@ data Session = Session , conf :: SessionConfiguration , sRealm :: HostName , sSaslCredentials :: Maybe (ConnectionState -> [SaslHandler] , Maybe Text) + , reconnectWait :: TVar Int } -- | IQHandlers holds the registered channels for incomming IQ requests and diff --git a/source/Network/Xmpp/Stream.hs b/source/Network/Xmpp/Stream.hs index c011582..26f95d1 100644 --- a/source/Network/Xmpp/Stream.hs +++ b/source/Network/Xmpp/Stream.hs @@ -346,20 +346,20 @@ closeStreams' = do void ((Ex.try cc) :: IO (Either Ex.SomeException ())) return () put xmppNoStream{ streamConnectionState = Finished } - lift $ debugM "Pontarius.Xmpp" "Collecting remaining elements" +-- lift $ debugM "Pontarius.Xmpp" "Collecting remaining elements" -- es <- collectElems [] -- lift $ debugM "Pontarius.Xmpp" "Stream sucessfully closed" -- return es - where - -- Pulls elements from the stream until the stream ends, or an error is - -- raised. - collectElems :: [Element] -> StateT StreamState IO (Either XmppFailure [Element]) - collectElems es = do - result <- pullElement - case result of - Left StreamEndFailure -> return $ Right es - Left e -> return $ Left $ StreamCloseError (es, e) - Right e -> collectElems (e:es) + -- where + -- -- Pulls elements from the stream until the stream ends, or an error is + -- -- raised. + -- collectElems :: [Element] -> StateT StreamState IO (Either XmppFailure [Element]) + -- collectElems es = do + -- result <- pullElement + -- case result of + -- Left StreamEndFailure -> return $ Right es + -- Left e -> return $ Left $ StreamCloseError (es, e) + -- Right e -> collectElems (e:es) -- TODO: Can the TLS send/recv functions throw something other than an IO error? debugOut :: MonadIO m => ByteString -> m () diff --git a/source/Network/Xmpp/Utilities.hs b/source/Network/Xmpp/Utilities.hs index c383715..a75608d 100644 --- a/source/Network/Xmpp/Utilities.hs +++ b/source/Network/Xmpp/Utilities.hs @@ -94,12 +94,12 @@ hostnameP = do then fail "Hostname too long." else return $ Text.concat [label, Text.pack ".", r] --- The number of seconds to wait before reconnection attempts in accordance with --- the truncated binary exponential backoff algorithm. -waitingTimes :: IO [Int] -waitingTimes = do - wait <- randomRIO (1, 59) - waits <- Prelude.mapM (\n -> randomRIO (0, wait * n)) slotTimes - return (wait:waits) - where - slotTimes = [1, 3, 8, 15, 24, 35, 48, 63, 80, 99, 99, 99, 99, 99, 99] +-- -- The number of seconds to wait before reconnection attempts in accordance with +-- -- the truncated binary exponential backoff algorithm. +-- waitingTimes :: IO [Int] +-- waitingTimes = do +-- wait <- randomRIO (1, 59) +-- waits <- Prelude.mapM (\n -> randomRIO (0, wait * n)) slotTimes +-- return (wait:waits) +-- where +-- slotTimes = [1, 3, 8, 15, 24, 35, 48, 63, 80, 99, 99, 99, 99, 99, 99] diff --git a/tests/Tests.hs b/tests/Tests.hs index 263c522..8614b70 100644 --- a/tests/Tests.hs +++ b/tests/Tests.hs @@ -143,14 +143,15 @@ iqTest debug we them context = do debug "sending" answer <- sendIQ' (Just them) Set Nothing body context case answer of - IQResponseResult r -> do + Nothing -> debug "Connection Down" + Just (IQResponseResult r) -> do debug "received" let Right answerPayload = unpickleElem payloadP (fromJust $ iqResultPayload r) expect debug (invertPayload payload) answerPayload context - IQResponseTimeout -> do + Just IQResponseTimeout -> do debug $ "Timeout in packet: " ++ show count - IQResponseError e -> do + Just (IQResponseError e) -> do debug $ "Error in packet: " ++ show count liftIO $ threadDelay 100000 -- sendUser "All tests done" context @@ -170,7 +171,7 @@ runMain debug number multi = do debug . (("Thread " ++ show number ++ ":") ++) debug' "running" Right context <- session (Text.unpack $ domainpart we) - (Just ([scramSha1 (fromJust $ localpart we) Nothing "pwd"], resourcepart we)) + (Just (\_ -> [scramSha1 (fromJust $ localpart we) Nothing "pwd"], resourcepart we)) config sendPresence presenceOnline context thread1 <- forkIO $ autoAccept =<< dupSession context @@ -206,12 +207,12 @@ connectionClosedTest = do debug' "running" let we = testUser1 Right context <- session (Text.unpack $ domainpart we) - (Just ([scramSha1 (fromJust $ localpart we) Nothing "pwd"], resourcepart we)) - config {onConnectionClosed = \e -> do - debug' $ "closed: " ++ show e - + (Just (\_ -> [scramSha1 (fromJust $ localpart we) Nothing "pwd"], resourcepart we)) + config {onConnectionClosed = \s e -> do + liftIO $ reconnect Nothing s + liftIO $ sendPresence presenceOnline s + return () } sendPresence presenceOnline context - forkIO $ threadDelay 3000000 >> void (closeConnection context) forever $ threadDelay 1000000 return ()