From 1551e4c9841d2285fc71c448558f1b702b93b777 Mon Sep 17 00:00:00 2001
From: Philipp Balzarek
Date: Mon, 10 Jun 2013 18:32:47 +0200
Subject: [PATCH] drop outbound stanza buffering
Stanza send functionality now directly sends the stanzas over the wire and returns appropriate notification whether the sent succeeded (as far as we can tell)
---
source/Network/Xmpp/Concurrent.hs | 123 ++++++++++-----------
source/Network/Xmpp/Concurrent/Basic.hs | 29 +++--
source/Network/Xmpp/Concurrent/IQ.hs | 44 +++++---
source/Network/Xmpp/Concurrent/Message.hs | 4 +-
source/Network/Xmpp/Concurrent/Monad.hs | 4 +-
source/Network/Xmpp/Concurrent/Presence.hs | 2 +-
source/Network/Xmpp/Concurrent/Threads.hs | 29 ++---
source/Network/Xmpp/Concurrent/Types.hs | 13 ++-
source/Network/Xmpp/IM/Roster.hs | 25 +++--
source/Network/Xmpp/Types.hs | 9 +-
10 files changed, 155 insertions(+), 127 deletions(-)
diff --git a/source/Network/Xmpp/Concurrent.hs b/source/Network/Xmpp/Concurrent.hs
index a01cf82..059f9c6 100644
--- a/source/Network/Xmpp/Concurrent.hs
+++ b/source/Network/Xmpp/Concurrent.hs
@@ -10,17 +10,14 @@ module Network.Xmpp.Concurrent
, module Network.Xmpp.Concurrent.IQ
, StanzaHandler
, newSession
- , writeWorker
, session
, newStanzaID
) where
-import Control.Applicative((<$>),(<*>))
-import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Error
-import qualified Data.ByteString as BS
+import qualified Control.Exception as Ex
import qualified Data.Map as Map
import Data.Maybe
import Data.Text as Text
@@ -35,22 +32,20 @@ import Network.Xmpp.Concurrent.Threads
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.IM.Roster.Types
import Network.Xmpp.IM.Roster
-import Network.Xmpp.Marshal
import Network.Xmpp.Sasl
import Network.Xmpp.Sasl.Types
import Network.Xmpp.Stream
import Network.Xmpp.Tls
import Network.Xmpp.Types
-import Network.Xmpp.Utilities
import Control.Monad.State.Strict
-runHandlers :: (TChan Stanza) -> [StanzaHandler] -> Stanza -> IO ()
+runHandlers :: WriteSemaphore -> [StanzaHandler] -> Stanza -> IO ()
runHandlers _ [] _ = return ()
-runHandlers outC (h:hands) sta = do
- res <- h outC sta
+runHandlers sem (h:hands) sta = do
+ res <- h sem sta
case res of
- True -> runHandlers outC hands sta
+ True -> runHandlers sem hands sta
False -> return ()
toChan :: TChan Stanza -> StanzaHandler
@@ -61,7 +56,7 @@ toChan stanzaC _ sta = do
handleIQ :: TVar IQHandlers
-> StanzaHandler
-handleIQ iqHands outC sta = atomically $ do
+handleIQ iqHands writeSem sta = do
case sta of
IQRequestS i -> handleIQRequest iqHands i >> return False
IQResultS i -> handleIQResponse iqHands (Right i) >> return False
@@ -69,37 +64,49 @@ handleIQ iqHands outC sta = atomically $ do
_ -> return True
where
-- If the IQ request has a namespace, send it through the appropriate channel.
- handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
+ handleIQRequest :: TVar IQHandlers -> IQRequest -> IO ()
handleIQRequest handlers iq = do
- (byNS, _) <- readTVar handlers
- let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq)
- case Map.lookup (iqRequestType iq, iqNS) byNS of
- Nothing -> writeTChan outC $ serviceUnavailable iq
- Just ch -> do
- sentRef <- newTVar False
- let answerT answer = do
- let IQRequest iqid from _to lang _tp bd = iq
- response = case answer of
- Left er -> IQErrorS $ IQError iqid Nothing
- from lang er
- (Just bd)
- Right res -> IQResultS $ IQResult iqid Nothing
- from lang res
- atomically $ do
- sent <- readTVar sentRef
- case sent of
- False -> do
- writeTVar sentRef True
- writeTChan outC response
- return True
- True -> return False
- writeTChan ch $ IQRequestTicket answerT iq
+ out <- atomically $ do
+ (byNS, _) <- readTVar handlers
+ let iqNS = fromMaybe "" (nameNamespace . elementName
+ $ iqRequestPayload iq)
+ case Map.lookup (iqRequestType iq, iqNS) byNS of
+ Nothing -> return . Just $ serviceUnavailable iq
+ Just ch -> do
+ sentRef <- newTMVar False
+ let answerT answer = do
+ let IQRequest iqid from _to lang _tp bd = iq
+ response = case answer of
+ Left er -> IQErrorS $ IQError iqid Nothing
+ from lang er
+ (Just bd)
+ Right res -> IQResultS $ IQResult iqid Nothing
+ from lang res
+ Ex.bracketOnError (atomically $ takeTMVar sentRef)
+ (atomically . putTMVar sentRef)
+ $ \wasSent -> do
+ case wasSent of
+ True -> do
+ atomically $ putTMVar sentRef True
+ return Nothing
+ False -> do
+ didSend <- writeStanza writeSem response
+ case didSend of
+ True -> do
+ atomically $ putTMVar sentRef True
+ return $ Just True
+ False -> do
+ atomically $ putTMVar sentRef False
+ return $ Just False
+ writeTChan ch $ IQRequestTicket answerT iq
+ return Nothing
+ maybe (return ()) (void . writeStanza writeSem) out
serviceUnavailable (IQRequest iqid from _to lang _tp bd) =
IQErrorS $ IQError iqid Nothing from lang err (Just bd)
err = StanzaError Cancel ServiceUnavailable Nothing Nothing
- handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM ()
- handleIQResponse handlers iq = do
+ handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> IO ()
+ handleIQResponse handlers iq = atomically $ do
(byNS, byID) <- readTVar handlers
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of
(Nothing, _) -> return () -- We are not supposed to send an error.
@@ -114,51 +121,36 @@ handleIQ iqHands outC sta = atomically $ do
-- | Creates and initializes a new Xmpp context.
newSession :: Stream -> SessionConfiguration -> IO (Either XmppFailure Session)
newSession stream config = runErrorT $ do
- outC <- lift newTChanIO
+ write' <- liftIO $ withStream' (gets $ streamSend . streamHandle) stream
+ writeSem <- liftIO $ newTMVarIO write'
stanzaChan <- lift newTChanIO
iqHands <- lift $ newTVarIO (Map.empty, Map.empty)
eh <- lift $ newTVarIO $ EventHandlers { connectionClosedHandler = onConnectionClosed config }
ros <- liftIO . newTVarIO $ Roster Nothing Map.empty
let rosterH = if (enableRoster config) then handleRoster ros
else \ _ _ -> return True
- let stanzaHandler = runHandlers outC $ Prelude.concat [ [ toChan stanzaChan ]
- , extraStanzaHandlers
- config
- , [ handleIQ iqHands
- , rosterH
- ]
- ]
- (kill, wLock, streamState, reader) <- ErrorT $ startThreadsWith stanzaHandler eh stream
- writer <- lift $ forkIO $ writeWorker outC wLock
+ let stanzaHandler = runHandlers writeSem
+ $ Prelude.concat [ [ toChan stanzaChan ]
+ , extraStanzaHandlers
+ config
+ , [ handleIQ iqHands
+ , rosterH
+ ]
+ ]
+ (kill, wLock, streamState, reader) <- ErrorT $ startThreadsWith writeSem stanzaHandler eh stream
idGen <- liftIO $ sessionStanzaIDs config
return $ Session { stanzaCh = stanzaChan
- , outCh = outC
, iqHandlers = iqHands
- , writeRef = wLock
+ , writeSemaphore = wLock
, readerThread = reader
, idGenerator = idGen
, streamRef = streamState
, eventHandlers = eh
- , stopThreads = kill >> killThread writer
+ , stopThreads = kill
, conf = config
, rosterRef = ros
}
--- Worker to write stanzas to the stream concurrently.
-writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO ()
-writeWorker stCh writeR = forever $ do
- (write, next) <- atomically $ (,) <$>
- takeTMVar writeR <*>
- readTChan stCh
- let outData = renderElement $ nsHack (pickleElem xpStanza next)
- debugOut outData
- r <- write outData
- atomically $ putTMVar writeR write
- unless r $ do
- atomically $ unGetTChan stCh next -- If the writing failed, the
- -- connection is dead.
- threadDelay 250000 -- Avoid free spinning.
-
-- | Creates a 'Session' object by setting up a connection with an XMPP server.
--
-- Will connect to the specified host with the provided configuration. If the
@@ -186,4 +178,3 @@ session realm mbSasl config = runErrorT $ do
newStanzaID :: Session -> IO StanzaID
newStanzaID = idGenerator
-
diff --git a/source/Network/Xmpp/Concurrent/Basic.hs b/source/Network/Xmpp/Concurrent/Basic.hs
index b5d24d2..a590f64 100644
--- a/source/Network/Xmpp/Concurrent/Basic.hs
+++ b/source/Network/Xmpp/Concurrent/Basic.hs
@@ -1,15 +1,30 @@
{-# OPTIONS_HADDOCK hide #-}
module Network.Xmpp.Concurrent.Basic where
-import Control.Concurrent.STM
-import Network.Xmpp.Concurrent.Types
-import Network.Xmpp.Stream
-import Network.Xmpp.Types
-import Control.Monad.State.Strict
+import Control.Concurrent.STM
+import qualified Control.Exception as Ex
+import Control.Monad.State.Strict
+import qualified Data.ByteString as BS
+import Network.Xmpp.Concurrent.Types
+import Network.Xmpp.Marshal
+import Network.Xmpp.Stream
+import Network.Xmpp.Types
+import Network.Xmpp.Utilities
+
+semWrite :: WriteSemaphore -> BS.ByteString -> IO Bool
+semWrite sem bs = Ex.bracket (atomically $ takeTMVar sem)
+ (atomically . putTMVar sem)
+ ($ bs)
+
+writeStanza :: WriteSemaphore -> Stanza -> IO Bool
+writeStanza sem a = do
+ let outData = renderElement $ nsHack (pickleElem xpStanza a)
+ semWrite sem outData
-- | Send a stanza to the server.
-sendStanza :: Stanza -> Session -> IO ()
-sendStanza a session = atomically $ writeTChan (outCh session) a
+sendStanza :: Stanza -> Session -> IO Bool
+sendStanza a session = writeStanza (writeSemaphore session) a
+
-- | Get the channel of incoming stanzas.
getStanzaChan :: Session -> TChan Stanza
diff --git a/source/Network/Xmpp/Concurrent/IQ.hs b/source/Network/Xmpp/Concurrent/IQ.hs
index 4b6c462..17d3298 100644
--- a/source/Network/Xmpp/Concurrent/IQ.hs
+++ b/source/Network/Xmpp/Concurrent/IQ.hs
@@ -13,16 +13,20 @@ import Network.Xmpp.Concurrent.Basic
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Types
--- | Sends an IQ, returns a 'TMVar' that will be filled with the first inbound
--- IQ with a matching ID that has type @result@ or @error@.
-sendIQ :: Maybe Int -- ^ Timeout
+-- | Sends an IQ, returns Just a 'TMVar' that will be filled with the first
+-- inbound IQ with a matching ID that has type @result@ or @error@ or Nothing if
+-- the stanza could not be sent
+sendIQ :: Maybe Int -- ^ Timeout . When the timeout is reached the response
+ -- TMVar will be filled with 'IQResponseTimeout' and the id
+ -- is removed from the list of IQ handlers. 'Nothing'
+ -- deactivates the timeout
-> Maybe Jid -- ^ Recipient (to)
-> IQRequestType -- ^ IQ type (@Get@ or @Set@)
-> Maybe LangTag -- ^ Language tag of the payload (@Nothing@ for
-- default)
-> Element -- ^ The IQ body (there has to be exactly one)
-> Session
- -> IO (TMVar IQResponse)
+ -> IO (Maybe (TMVar IQResponse))
sendIQ timeOut to tp lang body session = do -- TODO: Add timeout
newId <- idGenerator session
ref <- atomically $ do
@@ -31,13 +35,16 @@ sendIQ timeOut to tp lang body session = do -- TODO: Add timeout
writeTVar (iqHandlers session) (byNS, Map.insert newId resRef byId)
-- TODO: Check for id collisions (shouldn't happen?)
return resRef
- sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) session
- case timeOut of
- Nothing -> return ()
- Just t -> void . forkIO $ do
- threadDelay t
- doTimeOut (iqHandlers session) newId ref
- return ref
+ res <- sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) session
+ if res
+ then do
+ case timeOut of
+ Nothing -> return ()
+ Just t -> void . forkIO $ do
+ threadDelay t
+ doTimeOut (iqHandlers session) newId ref
+ return $ Just ref
+ else return Nothing
where
doTimeOut handlers iqid var = atomically $ do
p <- tryPutTMVar var IQResponseTimeout
@@ -53,10 +60,10 @@ sendIQ' :: Maybe Jid
-> Maybe LangTag
-> Element
-> Session
- -> IO IQResponse
+ -> IO (Maybe IQResponse)
sendIQ' to tp lang body session = do
ref <- sendIQ (Just 3000000) to tp lang body session
- atomically $ takeTMVar ref
+ maybe (return Nothing) (fmap Just . atomically . takeTMVar) ref
-- | Retrieves an IQ listener channel. If the namespace/'IQRequestType' is not
@@ -103,7 +110,12 @@ dropIQChan tp ns session = do
writeTVar handlers (byNS', byID)
return ()
--- | Answer an IQ request. Only the first answer ist sent and then True is
--- returned. Subsequent answers are dropped and (False is returned in that case)
-answerIQ :: IQRequestTicket -> Either StanzaError (Maybe Element) -> IO Bool
+-- | Answer an IQ request. Only the first answer ist sent and Just True is
+-- returned when the answer is sucessfully sent. If an error occured during
+-- sending Just False is returned (and another attempt can be
+-- undertaken). Subsequent answers after the first sucessful one are dropped and
+-- (False is returned in that case)
+answerIQ :: IQRequestTicket
+ -> Either StanzaError (Maybe Element)
+ -> IO (Maybe Bool)
answerIQ ticket = answerTicket ticket
diff --git a/source/Network/Xmpp/Concurrent/Message.hs b/source/Network/Xmpp/Concurrent/Message.hs
index 234484c..8413593 100644
--- a/source/Network/Xmpp/Concurrent/Message.hs
+++ b/source/Network/Xmpp/Concurrent/Message.hs
@@ -52,6 +52,6 @@ filterMessages f g session = do
Right m | g m -> return $ Right m
| otherwise -> filterMessages f g session
--- | Send a message stanza.
-sendMessage :: Message -> Session -> IO ()
+-- | Send a message stanza. Returns False when the Message could not be sentx
+sendMessage :: Message -> Session -> IO Bool
sendMessage m session = sendStanza (MessageS m) session
diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs
index 343a588..2545164 100644
--- a/source/Network/Xmpp/Concurrent/Monad.hs
+++ b/source/Network/Xmpp/Concurrent/Monad.hs
@@ -32,7 +32,7 @@ withConnection a session = do
-- fetches an updated state.
s <- Ex.catch
(atomically $ do
- _ <- takeTMVar (writeRef session)
+ _ <- takeTMVar (writeSemaphore session)
s <- takeTMVar (streamRef session)
putTMVar wait ()
return s
@@ -48,7 +48,7 @@ withConnection a session = do
(res, s') <- a s
wl <- withStream' (gets $ streamSend . streamHandle) s'
atomically $ do
- putTMVar (writeRef session) wl
+ putTMVar (writeSemaphore session) wl
putTMVar (streamRef session) s'
return $ Right res
)
diff --git a/source/Network/Xmpp/Concurrent/Presence.hs b/source/Network/Xmpp/Concurrent/Presence.hs
index cb6a502..6c08298 100644
--- a/source/Network/Xmpp/Concurrent/Presence.hs
+++ b/source/Network/Xmpp/Concurrent/Presence.hs
@@ -27,5 +27,5 @@ waitForPresence f session = do
| otherwise -> waitForPresence f session
-- | Send a presence stanza.
-sendPresence :: Presence -> Session -> IO ()
+sendPresence :: Presence -> Session -> IO Bool
sendPresence p session = sendStanza (PresenceS p) session
diff --git a/source/Network/Xmpp/Concurrent/Threads.hs b/source/Network/Xmpp/Concurrent/Threads.hs
index 6ab5934..aa9cc50 100644
--- a/source/Network/Xmpp/Concurrent/Threads.hs
+++ b/source/Network/Xmpp/Concurrent/Threads.hs
@@ -10,7 +10,6 @@ import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.Error
-import Control.Monad.State.Strict
import qualified Data.ByteString as BS
import GHC.IO (unsafeUnmask)
import Network.Xmpp.Concurrent.Types
@@ -90,28 +89,30 @@ readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do
-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing
-- stances, respectively, and an Action to stop the Threads and close the
-- connection.
-startThreadsWith :: (Stanza -> IO ())
+startThreadsWith :: TMVar (BS.ByteString -> IO Bool)
+ -> (Stanza -> IO ())
-> TVar EventHandlers
-> Stream
-> IO (Either XmppFailure (IO (),
TMVar (BS.ByteString -> IO Bool),
TMVar Stream,
ThreadId))
-startThreadsWith stanzaHandler eh con = do
- read' <- withStream' (gets $ streamSend . streamHandle) con
- writeLock <- newTMVarIO read'
+startThreadsWith writeSem stanzaHandler eh con = do
+ -- read' <- withStream' (gets $ streamSend . streamHandle) con
+ -- writeSem <- newTMVarIO read'
conS <- newTMVarIO con
- -- lw <- forkIO $ writeWorker outC writeLock
- cp <- forkIO $ connPersist writeLock
+ cp <- forkIO $ connPersist writeSem
rdw <- forkIO $ readWorker stanzaHandler (noCon eh) conS
- return $ Right ( killConnection writeLock [rdw, cp]
- , writeLock
+ return $ Right ( killConnection [rdw, cp]
+ , writeSem
, conS
, rdw
)
where
- killConnection writeLock threads = liftIO $ do
- _ <- atomically $ takeTMVar writeLock -- Should we put it back?
+ killConnection threads = liftIO $ do
+ _ <- atomically $ do
+ _ <- takeTMVar writeSem
+ putTMVar writeSem $ \_ -> return False
_ <- forM threads killThread
return ()
-- Call the connection closed handlers.
@@ -124,8 +125,8 @@ startThreadsWith stanzaHandler eh con = do
-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every 30 seconds to keep the connection alive.
connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO ()
-connPersist lock = forever $ do
- pushBS <- atomically $ takeTMVar lock
+connPersist sem = forever $ do
+ pushBS <- atomically $ takeTMVar sem
_ <- pushBS " "
- atomically $ putTMVar lock pushBS
+ atomically $ putTMVar sem pushBS
threadDelay 30000000 -- 30s
diff --git a/source/Network/Xmpp/Concurrent/Types.hs b/source/Network/Xmpp/Concurrent/Types.hs
index 9491b93..d5a1c6a 100644
--- a/source/Network/Xmpp/Concurrent/Types.hs
+++ b/source/Network/Xmpp/Concurrent/Types.hs
@@ -54,21 +54,21 @@ instance Show Interrupt where show _ = ""
instance Ex.Exception Interrupt
+type WriteSemaphore = TMVar (BS.ByteString -> IO Bool)
-- | A concurrent interface to Pontarius XMPP.
data Session = Session
{ stanzaCh :: TChan Stanza -- All stanzas
- , outCh :: TChan Stanza
, iqHandlers :: TVar IQHandlers
-- Writing lock, so that only one thread could write to the stream at any
-- given time.
-- Fields below are from Context.
- , writeRef :: TMVar (BS.ByteString -> IO Bool)
+ , writeSemaphore :: WriteSemaphore
, readerThread :: ThreadId
, idGenerator :: IO StanzaID
-- | Lock (used by withStream) to make sure that a maximum of one
-- Stream action is executed at any given time.
- , streamRef :: TMVar (Stream)
+ , streamRef :: TMVar Stream
, eventHandlers :: TVar EventHandlers
, stopThreads :: IO ()
, rosterRef :: TVar Roster
@@ -83,7 +83,12 @@ type IQHandlers = (Map.Map (IQRequestType, Text) (TChan IQRequestTicket)
-- | Contains whether or not a reply has been sent, and the IQ request body to
-- reply to.
+
data IQRequestTicket = IQRequestTicket
- { answerTicket :: Either StanzaError (Maybe Element) -> IO Bool
+ { answerTicket :: Either StanzaError (Maybe Element) -> IO (Maybe Bool)
+ -- ^ Return Nothing when the IQ request was already
+ -- answered before, Just True when it was sucessfully
+ -- answered and Just False when the answer was attempted,
+ -- but failed (e.g. there is a connection failure)
, iqRequestBody :: IQRequest
}
diff --git a/source/Network/Xmpp/IM/Roster.hs b/source/Network/Xmpp/IM/Roster.hs
index 2f0dacf..2e2a9a3 100644
--- a/source/Network/Xmpp/IM/Roster.hs
+++ b/source/Network/Xmpp/IM/Roster.hs
@@ -24,7 +24,7 @@ import Network.Xmpp.Types
-- | Push a roster item to the server. The values for approved and ask are
-- ignored and all values for subsciption except "remove" are ignored
-rosterPush :: Item -> Session -> IO IQResponse
+rosterPush :: Item -> Session -> IO (Maybe IQResponse)
rosterPush item session = do
let el = pickleElem xpQuery (Query Nothing [fromItem item])
sendIQ' Nothing Set Nothing el session
@@ -36,7 +36,7 @@ rosterAdd :: Jid -- ^ JID of the item
-> Maybe Text -- ^ Name alias
-> [Text] -- ^ Groups (duplicates will be removed)
-> Session
- -> IO IQResponse
+ -> IO (Maybe IQResponse)
rosterAdd j n gs session = do
let el = pickleElem xpQuery (Query Nothing
[QueryItem { qiApproved = Nothing
@@ -58,7 +58,7 @@ rosterRemove j sess = do
Just _ -> do
res <- rosterPush (Item False False j Nothing Remove []) sess
case res of
- IQResponseResult IQResult{} -> return True
+ Just (IQResponseResult IQResult{}) -> return True
_ -> return False
-- | Retrieve the current Roster state
@@ -76,8 +76,8 @@ initRoster session = do
"Server did not return a roster"
Just roster -> atomically $ writeTVar (rosterRef session) roster
-handleRoster :: TVar Roster -> TChan Stanza -> Stanza -> IO Bool
-handleRoster ref outC sta = case sta of
+handleRoster :: TVar Roster -> WriteSemaphore -> Stanza -> IO Bool
+handleRoster ref sem sta = case sta of
IQRequestS (iqr@IQRequest{iqRequestPayload =
iqb@Element{elementName = en}})
| nameNamespace en == Just "jabber:iq:roster" -> do
@@ -89,11 +89,11 @@ handleRoster ref outC sta = case sta of
, queryItems = [update]
} -> do
handleUpdate v update
- atomically . writeTChan outC $ result iqr
+ _ <- writeStanza sem $ result iqr
return False
_ -> do
errorM "Pontarius.Xmpp" "Invalid roster query"
- atomically . writeTChan outC $ badRequest iqr
+ _ <- writeStanza sem $ badRequest iqr
return False
_ -> return True
where
@@ -120,19 +120,22 @@ retrieveRoster mbOldRoster sess = do
(pickleElem xpQuery (Query version []))
sess
case res of
- IQResponseResult (IQResult{iqResultPayload = Just ros})
+ Nothing -> do
+ errorM "Pontarius.Xmpp.Roster" "getRoster: sending stanza failed"
+ return Nothing
+ Just (IQResponseResult (IQResult{iqResultPayload = Just ros}))
-> case unpickleElem xpQuery ros of
Left _e -> do
errorM "Pontarius.Xmpp.Roster" "getRoster: invalid query element"
return Nothing
Right ros' -> return . Just $ toRoster ros'
- IQResponseResult (IQResult{iqResultPayload = Nothing}) -> do
+ Just (IQResponseResult (IQResult{iqResultPayload = Nothing})) -> do
return mbOldRoster
-- sever indicated that no roster updates are necessary
- IQResponseTimeout -> do
+ Just IQResponseTimeout -> do
errorM "Pontarius.Xmpp.Roster" "getRoster: request timed out"
return Nothing
- IQResponseError e -> do
+ Just (IQResponseError e) -> do
errorM "Pontarius.Xmpp.Roster" $ "getRoster: server returned error"
++ show e
return Nothing
diff --git a/source/Network/Xmpp/Types.hs b/source/Network/Xmpp/Types.hs
index 67d2343..2d93f12 100644
--- a/source/Network/Xmpp/Types.hs
+++ b/source/Network/Xmpp/Types.hs
@@ -838,7 +838,8 @@ data ConnectionState
-- | Defines operations for sending, receiving, flushing, and closing on a
-- stream.
data StreamHandle =
- StreamHandle { streamSend :: BS.ByteString -> IO Bool
+ StreamHandle { streamSend :: BS.ByteString -> IO Bool -- ^ Sends may not
+ -- interleave
, streamReceive :: Int -> IO BS.ByteString
-- This is to hold the state of the XML parser (otherwise we
-- will receive EventBeginDocument events and forget about
@@ -849,8 +850,8 @@ data StreamHandle =
data StreamState = StreamState
{ -- | State of the stream - 'Closed', 'Plain', or 'Secured'
- streamConnectionState :: !ConnectionState -- ^ State of connection
- -- | Functions to send, receive, flush, and close on the stream
+ streamConnectionState :: !ConnectionState
+ -- | Functions to send, receive, flush, and close the stream
, streamHandle :: StreamHandle
-- | Event conduit source, and its associated finalizer
, streamEventSource :: Source IO Event
@@ -1163,7 +1164,7 @@ instance Default StreamConfiguration where
}
}
-type StanzaHandler = TChan Stanza -- ^ outgoing stanza
+type StanzaHandler = TMVar (BS.ByteString -> IO Bool) -- ^ outgoing stanza
-> Stanza -- ^ stanza to handle
-> IO Bool -- ^ True when processing should continue