From 1ce5aaf222525b8383d7f4f64c82b7a8ca94cf60 Mon Sep 17 00:00:00 2001
From: Philipp Balzarek
Date: Thu, 22 Aug 2013 19:33:11 +0200
Subject: [PATCH] improve reconnect
rename reconnect to reconnectNow
* reconnect immediately
* function returns failure mode on error
add reconnect function
* retries when reconnect failes
* waits exponentially increasing amount of seconds between attemps
* returns last encountered error when all attempts fail
---
source/Network/Xmpp.hs | 1 +
source/Network/Xmpp/Concurrent.hs | 54 ++++++++++++++++++++++---
source/Network/Xmpp/Concurrent/Types.hs | 1 +
source/Network/Xmpp/Stream.hs | 22 +++++-----
source/Network/Xmpp/Utilities.hs | 18 ++++-----
tests/Tests.hs | 19 ++++-----
6 files changed, 81 insertions(+), 34 deletions(-)
diff --git a/source/Network/Xmpp.hs b/source/Network/Xmpp.hs
index 6ceb0e4..25648a5 100644
--- a/source/Network/Xmpp.hs
+++ b/source/Network/Xmpp.hs
@@ -29,6 +29,7 @@ module Network.Xmpp
, session
, setConnectionClosedHandler
, reconnect
+ , reconnectNow
, StreamConfiguration(..)
, SessionConfiguration(..)
, ConnectionDetails(..)
diff --git a/source/Network/Xmpp/Concurrent.hs b/source/Network/Xmpp/Concurrent.hs
index df4ba04..6c28571 100644
--- a/source/Network/Xmpp/Concurrent.hs
+++ b/source/Network/Xmpp/Concurrent.hs
@@ -15,6 +15,7 @@ module Network.Xmpp.Concurrent
, reconnect
) where
+import Control.Concurrent (threadDelay)
import Control.Concurrent.STM
import qualified Control.Exception as Ex
import Control.Monad
@@ -39,6 +40,7 @@ import Network.Xmpp.Stream
import Network.Xmpp.Tls
import Network.Xmpp.Types
import System.Log.Logger
+import System.Random (randomRIO)
import Control.Monad.State.Strict
@@ -133,6 +135,7 @@ newSession stream config realm mbSasl = runErrorT $ do
iqHands <- lift $ newTVarIO (Map.empty, Map.empty)
eh <- lift $ newEmptyTMVarIO
ros <- liftIO . newTVarIO $ Roster Nothing Map.empty
+ rew <- lift $ newTVarIO 3
let rosterH = if (enableRoster config) then handleRoster ros
else \ _ _ -> return True
let stanzaHandler = runHandlers writeSem
@@ -157,6 +160,7 @@ newSession stream config realm mbSasl = runErrorT $ do
, rosterRef = ros
, sRealm = realm
, sSaslCredentials = mbSasl
+ , reconnectWait = rew
}
liftIO . atomically $ putTMVar eh $ EventHandlers { connectionClosedHandler =
onConnectionClosed config sess }
@@ -187,10 +191,13 @@ session realm mbSasl config = runErrorT $ do
liftIO $ when (enableRoster config) $ initRoster ses
return ses
-reconnect :: Session -> IO ()
-reconnect sess@Session{conf = config} = do
+-- | Reconnect immediately with the stored settings. Returns Just the error when
+-- the reconnect attempt fails and Nothing when no failure was encountered
+reconnectNow :: Session -- ^ session to reconnect
+ -> IO (Maybe XmppFailure)
+reconnectNow sess@Session{conf = config, reconnectWait = rw} = do
debugM "Pontarius.Xmpp" "reconnecting"
- _ <- flip withConnection sess $ \oldStream -> do
+ res <- flip withConnection sess $ \oldStream -> do
s <- runErrorT $ do
liftIO $ debugM "Pontarius.Xmpp" "reconnect: closing stream"
_ <- liftIO $ closeStreams oldStream
@@ -213,9 +220,46 @@ reconnect sess@Session{conf = config} = do
Left e -> do
errorM "Pontarius.Xmpp" $ "reconnect failed" ++ show e
return (Left e , oldStream )
- Right r -> return (Right () , r )
- when (enableRoster config) $ initRoster sess
+ Right r -> return (Right () , r )
+ case res of
+ Left e -> return $ Just e
+ Right (Left e) -> return $ Just e
+ Right (Right ()) -> do
+ atomically $ writeTVar rw 3
+ when (enableRoster config) $ initRoster sess
+ return Nothing
+
+-- | Reconnect with the stored settings. Returns Just the error when the
+-- reconnect attempt fails and Nothing when no failure was encountered
+reconnect :: Maybe Int -- ^ maximum number of retries (Nothing for
+ -- unbounded). Numbers of 1 or less will perform exactly
+ -- one retry
+ -> Session -- ^ session to reconnect
+ -> IO (Maybe XmppFailure) -- ^ The failure mode of the last retry
+reconnect maxTries sess@Session{reconnectWait = rw} = go maxTries
+ where
+ go Nothing = do
+ res <- doRetry
+ case res of
+ Nothing -> return Nothing
+ Just _e -> go Nothing
+ go (Just t) = do
+ res <- doRetry
+ case res of
+ Nothing -> return Nothing
+ Just e -> if (t > 1) then go (Just $ t - 1)
+ else return $ Just e
+ doRetry = do
+ wait <- atomically $ do
+ wt <- readTVar rw
+ writeTVar rw $ min 300 (2 * wt + 5)
+ return wt
+ t <- randomRIO (wait `div` 2, wait)
+ debugM "Pontarius.Xmpp" $
+ "Waiting " ++ show t ++ " seconds before reconnecting"
+ threadDelay $ t * 10^(6 :: Int)
+ reconnectNow sess
newStanzaID :: Session -> IO StanzaID
diff --git a/source/Network/Xmpp/Concurrent/Types.hs b/source/Network/Xmpp/Concurrent/Types.hs
index 30d460a..3c99f0b 100644
--- a/source/Network/Xmpp/Concurrent/Types.hs
+++ b/source/Network/Xmpp/Concurrent/Types.hs
@@ -77,6 +77,7 @@ data Session = Session
, conf :: SessionConfiguration
, sRealm :: HostName
, sSaslCredentials :: Maybe (ConnectionState -> [SaslHandler] , Maybe Text)
+ , reconnectWait :: TVar Int
}
-- | IQHandlers holds the registered channels for incomming IQ requests and
diff --git a/source/Network/Xmpp/Stream.hs b/source/Network/Xmpp/Stream.hs
index c011582..26f95d1 100644
--- a/source/Network/Xmpp/Stream.hs
+++ b/source/Network/Xmpp/Stream.hs
@@ -346,20 +346,20 @@ closeStreams' = do
void ((Ex.try cc) :: IO (Either Ex.SomeException ()))
return ()
put xmppNoStream{ streamConnectionState = Finished }
- lift $ debugM "Pontarius.Xmpp" "Collecting remaining elements"
+-- lift $ debugM "Pontarius.Xmpp" "Collecting remaining elements"
-- es <- collectElems []
-- lift $ debugM "Pontarius.Xmpp" "Stream sucessfully closed"
-- return es
- where
- -- Pulls elements from the stream until the stream ends, or an error is
- -- raised.
- collectElems :: [Element] -> StateT StreamState IO (Either XmppFailure [Element])
- collectElems es = do
- result <- pullElement
- case result of
- Left StreamEndFailure -> return $ Right es
- Left e -> return $ Left $ StreamCloseError (es, e)
- Right e -> collectElems (e:es)
+ -- where
+ -- -- Pulls elements from the stream until the stream ends, or an error is
+ -- -- raised.
+ -- collectElems :: [Element] -> StateT StreamState IO (Either XmppFailure [Element])
+ -- collectElems es = do
+ -- result <- pullElement
+ -- case result of
+ -- Left StreamEndFailure -> return $ Right es
+ -- Left e -> return $ Left $ StreamCloseError (es, e)
+ -- Right e -> collectElems (e:es)
-- TODO: Can the TLS send/recv functions throw something other than an IO error?
debugOut :: MonadIO m => ByteString -> m ()
diff --git a/source/Network/Xmpp/Utilities.hs b/source/Network/Xmpp/Utilities.hs
index c383715..a75608d 100644
--- a/source/Network/Xmpp/Utilities.hs
+++ b/source/Network/Xmpp/Utilities.hs
@@ -94,12 +94,12 @@ hostnameP = do
then fail "Hostname too long."
else return $ Text.concat [label, Text.pack ".", r]
--- The number of seconds to wait before reconnection attempts in accordance with
--- the truncated binary exponential backoff algorithm.
-waitingTimes :: IO [Int]
-waitingTimes = do
- wait <- randomRIO (1, 59)
- waits <- Prelude.mapM (\n -> randomRIO (0, wait * n)) slotTimes
- return (wait:waits)
- where
- slotTimes = [1, 3, 8, 15, 24, 35, 48, 63, 80, 99, 99, 99, 99, 99, 99]
+-- -- The number of seconds to wait before reconnection attempts in accordance with
+-- -- the truncated binary exponential backoff algorithm.
+-- waitingTimes :: IO [Int]
+-- waitingTimes = do
+-- wait <- randomRIO (1, 59)
+-- waits <- Prelude.mapM (\n -> randomRIO (0, wait * n)) slotTimes
+-- return (wait:waits)
+-- where
+-- slotTimes = [1, 3, 8, 15, 24, 35, 48, 63, 80, 99, 99, 99, 99, 99, 99]
diff --git a/tests/Tests.hs b/tests/Tests.hs
index 263c522..8614b70 100644
--- a/tests/Tests.hs
+++ b/tests/Tests.hs
@@ -143,14 +143,15 @@ iqTest debug we them context = do
debug "sending"
answer <- sendIQ' (Just them) Set Nothing body context
case answer of
- IQResponseResult r -> do
+ Nothing -> debug "Connection Down"
+ Just (IQResponseResult r) -> do
debug "received"
let Right answerPayload = unpickleElem payloadP
(fromJust $ iqResultPayload r)
expect debug (invertPayload payload) answerPayload context
- IQResponseTimeout -> do
+ Just IQResponseTimeout -> do
debug $ "Timeout in packet: " ++ show count
- IQResponseError e -> do
+ Just (IQResponseError e) -> do
debug $ "Error in packet: " ++ show count
liftIO $ threadDelay 100000
-- sendUser "All tests done" context
@@ -170,7 +171,7 @@ runMain debug number multi = do
debug . (("Thread " ++ show number ++ ":") ++)
debug' "running"
Right context <- session (Text.unpack $ domainpart we)
- (Just ([scramSha1 (fromJust $ localpart we) Nothing "pwd"], resourcepart we))
+ (Just (\_ -> [scramSha1 (fromJust $ localpart we) Nothing "pwd"], resourcepart we))
config
sendPresence presenceOnline context
thread1 <- forkIO $ autoAccept =<< dupSession context
@@ -206,12 +207,12 @@ connectionClosedTest = do
debug' "running"
let we = testUser1
Right context <- session (Text.unpack $ domainpart we)
- (Just ([scramSha1 (fromJust $ localpart we) Nothing "pwd"], resourcepart we))
- config {onConnectionClosed = \e -> do
- debug' $ "closed: " ++ show e
-
+ (Just (\_ -> [scramSha1 (fromJust $ localpart we) Nothing "pwd"], resourcepart we))
+ config {onConnectionClosed = \s e -> do
+ liftIO $ reconnect Nothing s
+ liftIO $ sendPresence presenceOnline s
+ return ()
}
sendPresence presenceOnline context
- forkIO $ threadDelay 3000000 >> void (closeConnection context)
forever $ threadDelay 1000000
return ()