From aea0556efda293d59c5cdfceede3eb34de7e12e0 Mon Sep 17 00:00:00 2001 From: Jon Kristensen Date: Mon, 2 Apr 2012 14:56:38 +0200 Subject: [PATCH] extended event system (management of hooks); now properly deals with network (asynchronous) exceptions and connects to the provided server --- Examples/IBR.hs | 2 +- Network/XMPP/Session.hs | 114 +++++++++++++++++++++++++++++++--------- 2 files changed, 89 insertions(+), 27 deletions(-) diff --git a/Examples/IBR.hs b/Examples/IBR.hs index c8064a1..e1e6a47 100644 --- a/Examples/IBR.hs +++ b/Examples/IBR.hs @@ -46,7 +46,7 @@ main = create $ do -- When the opening of the streams fails, print the error and -- shut down the XMPP session. - onConnectedEvent (Just e) = do + onStreamsOpened (Just e) = do liftIO $ putStrLn $ "Could not open the streams due to the following error: " ++ (show e) destroy return True diff --git a/Network/XMPP/Session.hs b/Network/XMPP/Session.hs index a49648b..84b38e1 100644 --- a/Network/XMPP/Session.hs +++ b/Network/XMPP/Session.hs @@ -68,7 +68,7 @@ create main = do stateLoop -data HookId = HookId String +data HookId = HookId String deriving (Eq) -- We need a channel because multiple threads needs to append events, @@ -76,7 +76,12 @@ data HookId = HookId String data State m = State { evtChan :: Chan (InternalEvent m) , hookIdGenerator :: IdGenerator - , streamsOpenedHooks :: [(HookId, (Maybe OpenStreamsFailureReason -> XMPPT m Bool, Maybe (Maybe OpenStreamsFailureReason -> XMPPT m Bool)))] } + , hooks :: [Hook m] } + + +data HookPayload m = StreamsOpenedHook (Maybe (Maybe OpenStreamsFailureReason -> XMPPT m Bool)) (Maybe OpenStreamsFailureReason -> XMPPT m Bool) + +type Hook m = (HookId, HookPayload m) -- Internal events - events to be processed within Pontarius. @@ -86,7 +91,7 @@ data State m = State { evtChan :: Chan (InternalEvent m) data InternalEvent m = OpenStreamsEvent HostName PortNumber -- | DisconnectEvent - | RegisterStreamsOpenedHook (Maybe OpenStreamsFailureReason -> XMPPT m Bool) (Maybe (OpenStreamsFailureReason -> Bool)) + | RegisterStreamsOpenedHook (Maybe (Maybe OpenStreamsFailureReason -> XMPPT m Bool)) (Maybe OpenStreamsFailureReason -> XMPPT m Bool) -- | IEEE EnumeratorEvent instance Show (InternalEvent m) where @@ -117,7 +122,7 @@ data Event = -- ConnectedEvent (Either IntFailureReason Resource) -- TODO: Possible ways opening a stream can fail. -data OpenStreamsFailureReason = OpenStreamFailureReason deriving (Show) +data OpenStreamsFailureReason = OpenStreamsFailureReason deriving (Show) -- data TLSSecureFailureReason = TLSSecureFailureReason @@ -155,30 +160,32 @@ stateLoop = do -- Processes an internal event and generates a list of impure actions. -processEvent :: MonadIO m => InternalEvent m -> XMPPT m [XMPPT m (IO ())] +processEvent :: MonadIO m => InternalEvent m -> XMPPT m [XMPPT m ()] processEvent (OpenStreamsEvent h p) = return [openStreamAction h p] where - openStreamAction :: MonadIO m => HostName -> PortNumber -> XMPPT m (IO ()) - openStreamAction h p = do - -- CEB.assert (stateConnectionState state == Disconnected) (return ()) - let p' = fromIntegral p - handle <- liftIO $ {- CE.try $ -} N.connectTo h (N.PortNumber p') - return $ liftIO $ do -- $ case result of - -- Right handle -> do - hSetBuffering handle NoBuffering - hPutStr handle $ encodeString "" - hFlush handle - return () - -- -- threadID <- lift $ liftIO $ forkIO $ xmlEnumerator (stateChannel state) (Left handle) - -- -- lift $ liftIO $ putMVar (stateThreadID state) threadID - -- Left error -> do - -- -- let clientState = stateClientState state - -- -- ((), clientState') <- lift $ runStateT (callback OpenStreamFailure) clientState - -- -- put $ state { stateShouldExit = True } - -- -- return $ Just e - -- return $ Just error - + openStreamAction :: MonadIO m => HostName -> PortNumber -> XMPPT m () + openStreamAction h p = let p' = fromIntegral p + computation = do + handle <- N.connectTo h (N.PortNumber p') + hSetBuffering handle NoBuffering + hPutStr handle $ encodeString "" + hFlush handle + in do + result <- liftIO $ CE.try computation + case result of + Right () -> do + fireStreamsOpenedEvent Nothing + -- -- threadID <- lift $ liftIO $ forkIO $ xmlEnumerator (stateChannel state) (Left handle) + -- -- lift $ liftIO $ putMVar (stateThreadID state) threadID + Left (CE.SomeException e) -> do -- TODO: Safe to do this? + fireStreamsOpenedEvent $ Just OpenStreamsFailureReason + -- Left error -> do + -- -- let clientState = stateClientState state + -- -- ((), clientState') <- lift $ runStateT (callback OpenStreamFailure) clientState + -- -- put $ state { stateShouldExit = True } + -- -- return $ Just e + -- return $ Just error -- hookConnectedEvent :: MonadIO m => (ConnectedEvent -> XMPPT m Bool) -> (Maybe (ConnectedEvent -> Bool)) -> XMPPT m () @@ -188,13 +195,15 @@ processEvent (OpenStreamsEvent h p) = return [openStreamAction h p] -- | Hook the provided callback and (optional) predicate to the -- "Streams Opened" event. +-- +-- The "Streams Opened" event will be fired when the stream:features element has been successfully received or an error has occurred. hookStreamsOpenedEvent :: MonadIO m => (Maybe OpenStreamsFailureReason -> XMPPT m Bool) -> (Maybe (Maybe OpenStreamsFailureReason -> XMPPT m Bool)) -> XMPPT m HookId hookStreamsOpenedEvent cb pred = do rs <- get hookId <- liftIO $ nextId $ hookIdGenerator rs - put $ rs { streamsOpenedHooks = (HookId hookId, (cb, pred)):streamsOpenedHooks rs } + put $ rs { hooks = (HookId hookId, StreamsOpenedHook pred cb):hooks rs } return $ HookId hookId @@ -229,9 +238,62 @@ openStreams :: MonadIO m => HostName -> PortNumber -> XMPPT m () openStreams h p = get >>= \rs -> liftIO $ writeChan (evtChan rs) (OpenStreamsEvent h p) +-- Like any other fire*Event function, it queries the hooks, filters +-- out the ones that are relevant, prepares them to be used with +-- processHook, and processes them. + +fireStreamsOpenedEvent :: MonadIO m => Maybe OpenStreamsFailureReason -> XMPPT m () + +fireStreamsOpenedEvent r = do + rs <- get + let hooks' = filterStreamsOpenedHooks $ hooks rs + sequence_ $ map (\(hookId, pred, cb) -> processHook hookId pred cb) $ map prepareStreamsOpenedHooks hooks' + return () + where + prepareStreamsOpenedHooks :: MonadIO m => Hook m -> (HookId, Maybe (XMPPT m Bool), XMPPT m Bool) + prepareStreamsOpenedHooks (hookId, StreamsOpenedHook pred cb) = + let pred' = case pred of + Nothing -> Nothing + Just pred'' -> Just $ pred'' r + cb' = cb r in (hookId, pred', cb') + + +-- Takes an optional predicate and a callback function, and excecutes +-- the callback function if the predicate does not exist, or exists +-- and is true, and returns True if the hook should be removed. + +processHook :: MonadIO m => HookId -> Maybe (XMPPT m Bool) -> XMPPT m Bool -> XMPPT m () + +processHook id pred cb = do + remove <- processHook' + if remove then do + rs <- get + put $ rs { hooks = removeHook id (hooks rs) } + else return () + where + processHook' = case pred of + Just pred' -> do + result <- pred' + if result then cb else return False + Nothing -> cb + + destroy = destroy +filterStreamsOpenedHooks :: MonadIO m => [Hook m] -> [Hook m] + +filterStreamsOpenedHooks h = filter pred h + where + pred (_, StreamsOpenedHook _ _) = True + pred _ = False + + +removeHook :: MonadIO m => HookId -> [Hook m] -> [Hook m] + +removeHook id h = filter (\(id', _) -> id' /= id) h + + -- tlsSecure = tlsSecure -- authenticate = authenticate