Browse Source

extended event system (management of hooks); now properly deals with network (asynchronous) exceptions and connects to the provided server

master
Jon Kristensen 14 years ago
parent
commit
aea0556efd
  1. 2
      Examples/IBR.hs
  2. 114
      Network/XMPP/Session.hs

2
Examples/IBR.hs

@ -46,7 +46,7 @@ main = create $ do @@ -46,7 +46,7 @@ main = create $ do
-- When the opening of the streams fails, print the error and
-- shut down the XMPP session.
onConnectedEvent (Just e) = do
onStreamsOpened (Just e) = do
liftIO $ putStrLn $ "Could not open the streams due to the following error: " ++ (show e)
destroy
return True

114
Network/XMPP/Session.hs

@ -68,7 +68,7 @@ create main = do @@ -68,7 +68,7 @@ create main = do
stateLoop
data HookId = HookId String
data HookId = HookId String deriving (Eq)
-- We need a channel because multiple threads needs to append events,
@ -76,7 +76,12 @@ data HookId = HookId String @@ -76,7 +76,12 @@ data HookId = HookId String
data State m = State { evtChan :: Chan (InternalEvent m)
, hookIdGenerator :: IdGenerator
, streamsOpenedHooks :: [(HookId, (Maybe OpenStreamsFailureReason -> XMPPT m Bool, Maybe (Maybe OpenStreamsFailureReason -> XMPPT m Bool)))] }
, hooks :: [Hook m] }
data HookPayload m = StreamsOpenedHook (Maybe (Maybe OpenStreamsFailureReason -> XMPPT m Bool)) (Maybe OpenStreamsFailureReason -> XMPPT m Bool)
type Hook m = (HookId, HookPayload m)
-- Internal events - events to be processed within Pontarius.
@ -86,7 +91,7 @@ data State m = State { evtChan :: Chan (InternalEvent m) @@ -86,7 +91,7 @@ data State m = State { evtChan :: Chan (InternalEvent m)
data InternalEvent m
= OpenStreamsEvent HostName PortNumber
-- | DisconnectEvent
| RegisterStreamsOpenedHook (Maybe OpenStreamsFailureReason -> XMPPT m Bool) (Maybe (OpenStreamsFailureReason -> Bool))
| RegisterStreamsOpenedHook (Maybe (Maybe OpenStreamsFailureReason -> XMPPT m Bool)) (Maybe OpenStreamsFailureReason -> XMPPT m Bool)
-- | IEEE EnumeratorEvent
instance Show (InternalEvent m) where
@ -117,7 +122,7 @@ data Event = -- ConnectedEvent (Either IntFailureReason Resource) @@ -117,7 +122,7 @@ data Event = -- ConnectedEvent (Either IntFailureReason Resource)
-- TODO: Possible ways opening a stream can fail.
data OpenStreamsFailureReason = OpenStreamFailureReason deriving (Show)
data OpenStreamsFailureReason = OpenStreamsFailureReason deriving (Show)
-- data TLSSecureFailureReason = TLSSecureFailureReason
@ -155,30 +160,32 @@ stateLoop = do @@ -155,30 +160,32 @@ stateLoop = do
-- Processes an internal event and generates a list of impure actions.
processEvent :: MonadIO m => InternalEvent m -> XMPPT m [XMPPT m (IO ())]
processEvent :: MonadIO m => InternalEvent m -> XMPPT m [XMPPT m ()]
processEvent (OpenStreamsEvent h p) = return [openStreamAction h p]
where
openStreamAction :: MonadIO m => HostName -> PortNumber -> XMPPT m (IO ())
openStreamAction h p = do
-- CEB.assert (stateConnectionState state == Disconnected) (return ())
let p' = fromIntegral p
handle <- liftIO $ {- CE.try $ -} N.connectTo h (N.PortNumber p')
return $ liftIO $ do -- $ case result of
-- Right handle -> do
hSetBuffering handle NoBuffering
hPutStr handle $ encodeString "<?xml version='1.0'?><stream:stream to='" ++ h ++ "' version='1.0' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>"
hFlush handle
return ()
-- -- threadID <- lift $ liftIO $ forkIO $ xmlEnumerator (stateChannel state) (Left handle)
-- -- lift $ liftIO $ putMVar (stateThreadID state) threadID
-- Left error -> do
-- -- let clientState = stateClientState state
-- -- ((), clientState') <- lift $ runStateT (callback OpenStreamFailure) clientState
-- -- put $ state { stateShouldExit = True }
-- -- return $ Just e
-- return $ Just error
openStreamAction :: MonadIO m => HostName -> PortNumber -> XMPPT m ()
openStreamAction h p = let p' = fromIntegral p
computation = do
handle <- N.connectTo h (N.PortNumber p')
hSetBuffering handle NoBuffering
hPutStr handle $ encodeString "<?xml version='1.0'?><stream:stream to='" ++ h ++ "' version='1.0' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>"
hFlush handle
in do
result <- liftIO $ CE.try computation
case result of
Right () -> do
fireStreamsOpenedEvent Nothing
-- -- threadID <- lift $ liftIO $ forkIO $ xmlEnumerator (stateChannel state) (Left handle)
-- -- lift $ liftIO $ putMVar (stateThreadID state) threadID
Left (CE.SomeException e) -> do -- TODO: Safe to do this?
fireStreamsOpenedEvent $ Just OpenStreamsFailureReason
-- Left error -> do
-- -- let clientState = stateClientState state
-- -- ((), clientState') <- lift $ runStateT (callback OpenStreamFailure) clientState
-- -- put $ state { stateShouldExit = True }
-- -- return $ Just e
-- return $ Just error
-- hookConnectedEvent :: MonadIO m => (ConnectedEvent -> XMPPT m Bool) -> (Maybe (ConnectedEvent -> Bool)) -> XMPPT m ()
@ -188,13 +195,15 @@ processEvent (OpenStreamsEvent h p) = return [openStreamAction h p] @@ -188,13 +195,15 @@ processEvent (OpenStreamsEvent h p) = return [openStreamAction h p]
-- | Hook the provided callback and (optional) predicate to the
-- "Streams Opened" event.
--
-- The "Streams Opened" event will be fired when the stream:features element has been successfully received or an error has occurred.
hookStreamsOpenedEvent :: MonadIO m => (Maybe OpenStreamsFailureReason -> XMPPT m Bool) -> (Maybe (Maybe OpenStreamsFailureReason -> XMPPT m Bool)) -> XMPPT m HookId
hookStreamsOpenedEvent cb pred = do
rs <- get
hookId <- liftIO $ nextId $ hookIdGenerator rs
put $ rs { streamsOpenedHooks = (HookId hookId, (cb, pred)):streamsOpenedHooks rs }
put $ rs { hooks = (HookId hookId, StreamsOpenedHook pred cb):hooks rs }
return $ HookId hookId
@ -229,9 +238,62 @@ openStreams :: MonadIO m => HostName -> PortNumber -> XMPPT m () @@ -229,9 +238,62 @@ openStreams :: MonadIO m => HostName -> PortNumber -> XMPPT m ()
openStreams h p = get >>= \rs -> liftIO $ writeChan (evtChan rs) (OpenStreamsEvent h p)
-- Like any other fire*Event function, it queries the hooks, filters
-- out the ones that are relevant, prepares them to be used with
-- processHook, and processes them.
fireStreamsOpenedEvent :: MonadIO m => Maybe OpenStreamsFailureReason -> XMPPT m ()
fireStreamsOpenedEvent r = do
rs <- get
let hooks' = filterStreamsOpenedHooks $ hooks rs
sequence_ $ map (\(hookId, pred, cb) -> processHook hookId pred cb) $ map prepareStreamsOpenedHooks hooks'
return ()
where
prepareStreamsOpenedHooks :: MonadIO m => Hook m -> (HookId, Maybe (XMPPT m Bool), XMPPT m Bool)
prepareStreamsOpenedHooks (hookId, StreamsOpenedHook pred cb) =
let pred' = case pred of
Nothing -> Nothing
Just pred'' -> Just $ pred'' r
cb' = cb r in (hookId, pred', cb')
-- Takes an optional predicate and a callback function, and excecutes
-- the callback function if the predicate does not exist, or exists
-- and is true, and returns True if the hook should be removed.
processHook :: MonadIO m => HookId -> Maybe (XMPPT m Bool) -> XMPPT m Bool -> XMPPT m ()
processHook id pred cb = do
remove <- processHook'
if remove then do
rs <- get
put $ rs { hooks = removeHook id (hooks rs) }
else return ()
where
processHook' = case pred of
Just pred' -> do
result <- pred'
if result then cb else return False
Nothing -> cb
destroy = destroy
filterStreamsOpenedHooks :: MonadIO m => [Hook m] -> [Hook m]
filterStreamsOpenedHooks h = filter pred h
where
pred (_, StreamsOpenedHook _ _) = True
pred _ = False
removeHook :: MonadIO m => HookId -> [Hook m] -> [Hook m]
removeHook id h = filter (\(id', _) -> id' /= id) h
-- tlsSecure = tlsSecure
-- authenticate = authenticate

Loading…
Cancel
Save