You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

131 lines
5.2 KiB

{-# OPTIONS_HADDOCK hide #-}
14 years ago
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.Xmpp.Concurrent.Threads where
14 years ago
import Control.Applicative((<$>))
import Control.Concurrent
import Control.Concurrent.STM
14 years ago
import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.Except
14 years ago
import qualified Data.ByteString as BS
import GHC.IO (unsafeUnmask)
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Stream
import Network.Xmpp.Types
import System.Log.Logger
Tweak failure approach I'm assuming and defining the following: 1. XMPP failures (which can occur at the TCP, TLS, and XML/XMPP layers (as a stream error or forbidden input)) are fatal; they will distrupt the XMPP session. 2. All fatal failures should be thrown (or similar) by `session', or any other function that might produce them. 3. Authentication failures that are not "XMPP failures" are not fatal. They do not necessarily terminate the stream. For example, the developer should be able to make another authentication attempt. The `Session' object returned by `session' might be useful even if the authentication fails. 4. We can (and should) use one single data type for fatal failures. (Previously, both StreamFailure and TlsFailure was used.) 5. We can catch and rethrow/wrap IO exceptions in the context of the Pontarius XMPP error system that we decide to use, making the error system more intuitive, Haskell-like, and more straight-forward to implement. Calling `error' may only be done in the case of a program error (a bug). 6. A logging system will remove the need for many of the error types. Only exceptions that seem likely to affect the flow of client applications should be defined. 7. The authentication functions are prone to fatal XMPP failures in addition to non-fatal authentication conditions. (Previously, `AuthStreamFailure' was used to wrap these errors.) I'm hereby suggesting (and implementing) the following: `StreamFailure' and `TlsFailure' should be joined into `XmppFailure'. `pullStanza' and the other Connection functions used to throw `IOException', `StreamFailure' and `TlsFailure' exceptions. With this patch, they have been converted to `StateT Connection IO (Either XmppFailure a)' computations. They also catch (some) IOException errors and wrap them in the new `XmppIOException' constructor. `newSession' is now `IO (Either XmppFailure Session)' as well (being capable of throwing IO exceptions). Whether or not to continue to a) wrap `XmppFailure' failures in an `AuthStreamFailure' equivalent, or, b) treat the authentication functions just like the other functions that may result in failure (Either XmppFailure a), depends on how Network.Xmpp.Connection.auth will be used. Since the latter will make `auth' more consistent, as well as remove the need for a wrapped (and special-case) "AuthFailure" type, I have decided to give the "b" approach a try. (The drawback being, of course, that authentication errors can not be accessed through the use of ErrorT. Whether or not this might be a problem, I don't really know at this point.) As the SASL code (and SaslM) depended on `AuthStreamFailure', it remains for internal use, at least for the time-being. `session' is now an ErrorT computation as well. Some functions have been updated as hacks, but this will be changed if we decide to move forward with this approach.
13 years ago
-- Worker to read stanzas from the stream and concurrently distribute them to
-- all listener threads.
readWorker :: (XmppElement -> IO ())
Tweak failure approach I'm assuming and defining the following: 1. XMPP failures (which can occur at the TCP, TLS, and XML/XMPP layers (as a stream error or forbidden input)) are fatal; they will distrupt the XMPP session. 2. All fatal failures should be thrown (or similar) by `session', or any other function that might produce them. 3. Authentication failures that are not "XMPP failures" are not fatal. They do not necessarily terminate the stream. For example, the developer should be able to make another authentication attempt. The `Session' object returned by `session' might be useful even if the authentication fails. 4. We can (and should) use one single data type for fatal failures. (Previously, both StreamFailure and TlsFailure was used.) 5. We can catch and rethrow/wrap IO exceptions in the context of the Pontarius XMPP error system that we decide to use, making the error system more intuitive, Haskell-like, and more straight-forward to implement. Calling `error' may only be done in the case of a program error (a bug). 6. A logging system will remove the need for many of the error types. Only exceptions that seem likely to affect the flow of client applications should be defined. 7. The authentication functions are prone to fatal XMPP failures in addition to non-fatal authentication conditions. (Previously, `AuthStreamFailure' was used to wrap these errors.) I'm hereby suggesting (and implementing) the following: `StreamFailure' and `TlsFailure' should be joined into `XmppFailure'. `pullStanza' and the other Connection functions used to throw `IOException', `StreamFailure' and `TlsFailure' exceptions. With this patch, they have been converted to `StateT Connection IO (Either XmppFailure a)' computations. They also catch (some) IOException errors and wrap them in the new `XmppIOException' constructor. `newSession' is now `IO (Either XmppFailure Session)' as well (being capable of throwing IO exceptions). Whether or not to continue to a) wrap `XmppFailure' failures in an `AuthStreamFailure' equivalent, or, b) treat the authentication functions just like the other functions that may result in failure (Either XmppFailure a), depends on how Network.Xmpp.Connection.auth will be used. Since the latter will make `auth' more consistent, as well as remove the need for a wrapped (and special-case) "AuthFailure" type, I have decided to give the "b" approach a try. (The drawback being, of course, that authentication errors can not be accessed through the use of ErrorT. Whether or not this might be a problem, I don't really know at this point.) As the SASL code (and SaslM) depended on `AuthStreamFailure', it remains for internal use, at least for the time-being. `session' is now an ErrorT computation as well. Some functions have been updated as hacks, but this will be changed if we decide to move forward with this approach.
13 years ago
-> (XmppFailure -> IO ())
-> TMVar Stream
-> IO a
readWorker onElement onCClosed stateRef = forever . Ex.mask_ $ do
s' <- Ex.catches ( do
atomically $ do
s@(Stream con) <- readTMVar stateRef
scs <- streamConnectionState <$> readTMVar con
when (stateIsClosed scs)
retry
return $ Just s
)
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
]
case s' of -- Maybe Stream
Nothing -> return ()
Just s -> do -- Stream
res <- Ex.catches (do
-- we don't know whether pull will
-- necessarily be interruptible
allowInterrupt
res <- pullXmppElement s
case res of
Left e -> do
errorM "Pontarius.Xmpp" $ "Read error: "
++ show e
_ <- closeStreams s
onCClosed e
return Nothing
Right r -> return $ Just r
)
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
]
case res of
Nothing -> return () -- Caught an exception, nothing to
-- do. TODO: Can this happen?
Just sta -> void $ onElement sta
where
-- Defining an Control.Exception.allowInterrupt equivalent for GHC 7
-- compatibility.
allowInterrupt :: IO ()
allowInterrupt = unsafeUnmask $ return ()
-- While waiting for the first semaphore(s) to flip we might receive another
-- interrupt. When that happens we add it's semaphore to the list and retry
-- waiting.
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts ts =
Ex.catch (atomically $ forM ts takeTMVar)
(\(Interrupt t) -> handleInterrupts (t:ts))
stateIsClosed Closed = True
stateIsClosed Finished = True
stateIsClosed _ = False
-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing
-- stances, respectively, and an Action to stop the Threads and close the
-- connection.
startThreadsWith :: TMVar (BS.ByteString -> IO (Either XmppFailure ()))
-> (XmppElement -> IO ())
-> TMVar EventHandlers
-> Stream
-> Maybe Int
Tweak failure approach I'm assuming and defining the following: 1. XMPP failures (which can occur at the TCP, TLS, and XML/XMPP layers (as a stream error or forbidden input)) are fatal; they will distrupt the XMPP session. 2. All fatal failures should be thrown (or similar) by `session', or any other function that might produce them. 3. Authentication failures that are not "XMPP failures" are not fatal. They do not necessarily terminate the stream. For example, the developer should be able to make another authentication attempt. The `Session' object returned by `session' might be useful even if the authentication fails. 4. We can (and should) use one single data type for fatal failures. (Previously, both StreamFailure and TlsFailure was used.) 5. We can catch and rethrow/wrap IO exceptions in the context of the Pontarius XMPP error system that we decide to use, making the error system more intuitive, Haskell-like, and more straight-forward to implement. Calling `error' may only be done in the case of a program error (a bug). 6. A logging system will remove the need for many of the error types. Only exceptions that seem likely to affect the flow of client applications should be defined. 7. The authentication functions are prone to fatal XMPP failures in addition to non-fatal authentication conditions. (Previously, `AuthStreamFailure' was used to wrap these errors.) I'm hereby suggesting (and implementing) the following: `StreamFailure' and `TlsFailure' should be joined into `XmppFailure'. `pullStanza' and the other Connection functions used to throw `IOException', `StreamFailure' and `TlsFailure' exceptions. With this patch, they have been converted to `StateT Connection IO (Either XmppFailure a)' computations. They also catch (some) IOException errors and wrap them in the new `XmppIOException' constructor. `newSession' is now `IO (Either XmppFailure Session)' as well (being capable of throwing IO exceptions). Whether or not to continue to a) wrap `XmppFailure' failures in an `AuthStreamFailure' equivalent, or, b) treat the authentication functions just like the other functions that may result in failure (Either XmppFailure a), depends on how Network.Xmpp.Connection.auth will be used. Since the latter will make `auth' more consistent, as well as remove the need for a wrapped (and special-case) "AuthFailure" type, I have decided to give the "b" approach a try. (The drawback being, of course, that authentication errors can not be accessed through the use of ErrorT. Whether or not this might be a problem, I don't really know at this point.) As the SASL code (and SaslM) depended on `AuthStreamFailure', it remains for internal use, at least for the time-being. `session' is now an ErrorT computation as well. Some functions have been updated as hacks, but this will be changed if we decide to move forward with this approach.
13 years ago
-> IO (Either XmppFailure (IO (),
TMVar Stream,
ThreadId))
startThreadsWith writeSem stanzaHandler eh con keepAlive = do
-- read' <- withStream' (gets $ streamSend . streamHandle) con
-- writeSem <- newTMVarIO read'
conS <- newTMVarIO con
cp <- forkIO $ connPersist keepAlive writeSem
let onConClosed failure = do
stopWrites
noCon eh failure
rdw <- forkIO $ readWorker stanzaHandler onConClosed conS
return $ Right ( killConnection [rdw, cp]
, conS
, rdw
)
14 years ago
where
stopWrites = atomically $ do
_ <- takeTMVar writeSem
putTMVar writeSem $ \_ -> return $ Left XmppNoStream
killConnection threads = liftIO $ do
debugM "Pontarius.Xmpp" "killing connection"
stopWrites
debugM "Pontarius.Xmpp" "killing threads"
14 years ago
_ <- forM threads killThread
return ()
-- Call the connection closed handlers.
noCon :: TMVar EventHandlers -> XmppFailure -> IO ()
noCon h e = do
hands <- atomically $ readTMVar h
_ <- forkIO $ connectionClosedHandler hands e
return ()
-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every <delay> seconds to keep the connection alive.
connPersist :: Maybe Int -> TMVar (BS.ByteString -> IO a) -> IO ()
connPersist (Just delay) sem = forever $ do
pushBS <- atomically $ takeTMVar sem
_ <- pushBS " "
atomically $ putTMVar sem pushBS
threadDelay (delay*1000000)
connPersist Nothing _ = return ()