diff --git a/source/Network/Xmpp/Concurrent.hs b/source/Network/Xmpp/Concurrent.hs index a01cf82..059f9c6 100644 --- a/source/Network/Xmpp/Concurrent.hs +++ b/source/Network/Xmpp/Concurrent.hs @@ -10,17 +10,14 @@ module Network.Xmpp.Concurrent , module Network.Xmpp.Concurrent.IQ , StanzaHandler , newSession - , writeWorker , session , newStanzaID ) where -import Control.Applicative((<$>),(<*>)) -import Control.Concurrent import Control.Concurrent.STM import Control.Monad import Control.Monad.Error -import qualified Data.ByteString as BS +import qualified Control.Exception as Ex import qualified Data.Map as Map import Data.Maybe import Data.Text as Text @@ -35,22 +32,20 @@ import Network.Xmpp.Concurrent.Threads import Network.Xmpp.Concurrent.Types import Network.Xmpp.IM.Roster.Types import Network.Xmpp.IM.Roster -import Network.Xmpp.Marshal import Network.Xmpp.Sasl import Network.Xmpp.Sasl.Types import Network.Xmpp.Stream import Network.Xmpp.Tls import Network.Xmpp.Types -import Network.Xmpp.Utilities import Control.Monad.State.Strict -runHandlers :: (TChan Stanza) -> [StanzaHandler] -> Stanza -> IO () +runHandlers :: WriteSemaphore -> [StanzaHandler] -> Stanza -> IO () runHandlers _ [] _ = return () -runHandlers outC (h:hands) sta = do - res <- h outC sta +runHandlers sem (h:hands) sta = do + res <- h sem sta case res of - True -> runHandlers outC hands sta + True -> runHandlers sem hands sta False -> return () toChan :: TChan Stanza -> StanzaHandler @@ -61,7 +56,7 @@ toChan stanzaC _ sta = do handleIQ :: TVar IQHandlers -> StanzaHandler -handleIQ iqHands outC sta = atomically $ do +handleIQ iqHands writeSem sta = do case sta of IQRequestS i -> handleIQRequest iqHands i >> return False IQResultS i -> handleIQResponse iqHands (Right i) >> return False @@ -69,37 +64,49 @@ handleIQ iqHands outC sta = atomically $ do _ -> return True where -- If the IQ request has a namespace, send it through the appropriate channel. - handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () + handleIQRequest :: TVar IQHandlers -> IQRequest -> IO () handleIQRequest handlers iq = do - (byNS, _) <- readTVar handlers - let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq) - case Map.lookup (iqRequestType iq, iqNS) byNS of - Nothing -> writeTChan outC $ serviceUnavailable iq - Just ch -> do - sentRef <- newTVar False - let answerT answer = do - let IQRequest iqid from _to lang _tp bd = iq - response = case answer of - Left er -> IQErrorS $ IQError iqid Nothing - from lang er - (Just bd) - Right res -> IQResultS $ IQResult iqid Nothing - from lang res - atomically $ do - sent <- readTVar sentRef - case sent of - False -> do - writeTVar sentRef True - writeTChan outC response - return True - True -> return False - writeTChan ch $ IQRequestTicket answerT iq + out <- atomically $ do + (byNS, _) <- readTVar handlers + let iqNS = fromMaybe "" (nameNamespace . elementName + $ iqRequestPayload iq) + case Map.lookup (iqRequestType iq, iqNS) byNS of + Nothing -> return . Just $ serviceUnavailable iq + Just ch -> do + sentRef <- newTMVar False + let answerT answer = do + let IQRequest iqid from _to lang _tp bd = iq + response = case answer of + Left er -> IQErrorS $ IQError iqid Nothing + from lang er + (Just bd) + Right res -> IQResultS $ IQResult iqid Nothing + from lang res + Ex.bracketOnError (atomically $ takeTMVar sentRef) + (atomically . putTMVar sentRef) + $ \wasSent -> do + case wasSent of + True -> do + atomically $ putTMVar sentRef True + return Nothing + False -> do + didSend <- writeStanza writeSem response + case didSend of + True -> do + atomically $ putTMVar sentRef True + return $ Just True + False -> do + atomically $ putTMVar sentRef False + return $ Just False + writeTChan ch $ IQRequestTicket answerT iq + return Nothing + maybe (return ()) (void . writeStanza writeSem) out serviceUnavailable (IQRequest iqid from _to lang _tp bd) = IQErrorS $ IQError iqid Nothing from lang err (Just bd) err = StanzaError Cancel ServiceUnavailable Nothing Nothing - handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM () - handleIQResponse handlers iq = do + handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> IO () + handleIQResponse handlers iq = atomically $ do (byNS, byID) <- readTVar handlers case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of (Nothing, _) -> return () -- We are not supposed to send an error. @@ -114,51 +121,36 @@ handleIQ iqHands outC sta = atomically $ do -- | Creates and initializes a new Xmpp context. newSession :: Stream -> SessionConfiguration -> IO (Either XmppFailure Session) newSession stream config = runErrorT $ do - outC <- lift newTChanIO + write' <- liftIO $ withStream' (gets $ streamSend . streamHandle) stream + writeSem <- liftIO $ newTMVarIO write' stanzaChan <- lift newTChanIO iqHands <- lift $ newTVarIO (Map.empty, Map.empty) eh <- lift $ newTVarIO $ EventHandlers { connectionClosedHandler = onConnectionClosed config } ros <- liftIO . newTVarIO $ Roster Nothing Map.empty let rosterH = if (enableRoster config) then handleRoster ros else \ _ _ -> return True - let stanzaHandler = runHandlers outC $ Prelude.concat [ [ toChan stanzaChan ] - , extraStanzaHandlers - config - , [ handleIQ iqHands - , rosterH - ] - ] - (kill, wLock, streamState, reader) <- ErrorT $ startThreadsWith stanzaHandler eh stream - writer <- lift $ forkIO $ writeWorker outC wLock + let stanzaHandler = runHandlers writeSem + $ Prelude.concat [ [ toChan stanzaChan ] + , extraStanzaHandlers + config + , [ handleIQ iqHands + , rosterH + ] + ] + (kill, wLock, streamState, reader) <- ErrorT $ startThreadsWith writeSem stanzaHandler eh stream idGen <- liftIO $ sessionStanzaIDs config return $ Session { stanzaCh = stanzaChan - , outCh = outC , iqHandlers = iqHands - , writeRef = wLock + , writeSemaphore = wLock , readerThread = reader , idGenerator = idGen , streamRef = streamState , eventHandlers = eh - , stopThreads = kill >> killThread writer + , stopThreads = kill , conf = config , rosterRef = ros } --- Worker to write stanzas to the stream concurrently. -writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO () -writeWorker stCh writeR = forever $ do - (write, next) <- atomically $ (,) <$> - takeTMVar writeR <*> - readTChan stCh - let outData = renderElement $ nsHack (pickleElem xpStanza next) - debugOut outData - r <- write outData - atomically $ putTMVar writeR write - unless r $ do - atomically $ unGetTChan stCh next -- If the writing failed, the - -- connection is dead. - threadDelay 250000 -- Avoid free spinning. - -- | Creates a 'Session' object by setting up a connection with an XMPP server. -- -- Will connect to the specified host with the provided configuration. If the @@ -186,4 +178,3 @@ session realm mbSasl config = runErrorT $ do newStanzaID :: Session -> IO StanzaID newStanzaID = idGenerator - diff --git a/source/Network/Xmpp/Concurrent/Basic.hs b/source/Network/Xmpp/Concurrent/Basic.hs index b5d24d2..a590f64 100644 --- a/source/Network/Xmpp/Concurrent/Basic.hs +++ b/source/Network/Xmpp/Concurrent/Basic.hs @@ -1,15 +1,30 @@ {-# OPTIONS_HADDOCK hide #-} module Network.Xmpp.Concurrent.Basic where -import Control.Concurrent.STM -import Network.Xmpp.Concurrent.Types -import Network.Xmpp.Stream -import Network.Xmpp.Types -import Control.Monad.State.Strict +import Control.Concurrent.STM +import qualified Control.Exception as Ex +import Control.Monad.State.Strict +import qualified Data.ByteString as BS +import Network.Xmpp.Concurrent.Types +import Network.Xmpp.Marshal +import Network.Xmpp.Stream +import Network.Xmpp.Types +import Network.Xmpp.Utilities + +semWrite :: WriteSemaphore -> BS.ByteString -> IO Bool +semWrite sem bs = Ex.bracket (atomically $ takeTMVar sem) + (atomically . putTMVar sem) + ($ bs) + +writeStanza :: WriteSemaphore -> Stanza -> IO Bool +writeStanza sem a = do + let outData = renderElement $ nsHack (pickleElem xpStanza a) + semWrite sem outData -- | Send a stanza to the server. -sendStanza :: Stanza -> Session -> IO () -sendStanza a session = atomically $ writeTChan (outCh session) a +sendStanza :: Stanza -> Session -> IO Bool +sendStanza a session = writeStanza (writeSemaphore session) a + -- | Get the channel of incoming stanzas. getStanzaChan :: Session -> TChan Stanza diff --git a/source/Network/Xmpp/Concurrent/IQ.hs b/source/Network/Xmpp/Concurrent/IQ.hs index 4b6c462..17d3298 100644 --- a/source/Network/Xmpp/Concurrent/IQ.hs +++ b/source/Network/Xmpp/Concurrent/IQ.hs @@ -13,16 +13,20 @@ import Network.Xmpp.Concurrent.Basic import Network.Xmpp.Concurrent.Types import Network.Xmpp.Types --- | Sends an IQ, returns a 'TMVar' that will be filled with the first inbound --- IQ with a matching ID that has type @result@ or @error@. -sendIQ :: Maybe Int -- ^ Timeout +-- | Sends an IQ, returns Just a 'TMVar' that will be filled with the first +-- inbound IQ with a matching ID that has type @result@ or @error@ or Nothing if +-- the stanza could not be sent +sendIQ :: Maybe Int -- ^ Timeout . When the timeout is reached the response + -- TMVar will be filled with 'IQResponseTimeout' and the id + -- is removed from the list of IQ handlers. 'Nothing' + -- deactivates the timeout -> Maybe Jid -- ^ Recipient (to) -> IQRequestType -- ^ IQ type (@Get@ or @Set@) -> Maybe LangTag -- ^ Language tag of the payload (@Nothing@ for -- default) -> Element -- ^ The IQ body (there has to be exactly one) -> Session - -> IO (TMVar IQResponse) + -> IO (Maybe (TMVar IQResponse)) sendIQ timeOut to tp lang body session = do -- TODO: Add timeout newId <- idGenerator session ref <- atomically $ do @@ -31,13 +35,16 @@ sendIQ timeOut to tp lang body session = do -- TODO: Add timeout writeTVar (iqHandlers session) (byNS, Map.insert newId resRef byId) -- TODO: Check for id collisions (shouldn't happen?) return resRef - sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) session - case timeOut of - Nothing -> return () - Just t -> void . forkIO $ do - threadDelay t - doTimeOut (iqHandlers session) newId ref - return ref + res <- sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) session + if res + then do + case timeOut of + Nothing -> return () + Just t -> void . forkIO $ do + threadDelay t + doTimeOut (iqHandlers session) newId ref + return $ Just ref + else return Nothing where doTimeOut handlers iqid var = atomically $ do p <- tryPutTMVar var IQResponseTimeout @@ -53,10 +60,10 @@ sendIQ' :: Maybe Jid -> Maybe LangTag -> Element -> Session - -> IO IQResponse + -> IO (Maybe IQResponse) sendIQ' to tp lang body session = do ref <- sendIQ (Just 3000000) to tp lang body session - atomically $ takeTMVar ref + maybe (return Nothing) (fmap Just . atomically . takeTMVar) ref -- | Retrieves an IQ listener channel. If the namespace/'IQRequestType' is not @@ -103,7 +110,12 @@ dropIQChan tp ns session = do writeTVar handlers (byNS', byID) return () --- | Answer an IQ request. Only the first answer ist sent and then True is --- returned. Subsequent answers are dropped and (False is returned in that case) -answerIQ :: IQRequestTicket -> Either StanzaError (Maybe Element) -> IO Bool +-- | Answer an IQ request. Only the first answer ist sent and Just True is +-- returned when the answer is sucessfully sent. If an error occured during +-- sending Just False is returned (and another attempt can be +-- undertaken). Subsequent answers after the first sucessful one are dropped and +-- (False is returned in that case) +answerIQ :: IQRequestTicket + -> Either StanzaError (Maybe Element) + -> IO (Maybe Bool) answerIQ ticket = answerTicket ticket diff --git a/source/Network/Xmpp/Concurrent/Message.hs b/source/Network/Xmpp/Concurrent/Message.hs index 234484c..8413593 100644 --- a/source/Network/Xmpp/Concurrent/Message.hs +++ b/source/Network/Xmpp/Concurrent/Message.hs @@ -52,6 +52,6 @@ filterMessages f g session = do Right m | g m -> return $ Right m | otherwise -> filterMessages f g session --- | Send a message stanza. -sendMessage :: Message -> Session -> IO () +-- | Send a message stanza. Returns False when the Message could not be sentx +sendMessage :: Message -> Session -> IO Bool sendMessage m session = sendStanza (MessageS m) session diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs index 343a588..2545164 100644 --- a/source/Network/Xmpp/Concurrent/Monad.hs +++ b/source/Network/Xmpp/Concurrent/Monad.hs @@ -32,7 +32,7 @@ withConnection a session = do -- fetches an updated state. s <- Ex.catch (atomically $ do - _ <- takeTMVar (writeRef session) + _ <- takeTMVar (writeSemaphore session) s <- takeTMVar (streamRef session) putTMVar wait () return s @@ -48,7 +48,7 @@ withConnection a session = do (res, s') <- a s wl <- withStream' (gets $ streamSend . streamHandle) s' atomically $ do - putTMVar (writeRef session) wl + putTMVar (writeSemaphore session) wl putTMVar (streamRef session) s' return $ Right res ) diff --git a/source/Network/Xmpp/Concurrent/Presence.hs b/source/Network/Xmpp/Concurrent/Presence.hs index cb6a502..6c08298 100644 --- a/source/Network/Xmpp/Concurrent/Presence.hs +++ b/source/Network/Xmpp/Concurrent/Presence.hs @@ -27,5 +27,5 @@ waitForPresence f session = do | otherwise -> waitForPresence f session -- | Send a presence stanza. -sendPresence :: Presence -> Session -> IO () +sendPresence :: Presence -> Session -> IO Bool sendPresence p session = sendStanza (PresenceS p) session diff --git a/source/Network/Xmpp/Concurrent/Threads.hs b/source/Network/Xmpp/Concurrent/Threads.hs index 6ab5934..aa9cc50 100644 --- a/source/Network/Xmpp/Concurrent/Threads.hs +++ b/source/Network/Xmpp/Concurrent/Threads.hs @@ -10,7 +10,6 @@ import Control.Concurrent.STM import qualified Control.Exception.Lifted as Ex import Control.Monad import Control.Monad.Error -import Control.Monad.State.Strict import qualified Data.ByteString as BS import GHC.IO (unsafeUnmask) import Network.Xmpp.Concurrent.Types @@ -90,28 +89,30 @@ readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do -- | Runs thread in XmppState monad. Returns channel of incoming and outgoing -- stances, respectively, and an Action to stop the Threads and close the -- connection. -startThreadsWith :: (Stanza -> IO ()) +startThreadsWith :: TMVar (BS.ByteString -> IO Bool) + -> (Stanza -> IO ()) -> TVar EventHandlers -> Stream -> IO (Either XmppFailure (IO (), TMVar (BS.ByteString -> IO Bool), TMVar Stream, ThreadId)) -startThreadsWith stanzaHandler eh con = do - read' <- withStream' (gets $ streamSend . streamHandle) con - writeLock <- newTMVarIO read' +startThreadsWith writeSem stanzaHandler eh con = do + -- read' <- withStream' (gets $ streamSend . streamHandle) con + -- writeSem <- newTMVarIO read' conS <- newTMVarIO con - -- lw <- forkIO $ writeWorker outC writeLock - cp <- forkIO $ connPersist writeLock + cp <- forkIO $ connPersist writeSem rdw <- forkIO $ readWorker stanzaHandler (noCon eh) conS - return $ Right ( killConnection writeLock [rdw, cp] - , writeLock + return $ Right ( killConnection [rdw, cp] + , writeSem , conS , rdw ) where - killConnection writeLock threads = liftIO $ do - _ <- atomically $ takeTMVar writeLock -- Should we put it back? + killConnection threads = liftIO $ do + _ <- atomically $ do + _ <- takeTMVar writeSem + putTMVar writeSem $ \_ -> return False _ <- forM threads killThread return () -- Call the connection closed handlers. @@ -124,8 +125,8 @@ startThreadsWith stanzaHandler eh con = do -- Acquires the write lock, pushes a space, and releases the lock. -- | Sends a blank space every 30 seconds to keep the connection alive. connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO () -connPersist lock = forever $ do - pushBS <- atomically $ takeTMVar lock +connPersist sem = forever $ do + pushBS <- atomically $ takeTMVar sem _ <- pushBS " " - atomically $ putTMVar lock pushBS + atomically $ putTMVar sem pushBS threadDelay 30000000 -- 30s diff --git a/source/Network/Xmpp/Concurrent/Types.hs b/source/Network/Xmpp/Concurrent/Types.hs index 9491b93..d5a1c6a 100644 --- a/source/Network/Xmpp/Concurrent/Types.hs +++ b/source/Network/Xmpp/Concurrent/Types.hs @@ -54,21 +54,21 @@ instance Show Interrupt where show _ = "" instance Ex.Exception Interrupt +type WriteSemaphore = TMVar (BS.ByteString -> IO Bool) -- | A concurrent interface to Pontarius XMPP. data Session = Session { stanzaCh :: TChan Stanza -- All stanzas - , outCh :: TChan Stanza , iqHandlers :: TVar IQHandlers -- Writing lock, so that only one thread could write to the stream at any -- given time. -- Fields below are from Context. - , writeRef :: TMVar (BS.ByteString -> IO Bool) + , writeSemaphore :: WriteSemaphore , readerThread :: ThreadId , idGenerator :: IO StanzaID -- | Lock (used by withStream) to make sure that a maximum of one -- Stream action is executed at any given time. - , streamRef :: TMVar (Stream) + , streamRef :: TMVar Stream , eventHandlers :: TVar EventHandlers , stopThreads :: IO () , rosterRef :: TVar Roster @@ -83,7 +83,12 @@ type IQHandlers = (Map.Map (IQRequestType, Text) (TChan IQRequestTicket) -- | Contains whether or not a reply has been sent, and the IQ request body to -- reply to. + data IQRequestTicket = IQRequestTicket - { answerTicket :: Either StanzaError (Maybe Element) -> IO Bool + { answerTicket :: Either StanzaError (Maybe Element) -> IO (Maybe Bool) + -- ^ Return Nothing when the IQ request was already + -- answered before, Just True when it was sucessfully + -- answered and Just False when the answer was attempted, + -- but failed (e.g. there is a connection failure) , iqRequestBody :: IQRequest } diff --git a/source/Network/Xmpp/IM/Roster.hs b/source/Network/Xmpp/IM/Roster.hs index 2f0dacf..2e2a9a3 100644 --- a/source/Network/Xmpp/IM/Roster.hs +++ b/source/Network/Xmpp/IM/Roster.hs @@ -24,7 +24,7 @@ import Network.Xmpp.Types -- | Push a roster item to the server. The values for approved and ask are -- ignored and all values for subsciption except "remove" are ignored -rosterPush :: Item -> Session -> IO IQResponse +rosterPush :: Item -> Session -> IO (Maybe IQResponse) rosterPush item session = do let el = pickleElem xpQuery (Query Nothing [fromItem item]) sendIQ' Nothing Set Nothing el session @@ -36,7 +36,7 @@ rosterAdd :: Jid -- ^ JID of the item -> Maybe Text -- ^ Name alias -> [Text] -- ^ Groups (duplicates will be removed) -> Session - -> IO IQResponse + -> IO (Maybe IQResponse) rosterAdd j n gs session = do let el = pickleElem xpQuery (Query Nothing [QueryItem { qiApproved = Nothing @@ -58,7 +58,7 @@ rosterRemove j sess = do Just _ -> do res <- rosterPush (Item False False j Nothing Remove []) sess case res of - IQResponseResult IQResult{} -> return True + Just (IQResponseResult IQResult{}) -> return True _ -> return False -- | Retrieve the current Roster state @@ -76,8 +76,8 @@ initRoster session = do "Server did not return a roster" Just roster -> atomically $ writeTVar (rosterRef session) roster -handleRoster :: TVar Roster -> TChan Stanza -> Stanza -> IO Bool -handleRoster ref outC sta = case sta of +handleRoster :: TVar Roster -> WriteSemaphore -> Stanza -> IO Bool +handleRoster ref sem sta = case sta of IQRequestS (iqr@IQRequest{iqRequestPayload = iqb@Element{elementName = en}}) | nameNamespace en == Just "jabber:iq:roster" -> do @@ -89,11 +89,11 @@ handleRoster ref outC sta = case sta of , queryItems = [update] } -> do handleUpdate v update - atomically . writeTChan outC $ result iqr + _ <- writeStanza sem $ result iqr return False _ -> do errorM "Pontarius.Xmpp" "Invalid roster query" - atomically . writeTChan outC $ badRequest iqr + _ <- writeStanza sem $ badRequest iqr return False _ -> return True where @@ -120,19 +120,22 @@ retrieveRoster mbOldRoster sess = do (pickleElem xpQuery (Query version [])) sess case res of - IQResponseResult (IQResult{iqResultPayload = Just ros}) + Nothing -> do + errorM "Pontarius.Xmpp.Roster" "getRoster: sending stanza failed" + return Nothing + Just (IQResponseResult (IQResult{iqResultPayload = Just ros})) -> case unpickleElem xpQuery ros of Left _e -> do errorM "Pontarius.Xmpp.Roster" "getRoster: invalid query element" return Nothing Right ros' -> return . Just $ toRoster ros' - IQResponseResult (IQResult{iqResultPayload = Nothing}) -> do + Just (IQResponseResult (IQResult{iqResultPayload = Nothing})) -> do return mbOldRoster -- sever indicated that no roster updates are necessary - IQResponseTimeout -> do + Just IQResponseTimeout -> do errorM "Pontarius.Xmpp.Roster" "getRoster: request timed out" return Nothing - IQResponseError e -> do + Just (IQResponseError e) -> do errorM "Pontarius.Xmpp.Roster" $ "getRoster: server returned error" ++ show e return Nothing diff --git a/source/Network/Xmpp/Types.hs b/source/Network/Xmpp/Types.hs index 67d2343..2d93f12 100644 --- a/source/Network/Xmpp/Types.hs +++ b/source/Network/Xmpp/Types.hs @@ -838,7 +838,8 @@ data ConnectionState -- | Defines operations for sending, receiving, flushing, and closing on a -- stream. data StreamHandle = - StreamHandle { streamSend :: BS.ByteString -> IO Bool + StreamHandle { streamSend :: BS.ByteString -> IO Bool -- ^ Sends may not + -- interleave , streamReceive :: Int -> IO BS.ByteString -- This is to hold the state of the XML parser (otherwise we -- will receive EventBeginDocument events and forget about @@ -849,8 +850,8 @@ data StreamHandle = data StreamState = StreamState { -- | State of the stream - 'Closed', 'Plain', or 'Secured' - streamConnectionState :: !ConnectionState -- ^ State of connection - -- | Functions to send, receive, flush, and close on the stream + streamConnectionState :: !ConnectionState + -- | Functions to send, receive, flush, and close the stream , streamHandle :: StreamHandle -- | Event conduit source, and its associated finalizer , streamEventSource :: Source IO Event @@ -1163,7 +1164,7 @@ instance Default StreamConfiguration where } } -type StanzaHandler = TChan Stanza -- ^ outgoing stanza +type StanzaHandler = TMVar (BS.ByteString -> IO Bool) -- ^ outgoing stanza -> Stanza -- ^ stanza to handle -> IO Bool -- ^ True when processing should continue