From c89cf91a7093135cca9ba1664c09aac2d1a3b6e8 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Mon, 16 Jul 2012 00:08:53 +0200 Subject: [PATCH] handle timeouts in sendIQ response processing --- source/Network/Xmpp.hs | 1 + source/Network/Xmpp/Concurrent/IQ.hs | 46 ++++++++++++++------- source/Network/Xmpp/Concurrent/Threads.hs | 3 +- source/Network/Xmpp/Concurrent/Types.hs | 2 +- source/Network/Xmpp/Session.hs | 4 +- source/Network/Xmpp/Types.hs | 11 +++-- source/Network/Xmpp/Xep/ServiceDiscovery.hs | 11 +++-- tests/Tests.hs | 42 +++++++++++++++---- 8 files changed, 85 insertions(+), 35 deletions(-) diff --git a/source/Network/Xmpp.hs b/source/Network/Xmpp.hs index 7c7ee8b..2c61395 100644 --- a/source/Network/Xmpp.hs +++ b/source/Network/Xmpp.hs @@ -133,6 +133,7 @@ module Network.Xmpp , IQRequestType(..) , IQResult(..) , IQError(..) + , IQResponse(..) , sendIQ , sendIQ' , answerIQ diff --git a/source/Network/Xmpp/Concurrent/IQ.hs b/source/Network/Xmpp/Concurrent/IQ.hs index 0c2ccc4..c77bb2b 100644 --- a/source/Network/Xmpp/Concurrent/IQ.hs +++ b/source/Network/Xmpp/Concurrent/IQ.hs @@ -1,6 +1,8 @@ module Network.Xmpp.Concurrent.IQ where import Control.Concurrent.STM +import Control.Concurrent (forkIO, threadDelay) +import Control.Monad import Control.Monad.IO.Class import Control.Monad.Trans.Reader @@ -13,34 +15,48 @@ import Network.Xmpp.Types -- | Sends an IQ, returns a 'TMVar' that will be filled with the first inbound -- IQ with a matching ID that has type @result@ or @error@. -sendIQ :: Maybe Jid -- ^ Recipient (to) +sendIQ :: Maybe Int -- ^ Timeout + -> Maybe Jid -- ^ Recipient (to) -> IQRequestType -- ^ IQ type (@Get@ or @Set@) -> Maybe LangTag -- ^ Language tag of the payload (@Nothing@ for -- default) -> Element -- ^ The IQ body (there has to be exactly one) -> Xmpp (TMVar IQResponse) -sendIQ to tp lang body = do -- TODO: Add timeout - newId <- liftIO =<< asks idGenerator - handlers <- asks iqHandlers - ref <- liftIO . atomically $ do - resRef <- newEmptyTMVar - (byNS, byId) <- readTVar handlers - writeTVar handlers (byNS, Map.insert newId resRef byId) - -- TODO: Check for id collisions (shouldn't happen?) - return resRef - sendStanza . IQRequestS $ IQRequest newId Nothing to lang tp body - return ref - --- | Like 'sendIQ', but waits for the answer IQ. +sendIQ timeOut to tp lang body = do -- TODO: Add timeout + newId <- liftIO =<< asks idGenerator + handlers <- asks iqHandlers + ref <- liftIO . atomically $ do + resRef <- newEmptyTMVar + (byNS, byId) <- readTVar handlers + writeTVar handlers (byNS, Map.insert newId resRef byId) + -- TODO: Check for id collisions (shouldn't happen?) + return resRef + sendStanza . IQRequestS $ IQRequest newId Nothing to lang tp body + case timeOut of + Nothing -> return () + Just t -> void . liftIO . forkIO $ do + threadDelay t + doTimeOut handlers newId ref + return ref + where + doTimeOut handlers iqid var = atomically $ do + p <- tryPutTMVar var IQResponseTimeout + when p $ do + (byNS, byId) <- readTVar handlers + writeTVar handlers (byNS, Map.delete iqid byId) + return () + +-- | Like 'sendIQ', but waits for the answer IQ. Times out after 3 seconds sendIQ' :: Maybe Jid -> IQRequestType -> Maybe LangTag -> Element -> Xmpp IQResponse sendIQ' to tp lang body = do - ref <- sendIQ to tp lang body + ref <- sendIQ (Just 3000000) to tp lang body liftIO . atomically $ takeTMVar ref + answerIQ :: IQRequestTicket -> Either StanzaError (Maybe Element) -> Xmpp Bool diff --git a/source/Network/Xmpp/Concurrent/Threads.hs b/source/Network/Xmpp/Concurrent/Threads.hs index aa1a47a..06559a4 100644 --- a/source/Network/Xmpp/Concurrent/Threads.hs +++ b/source/Network/Xmpp/Concurrent/Threads.hs @@ -129,7 +129,8 @@ handleIQResponse handlers iq = do case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of (Nothing, _) -> return () -- We are not supposed to send an error. (Just tmvar, byID') -> do - _ <- tryPutTMVar tmvar iq -- Don't block. + let answer = either IQResponseError IQResponseResult iq + _ <- tryPutTMVar tmvar answer -- Don't block. writeTVar handlers (byNS, byID') where iqID (Left err) = iqErrorID err diff --git a/source/Network/Xmpp/Concurrent/Types.hs b/source/Network/Xmpp/Concurrent/Types.hs index 6772df1..457dc33 100644 --- a/source/Network/Xmpp/Concurrent/Types.hs +++ b/source/Network/Xmpp/Concurrent/Types.hs @@ -19,7 +19,7 @@ import Network.Xmpp.Types -- Map between the IQ request type and the "query" namespace pair, and the TChan -- for the IQ request and "sent" boolean pair. type IQHandlers = (Map.Map (IQRequestType, Text) (TChan IQRequestTicket) - , Map.Map StanzaId (TMVar IQResponse) + , Map.Map StanzaId (TMVar (IQResponse)) ) -- Handlers to be run when the Xmpp session ends and when the Xmpp connection is diff --git a/source/Network/Xmpp/Session.hs b/source/Network/Xmpp/Session.hs index c3ba459..1c783d8 100644 --- a/source/Network/Xmpp/Session.hs +++ b/source/Network/Xmpp/Session.hs @@ -39,5 +39,5 @@ startSession :: Xmpp () startSession = do answer <- sendIQ' Nothing Set Nothing sessionXML case answer of - Left e -> error $ show e - Right _ -> return () + IQResponseResult _ -> return () + e -> error $ show e diff --git a/source/Network/Xmpp/Types.hs b/source/Network/Xmpp/Types.hs index 76d7d36..cd7158d 100644 --- a/source/Network/Xmpp/Types.hs +++ b/source/Network/Xmpp/Types.hs @@ -10,7 +10,7 @@ module Network.Xmpp.Types ( IQError(..) , IQRequest(..) , IQRequestType(..) - , IQResponse + , IQResponse(..) , IQResult(..) , IdGenerator(..) , LangTag (..) @@ -111,9 +111,12 @@ instance Read IQRequestType where readsPrec _ "set" = [(Set, "")] readsPrec _ _ = [] --- | A "response" Info/Query (IQ) stanza is either an 'IQError' or an IQ stanza --- with the type "result" ('IQResult'). -type IQResponse = Either IQError IQResult +-- | A "response" Info/Query (IQ) stanza is either an 'IQError', an IQ stanza +-- of type "result" ('IQResult') or a Timeout. +data IQResponse = IQResponseError IQError + | IQResponseResult IQResult + | IQResponseTimeout + deriving Show -- | The (non-error) answer to an IQ request. data IQResult = IQResult { iqResultID :: StanzaId diff --git a/source/Network/Xmpp/Xep/ServiceDiscovery.hs b/source/Network/Xmpp/Xep/ServiceDiscovery.hs index 2c2eb18..53572a2 100644 --- a/source/Network/Xmpp/Xep/ServiceDiscovery.hs +++ b/source/Network/Xmpp/Xep/ServiceDiscovery.hs @@ -27,6 +27,7 @@ import Network.Xmpp data DiscoError = DiscoNoQueryElement | DiscoIQError IQError + | DiscoTimeout | DiscoXMLError Element UnpickleError deriving (Show) @@ -83,8 +84,9 @@ queryInfo :: Jid -- ^ Entity to query queryInfo to node = do res <- sendIQ' (Just to) Get Nothing queryBody return $ case res of - Left e -> Left $ DiscoIQError e - Right r -> case iqResultPayload r of + IQResponseError e -> Left $ DiscoIQError e + IQResponseTimeout -> Left $ DiscoTimeout + IQResponseResult r -> case iqResultPayload r of Nothing -> Left DiscoNoQueryElement Just p -> case unpickleElem xpQueryInfo p of Left e -> Left $ DiscoXMLError p e @@ -127,8 +129,9 @@ queryItems :: Jid -- ^ Entity to query queryItems to node = do res <- sendIQ' (Just to) Get Nothing queryBody return $ case res of - Left e -> Left $ DiscoIQError e - Right r -> case iqResultPayload r of + IQResponseError e -> Left $ DiscoIQError e + IQResponseTimeout -> Left $ DiscoTimeout + IQResponseResult r -> case iqResultPayload r of Nothing -> Left DiscoNoQueryElement Just p -> case unpickleElem xpQueryItems p of Left e -> Left $ DiscoXMLError p e diff --git a/tests/Tests.hs b/tests/Tests.hs index 7929ed0..f507fa0 100644 --- a/tests/Tests.hs +++ b/tests/Tests.hs @@ -16,6 +16,7 @@ import Data.XML.Types import Network.Xmpp import Network.Xmpp.IM.Presence import Network.Xmpp.Pickle +import Network.Xmpp.Xep.ServiceDiscovery import System.Environment import Text.XML.Stream.Elements @@ -57,16 +58,17 @@ invertPayload (Payload count flag message) = Payload (count + 1) (not flag) (Tex iqResponder = do chan' <- listenIQChan Get testNS chan <- case chan' of - Nothing -> liftIO $ putStrLn "Channel was already taken" + Left _ -> liftIO $ putStrLn "Channel was already taken" >> error "hanging up" - Just c -> return c + Right c -> return c forever $ do next <- liftIO . atomically $ readTChan chan let Right payload = unpickleElem payloadP . iqRequestPayload $ iqRequestBody next let answerPayload = invertPayload payload let answerBody = pickleElem payloadP answerPayload - answerIQ next (Right $ Just answerBody) + unless (payloadCounter payload == 3) . void $ + answerIQ next (Right $ Just answerBody) when (payloadCounter payload == 10) $ do liftIO $ threadDelay 1000000 endSession @@ -134,6 +136,24 @@ runMain debug number = do sendPresence $ presenceSubscribe them fork iqResponder when active $ do + q <- queryInfo "species64739.dyndns.org" Nothing + case q of + Left (DiscoXMLError el e) -> do + debug' (ppElement el) + debug' (Text.unpack $ ppUnpickleError e) + debug' (show $ length $ elementNodes el) + x -> debug' $ show x + + q <- queryItems "species64739.dyndns.org" + (Just "http://jabber.org/protocol/commands") + case q of + Left (DiscoXMLError el e) -> do + debug' (ppElement el) + debug' (Text.unpack $ ppUnpickleError e) + debug' (show $ length $ elementNodes el) + x -> debug' $ show x + + liftIO $ threadDelay 1000000 -- Wait for the other thread to go online void . fork $ do forM [1..10] $ \count -> do @@ -141,11 +161,17 @@ runMain debug number = do let payload = Payload count (even count) (Text.pack $ show count) let body = pickleElem payloadP payload debug' "sending" - Right answer <- sendIQ' (Just them) Get Nothing body - debug' "received" - let Right answerPayload = unpickleElem payloadP - (fromJust $ iqResultPayload answer) - expect debug' (invertPayload payload) answerPayload + answer <- sendIQ' (Just them) Get Nothing body + case answer of + IQResponseResult r -> do + debug' "received" + let Right answerPayload = unpickleElem payloadP + (fromJust $ iqResultPayload r) + expect debug' (invertPayload payload) answerPayload + IQResponseTimeout -> do + debug' $ "Timeout in packet: " ++ show count + IQResponseError e -> do + debug' $ "Error in packet: " ++ show count liftIO $ threadDelay 100000 sendUser "All tests done" debug' "ending session"