From a9f691cdcc2b2e89d340c109ccc3657f420f76de Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Mon, 26 Nov 2012 13:18:08 +0100 Subject: [PATCH] factor Channels out from Concurrent interface --- examples/EchoClient.hs | 38 +++-- pontarius-xmpp.cabal | 2 +- source/Network/Xmpp.hs | 15 +- source/Network/Xmpp/Concurrent.hs | 4 +- source/Network/Xmpp/Concurrent/Channels.hs | 140 +++++++++++++++++ .../Network/Xmpp/Concurrent/Channels/Basic.hs | 21 +++ source/Network/Xmpp/Concurrent/Channels/IQ.hs | 107 +++++++++++++ .../Xmpp/Concurrent/Channels/Message.hs | 69 +++++++++ .../Xmpp/Concurrent/Channels/Presence.hs | 46 ++++++ .../Network/Xmpp/Concurrent/Channels/Types.hs | 42 +++++ source/Network/Xmpp/Concurrent/IQ.hs | 81 ---------- source/Network/Xmpp/Concurrent/Monad.hs | 137 ----------------- source/Network/Xmpp/Concurrent/Types.hs | 43 +----- source/Network/Xmpp/Xep/ServiceDiscovery.hs | 4 +- tests/Tests.hs | 145 ++++++++++-------- 15 files changed, 548 insertions(+), 346 deletions(-) create mode 100644 source/Network/Xmpp/Concurrent/Channels.hs create mode 100644 source/Network/Xmpp/Concurrent/Channels/Basic.hs create mode 100644 source/Network/Xmpp/Concurrent/Channels/IQ.hs create mode 100644 source/Network/Xmpp/Concurrent/Channels/Message.hs create mode 100644 source/Network/Xmpp/Concurrent/Channels/Presence.hs create mode 100644 source/Network/Xmpp/Concurrent/Channels/Types.hs delete mode 100644 source/Network/Xmpp/Concurrent/IQ.hs diff --git a/examples/EchoClient.hs b/examples/EchoClient.hs index ad45313..9f55e16 100644 --- a/examples/EchoClient.hs +++ b/examples/EchoClient.hs @@ -15,44 +15,56 @@ in the public domain. module Main (main) where +import Control.Concurrent import Control.Monad (forever) import Control.Monad.IO.Class (liftIO) import Data.Maybe (fromJust, isJust) +import Network import Network.Xmpp +import Network.Xmpp.Concurrent import Network.Xmpp.IM -- Server and authentication details. -hostname = "localhost" +host = "localhost" +hostname = "species64739.dyndns.org" --- portNumber = 5222 -- TODO -username = "" -password = "" +port = PortNumber 5222 +username = "echouser" +password = "pwd" resource = Nothing -- TODO: Incomplete code, needs documentation, etc. main :: IO () main = do - session <- newSession - withConnection (simpleConnect hostname username password resource) session - sendPresence presenceOnline session - echo session + csession <- newSessionChans + withConnection (simpleConnect host port hostname username password resource) + (session csession) + forkIO $ autoAccept csession + sendPresence presenceOnline csession + echo csession return () -- Pull message stanzas, verify that they originate from a `full' XMPP -- address, and, if so, `echo' the message back. -echo :: Session -> IO () -echo session = forever $ do - result <- pullMessage session +echo :: CSession -> IO () +echo csession = forever $ do + result <- pullMessage csession case result of Right message -> if (isJust $ messageFrom message) && (isFull $ fromJust $ messageFrom message) then do -- TODO: May not set from. - sendMessage (Message Nothing (messageTo message) (messageFrom message) Nothing (messageType message) (messagePayload message)) session + sendMessage (Message Nothing (messageTo message) (messageFrom message) Nothing (messageType message) (messagePayload message)) csession liftIO $ putStrLn "Message echoed!" else liftIO $ putStrLn "Message sender is not set or is bare!" - Left exception -> liftIO $ putStrLn "Error: " \ No newline at end of file + Left exception -> liftIO $ putStrLn "Error: " + +-- | Autoaccept any subscription offers (So people can see us online) +autoAccept :: CSession -> IO () +autoAccept csession = forever $ do + st <- waitForPresence isPresenceSubscribe csession + sendPresence (presenceSubscribed (fromJust $ presenceFrom st)) csession \ No newline at end of file diff --git a/pontarius-xmpp.cabal b/pontarius-xmpp.cabal index 8352050..33c9402 100644 --- a/pontarius-xmpp.cabal +++ b/pontarius-xmpp.cabal @@ -74,7 +74,7 @@ Library Other-modules: Network.Xmpp.Jid , Network.Xmpp.Concurrent.Types - , Network.Xmpp.Concurrent.IQ + , Network.Xmpp.Concurrent.Channels.IQ , Network.Xmpp.Concurrent.Threads , Network.Xmpp.Concurrent.Monad , Text.XML.Stream.Elements diff --git a/source/Network/Xmpp.hs b/source/Network/Xmpp.hs index 6481216..3a06f37 100644 --- a/source/Network/Xmpp.hs +++ b/source/Network/Xmpp.hs @@ -29,7 +29,8 @@ module Network.Xmpp ( -- * Session management - newSessionChans + Session + , newSessionChans , withConnection , connect , simpleConnect @@ -140,7 +141,7 @@ module Network.Xmpp , iqRequestPayload , iqResultPayload -- * Threads - , forkChans + , forkCSession -- * Miscellaneous , LangTag(..) , exampleParams @@ -238,15 +239,17 @@ simpleAuth username passwd resource = flip auth resource $ -- -- Note that the server might assign a different resource even when we send -- a preference. -simpleConnect :: HostName -- ^ Target host name - -> PortID +simpleConnect :: HostName -- ^ Host to connect to + -> PortID -- ^ Port to connec to + -> Text -- ^ Hostname of the server (to distinguish the XMPP + -- service) -> Text -- ^ User name (authcid) -> Text -- ^ Password -> Maybe Text -- ^ Desired resource (or Nothing to let the server -- decide) -> XmppConMonad Jid -simpleConnect host port username password resource = do - connect host port username +simpleConnect host port hostname username password resource = do + connect host port hostname startTLS exampleParams saslResponse <- simpleAuth username password resource case saslResponse of diff --git a/source/Network/Xmpp/Concurrent.hs b/source/Network/Xmpp/Concurrent.hs index a797b1d..e11c5a7 100644 --- a/source/Network/Xmpp/Concurrent.hs +++ b/source/Network/Xmpp/Concurrent.hs @@ -2,10 +2,10 @@ module Network.Xmpp.Concurrent ( Session , module Network.Xmpp.Concurrent.Monad , module Network.Xmpp.Concurrent.Threads - , module Network.Xmpp.Concurrent.IQ + , module Network.Xmpp.Concurrent.Channels ) where import Network.Xmpp.Concurrent.Types import Network.Xmpp.Concurrent.Monad import Network.Xmpp.Concurrent.Threads -import Network.Xmpp.Concurrent.IQ +import Network.Xmpp.Concurrent.Channels diff --git a/source/Network/Xmpp/Concurrent/Channels.hs b/source/Network/Xmpp/Concurrent/Channels.hs new file mode 100644 index 0000000..3f23e06 --- /dev/null +++ b/source/Network/Xmpp/Concurrent/Channels.hs @@ -0,0 +1,140 @@ +{-# LANGUAGE OverloadedStrings #-} +module Network.Xmpp.Concurrent.Channels + ( module Network.Xmpp.Concurrent.Channels.Basic + , module Network.Xmpp.Concurrent.Channels.Types + , module Network.Xmpp.Concurrent.Channels.Message + , module Network.Xmpp.Concurrent.Channels.Presence + , module Network.Xmpp.Concurrent.Channels.IQ + , toChans + , newSessionChans + , writeWorker + ) + + where + +import Control.Applicative((<$>),(<*>)) +import Control.Concurrent +import Control.Concurrent.STM +import Control.Monad +import qualified Data.ByteString as BS +import Data.IORef +import qualified Data.Map as Map +import Data.Maybe (fromMaybe) +import Data.XML.Types +import Network.Xmpp.Concurrent.Channels.Basic +import Network.Xmpp.Concurrent.Channels.IQ +import Network.Xmpp.Concurrent.Channels.Message +import Network.Xmpp.Concurrent.Channels.Presence +import Network.Xmpp.Concurrent.Channels.Types +import Network.Xmpp.Concurrent.Threads +import Network.Xmpp.Concurrent.Types +import Network.Xmpp.Marshal +import Network.Xmpp.Pickle +import Network.Xmpp.Types +import Text.XML.Stream.Elements + +toChans :: TChan (Either MessageError Message) + -> TChan (Either PresenceError Presence) + -> TChan Stanza + -> TVar IQHandlers + -> Stanza + -> IO () +toChans messageC presenceC stanzaC iqHands sta = atomically $ do + writeTChan stanzaC sta + void $ readTChan stanzaC -- sic + case sta of + MessageS m -> do writeTChan messageC $ Right m + _ <- readTChan messageC -- Sic! + return () + -- this may seem ridiculous, but to prevent + -- the channel from filling up we + -- immedtiately remove the + -- Stanza we just put in. It will still be + -- available in duplicates. + MessageErrorS m -> do writeTChan messageC $ Left m + _ <- readTChan messageC + return () + PresenceS p -> do + writeTChan presenceC $ Right p + _ <- readTChan presenceC + return () + PresenceErrorS p -> do + writeTChan presenceC $ Left p + _ <- readTChan presenceC + return () + IQRequestS i -> handleIQRequest iqHands i + IQResultS i -> handleIQResponse iqHands (Right i) + IQErrorS i -> handleIQResponse iqHands (Left i) + where + -- If the IQ request has a namespace, send it through the appropriate channel. + handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () + handleIQRequest handlers iq = do + (byNS, _) <- readTVar handlers + let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq) + case Map.lookup (iqRequestType iq, iqNS) byNS of + Nothing -> return () -- TODO: send error stanza + Just ch -> do + sent <- newTVar False + writeTChan ch $ IQRequestTicket sent iq + handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM () + handleIQResponse handlers iq = do + (byNS, byID) <- readTVar handlers + case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of + (Nothing, _) -> return () -- We are not supposed to send an error. + (Just tmvar, byID') -> do + let answer = either IQResponseError IQResponseResult iq + _ <- tryPutTMVar tmvar answer -- Don't block. + writeTVar handlers (byNS, byID') + where + iqID (Left err) = iqErrorID err + iqID (Right iq') = iqResultID iq' + + +-- | Creates and initializes a new concurrent session. +newSessionChans :: IO CSession +newSessionChans = do + messageC <- newTChanIO + presenceC <- newTChanIO + outC <- newTChanIO + stanzaC <- newTChanIO + iqHandlers <- newTVarIO (Map.empty, Map.empty) + eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () } + let stanzaHandler = toChans messageC presenceC stanzaC iqHandlers + (kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh + writer <- forkIO $ writeWorker outC wLock + workermCh <- newIORef $ Nothing + workerpCh <- newIORef $ Nothing + idRef <- newTVarIO 1 + let getId = atomically $ do + curId <- readTVar idRef + writeTVar idRef (curId + 1 :: Integer) + return . read. show $ curId + let sess = Session { writeRef = wLock + , readerThread = readerThread + , idGenerator = getId + , conStateRef = conState + , eventHandlers = eh + , stopThreads = kill >> killThread writer + } + return $ CSession { session = sess + , mShadow = messageC + , pShadow = presenceC + , sShadow = stanzaC + , messagesRef = workermCh + , presenceRef = workerpCh + , outCh = outC + , iqHandlers = iqHandlers + } + +-- Worker to write stanzas to the stream concurrently. +writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO () +writeWorker stCh writeR = forever $ do + (write, next) <- atomically $ (,) <$> + takeTMVar writeR <*> + readTChan stCh + r <- write $ renderElement (pickleElem xpStanza next) + atomically $ putTMVar writeR write + unless r $ do + atomically $ unGetTChan stCh next -- If the writing failed, the + -- connection is dead. + threadDelay 250000 -- Avoid free spinning. diff --git a/source/Network/Xmpp/Concurrent/Channels/Basic.hs b/source/Network/Xmpp/Concurrent/Channels/Basic.hs new file mode 100644 index 0000000..51f7914 --- /dev/null +++ b/source/Network/Xmpp/Concurrent/Channels/Basic.hs @@ -0,0 +1,21 @@ +module Network.Xmpp.Concurrent.Channels.Basic where + +import Control.Concurrent.STM +import Data.IORef +import Network.Xmpp.Concurrent.Channels.Types +import Network.Xmpp.Types + +-- | Get a duplicate of the stanza channel +getStanzaChan :: CSession -> IO (TChan Stanza) +getStanzaChan session = atomically $ dupTChan (sShadow session) + +-- | Send a stanza to the server. +sendStanza :: Stanza -> CSession -> IO () +sendStanza a session = atomically $ writeTChan (outCh session) a + +-- | Create a forked session object +forkCSession :: CSession -> IO CSession +forkCSession session = do + mCH' <- newIORef Nothing + pCH' <- newIORef Nothing + return $ session {messagesRef = mCH' , presenceRef = pCH'} diff --git a/source/Network/Xmpp/Concurrent/Channels/IQ.hs b/source/Network/Xmpp/Concurrent/Channels/IQ.hs new file mode 100644 index 0000000..4923756 --- /dev/null +++ b/source/Network/Xmpp/Concurrent/Channels/IQ.hs @@ -0,0 +1,107 @@ +module Network.Xmpp.Concurrent.Channels.IQ where + +import Control.Concurrent (forkIO, threadDelay) +import Control.Concurrent.STM +import Control.Monad +import Control.Monad.IO.Class +import Control.Monad.Trans.Reader + +import qualified Data.Map as Map +import Data.Text (Text) +import Data.XML.Types + +import Network.Xmpp.Concurrent.Channels.Basic +import Network.Xmpp.Concurrent.Channels.Types +import Network.Xmpp.Concurrent.Types +import Network.Xmpp.Types + +-- | Sends an IQ, returns a 'TMVar' that will be filled with the first inbound +-- IQ with a matching ID that has type @result@ or @error@. +sendIQ :: Maybe Int -- ^ Timeout + -> Maybe Jid -- ^ Recipient (to) + -> IQRequestType -- ^ IQ type (@Get@ or @Set@) + -> Maybe LangTag -- ^ Language tag of the payload (@Nothing@ for + -- default) + -> Element -- ^ The IQ body (there has to be exactly one) + -> CSession + -> IO (TMVar IQResponse) +sendIQ timeOut to tp lang body csession = do -- TODO: Add timeout + newId <- idGenerator (session csession) + ref <- atomically $ do + resRef <- newEmptyTMVar + (byNS, byId) <- readTVar (iqHandlers csession) + writeTVar (iqHandlers csession) (byNS, Map.insert newId resRef byId) + -- TODO: Check for id collisions (shouldn't happen?) + return resRef + sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) csession + case timeOut of + Nothing -> return () + Just t -> void . forkIO $ do + threadDelay t + doTimeOut (iqHandlers csession) newId ref + return ref + where + doTimeOut handlers iqid var = atomically $ do + p <- tryPutTMVar var IQResponseTimeout + when p $ do + (byNS, byId) <- readTVar (iqHandlers csession) + writeTVar handlers (byNS, Map.delete iqid byId) + return () + + +-- | Like 'sendIQ', but waits for the answer IQ. Times out after 3 seconds +sendIQ' :: Maybe Jid + -> IQRequestType + -> Maybe LangTag + -> Element + -> CSession + -> IO IQResponse +sendIQ' to tp lang body csession = do + ref <- sendIQ (Just 3000000) to tp lang body csession + atomically $ takeTMVar ref + + +-- | Retrieves an IQ listener channel. If the namespace/'IQRequestType' is not +-- already handled, a new 'TChan' is created and returned as a 'Right' value. +-- Otherwise, the already existing channel will be returned wrapped in a 'Left' +-- value. Note that the 'Left' channel might need to be duplicated in order not +-- to interfere with existing consumers. +listenIQChan :: IQRequestType -- ^ Type of IQs to receive (@Get@ or @Set@) + -> Text -- ^ Namespace of the child element + -> CSession + -> IO (Either (TChan IQRequestTicket) (TChan IQRequestTicket)) +listenIQChan tp ns csession = do + let handlers = (iqHandlers csession) + atomically $ do + (byNS, byID) <- readTVar handlers + iqCh <- newTChan + let (present, byNS') = Map.insertLookupWithKey' + (\_ _ old -> old) + (tp, ns) + iqCh + byNS + writeTVar handlers (byNS', byID) + return $ case present of + Nothing -> Right iqCh + Just iqCh' -> Left iqCh' + +answerIQ :: IQRequestTicket + -> Either StanzaError (Maybe Element) + -> CSession + -> IO Bool +answerIQ (IQRequestTicket + sentRef + (IQRequest iqid from _to lang _tp bd)) + answer csession = do + let response = case answer of + Left err -> IQErrorS $ IQError iqid Nothing from lang err (Just bd) + Right res -> IQResultS $ IQResult iqid Nothing from lang res + atomically $ do + sent <- readTVar sentRef + case sent of + False -> do + writeTVar sentRef True + + writeTChan (outCh csession) response + return True + True -> return False diff --git a/source/Network/Xmpp/Concurrent/Channels/Message.hs b/source/Network/Xmpp/Concurrent/Channels/Message.hs new file mode 100644 index 0000000..6f55962 --- /dev/null +++ b/source/Network/Xmpp/Concurrent/Channels/Message.hs @@ -0,0 +1,69 @@ +module Network.Xmpp.Concurrent.Channels.Message where + +import Network.Xmpp.Concurrent.Channels.Types +import Control.Concurrent.STM +import Data.IORef +import Network.Xmpp.Types +import Network.Xmpp.Concurrent.Types +import Network.Xmpp.Concurrent.Channels.Basic + +-- | Get the inbound stanza channel, duplicates from master if necessary. Please +-- note that once duplicated it will keep filling up, call 'dropMessageChan' to +-- allow it to be garbage collected. +getMessageChan :: CSession -> IO (TChan (Either MessageError Message)) +getMessageChan session = do + mCh <- readIORef . messagesRef $ session + case mCh of + Nothing -> do + mCh' <- atomically $ dupTChan (mShadow session) + writeIORef (messagesRef session) (Just mCh') + return mCh' + Just mCh' -> return mCh' + +-- | Drop the local end of the inbound stanza channel from our context so it can +-- be GC-ed. +dropMessageChan :: CSession -> IO () +dropMessageChan session = writeIORef (messagesRef session) Nothing + +-- | Read an element from the inbound stanza channel, acquiring a copy of the +-- channel as necessary. +pullMessage :: CSession -> IO (Either MessageError Message) +pullMessage session = do + c <- getMessageChan session + atomically $ readTChan c + +-- | Pulls a (non-error) message and returns it if the given predicate returns +-- @True@. +waitForMessage :: (Message -> Bool) -> CSession -> IO Message +waitForMessage f session = do + s <- pullMessage session + case s of + Left _ -> waitForMessage f session + Right m | f m -> return m + | otherwise -> waitForMessage f session + +-- | Pulls an error message and returns it if the given predicate returns @True@. +waitForMessageError :: (MessageError -> Bool) -> CSession -> IO MessageError +waitForMessageError f session = do + s <- pullMessage session + case s of + Right _ -> waitForMessageError f session + Left m | f m -> return m + | otherwise -> waitForMessageError f session + + +-- | Pulls a message and returns it if the given predicate returns @True@. +filterMessages :: (MessageError -> Bool) + -> (Message -> Bool) + -> CSession -> IO (Either MessageError Message) +filterMessages f g session = do + s <- pullMessage session + case s of + Left e | f e -> return $ Left e + | otherwise -> filterMessages f g session + Right m | g m -> return $ Right m + | otherwise -> filterMessages f g session + +-- | Send a message stanza. +sendMessage :: Message -> CSession -> IO () +sendMessage m session = sendStanza (MessageS m) session diff --git a/source/Network/Xmpp/Concurrent/Channels/Presence.hs b/source/Network/Xmpp/Concurrent/Channels/Presence.hs new file mode 100644 index 0000000..e264a14 --- /dev/null +++ b/source/Network/Xmpp/Concurrent/Channels/Presence.hs @@ -0,0 +1,46 @@ +module Network.Xmpp.Concurrent.Channels.Presence where + +import Network.Xmpp.Concurrent.Channels.Types +import Control.Concurrent.STM +import Data.IORef +import Network.Xmpp.Types +import Network.Xmpp.Concurrent.Types +import Network.Xmpp.Concurrent.Channels.Basic + +-- | Analogous to 'getMessageChan'. +getPresenceChan :: CSession -> IO (TChan (Either PresenceError Presence)) +getPresenceChan session = do + pCh <- readIORef $ (presenceRef session) + case pCh of + Nothing -> do + pCh' <- atomically $ dupTChan (pShadow session) + writeIORef (presenceRef session) (Just pCh') + return pCh' + Just pCh' -> return pCh' + + +-- | Analogous to 'dropMessageChan'. +dropPresenceChan :: CSession -> IO () +dropPresenceChan session = writeIORef (presenceRef session) Nothing + + +-- | Read an element from the inbound stanza channel, acquiring a copy of the +-- channel as necessary. +pullPresence :: CSession -> IO (Either PresenceError Presence) +pullPresence session = do + c <- getPresenceChan session + atomically $ readTChan c + +-- | Pulls a (non-error) presence and returns it if the given predicate returns +-- @True@. +waitForPresence :: (Presence -> Bool) -> CSession -> IO Presence +waitForPresence f session = do + s <- pullPresence session + case s of + Left _ -> waitForPresence f session + Right m | f m -> return m + | otherwise -> waitForPresence f session + +-- | Send a presence stanza. +sendPresence :: Presence -> CSession -> IO () +sendPresence p session = sendStanza (PresenceS p) session diff --git a/source/Network/Xmpp/Concurrent/Channels/Types.hs b/source/Network/Xmpp/Concurrent/Channels/Types.hs new file mode 100644 index 0000000..0832154 --- /dev/null +++ b/source/Network/Xmpp/Concurrent/Channels/Types.hs @@ -0,0 +1,42 @@ +module Network.Xmpp.Concurrent.Channels.Types where + +import Control.Concurrent.STM +import Data.IORef +import qualified Data.Map as Map +import Data.Text (Text) +import Network.Xmpp.Concurrent.Types +import Network.Xmpp.Types + +-- | Session with Channels +data CSession = CSession + { session :: Session + -- The original master channels that the reader puts stanzas + -- into. These are cloned by @get{STanza,Message,Presence}Chan + -- on demand when first used by the thread and are stored in the + -- {message,presence}Ref fields below. + , mShadow :: TChan (Either MessageError Message) + , pShadow :: TChan (Either PresenceError Presence) + , sShadow :: TChan Stanza -- All stanzas + -- The cloned copies of the original/shadow channels. They are + -- thread-local (as opposed to the shadow channels) and contains all + -- stanzas received after the cloning of the shadow channels. + , messagesRef :: IORef (Maybe (TChan (Either MessageError Message))) + , presenceRef :: IORef (Maybe (TChan (Either PresenceError Presence))) + , outCh :: TChan Stanza + , iqHandlers :: TVar IQHandlers + -- Writing lock, so that only one thread could write to the stream at any + -- given time. + } + +-- | IQHandlers holds the registered channels for incomming IQ requests and +-- TMVars of and TMVars for expected IQ responses +type IQHandlers = (Map.Map (IQRequestType, Text) (TChan IQRequestTicket) + , Map.Map StanzaId (TMVar IQResponse) + ) + +-- | Contains whether or not a reply has been sent, and the IQ request body to +-- reply to. +data IQRequestTicket = IQRequestTicket + { sentRef :: (TVar Bool) + , iqRequestBody :: IQRequest + } diff --git a/source/Network/Xmpp/Concurrent/IQ.hs b/source/Network/Xmpp/Concurrent/IQ.hs deleted file mode 100644 index 015fc3b..0000000 --- a/source/Network/Xmpp/Concurrent/IQ.hs +++ /dev/null @@ -1,81 +0,0 @@ -module Network.Xmpp.Concurrent.IQ where - -import Control.Concurrent.STM -import Control.Concurrent (forkIO, threadDelay) -import Control.Monad -import Control.Monad.IO.Class -import Control.Monad.Trans.Reader - -import Data.XML.Types -import qualified Data.Map as Map - -import Network.Xmpp.Concurrent.Types -import Network.Xmpp.Concurrent.Monad -import Network.Xmpp.Types - --- | Sends an IQ, returns a 'TMVar' that will be filled with the first inbound --- IQ with a matching ID that has type @result@ or @error@. -sendIQ :: Maybe Int -- ^ Timeout - -> Maybe Jid -- ^ Recipient (to) - -> IQRequestType -- ^ IQ type (@Get@ or @Set@) - -> Maybe LangTag -- ^ Language tag of the payload (@Nothing@ for - -- default) - -> Element -- ^ The IQ body (there has to be exactly one) - -> Session - -> IO (TMVar IQResponse) -sendIQ timeOut to tp lang body session = do -- TODO: Add timeout - newId <- idGenerator session - ref <- atomically $ do - resRef <- newEmptyTMVar - (byNS, byId) <- readTVar (iqHandlers . chans $ session) - writeTVar (iqHandlers . chans $ session) (byNS, Map.insert newId resRef byId) - -- TODO: Check for id collisions (shouldn't happen?) - return resRef - sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) (chans session) - case timeOut of - Nothing -> return () - Just t -> void . forkIO $ do - threadDelay t - doTimeOut (iqHandlers . chans $ session) newId ref - return ref - where - doTimeOut handlers iqid var = atomically $ do - p <- tryPutTMVar var IQResponseTimeout - when p $ do - (byNS, byId) <- readTVar (iqHandlers . chans $ session) - writeTVar handlers (byNS, Map.delete iqid byId) - return () - - --- | Like 'sendIQ', but waits for the answer IQ. Times out after 3 seconds -sendIQ' :: Maybe Jid - -> IQRequestType - -> Maybe LangTag - -> Element - -> Session - -> IO IQResponse -sendIQ' to tp lang body session = do - ref <- sendIQ (Just 3000000) to tp lang body session - atomically $ takeTMVar ref - - -answerIQ :: IQRequestTicket - -> Either StanzaError (Maybe Element) - -> Session - -> IO Bool -answerIQ (IQRequestTicket - sentRef - (IQRequest iqid from _to lang _tp bd)) - answer session = do - let response = case answer of - Left err -> IQErrorS $ IQError iqid Nothing from lang err (Just bd) - Right res -> IQResultS $ IQResult iqid Nothing from lang res - atomically $ do - sent <- readTVar sentRef - case sent of - False -> do - writeTVar sentRef True - - writeTChan (outCh . chans $ session) response - return True - True -> return False diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs index 40a6b9f..96a1bc2 100644 --- a/source/Network/Xmpp/Concurrent/Monad.hs +++ b/source/Network/Xmpp/Concurrent/Monad.hs @@ -20,135 +20,7 @@ import Network.Xmpp.Concurrent.Types import Network.Xmpp.Monad --- | Retrieves an IQ listener channel. If the namespace/'IQRequestType' is not --- already handled, a new 'TChan' is created and returned as a 'Right' value. --- Otherwise, the already existing channel will be returned wrapped in a 'Left' --- value. Note that the 'Left' channel might need to be duplicated in order not --- to interfere with existing consumers. -listenIQChan :: IQRequestType -- ^ Type of IQs to receive (@Get@ or @Set@) - -> Text -- ^ Namespace of the child element - -> Chans - -> IO (Either (TChan IQRequestTicket) (TChan IQRequestTicket)) -listenIQChan tp ns chans = do - let handlers = iqHandlers chans - atomically $ do - (byNS, byID) <- readTVar handlers - iqCh <- newTChan - let (present, byNS') = Map.insertLookupWithKey' - (\_ _ old -> old) - (tp, ns) - iqCh - byNS - writeTVar handlers (byNS', byID) - return $ case present of - Nothing -> Right iqCh - Just iqCh' -> Left iqCh' --- | Get a duplicate of the stanza channel -getStanzaChan :: Chans -> IO (TChan Stanza) -getStanzaChan chans = atomically $ dupTChan (sShadow chans) - --- | Get the inbound stanza channel, duplicates from master if necessary. Please --- note that once duplicated it will keep filling up, call 'dropMessageChan' to --- allow it to be garbage collected. -getMessageChan :: Chans -> IO (TChan (Either MessageError Message)) -getMessageChan chans = do - mCh <- readIORef $ messagesRef chans - case mCh of - Nothing -> do - mCh' <- atomically $ dupTChan (mShadow chans) - writeIORef (messagesRef chans) (Just mCh') - return mCh' - Just mCh' -> return mCh' - --- | Analogous to 'getMessageChan'. -getPresenceChan :: Chans -> IO (TChan (Either PresenceError Presence)) -getPresenceChan chans = do - pCh <- readIORef $ presenceRef chans - case pCh of - Nothing -> do - pCh' <- atomically $ dupTChan (pShadow chans) - writeIORef (presenceRef chans) (Just pCh') - return pCh' - Just pCh' -> return pCh' - --- | Drop the local end of the inbound stanza channel from our context so it can --- be GC-ed. -dropMessageChan :: Chans -> IO () -dropMessageChan chans = writeIORef (messagesRef chans) Nothing - --- | Analogous to 'dropMessageChan'. -dropPresenceChan :: Chans -> IO () -dropPresenceChan chans = writeIORef (presenceRef chans) Nothing - --- | Read an element from the inbound stanza channel, acquiring a copy of the --- channel as necessary. -pullMessage :: Chans -> IO (Either MessageError Message) -pullMessage chans = do - c <- getMessageChan chans - atomically $ readTChan c - --- | Read an element from the inbound stanza channel, acquiring a copy of the --- channel as necessary. -pullPresence :: Chans -> IO (Either PresenceError Presence) -pullPresence chans = do - c <- getPresenceChan chans - atomically $ readTChan c - --- | Send a stanza to the server. -sendStanza :: Stanza -> Chans -> IO () -sendStanza a chans = atomically $ writeTChan (outCh chans) a - - --- | Create a forked chans object -forkChans :: Chans -> IO Chans -forkChans chans = do - mCH' <- newIORef Nothing - pCH' <- newIORef Nothing - return $ chans {messagesRef = mCH', presenceRef = pCH'} - --- | Pulls a message and returns it if the given predicate returns @True@. -filterMessages :: (MessageError -> Bool) - -> (Message -> Bool) - -> Chans -> IO (Either MessageError Message) -filterMessages f g chans = do - s <- pullMessage chans - case s of - Left e | f e -> return $ Left e - | otherwise -> filterMessages f g chans - Right m | g m -> return $ Right m - | otherwise -> filterMessages f g chans - --- | Pulls a (non-error) message and returns it if the given predicate returns --- @True@. -waitForMessage :: (Message -> Bool) -> Chans -> IO Message -waitForMessage f chans = do - s <- pullMessage chans - case s of - Left _ -> waitForMessage f chans - Right m | f m -> return m - | otherwise -> waitForMessage f chans - - --- | Pulls an error message and returns it if the given predicate returns @True@. -waitForMessageError :: (MessageError -> Bool) -> Chans -> IO MessageError -waitForMessageError f chans = do - s <- pullMessage chans - case s of - Right _ -> waitForMessageError f chans - Left m | f m -> return m - | otherwise -> waitForMessageError f chans - - --- | Pulls a (non-error) presence and returns it if the given predicate returns --- @True@. -waitForPresence :: (Presence -> Bool) -> Chans -> IO Presence -waitForPresence f chans = do - s <- pullPresence chans - case s of - Left _ -> waitForPresence f chans - Right m | f m -> return m - | otherwise -> waitForPresence f chans -- TODO: Wait for presence error? @@ -194,15 +66,6 @@ withConnection a session = do >> Ex.throwIO (e :: Ex.SomeException) ] --- | Send a presence stanza. -sendPresence :: Presence -> Chans -> IO () -sendPresence p chans = sendStanza (PresenceS p) chans - --- | Send a message stanza. -sendMessage :: Message -> Chans -> IO () -sendMessage m chans = sendStanza (MessageS m) chans - - -- | Executes a function to update the event handlers. modifyHandlers :: (EventHandlers -> EventHandlers) -> Session -> IO () modifyHandlers f session = atomically $ modifyTVar (eventHandlers session) f diff --git a/source/Network/Xmpp/Concurrent/Types.hs b/source/Network/Xmpp/Concurrent/Types.hs index 78bdfcc..249077c 100644 --- a/source/Network/Xmpp/Concurrent/Types.hs +++ b/source/Network/Xmpp/Concurrent/Types.hs @@ -15,60 +15,27 @@ import Data.Typeable import Network.Xmpp.Types --- Map between the IQ request type and the "query" namespace pair, and the TChan --- for the IQ request and "sent" boolean pair. -type IQHandlers = (Map.Map (IQRequestType, Text) (TChan IQRequestTicket) - , Map.Map StanzaId (TMVar IQResponse) - ) - --- Handlers to be run when the Xmpp session ends and when the Xmpp connection is +-- | Handlers to be run when the Xmpp session ends and when the Xmpp connection is -- closed. data EventHandlers = EventHandlers { connectionClosedHandler :: StreamError -> IO () } --- The Session object is the Xmpp (ReaderT) state. +-- | Xmpp Session object data Session = Session { writeRef :: TMVar (BS.ByteString -> IO Bool) , readerThread :: ThreadId , idGenerator :: IO StanzaId - -- Lock (used by withConnection) to make sure that a maximum of one - -- XmppConMonad calculation is executed at any given time. + -- | Lock (used by withConnection) to make sure that a maximum of one + -- XmppConMonad action is executed at any given time. , conStateRef :: TMVar XmppConnection , eventHandlers :: TVar EventHandlers , stopThreads :: IO () - , chans :: Chans } -data Chans = Chans - { - -- The original master channels that the reader puts stanzas - -- into. These are cloned by @get{STanza,Message,Presence}Chan - -- on demand when first used by the thread and are stored in the - -- {message,presence}Ref fields below. - mShadow :: TChan (Either MessageError Message) - , pShadow :: TChan (Either PresenceError Presence) - , sShadow :: TChan Stanza -- All stanzas - -- The cloned copies of the original/shadow channels. They are - -- thread-local (as opposed to the shadow channels) and contains all - -- stanzas received after the cloning of the shadow channels. - , messagesRef :: IORef (Maybe (TChan (Either MessageError Message))) - , presenceRef :: IORef (Maybe (TChan (Either PresenceError Presence))) - , outCh :: TChan Stanza - , iqHandlers :: TVar IQHandlers - -- Writing lock, so that only one thread could write to the stream at any - -- given time. - } --- Interrupt is used to signal to the reader thread that it should stop. +-- | Interrupt is used to signal to the reader thread that it should stop. Th contained semphore signals the reader to resume it's work. data Interrupt = Interrupt (TMVar ()) deriving Typeable instance Show Interrupt where show _ = "" instance Ex.Exception Interrupt - --- | Contains whether or not a reply has been sent, and the IQ request body to --- reply to. -data IQRequestTicket = IQRequestTicket - { sentRef :: (TVar Bool) - , iqRequestBody :: IQRequest - } diff --git a/source/Network/Xmpp/Xep/ServiceDiscovery.hs b/source/Network/Xmpp/Xep/ServiceDiscovery.hs index 332a018..233f981 100644 --- a/source/Network/Xmpp/Xep/ServiceDiscovery.hs +++ b/source/Network/Xmpp/Xep/ServiceDiscovery.hs @@ -86,7 +86,7 @@ xpQueryInfo = xpWrap (\(nd, (feats, ids)) -> QIR nd ids feats) -- | Query an entity for it's identity and features queryInfo :: Jid -- ^ Entity to query -> Maybe Text.Text -- ^ Node - -> Session + -> CSession -> IO (Either DiscoError QueryInfoResult) queryInfo to node session = do res <- sendIQ' (Just to) Get Nothing queryBody session @@ -149,7 +149,7 @@ xpQueryItems = xpElem (itemsN "query") -- | Query an entity for Items of a node queryItems :: Jid -- ^ Entity to query -> Maybe Text.Text -- ^ Node - -> Session + -> CSession -> IO (Either DiscoError (Maybe Text.Text, [Item])) queryItems to node session = do res <- sendIQ' (Just to) Get Nothing queryBody session diff --git a/tests/Tests.hs b/tests/Tests.hs index 893033d..4bcad8a 100644 --- a/tests/Tests.hs +++ b/tests/Tests.hs @@ -7,6 +7,7 @@ import qualified Control.Exception.Lifted as Ex import Control.Monad import Control.Monad.State import Control.Monad.IO.Class +import Control.Monad.Reader import Data.Maybe import Data.Text (Text) @@ -14,12 +15,14 @@ import qualified Data.Text as Text import Data.XML.Pickle import Data.XML.Types +import Network import Network.Xmpp +import Network.Xmpp.Concurrent.Channels import Network.Xmpp.IM.Presence import Network.Xmpp.Pickle import Network.Xmpp.Types -import qualified Network.Xmpp.Xep.ServiceDiscovery as Disco import qualified Network.Xmpp.Xep.InbandRegistration as IBR +import qualified Network.Xmpp.Xep.ServiceDiscovery as Disco import System.Environment import Text.XML.Stream.Elements @@ -33,17 +36,15 @@ testUser2 = read "testuser2@species64739.dyndns.org/bot2" supervisor :: Jid supervisor = read "uart14@species64739.dyndns.org" - -attXmpp :: STM a -> Xmpp a -attXmpp = liftIO . atomically - testNS :: Text testNS = "xmpp:library:test" +type Xmpp a = CSession -> IO a + data Payload = Payload - { payloadCounter ::Int - , payloadFlag :: Bool - , payloadText :: Text + { payloadCounter :: Int + , payloadFlag :: Bool + , payloadText :: Text } deriving (Eq, Show) payloadP = xpWrap (\((counter,flag) , message) -> Payload counter flag message) @@ -58,8 +59,8 @@ payloadP = xpWrap (\((counter,flag) , message) -> Payload counter flag message) invertPayload (Payload count flag message) = Payload (count + 1) (not flag) (Text.reverse message) -iqResponder = do - chan' <- listenIQChan Get testNS +iqResponder csession = do + chan' <- listenIQChan Get testNS csession chan <- case chan' of Left _ -> liftIO $ putStrLn "Channel was already taken" >> error "hanging up" @@ -71,15 +72,15 @@ iqResponder = do let answerPayload = invertPayload payload let answerBody = pickleElem payloadP answerPayload unless (payloadCounter payload == 3) . void $ - answerIQ next (Right $ Just answerBody) + answerIQ next (Right $ Just answerBody) csession when (payloadCounter payload == 10) $ do - liftIO $ threadDelay 1000000 - endSession + threadDelay 1000000 + endSession (session csession) autoAccept :: Xmpp () -autoAccept = forever $ do - st <- waitForPresence isPresenceSubscribe - sendPresence $ presenceSubscribed (fromJust $ presenceFrom st) +autoAccept csession = forever $ do + st <- waitForPresence isPresenceSubscribe csession + sendPresence (presenceSubscribed (fromJust $ presenceFrom st)) csession simpleMessage :: Jid -> Text -> Message simpleMessage to txt = message @@ -99,19 +100,19 @@ simpleMessage to txt = message , messagePayload = [] } -sendUser = sendMessage . simpleMessage supervisor . Text.pack +sendUser m csession = sendMessage (simpleMessage supervisor $ Text.pack m) csession -expect debug x y | x == y = debug "Ok." - | otherwise = do - let failMSG = "failed" ++ show x ++ " /= " ++ show y - debug failMSG - sendUser failMSG +expect debug x y csession | x == y = debug "Ok." + | otherwise = do + let failMSG = "failed" ++ show x ++ " /= " ++ show y + debug failMSG + sendUser failMSG csession wait3 :: MonadIO m => m () wait3 = liftIO $ threadDelay 1000000 -discoTest debug = do - q <- Disco.queryInfo "species64739.dyndns.org" Nothing +discoTest debug csession = do + q <- Disco.queryInfo "species64739.dyndns.org" Nothing csession case q of Left (Disco.DiscoXMLError el e) -> do debug (ppElement el) @@ -120,7 +121,7 @@ discoTest debug = do x -> debug $ show x q <- Disco.queryItems "species64739.dyndns.org" - (Just "http://jabber.org/protocol/commands") + (Just "http://jabber.org/protocol/commands") csession case q of Left (Disco.DiscoXMLError el e) -> do debug (ppElement el) @@ -128,28 +129,32 @@ discoTest debug = do debug (show $ length $ elementNodes el) x -> debug $ show x -iqTest debug we them = do +iqTest debug we them csession = do forM [1..10] $ \count -> do let message = Text.pack . show $ localpart we let payload = Payload count (even count) (Text.pack $ show count) let body = pickleElem payloadP payload debug "sending" - answer <- sendIQ' (Just them) Get Nothing body + answer <- sendIQ' (Just them) Get Nothing body csession case answer of IQResponseResult r -> do debug "received" let Right answerPayload = unpickleElem payloadP (fromJust $ iqResultPayload r) - expect debug (invertPayload payload) answerPayload + expect debug (invertPayload payload) answerPayload csession IQResponseTimeout -> do debug $ "Timeout in packet: " ++ show count IQResponseError e -> do debug $ "Error in packet: " ++ show count liftIO $ threadDelay 100000 - sendUser "All tests done" + sendUser "All tests done" csession debug "ending session" -ibrTest debug = IBR.registerWith [ (IBR.Username, "testuser2") +fork action csession = do + csession' <- forkCSession csession + forkIO $ action csession' + +ibrTest debug uname pw = IBR.registerWith [ (IBR.Username, "testuser2") , (IBR.Password, "pwd") ] >>= debug . show @@ -161,41 +166,49 @@ runMain debug number multi = do 0 -> (testUser2, testUser1,False) let debug' = liftIO . atomically . debug . (("Thread " ++ show number ++ ":") ++) - withNewSession $ do - setConnectionClosedHandler (\e -> do - liftIO (debug' $ "connection lost because " ++ show e) - endSession ) - debug' "running" - withConnection $ Ex.catch (do - connect "localhost" "species64739.dyndns.org" - startTLS exampleParams - debug' "ibr start" - ibrTest debug' - debug' "ibr end" - saslResponse <- simpleAuth - (fromJust $ localpart we) "pwd" (resourcepart we) - case saslResponse of - Right _ -> return () - Left e -> error $ show e - debug' "session standing" - features <- other `liftM` gets sFeatures - liftIO . void $ forM features $ \f -> debug' $ ppElement f - ) - (\e -> debug' $ show (e ::Ex.SomeException)) - sendPresence presenceOnline - thread1 <- fork autoAccept - sendPresence $ presenceSubscribe them - thread2 <- fork iqResponder - when active $ do - liftIO $ threadDelay 1000000 -- Wait for the other thread to go online - discoTest debug' - when multi $ iqTest debug' we them - closeConnection - liftIO $ killThread thread1 - liftIO $ killThread thread2 - return () --- liftIO . threadDelay $ 10^6 - unless multi . void . withConnection $ IBR.unregister + csession <- newSessionChans + + setConnectionClosedHandler (\e s -> do + debug' $ "connection lost because " ++ show e + endSession s) (session csession) + debug' "running" + flip withConnection (session csession) $ Ex.catch (do + connect "localhost" (PortNumber 5222) "species64739.dyndns.org" + startTLS exampleParams + -- debug' "ibr start" + -- ibrTest debug' (localpart we) "pwd" + -- debug' "ibr end" + saslResponse <- simpleAuth + (fromJust $ localpart we) "pwd" (resourcepart we) + case saslResponse of + Right _ -> return () + Left e -> error $ show e + debug' "session standing" + features <- other `liftM` gets sFeatures + liftIO . void $ forM features $ \f -> debug' $ ppElement f + ) + (\e -> debug' $ show (e ::Ex.SomeException)) + sendPresence presenceOnline csession + thread1 <- fork autoAccept csession + sendPresence (presenceSubscribe them) csession + thread2 <- fork iqResponder csession + when active $ do + liftIO $ threadDelay 1000000 -- Wait for the other thread to go online +-- discoTest debug' + when multi $ iqTest debug' we them csession + closeConnection (session csession) + killThread thread1 + killThread thread2 + return () + liftIO . threadDelay $ 10^6 +-- unless multi . void . withConnection $ IBR.unregister + unless multi . void $ fork (\s -> forever $ do + pullMessage s >>= debug' . show + putStrLn "" + putStrLn "" + ) + csession + liftIO . forever $ threadDelay 1000000 return () run i multi = do @@ -206,4 +219,4 @@ run i multi = do runMain debugOut (2 + i) multi -main = run 0 False +main = run 0 True