diff --git a/source/Network/Xmpp.hs b/source/Network/Xmpp.hs index fd8b19e..538fae5 100644 --- a/source/Network/Xmpp.hs +++ b/source/Network/Xmpp.hs @@ -29,7 +29,7 @@ module Network.Xmpp ( -- * Session management - newSession + newSessionChans , withConnection , connect , simpleConnect @@ -140,7 +140,7 @@ module Network.Xmpp , iqRequestPayload , iqResultPayload -- * Threads - , forkSession + , forkChans -- * Misc , LangTag(..) , exampleParams @@ -152,6 +152,7 @@ import Network import qualified Network.TLS as TLS import Network.Xmpp.Bind import Network.Xmpp.Concurrent +import Network.Xmpp.Concurrent.Channels import Network.Xmpp.Concurrent.Types import Network.Xmpp.Marshal import Network.Xmpp.Message @@ -169,9 +170,9 @@ import Network.Xmpp.Types import Control.Monad.Error -- | Connect to host with given address. -connect :: HostName -> Text -> XmppConMonad (Either StreamError ()) -connect address hostname = do - xmppRawConnect address hostname +connect :: HostName -> PortID -> Text -> XmppConMonad (Either StreamError ()) +connect address port hostname = do + xmppRawConnect address port hostname result <- xmppStartStream case result of Left e -> do diff --git a/source/Network/Xmpp/Concurrent/IQ.hs b/source/Network/Xmpp/Concurrent/IQ.hs index 25e0c3d..015fc3b 100644 --- a/source/Network/Xmpp/Concurrent/IQ.hs +++ b/source/Network/Xmpp/Concurrent/IQ.hs @@ -27,22 +27,22 @@ sendIQ timeOut to tp lang body session = do -- TODO: Add timeout newId <- idGenerator session ref <- atomically $ do resRef <- newEmptyTMVar - (byNS, byId) <- readTVar (iqHandlers session) - writeTVar (iqHandlers session) (byNS, Map.insert newId resRef byId) + (byNS, byId) <- readTVar (iqHandlers . chans $ session) + writeTVar (iqHandlers . chans $ session) (byNS, Map.insert newId resRef byId) -- TODO: Check for id collisions (shouldn't happen?) return resRef - sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) session + sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) (chans session) case timeOut of Nothing -> return () Just t -> void . forkIO $ do threadDelay t - doTimeOut (iqHandlers session) newId ref + doTimeOut (iqHandlers . chans $ session) newId ref return ref where doTimeOut handlers iqid var = atomically $ do p <- tryPutTMVar var IQResponseTimeout when p $ do - (byNS, byId) <- readTVar (iqHandlers session) + (byNS, byId) <- readTVar (iqHandlers . chans $ session) writeTVar handlers (byNS, Map.delete iqid byId) return () @@ -76,6 +76,6 @@ answerIQ (IQRequestTicket False -> do writeTVar sentRef True - writeTChan (outCh session) response + writeTChan (outCh . chans $ session) response return True True -> return False diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs index 6a3de26..9f87c03 100644 --- a/source/Network/Xmpp/Concurrent/Monad.hs +++ b/source/Network/Xmpp/Concurrent/Monad.hs @@ -26,10 +26,10 @@ import Network.Xmpp.Monad -- to interfere with existing consumers. listenIQChan :: IQRequestType -- ^ Type of IQs to receive (@Get@ or @Set@) -> Text -- ^ Namespace of the child element - -> Session + -> Chans -> IO (Either (TChan IQRequestTicket) (TChan IQRequestTicket)) -listenIQChan tp ns session = do - let handlers = iqHandlers session +listenIQChan tp ns chans = do + let handlers = iqHandlers chans atomically $ do (byNS, byID) <- readTVar handlers iqCh <- newTChan @@ -44,110 +44,110 @@ listenIQChan tp ns session = do Just iqCh' -> Left iqCh' -- | Get a duplicate of the stanza channel -getStanzaChan :: Session -> IO (TChan Stanza) -getStanzaChan session = atomically $ dupTChan (sShadow session) +getStanzaChan :: Chans -> IO (TChan Stanza) +getStanzaChan chans = atomically $ dupTChan (sShadow chans) -- | Get the inbound stanza channel, duplicates from master if necessary. Please -- note that once duplicated it will keep filling up, call 'dropMessageChan' to -- allow it to be garbage collected. -getMessageChan :: Session -> IO (TChan (Either MessageError Message)) -getMessageChan session = do - mCh <- readIORef $ messagesRef session +getMessageChan :: Chans -> IO (TChan (Either MessageError Message)) +getMessageChan chans = do + mCh <- readIORef $ messagesRef chans case mCh of Nothing -> do - mCh' <- atomically $ dupTChan (mShadow session) - writeIORef (messagesRef session) (Just mCh') + mCh' <- atomically $ dupTChan (mShadow chans) + writeIORef (messagesRef chans) (Just mCh') return mCh' Just mCh' -> return mCh' -- | Analogous to 'getMessageChan'. -getPresenceChan :: Session -> IO (TChan (Either PresenceError Presence)) -getPresenceChan session = do - pCh <- readIORef $ presenceRef session +getPresenceChan :: Chans -> IO (TChan (Either PresenceError Presence)) +getPresenceChan chans = do + pCh <- readIORef $ presenceRef chans case pCh of Nothing -> do - pCh' <- atomically $ dupTChan (pShadow session) - writeIORef (presenceRef session) (Just pCh') + pCh' <- atomically $ dupTChan (pShadow chans) + writeIORef (presenceRef chans) (Just pCh') return pCh' Just pCh' -> return pCh' -- | Drop the local end of the inbound stanza channel from our context so it can -- be GC-ed. -dropMessageChan :: Session -> IO () -dropMessageChan session = writeIORef (messagesRef session) Nothing +dropMessageChan :: Chans -> IO () +dropMessageChan chans = writeIORef (messagesRef chans) Nothing -- | Analogous to 'dropMessageChan'. -dropPresenceChan :: Session -> IO () -dropPresenceChan session = writeIORef (presenceRef session) Nothing +dropPresenceChan :: Chans -> IO () +dropPresenceChan chans = writeIORef (presenceRef chans) Nothing -- | Read an element from the inbound stanza channel, acquiring a copy of the -- channel as necessary. -pullMessage :: Session -> IO (Either MessageError Message) -pullMessage session = do - c <- getMessageChan session +pullMessage :: Chans -> IO (Either MessageError Message) +pullMessage chans = do + c <- getMessageChan chans atomically $ readTChan c -- | Read an element from the inbound stanza channel, acquiring a copy of the -- channel as necessary. -pullPresence :: Session -> IO (Either PresenceError Presence) -pullPresence session = do - c <- getPresenceChan session +pullPresence :: Chans -> IO (Either PresenceError Presence) +pullPresence chans = do + c <- getPresenceChan chans atomically $ readTChan c -- | Send a stanza to the server. -sendStanza :: Stanza -> Session -> IO () -sendStanza a session = atomically $ writeTChan (outCh session) a +sendStanza :: Stanza -> Chans -> IO () +sendStanza a chans = atomically $ writeTChan (outCh chans) a --- | Create a forked session object -forkSession :: Session -> IO Session -forkSession session = do +-- | Create a forked chans object +forkChans :: Chans -> IO Chans +forkChans chans = do mCH' <- newIORef Nothing pCH' <- newIORef Nothing - return $ session {messagesRef = mCH', presenceRef = pCH'} + return $ chans {messagesRef = mCH', presenceRef = pCH'} -- | Pulls a message and returns it if the given predicate returns @True@. filterMessages :: (MessageError -> Bool) -> (Message -> Bool) - -> Session -> IO (Either MessageError Message) -filterMessages f g session = do - s <- pullMessage session + -> Chans -> IO (Either MessageError Message) +filterMessages f g chans = do + s <- pullMessage chans case s of Left e | f e -> return $ Left e - | otherwise -> filterMessages f g session + | otherwise -> filterMessages f g chans Right m | g m -> return $ Right m - | otherwise -> filterMessages f g session + | otherwise -> filterMessages f g chans -- | Pulls a (non-error) message and returns it if the given predicate returns -- @True@. -waitForMessage :: (Message -> Bool) -> Session -> IO Message -waitForMessage f session = do - s <- pullMessage session +waitForMessage :: (Message -> Bool) -> Chans -> IO Message +waitForMessage f chans = do + s <- pullMessage chans case s of - Left _ -> waitForMessage f session + Left _ -> waitForMessage f chans Right m | f m -> return m - | otherwise -> waitForMessage f session + | otherwise -> waitForMessage f chans -- | Pulls an error message and returns it if the given predicate returns @True@. -waitForMessageError :: (MessageError -> Bool) -> Session -> IO MessageError -waitForMessageError f session = do - s <- pullMessage session +waitForMessageError :: (MessageError -> Bool) -> Chans -> IO MessageError +waitForMessageError f chans = do + s <- pullMessage chans case s of - Right _ -> waitForMessageError f session + Right _ -> waitForMessageError f chans Left m | f m -> return m - | otherwise -> waitForMessageError f session + | otherwise -> waitForMessageError f chans -- | Pulls a (non-error) presence and returns it if the given predicate returns -- @True@. -waitForPresence :: (Presence -> Bool) -> Session -> IO Presence -waitForPresence f session = do - s <- pullPresence session +waitForPresence :: (Presence -> Bool) -> Chans -> IO Presence +waitForPresence f chans = do + s <- pullPresence chans case s of - Left _ -> waitForPresence f session + Left _ -> waitForPresence f chans Right m | f m -> return m - | otherwise -> waitForPresence f session + | otherwise -> waitForPresence f chans -- TODO: Wait for presence error? @@ -194,12 +194,12 @@ withConnection a session = do ] -- | Send a presence stanza. -sendPresence :: Presence -> Session -> IO () -sendPresence p session = sendStanza (PresenceS p) session +sendPresence :: Presence -> Chans -> IO () +sendPresence p chans = sendStanza (PresenceS p) chans -- | Send a message stanza. -sendMessage :: Message -> Session -> IO () -sendMessage m session = sendStanza (MessageS m) session +sendMessage :: Message -> Chans -> IO () +sendMessage m chans = sendStanza (MessageS m) chans -- | Executes a function to update the event handlers. diff --git a/source/Network/Xmpp/Concurrent/Threads.hs b/source/Network/Xmpp/Concurrent/Threads.hs index 074d455..4100588 100644 --- a/source/Network/Xmpp/Concurrent/Threads.hs +++ b/source/Network/Xmpp/Concurrent/Threads.hs @@ -1,32 +1,22 @@ -{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE ScopedTypeVariables #-} + module Network.Xmpp.Concurrent.Threads where import Network.Xmpp.Types -import Control.Applicative((<$>),(<*>)) +import Control.Applicative((<$>)) import Control.Concurrent import Control.Concurrent.STM import qualified Control.Exception.Lifted as Ex import Control.Monad import Control.Monad.IO.Class -import Control.Monad.Reader import Control.Monad.State.Strict import qualified Data.ByteString as BS -import Data.IORef -import qualified Data.Map as Map -import Data.Maybe - -import Data.XML.Types - import Network.Xmpp.Monad -import Network.Xmpp.Marshal -import Network.Xmpp.Pickle import Network.Xmpp.Concurrent.Types -import Text.XML.Stream.Elements - import GHC.IO (unsafeUnmask) -- Worker to read stanzas from the stream and concurrently distribute them to @@ -73,55 +63,25 @@ readWorker onStanza onConnectionClosed stateRef = Ex.catch (atomically $ forM ts takeTMVar) (\(Interrupt t) -> handleInterrupts (t:ts)) --- If the IQ request has a namespace, send it through the appropriate channel. -handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () -handleIQRequest handlers iq = do - (byNS, _) <- readTVar handlers - let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq) - case Map.lookup (iqRequestType iq, iqNS) byNS of - Nothing -> return () -- TODO: send error stanza - Just ch -> do - sent <- newTVar False - writeTChan ch $ IQRequestTicket sent iq - -handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM () -handleIQResponse handlers iq = do - (byNS, byID) <- readTVar handlers - case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of - (Nothing, _) -> return () -- We are not supposed to send an error. - (Just tmvar, byID') -> do - let answer = either IQResponseError IQResponseResult iq - _ <- tryPutTMVar tmvar answer -- Don't block. - writeTVar handlers (byNS, byID') - where - iqID (Left err) = iqErrorID err - iqID (Right iq') = iqResultID iq' - --- Worker to write stanzas to the stream concurrently. -writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO () -writeWorker stCh writeR = forever $ do - (write, next) <- atomically $ (,) <$> - takeTMVar writeR <*> - readTChan stCh - r <- write $ renderElement (pickleElem xpStanza next) - atomically $ putTMVar writeR write - unless r $ do - atomically $ unGetTChan stCh next -- If the writing failed, the - -- connection is dead. - threadDelay 250000 -- Avoid free spinning. - -- Two streams: input and output. Threads read from input stream and write to -- output stream. -- | Runs thread in XmppState monad. Returns channel of incoming and outgoing -- stances, respectively, and an Action to stop the Threads and close the -- connection. -startThreadsWith stanzaHandler outC eh = do +startThreadsWith :: (Stanza -> IO ()) + -> TVar EventHandlers + -> IO + (IO (), + TMVar (BS.ByteString -> IO Bool), + TMVar XmppConnection, + ThreadId) +startThreadsWith stanzaHandler eh = do writeLock <- newTMVarIO (\_ -> return False) conS <- newTMVarIO xmppNoConnection - lw <- forkIO $ writeWorker outC writeLock +-- lw <- forkIO $ writeWorker outC writeLock cp <- forkIO $ connPersist writeLock rd <- forkIO $ readWorker stanzaHandler (noCon eh) conS - return ( killConnection writeLock [lw, rd, cp] + return ( killConnection writeLock [rd, cp] , writeLock , conS , rd @@ -131,39 +91,12 @@ startThreadsWith stanzaHandler outC eh = do _ <- atomically $ takeTMVar writeLock -- Should we put it back? _ <- forM threads killThread return () - --- | Creates and initializes a new concurrent session. -newSessionChans :: IO Session -newSessionChans = do - messageC <- newTChanIO - presenceC <- newTChanIO - outC <- newTChanIO - stanzaC <- newTChanIO - iqHandlers <- newTVarIO (Map.empty, Map.empty) - eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () } - let stanzaHandler = toChans messageC presenceC stanzaC iqHandlers - (kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler outC eh - workermCh <- newIORef $ Nothing - workerpCh <- newIORef $ Nothing - idRef <- newTVarIO 1 - let getId = atomically $ do - curId <- readTVar idRef - writeTVar idRef (curId + 1 :: Integer) - return . read. show $ curId - return $ Session { mShadow = messageC - , pShadow = presenceC - , sShadow = stanzaC - , messagesRef = workermCh - , presenceRef = workerpCh - , outCh = outC - , iqHandlers = iqHandlers - , writeRef = wLock - , readerThread = readerThread - , idGenerator = getId - , conStateRef = conState - , eventHandlers = eh - , stopThreads = kill - } + -- Call the connection closed handlers. + noCon :: TVar EventHandlers -> StreamError -> IO () + noCon h e = do + hands <- atomically $ readTVar h + _ <- forkIO $ connectionClosedHandler hands e + return () -- Acquires the write lock, pushes a space, and releases the lock. -- | Sends a blank space every 30 seconds to keep the connection alive. @@ -173,38 +106,3 @@ connPersist lock = forever $ do _ <- pushBS " " atomically $ putTMVar lock pushBS threadDelay 30000000 -- 30s - - -toChans messageC presenceC stanzaC iqHands sta = atomically $ do - writeTChan stanzaC sta - void $ readTChan stanzaC -- sic - case sta of - MessageS m -> do writeTChan messageC $ Right m - _ <- readTChan messageC -- Sic! - return () - -- this may seem ridiculous, but to prevent - -- the channel from filling up we - -- immedtiately remove the - -- Stanza we just put in. It will still be - -- available in duplicates. - MessageErrorS m -> do writeTChan messageC $ Left m - _ <- readTChan messageC - return () - PresenceS p -> do - writeTChan presenceC $ Right p - _ <- readTChan presenceC - return () - PresenceErrorS p -> do - writeTChan presenceC $ Left p - _ <- readTChan presenceC - return () - IQRequestS i -> handleIQRequest iqHands i - IQResultS i -> handleIQResponse iqHands (Right i) - IQErrorS i -> handleIQResponse iqHands (Left i) - --- Call the connection closed handlers. -noCon :: TVar EventHandlers -> StreamError -> IO () -noCon h e = do - hands <- atomically $ readTVar h - _ <- forkIO $ connectionClosedHandler hands e - return () \ No newline at end of file diff --git a/source/Network/Xmpp/Concurrent/Types.hs b/source/Network/Xmpp/Concurrent/Types.hs index 60a465d..78bdfcc 100644 --- a/source/Network/Xmpp/Concurrent/Types.hs +++ b/source/Network/Xmpp/Concurrent/Types.hs @@ -6,7 +6,6 @@ module Network.Xmpp.Concurrent.Types where import qualified Control.Exception.Lifted as Ex import Control.Concurrent import Control.Concurrent.STM -import Control.Monad.Trans.Reader import qualified Data.ByteString as BS import Data.IORef @@ -30,7 +29,20 @@ data EventHandlers = EventHandlers -- The Session object is the Xmpp (ReaderT) state. data Session = Session - { -- The original master channels that the reader puts stanzas + { writeRef :: TMVar (BS.ByteString -> IO Bool) + , readerThread :: ThreadId + , idGenerator :: IO StanzaId + -- Lock (used by withConnection) to make sure that a maximum of one + -- XmppConMonad calculation is executed at any given time. + , conStateRef :: TMVar XmppConnection + , eventHandlers :: TVar EventHandlers + , stopThreads :: IO () + , chans :: Chans + } + +data Chans = Chans + { + -- The original master channels that the reader puts stanzas -- into. These are cloned by @get{STanza,Message,Presence}Chan -- on demand when first used by the thread and are stored in the -- {message,presence}Ref fields below. @@ -46,17 +58,8 @@ data Session = Session , iqHandlers :: TVar IQHandlers -- Writing lock, so that only one thread could write to the stream at any -- given time. - , writeRef :: TMVar (BS.ByteString -> IO Bool) - , readerThread :: ThreadId - , idGenerator :: IO StanzaId - -- Lock (used by withConnection) to make sure that a maximum of one - -- XmppConMonad calculation is executed at any given time. - , conStateRef :: TMVar XmppConnection - , eventHandlers :: TVar EventHandlers - , stopThreads :: IO () } - -- Interrupt is used to signal to the reader thread that it should stop. data Interrupt = Interrupt (TMVar ()) deriving Typeable instance Show Interrupt where show _ = "" diff --git a/source/Network/Xmpp/Session.hs b/source/Network/Xmpp/Session.hs index 94e34f5..b6500f3 100644 --- a/source/Network/Xmpp/Session.hs +++ b/source/Network/Xmpp/Session.hs @@ -33,11 +33,11 @@ xmppStartSession = do Left e -> error $ show e Right _ -> return () --- Sends the session IQ set element and waits for an answer. Throws an error if --- if an IQ error stanza is returned from the server. -startSession :: Session -> IO () -startSession session = do - answer <- sendIQ' Nothing Set Nothing sessionXML session - case answer of - IQResponseResult _ -> return () - e -> error $ show e +-- -- Sends the session IQ set element and waits for an answer. Throws an error if +-- -- if an IQ error stanza is returned from the server. +-- startSession :: Session -> IO () +-- startSession session = do +-- answer <- sendIQ' Nothing Set Nothing sessionXML session +-- case answer of +-- IQResponseResult _ -> return () +-- e -> error $ show e diff --git a/source/Network/Xmpp/Xep/ServiceDiscovery.hs b/source/Network/Xmpp/Xep/ServiceDiscovery.hs index 2d45618..332a018 100644 --- a/source/Network/Xmpp/Xep/ServiceDiscovery.hs +++ b/source/Network/Xmpp/Xep/ServiceDiscovery.hs @@ -28,6 +28,8 @@ import Network.Xmpp.Monad import Network.Xmpp.Pickle import Network.Xmpp.Types import Network.Xmpp.Concurrent +import Network.Xmpp.Concurrent.Types +import Network.Xmpp.Concurrent.Channels data DiscoError = DiscoNoQueryElement | DiscoIQError IQError