Browse Source

Merge branch 'master' into upstream

master
Philipp Balzarek 12 years ago
parent
commit
30b9910a39
  1. 1
      source/Network/Xmpp.hs
  2. 54
      source/Network/Xmpp/Concurrent.hs
  3. 1
      source/Network/Xmpp/Concurrent/Types.hs
  4. 22
      source/Network/Xmpp/Stream.hs
  5. 18
      source/Network/Xmpp/Utilities.hs
  6. 19
      tests/Tests.hs

1
source/Network/Xmpp.hs

@ -29,6 +29,7 @@ module Network.Xmpp
, session , session
, setConnectionClosedHandler , setConnectionClosedHandler
, reconnect , reconnect
, reconnectNow
, StreamConfiguration(..) , StreamConfiguration(..)
, SessionConfiguration(..) , SessionConfiguration(..)
, ConnectionDetails(..) , ConnectionDetails(..)

54
source/Network/Xmpp/Concurrent.hs

@ -15,6 +15,7 @@ module Network.Xmpp.Concurrent
, reconnect , reconnect
) where ) where
import Control.Concurrent (threadDelay)
import Control.Concurrent.STM import Control.Concurrent.STM
import qualified Control.Exception as Ex import qualified Control.Exception as Ex
import Control.Monad import Control.Monad
@ -39,6 +40,7 @@ import Network.Xmpp.Stream
import Network.Xmpp.Tls import Network.Xmpp.Tls
import Network.Xmpp.Types import Network.Xmpp.Types
import System.Log.Logger import System.Log.Logger
import System.Random (randomRIO)
import Control.Monad.State.Strict import Control.Monad.State.Strict
@ -133,6 +135,7 @@ newSession stream config realm mbSasl = runErrorT $ do
iqHands <- lift $ newTVarIO (Map.empty, Map.empty) iqHands <- lift $ newTVarIO (Map.empty, Map.empty)
eh <- lift $ newEmptyTMVarIO eh <- lift $ newEmptyTMVarIO
ros <- liftIO . newTVarIO $ Roster Nothing Map.empty ros <- liftIO . newTVarIO $ Roster Nothing Map.empty
rew <- lift $ newTVarIO 3
let rosterH = if (enableRoster config) then handleRoster ros let rosterH = if (enableRoster config) then handleRoster ros
else \ _ _ -> return True else \ _ _ -> return True
let stanzaHandler = runHandlers writeSem let stanzaHandler = runHandlers writeSem
@ -157,6 +160,7 @@ newSession stream config realm mbSasl = runErrorT $ do
, rosterRef = ros , rosterRef = ros
, sRealm = realm , sRealm = realm
, sSaslCredentials = mbSasl , sSaslCredentials = mbSasl
, reconnectWait = rew
} }
liftIO . atomically $ putTMVar eh $ EventHandlers { connectionClosedHandler = liftIO . atomically $ putTMVar eh $ EventHandlers { connectionClosedHandler =
onConnectionClosed config sess } onConnectionClosed config sess }
@ -187,10 +191,13 @@ session realm mbSasl config = runErrorT $ do
liftIO $ when (enableRoster config) $ initRoster ses liftIO $ when (enableRoster config) $ initRoster ses
return ses return ses
reconnect :: Session -> IO () -- | Reconnect immediately with the stored settings. Returns Just the error when
reconnect sess@Session{conf = config} = do -- the reconnect attempt fails and Nothing when no failure was encountered
reconnectNow :: Session -- ^ session to reconnect
-> IO (Maybe XmppFailure)
reconnectNow sess@Session{conf = config, reconnectWait = rw} = do
debugM "Pontarius.Xmpp" "reconnecting" debugM "Pontarius.Xmpp" "reconnecting"
_ <- flip withConnection sess $ \oldStream -> do res <- flip withConnection sess $ \oldStream -> do
s <- runErrorT $ do s <- runErrorT $ do
liftIO $ debugM "Pontarius.Xmpp" "reconnect: closing stream" liftIO $ debugM "Pontarius.Xmpp" "reconnect: closing stream"
_ <- liftIO $ closeStreams oldStream _ <- liftIO $ closeStreams oldStream
@ -213,9 +220,46 @@ reconnect sess@Session{conf = config} = do
Left e -> do Left e -> do
errorM "Pontarius.Xmpp" $ "reconnect failed" ++ show e errorM "Pontarius.Xmpp" $ "reconnect failed" ++ show e
return (Left e , oldStream ) return (Left e , oldStream )
Right r -> return (Right () , r ) Right r -> return (Right () , r )
when (enableRoster config) $ initRoster sess case res of
Left e -> return $ Just e
Right (Left e) -> return $ Just e
Right (Right ()) -> do
atomically $ writeTVar rw 3
when (enableRoster config) $ initRoster sess
return Nothing
-- | Reconnect with the stored settings. Returns Just the error when the
-- reconnect attempt fails and Nothing when no failure was encountered
reconnect :: Maybe Int -- ^ maximum number of retries (Nothing for
-- unbounded). Numbers of 1 or less will perform exactly
-- one retry
-> Session -- ^ session to reconnect
-> IO (Maybe XmppFailure) -- ^ The failure mode of the last retry
reconnect maxTries sess@Session{reconnectWait = rw} = go maxTries
where
go Nothing = do
res <- doRetry
case res of
Nothing -> return Nothing
Just _e -> go Nothing
go (Just t) = do
res <- doRetry
case res of
Nothing -> return Nothing
Just e -> if (t > 1) then go (Just $ t - 1)
else return $ Just e
doRetry = do
wait <- atomically $ do
wt <- readTVar rw
writeTVar rw $ min 300 (2 * wt + 5)
return wt
t <- randomRIO (wait `div` 2, wait)
debugM "Pontarius.Xmpp" $
"Waiting " ++ show t ++ " seconds before reconnecting"
threadDelay $ t * 10^(6 :: Int)
reconnectNow sess
newStanzaID :: Session -> IO StanzaID newStanzaID :: Session -> IO StanzaID

1
source/Network/Xmpp/Concurrent/Types.hs

@ -77,6 +77,7 @@ data Session = Session
, conf :: SessionConfiguration , conf :: SessionConfiguration
, sRealm :: HostName , sRealm :: HostName
, sSaslCredentials :: Maybe (ConnectionState -> [SaslHandler] , Maybe Text) , sSaslCredentials :: Maybe (ConnectionState -> [SaslHandler] , Maybe Text)
, reconnectWait :: TVar Int
} }
-- | IQHandlers holds the registered channels for incomming IQ requests and -- | IQHandlers holds the registered channels for incomming IQ requests and

22
source/Network/Xmpp/Stream.hs

@ -346,20 +346,20 @@ closeStreams' = do
void ((Ex.try cc) :: IO (Either Ex.SomeException ())) void ((Ex.try cc) :: IO (Either Ex.SomeException ()))
return () return ()
put xmppNoStream{ streamConnectionState = Finished } put xmppNoStream{ streamConnectionState = Finished }
lift $ debugM "Pontarius.Xmpp" "Collecting remaining elements" -- lift $ debugM "Pontarius.Xmpp" "Collecting remaining elements"
-- es <- collectElems [] -- es <- collectElems []
-- lift $ debugM "Pontarius.Xmpp" "Stream sucessfully closed" -- lift $ debugM "Pontarius.Xmpp" "Stream sucessfully closed"
-- return es -- return es
where -- where
-- Pulls elements from the stream until the stream ends, or an error is -- -- Pulls elements from the stream until the stream ends, or an error is
-- raised. -- -- raised.
collectElems :: [Element] -> StateT StreamState IO (Either XmppFailure [Element]) -- collectElems :: [Element] -> StateT StreamState IO (Either XmppFailure [Element])
collectElems es = do -- collectElems es = do
result <- pullElement -- result <- pullElement
case result of -- case result of
Left StreamEndFailure -> return $ Right es -- Left StreamEndFailure -> return $ Right es
Left e -> return $ Left $ StreamCloseError (es, e) -- Left e -> return $ Left $ StreamCloseError (es, e)
Right e -> collectElems (e:es) -- Right e -> collectElems (e:es)
-- TODO: Can the TLS send/recv functions throw something other than an IO error? -- TODO: Can the TLS send/recv functions throw something other than an IO error?
debugOut :: MonadIO m => ByteString -> m () debugOut :: MonadIO m => ByteString -> m ()

18
source/Network/Xmpp/Utilities.hs

@ -94,12 +94,12 @@ hostnameP = do
then fail "Hostname too long." then fail "Hostname too long."
else return $ Text.concat [label, Text.pack ".", r] else return $ Text.concat [label, Text.pack ".", r]
-- The number of seconds to wait before reconnection attempts in accordance with -- -- The number of seconds to wait before reconnection attempts in accordance with
-- the truncated binary exponential backoff algorithm. -- -- the truncated binary exponential backoff algorithm.
waitingTimes :: IO [Int] -- waitingTimes :: IO [Int]
waitingTimes = do -- waitingTimes = do
wait <- randomRIO (1, 59) -- wait <- randomRIO (1, 59)
waits <- Prelude.mapM (\n -> randomRIO (0, wait * n)) slotTimes -- waits <- Prelude.mapM (\n -> randomRIO (0, wait * n)) slotTimes
return (wait:waits) -- return (wait:waits)
where -- where
slotTimes = [1, 3, 8, 15, 24, 35, 48, 63, 80, 99, 99, 99, 99, 99, 99] -- slotTimes = [1, 3, 8, 15, 24, 35, 48, 63, 80, 99, 99, 99, 99, 99, 99]

19
tests/Tests.hs

@ -143,14 +143,15 @@ iqTest debug we them context = do
debug "sending" debug "sending"
answer <- sendIQ' (Just them) Set Nothing body context answer <- sendIQ' (Just them) Set Nothing body context
case answer of case answer of
IQResponseResult r -> do Nothing -> debug "Connection Down"
Just (IQResponseResult r) -> do
debug "received" debug "received"
let Right answerPayload = unpickleElem payloadP let Right answerPayload = unpickleElem payloadP
(fromJust $ iqResultPayload r) (fromJust $ iqResultPayload r)
expect debug (invertPayload payload) answerPayload context expect debug (invertPayload payload) answerPayload context
IQResponseTimeout -> do Just IQResponseTimeout -> do
debug $ "Timeout in packet: " ++ show count debug $ "Timeout in packet: " ++ show count
IQResponseError e -> do Just (IQResponseError e) -> do
debug $ "Error in packet: " ++ show count debug $ "Error in packet: " ++ show count
liftIO $ threadDelay 100000 liftIO $ threadDelay 100000
-- sendUser "All tests done" context -- sendUser "All tests done" context
@ -170,7 +171,7 @@ runMain debug number multi = do
debug . (("Thread " ++ show number ++ ":") ++) debug . (("Thread " ++ show number ++ ":") ++)
debug' "running" debug' "running"
Right context <- session (Text.unpack $ domainpart we) Right context <- session (Text.unpack $ domainpart we)
(Just ([scramSha1 (fromJust $ localpart we) Nothing "pwd"], resourcepart we)) (Just (\_ -> [scramSha1 (fromJust $ localpart we) Nothing "pwd"], resourcepart we))
config config
sendPresence presenceOnline context sendPresence presenceOnline context
thread1 <- forkIO $ autoAccept =<< dupSession context thread1 <- forkIO $ autoAccept =<< dupSession context
@ -206,12 +207,12 @@ connectionClosedTest = do
debug' "running" debug' "running"
let we = testUser1 let we = testUser1
Right context <- session (Text.unpack $ domainpart we) Right context <- session (Text.unpack $ domainpart we)
(Just ([scramSha1 (fromJust $ localpart we) Nothing "pwd"], resourcepart we)) (Just (\_ -> [scramSha1 (fromJust $ localpart we) Nothing "pwd"], resourcepart we))
config {onConnectionClosed = \e -> do config {onConnectionClosed = \s e -> do
debug' $ "closed: " ++ show e liftIO $ reconnect Nothing s
liftIO $ sendPresence presenceOnline s
return ()
} }
sendPresence presenceOnline context sendPresence presenceOnline context
forkIO $ threadDelay 3000000 >> void (closeConnection context)
forever $ threadDelay 1000000 forever $ threadDelay 1000000
return () return ()

Loading…
Cancel
Save