Browse Source

handle timeouts in sendIQ response processing

master
Philipp Balzarek 14 years ago
parent
commit
c89cf91a70
  1. 1
      source/Network/Xmpp.hs
  2. 46
      source/Network/Xmpp/Concurrent/IQ.hs
  3. 3
      source/Network/Xmpp/Concurrent/Threads.hs
  4. 2
      source/Network/Xmpp/Concurrent/Types.hs
  5. 4
      source/Network/Xmpp/Session.hs
  6. 11
      source/Network/Xmpp/Types.hs
  7. 11
      source/Network/Xmpp/Xep/ServiceDiscovery.hs
  8. 42
      tests/Tests.hs

1
source/Network/Xmpp.hs

@ -133,6 +133,7 @@ module Network.Xmpp @@ -133,6 +133,7 @@ module Network.Xmpp
, IQRequestType(..)
, IQResult(..)
, IQError(..)
, IQResponse(..)
, sendIQ
, sendIQ'
, answerIQ

46
source/Network/Xmpp/Concurrent/IQ.hs

@ -1,6 +1,8 @@ @@ -1,6 +1,8 @@
module Network.Xmpp.Concurrent.IQ where
import Control.Concurrent.STM
import Control.Concurrent (forkIO, threadDelay)
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Trans.Reader
@ -13,34 +15,48 @@ import Network.Xmpp.Types @@ -13,34 +15,48 @@ import Network.Xmpp.Types
-- | Sends an IQ, returns a 'TMVar' that will be filled with the first inbound
-- IQ with a matching ID that has type @result@ or @error@.
sendIQ :: Maybe Jid -- ^ Recipient (to)
sendIQ :: Maybe Int -- ^ Timeout
-> Maybe Jid -- ^ Recipient (to)
-> IQRequestType -- ^ IQ type (@Get@ or @Set@)
-> Maybe LangTag -- ^ Language tag of the payload (@Nothing@ for
-- default)
-> Element -- ^ The IQ body (there has to be exactly one)
-> Xmpp (TMVar IQResponse)
sendIQ to tp lang body = do -- TODO: Add timeout
newId <- liftIO =<< asks idGenerator
handlers <- asks iqHandlers
ref <- liftIO . atomically $ do
resRef <- newEmptyTMVar
(byNS, byId) <- readTVar handlers
writeTVar handlers (byNS, Map.insert newId resRef byId)
-- TODO: Check for id collisions (shouldn't happen?)
return resRef
sendStanza . IQRequestS $ IQRequest newId Nothing to lang tp body
return ref
-- | Like 'sendIQ', but waits for the answer IQ.
sendIQ timeOut to tp lang body = do -- TODO: Add timeout
newId <- liftIO =<< asks idGenerator
handlers <- asks iqHandlers
ref <- liftIO . atomically $ do
resRef <- newEmptyTMVar
(byNS, byId) <- readTVar handlers
writeTVar handlers (byNS, Map.insert newId resRef byId)
-- TODO: Check for id collisions (shouldn't happen?)
return resRef
sendStanza . IQRequestS $ IQRequest newId Nothing to lang tp body
case timeOut of
Nothing -> return ()
Just t -> void . liftIO . forkIO $ do
threadDelay t
doTimeOut handlers newId ref
return ref
where
doTimeOut handlers iqid var = atomically $ do
p <- tryPutTMVar var IQResponseTimeout
when p $ do
(byNS, byId) <- readTVar handlers
writeTVar handlers (byNS, Map.delete iqid byId)
return ()
-- | Like 'sendIQ', but waits for the answer IQ. Times out after 3 seconds
sendIQ' :: Maybe Jid
-> IQRequestType
-> Maybe LangTag
-> Element
-> Xmpp IQResponse
sendIQ' to tp lang body = do
ref <- sendIQ to tp lang body
ref <- sendIQ (Just 3000000) to tp lang body
liftIO . atomically $ takeTMVar ref
answerIQ :: IQRequestTicket
-> Either StanzaError (Maybe Element)
-> Xmpp Bool

3
source/Network/Xmpp/Concurrent/Threads.hs

@ -129,7 +129,8 @@ handleIQResponse handlers iq = do @@ -129,7 +129,8 @@ handleIQResponse handlers iq = do
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of
(Nothing, _) -> return () -- We are not supposed to send an error.
(Just tmvar, byID') -> do
_ <- tryPutTMVar tmvar iq -- Don't block.
let answer = either IQResponseError IQResponseResult iq
_ <- tryPutTMVar tmvar answer -- Don't block.
writeTVar handlers (byNS, byID')
where
iqID (Left err) = iqErrorID err

2
source/Network/Xmpp/Concurrent/Types.hs

@ -19,7 +19,7 @@ import Network.Xmpp.Types @@ -19,7 +19,7 @@ import Network.Xmpp.Types
-- Map between the IQ request type and the "query" namespace pair, and the TChan
-- for the IQ request and "sent" boolean pair.
type IQHandlers = (Map.Map (IQRequestType, Text) (TChan IQRequestTicket)
, Map.Map StanzaId (TMVar IQResponse)
, Map.Map StanzaId (TMVar (IQResponse))
)
-- Handlers to be run when the Xmpp session ends and when the Xmpp connection is

4
source/Network/Xmpp/Session.hs

@ -39,5 +39,5 @@ startSession :: Xmpp () @@ -39,5 +39,5 @@ startSession :: Xmpp ()
startSession = do
answer <- sendIQ' Nothing Set Nothing sessionXML
case answer of
Left e -> error $ show e
Right _ -> return ()
IQResponseResult _ -> return ()
e -> error $ show e

11
source/Network/Xmpp/Types.hs

@ -10,7 +10,7 @@ module Network.Xmpp.Types @@ -10,7 +10,7 @@ module Network.Xmpp.Types
( IQError(..)
, IQRequest(..)
, IQRequestType(..)
, IQResponse
, IQResponse(..)
, IQResult(..)
, IdGenerator(..)
, LangTag (..)
@ -111,9 +111,12 @@ instance Read IQRequestType where @@ -111,9 +111,12 @@ instance Read IQRequestType where
readsPrec _ "set" = [(Set, "")]
readsPrec _ _ = []
-- | A "response" Info/Query (IQ) stanza is either an 'IQError' or an IQ stanza
-- with the type "result" ('IQResult').
type IQResponse = Either IQError IQResult
-- | A "response" Info/Query (IQ) stanza is either an 'IQError', an IQ stanza
-- of type "result" ('IQResult') or a Timeout.
data IQResponse = IQResponseError IQError
| IQResponseResult IQResult
| IQResponseTimeout
deriving Show
-- | The (non-error) answer to an IQ request.
data IQResult = IQResult { iqResultID :: StanzaId

11
source/Network/Xmpp/Xep/ServiceDiscovery.hs

@ -27,6 +27,7 @@ import Network.Xmpp @@ -27,6 +27,7 @@ import Network.Xmpp
data DiscoError = DiscoNoQueryElement
| DiscoIQError IQError
| DiscoTimeout
| DiscoXMLError Element UnpickleError
deriving (Show)
@ -83,8 +84,9 @@ queryInfo :: Jid -- ^ Entity to query @@ -83,8 +84,9 @@ queryInfo :: Jid -- ^ Entity to query
queryInfo to node = do
res <- sendIQ' (Just to) Get Nothing queryBody
return $ case res of
Left e -> Left $ DiscoIQError e
Right r -> case iqResultPayload r of
IQResponseError e -> Left $ DiscoIQError e
IQResponseTimeout -> Left $ DiscoTimeout
IQResponseResult r -> case iqResultPayload r of
Nothing -> Left DiscoNoQueryElement
Just p -> case unpickleElem xpQueryInfo p of
Left e -> Left $ DiscoXMLError p e
@ -127,8 +129,9 @@ queryItems :: Jid -- ^ Entity to query @@ -127,8 +129,9 @@ queryItems :: Jid -- ^ Entity to query
queryItems to node = do
res <- sendIQ' (Just to) Get Nothing queryBody
return $ case res of
Left e -> Left $ DiscoIQError e
Right r -> case iqResultPayload r of
IQResponseError e -> Left $ DiscoIQError e
IQResponseTimeout -> Left $ DiscoTimeout
IQResponseResult r -> case iqResultPayload r of
Nothing -> Left DiscoNoQueryElement
Just p -> case unpickleElem xpQueryItems p of
Left e -> Left $ DiscoXMLError p e

42
tests/Tests.hs

@ -16,6 +16,7 @@ import Data.XML.Types @@ -16,6 +16,7 @@ import Data.XML.Types
import Network.Xmpp
import Network.Xmpp.IM.Presence
import Network.Xmpp.Pickle
import Network.Xmpp.Xep.ServiceDiscovery
import System.Environment
import Text.XML.Stream.Elements
@ -57,16 +58,17 @@ invertPayload (Payload count flag message) = Payload (count + 1) (not flag) (Tex @@ -57,16 +58,17 @@ invertPayload (Payload count flag message) = Payload (count + 1) (not flag) (Tex
iqResponder = do
chan' <- listenIQChan Get testNS
chan <- case chan' of
Nothing -> liftIO $ putStrLn "Channel was already taken"
Left _ -> liftIO $ putStrLn "Channel was already taken"
>> error "hanging up"
Just c -> return c
Right c -> return c
forever $ do
next <- liftIO . atomically $ readTChan chan
let Right payload = unpickleElem payloadP . iqRequestPayload $
iqRequestBody next
let answerPayload = invertPayload payload
let answerBody = pickleElem payloadP answerPayload
answerIQ next (Right $ Just answerBody)
unless (payloadCounter payload == 3) . void $
answerIQ next (Right $ Just answerBody)
when (payloadCounter payload == 10) $ do
liftIO $ threadDelay 1000000
endSession
@ -134,6 +136,24 @@ runMain debug number = do @@ -134,6 +136,24 @@ runMain debug number = do
sendPresence $ presenceSubscribe them
fork iqResponder
when active $ do
q <- queryInfo "species64739.dyndns.org" Nothing
case q of
Left (DiscoXMLError el e) -> do
debug' (ppElement el)
debug' (Text.unpack $ ppUnpickleError e)
debug' (show $ length $ elementNodes el)
x -> debug' $ show x
q <- queryItems "species64739.dyndns.org"
(Just "http://jabber.org/protocol/commands")
case q of
Left (DiscoXMLError el e) -> do
debug' (ppElement el)
debug' (Text.unpack $ ppUnpickleError e)
debug' (show $ length $ elementNodes el)
x -> debug' $ show x
liftIO $ threadDelay 1000000 -- Wait for the other thread to go online
void . fork $ do
forM [1..10] $ \count -> do
@ -141,11 +161,17 @@ runMain debug number = do @@ -141,11 +161,17 @@ runMain debug number = do
let payload = Payload count (even count) (Text.pack $ show count)
let body = pickleElem payloadP payload
debug' "sending"
Right answer <- sendIQ' (Just them) Get Nothing body
debug' "received"
let Right answerPayload = unpickleElem payloadP
(fromJust $ iqResultPayload answer)
expect debug' (invertPayload payload) answerPayload
answer <- sendIQ' (Just them) Get Nothing body
case answer of
IQResponseResult r -> do
debug' "received"
let Right answerPayload = unpickleElem payloadP
(fromJust $ iqResultPayload r)
expect debug' (invertPayload payload) answerPayload
IQResponseTimeout -> do
debug' $ "Timeout in packet: " ++ show count
IQResponseError e -> do
debug' $ "Error in packet: " ++ show count
liftIO $ threadDelay 100000
sendUser "All tests done"
debug' "ending session"

Loading…
Cancel
Save