Browse Source

Add connectionClosedHandler

master
Philipp Balzarek 14 years ago
parent
commit
d5cdc74f23
  1. 1
      src/Network/XMPP.hs
  2. 15
      src/Network/XMPP/Concurrent/Monad.hs
  3. 46
      src/Network/XMPP/Concurrent/Threads.hs
  4. 6
      src/Network/XMPP/Concurrent/Types.hs
  5. 38
      src/Network/XMPP/Monad.hs
  6. 2
      src/Network/XMPP/Stream.hs
  7. 4
      src/Network/XMPP/Types.hs
  8. 3
      src/Tests.hs

1
src/Network/XMPP.hs

@ -43,6 +43,7 @@ module Network.XMPP
, auth , auth
, endSession , endSession
, setSessionEndHandler , setSessionEndHandler
, setConnectionClosedHandler
-- * JID -- * JID
-- | A JID (historically: Jabber ID) is XMPPs native format -- | A JID (historically: Jabber ID) is XMPPs native format
-- for addressing entities in the network. It is somewhat similar to an -- for addressing entities in the network. It is somewhat similar to an

15
src/Network/XMPP/Concurrent/Monad.hs

@ -198,17 +198,20 @@ modifyHandlers f = do
liftIO . atomically $ writeTVar eh . f =<< readTVar eh liftIO . atomically $ writeTVar eh . f =<< readTVar eh
setSessionEndHandler :: XMPP () -> XMPP () setSessionEndHandler :: XMPP () -> XMPP ()
setSessionEndHandler eh = modifyHandlers (\s -> s{sessionEndHandler = eh}) setSessionEndHandler eh = do
r <- ask
modifyHandlers (\s -> s{sessionEndHandler = runReaderT eh r})
setConnectionClosedHandler :: XMPP () -> XMPP () setConnectionClosedHandler :: (StreamError -> XMPP ()) -> XMPP ()
setConnectionClosedHandler eh = modifyHandlers setConnectionClosedHandler eh = do
(\s -> s{connectionClosedHandler = eh}) r <- ask
modifyHandlers (\s -> s{connectionClosedHandler = \e -> runReaderT (eh e) r})
-- | run an event handler -- | run an event handler
runHandler :: (EventHandlers -> XMPP a) -> XMPP a runHandler :: (EventHandlers -> IO a) -> XMPP a
runHandler h = do runHandler h = do
eh <- liftIO . atomically . readTVar =<< asks eventHandlers eh <- liftIO . atomically . readTVar =<< asks eventHandlers
h eh liftIO $ h eh
-- | End the current xmpp session -- | End the current xmpp session
endSession :: XMPP () endSession :: XMPP ()

46
src/Network/XMPP/Concurrent/Threads.hs

@ -29,22 +29,15 @@ import Text.XML.Stream.Elements
import GHC.IO (unsafeUnmask) import GHC.IO (unsafeUnmask)
-- While waiting for the first semaphore(s) to flip we might receive
-- another interrupt. When that happens we add it's semaphore to the
-- list and retry waiting
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts ts =
Ex.catch (atomically $ forM ts takeTMVar)
( \(Interrupt t) -> handleInterrupts (t:ts))
readWorker :: TChan (Either MessageError Message) readWorker :: TChan (Either MessageError Message)
-> TChan (Either PresenceError Presence) -> TChan (Either PresenceError Presence)
-> TVar IQHandlers -> TVar IQHandlers
-> TVar EventHandlers
-> TMVar XmppConnection -> TMVar XmppConnection
-> IO () -> IO ()
readWorker messageC presenceC handlers stateRef = readWorker messageC presenceC iqHands handlers stateRef =
Ex.mask_ . forever $ do Ex.mask_ . forever $ do
res <- liftIO $ Ex.catch ( do res <- liftIO $ Ex.catches ( do
-- we don't know whether pull will -- we don't know whether pull will
-- necessarily be interruptible -- necessarily be interruptible
s <- liftIO . atomically $ do s <- liftIO . atomically $ do
@ -55,10 +48,11 @@ readWorker messageC presenceC handlers stateRef =
allowInterrupt allowInterrupt
Just . fst <$> runStateT pullStanza s Just . fst <$> runStateT pullStanza s
) )
(\(Interrupt t) -> do [ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t] void $ handleInterrupts [t]
return Nothing return Nothing
) , Ex.Handler $ \e -> noCon handlers (e :: StreamError)
]
liftIO . atomically $ do liftIO . atomically $ do
case res of case res of
Nothing -> return () Nothing -> return ()
@ -84,14 +78,26 @@ readWorker messageC presenceC handlers stateRef =
_ <- readTChan presenceC _ <- readTChan presenceC
return () return ()
IQRequestS i -> handleIQRequest handlers i IQRequestS i -> handleIQRequest iqHands i
IQResultS i -> handleIQResponse handlers (Right i) IQResultS i -> handleIQResponse iqHands (Right i)
IQErrorS i -> handleIQResponse handlers (Left i) IQErrorS i -> handleIQResponse iqHands (Left i)
where where
-- Defining an Control.Exception.allowInterrupt equivalent for -- Defining an Control.Exception.allowInterrupt equivalent for
-- GHC 7 compatibility. -- GHC 7 compatibility.
allowInterrupt :: IO () allowInterrupt :: IO ()
allowInterrupt = unsafeUnmask $ return () allowInterrupt = unsafeUnmask $ return ()
noCon :: TVar EventHandlers -> StreamError -> IO (Maybe a)
noCon h e = do
hands <- atomically $ readTVar h
_ <- forkIO $ connectionClosedHandler hands e
return Nothing
-- While waiting for the first semaphore(s) to flip we might receive
-- another interrupt. When that happens we add it's semaphore to the
-- list and retry waiting
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts ts =
Ex.catch (atomically $ forM ts takeTMVar)
( \(Interrupt t) -> handleInterrupts (t:ts))
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
handleIQRequest handlers iq = do handleIQRequest handlers iq = do
@ -121,7 +127,10 @@ writeWorker stCh writeR = forever $ do
(write, next) <- atomically $ (,) <$> (write, next) <- atomically $ (,) <$>
takeTMVar writeR <*> takeTMVar writeR <*>
readTChan stCh readTChan stCh
_ <- write $ renderElement (pickleElem xpStanza next) r <- write $ renderElement (pickleElem xpStanza next)
unless r $ do
atomically $ unGetTChan stCh next -- connection is dead
threadDelay 250000 -- avoid free spinning
atomically $ putTMVar writeR write atomically $ putTMVar writeR write
-- Two streams: input and output. Threads read from input stream and write to output stream. -- Two streams: input and output. Threads read from input stream and write to output stream.
@ -150,13 +159,14 @@ startThreads = do
conS <- newTMVarIO xmppNoConnection conS <- newTMVarIO xmppNoConnection
lw <- forkIO $ writeWorker outC writeLock lw <- forkIO $ writeWorker outC writeLock
cp <- forkIO $ connPersist writeLock cp <- forkIO $ connPersist writeLock
rd <- forkIO $ readWorker messageC presenceC handlers conS rd <- forkIO $ readWorker messageC presenceC handlers eh conS
return (messageC, presenceC, handlers, outC return (messageC, presenceC, handlers, outC
, killConnection writeLock [lw, rd, cp] , killConnection writeLock [lw, rd, cp]
, writeLock, conS ,rd, eh) , writeLock, conS ,rd, eh)
where where
killConnection writeLock threads = liftIO $ do killConnection writeLock threads = liftIO $ do
_ <- atomically $ takeTMVar writeLock -- Should we put it back? _ <- atomically $ takeTMVar writeLock -- Should we put it back?
liftIO $ putStrLn "killing threads #"
_ <- forM threads killThread _ <- forM threads killThread
return() return()
@ -186,6 +196,6 @@ withSession = flip runReaderT
connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO () connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO ()
connPersist lock = forever $ do connPersist lock = forever $ do
pushBS <- atomically $ takeTMVar lock pushBS <- atomically $ takeTMVar lock
pushBS " " _ <- pushBS " "
atomically $ putTMVar lock pushBS atomically $ putTMVar lock pushBS
threadDelay 30000000 threadDelay 30000000

6
src/Network/XMPP/Concurrent/Types.hs

@ -23,14 +23,14 @@ type IQHandlers = (Map.Map (IQRequestType, Text) (TChan (IQRequest, TVar Bool))
) )
data EventHandlers = EventHandlers data EventHandlers = EventHandlers
{ sessionEndHandler :: XMPP () { sessionEndHandler :: IO ()
, connectionClosedHandler :: XMPP () , connectionClosedHandler :: StreamError -> IO ()
} }
zeroEventHandlers :: EventHandlers zeroEventHandlers :: EventHandlers
zeroEventHandlers = EventHandlers zeroEventHandlers = EventHandlers
{ sessionEndHandler = return () { sessionEndHandler = return ()
, connectionClosedHandler = return () , connectionClosedHandler = \_ -> return ()
} }
data Session = Session { messagesRef :: IORef (Maybe ( TChan (Either data Session = Session { messagesRef :: IORef (Maybe ( TChan (Either

38
src/Network/XMPP/Monad.hs

@ -8,8 +8,8 @@ import Control.Monad
import Control.Monad.IO.Class import Control.Monad.IO.Class
import Control.Monad.Trans.Class import Control.Monad.Trans.Class
--import Control.Monad.Trans.Resource --import Control.Monad.Trans.Resource
import qualified Control.Exception as Ex import qualified Control.Exception.Lifted as Ex
import qualified GHC.IO.Exception as Ex import qualified GHC.IO.Exception as GIE
import Control.Monad.State.Strict import Control.Monad.State.Strict
import Data.ByteString as BS import Data.ByteString as BS
@ -30,6 +30,7 @@ import System.IO
import Text.XML.Stream.Elements import Text.XML.Stream.Elements
import Text.XML.Stream.Parse as XP import Text.XML.Stream.Parse as XP
import Text.XML.Unresolved(InvalidEventStream(..))
pushN :: Element -> XMPPConMonad Bool pushN :: Element -> XMPPConMonad Bool
pushN x = do pushN x = do
@ -52,10 +53,14 @@ pullSink snk = do
pullElement :: XMPPConMonad Element pullElement :: XMPPConMonad Element
pullElement = do pullElement = do
Ex.catch (do
e <- pullSink (elements =$ CL.head) e <- pullSink (elements =$ CL.head)
case e of case e of
Nothing -> liftIO $ Ex.throwIO XmppNoConnection Nothing -> liftIO $ Ex.throwIO StreamConnectionError
Just r -> return r Just r -> return r
)
(\(InvalidEventStream s) -> liftIO . Ex.throwIO $ StreamXMLError s)
pullPickle :: PU [Node] a -> XMPPConMonad a pullPickle :: PU [Node] a -> XMPPConMonad a
pullPickle p = do pullPickle p = do
@ -72,34 +77,13 @@ pullStanza = do
Right r -> return r Right r -> return r
catchPush p = Ex.catch (p >> return True) catchPush p = Ex.catch (p >> return True)
(\e -> case Ex.ioe_type e of (\e -> case GIE.ioe_type e of
Ex.ResourceVanished -> return False GIE.ResourceVanished -> return False
_ -> Ex.throwIO e _ -> Ex.throwIO e
) )
xmppFromHandle :: Handle
-> Text
-> XMPPConMonad a
-> IO (a, XmppConnection)
xmppFromHandle handle hostname f = do
liftIO $ hSetBuffering handle NoBuffering
let raw = sourceHandle handle
let src = raw $= XP.parseBytes def
let st = XmppConnection
src
(raw)
(catchPush . BS.hPut handle)
(Just handle)
(SF Nothing [] [])
XmppConnectionPlain
(Just hostname)
Nothing
Nothing
(hClose handle)
runStateT f st
zeroSource :: Source IO output zeroSource :: Source IO output
zeroSource = liftIO . Ex.throwIO $ XmppNoConnection zeroSource = liftIO . Ex.throwIO $ StreamConnectionError
xmppNoConnection :: XmppConnection xmppNoConnection :: XmppConnection
xmppNoConnection = XmppConnection xmppNoConnection = XmppConnection

2
src/Network/XMPP/Stream.hs

@ -86,7 +86,7 @@ xmppStreamFeatures :: StreamSink ServerFeatures
xmppStreamFeatures = do xmppStreamFeatures = do
e <- lift $ elements =$ CL.head e <- lift $ elements =$ CL.head
case e of case e of
Nothing -> liftIO $ Ex.throwIO XmppNoConnection Nothing -> liftIO $ Ex.throwIO StreamConnectionError
Just r -> streamUnpickleElem pickleStreamFeatures r Just r -> streamUnpickleElem pickleStreamFeatures r
-- Pickling -- Pickling

4
src/Network/XMPP/Types.hs

@ -39,7 +39,6 @@ module Network.XMPP.Types
, XMPPConMonad , XMPPConMonad
, XmppConnection(..) , XmppConnection(..)
, XmppConnectionState(..) , XmppConnectionState(..)
, XmppNoConnection(..)
, XMPPT(..) , XMPPT(..)
, XmppStreamError(..) , XmppStreamError(..)
, parseLangTag , parseLangTag
@ -735,6 +734,3 @@ type XMPPConMonad a = StateT XmppConnection IO a
deriving instance (Monad m, MonadIO m) => MonadState (XmppConnection) (XMPPT m) deriving instance (Monad m, MonadIO m) => MonadState (XmppConnection) (XMPPT m)
data XmppNoConnection = XmppNoConnection deriving (Show, Typeable)
instance Exception XmppNoConnection

3
src/Tests.hs

@ -111,6 +111,9 @@ runMain debug number = do
wait <- newEmptyTMVarIO wait <- newEmptyTMVarIO
withNewSession $ do withNewSession $ do
setSessionEndHandler (liftIO . atomically $ putTMVar wait ()) setSessionEndHandler (liftIO . atomically $ putTMVar wait ())
setConnectionClosedHandler (\e -> do
liftIO (debug' $ "connection lost because " ++ show e)
endSession )
debug' "running" debug' "running"
withConnection $ do withConnection $ do
connect "localhost" "species64739.dyndns.org" connect "localhost" "species64739.dyndns.org"

Loading…
Cancel
Save