From c0af09d40c8332d032785bc440b3a5f7c020dc35 Mon Sep 17 00:00:00 2001
From: Philipp Balzarek
Date: Sun, 18 Nov 2012 00:29:56 +0100
Subject: [PATCH] split off channels
---
source/Network/Xmpp.hs | 11 +-
source/Network/Xmpp/Concurrent/IQ.hs | 12 +-
source/Network/Xmpp/Concurrent/Monad.hs | 110 +++++++--------
source/Network/Xmpp/Concurrent/Threads.hs | 140 +++-----------------
source/Network/Xmpp/Concurrent/Types.hs | 25 ++--
source/Network/Xmpp/Session.hs | 16 +--
source/Network/Xmpp/Xep/ServiceDiscovery.hs | 2 +
7 files changed, 110 insertions(+), 206 deletions(-)
diff --git a/source/Network/Xmpp.hs b/source/Network/Xmpp.hs
index fd8b19e..538fae5 100644
--- a/source/Network/Xmpp.hs
+++ b/source/Network/Xmpp.hs
@@ -29,7 +29,7 @@
module Network.Xmpp
( -- * Session management
- newSession
+ newSessionChans
, withConnection
, connect
, simpleConnect
@@ -140,7 +140,7 @@ module Network.Xmpp
, iqRequestPayload
, iqResultPayload
-- * Threads
- , forkSession
+ , forkChans
-- * Misc
, LangTag(..)
, exampleParams
@@ -152,6 +152,7 @@ import Network
import qualified Network.TLS as TLS
import Network.Xmpp.Bind
import Network.Xmpp.Concurrent
+import Network.Xmpp.Concurrent.Channels
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Marshal
import Network.Xmpp.Message
@@ -169,9 +170,9 @@ import Network.Xmpp.Types
import Control.Monad.Error
-- | Connect to host with given address.
-connect :: HostName -> Text -> XmppConMonad (Either StreamError ())
-connect address hostname = do
- xmppRawConnect address hostname
+connect :: HostName -> PortID -> Text -> XmppConMonad (Either StreamError ())
+connect address port hostname = do
+ xmppRawConnect address port hostname
result <- xmppStartStream
case result of
Left e -> do
diff --git a/source/Network/Xmpp/Concurrent/IQ.hs b/source/Network/Xmpp/Concurrent/IQ.hs
index 25e0c3d..015fc3b 100644
--- a/source/Network/Xmpp/Concurrent/IQ.hs
+++ b/source/Network/Xmpp/Concurrent/IQ.hs
@@ -27,22 +27,22 @@ sendIQ timeOut to tp lang body session = do -- TODO: Add timeout
newId <- idGenerator session
ref <- atomically $ do
resRef <- newEmptyTMVar
- (byNS, byId) <- readTVar (iqHandlers session)
- writeTVar (iqHandlers session) (byNS, Map.insert newId resRef byId)
+ (byNS, byId) <- readTVar (iqHandlers . chans $ session)
+ writeTVar (iqHandlers . chans $ session) (byNS, Map.insert newId resRef byId)
-- TODO: Check for id collisions (shouldn't happen?)
return resRef
- sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) session
+ sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) (chans session)
case timeOut of
Nothing -> return ()
Just t -> void . forkIO $ do
threadDelay t
- doTimeOut (iqHandlers session) newId ref
+ doTimeOut (iqHandlers . chans $ session) newId ref
return ref
where
doTimeOut handlers iqid var = atomically $ do
p <- tryPutTMVar var IQResponseTimeout
when p $ do
- (byNS, byId) <- readTVar (iqHandlers session)
+ (byNS, byId) <- readTVar (iqHandlers . chans $ session)
writeTVar handlers (byNS, Map.delete iqid byId)
return ()
@@ -76,6 +76,6 @@ answerIQ (IQRequestTicket
False -> do
writeTVar sentRef True
- writeTChan (outCh session) response
+ writeTChan (outCh . chans $ session) response
return True
True -> return False
diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs
index 6a3de26..9f87c03 100644
--- a/source/Network/Xmpp/Concurrent/Monad.hs
+++ b/source/Network/Xmpp/Concurrent/Monad.hs
@@ -26,10 +26,10 @@ import Network.Xmpp.Monad
-- to interfere with existing consumers.
listenIQChan :: IQRequestType -- ^ Type of IQs to receive (@Get@ or @Set@)
-> Text -- ^ Namespace of the child element
- -> Session
+ -> Chans
-> IO (Either (TChan IQRequestTicket) (TChan IQRequestTicket))
-listenIQChan tp ns session = do
- let handlers = iqHandlers session
+listenIQChan tp ns chans = do
+ let handlers = iqHandlers chans
atomically $ do
(byNS, byID) <- readTVar handlers
iqCh <- newTChan
@@ -44,110 +44,110 @@ listenIQChan tp ns session = do
Just iqCh' -> Left iqCh'
-- | Get a duplicate of the stanza channel
-getStanzaChan :: Session -> IO (TChan Stanza)
-getStanzaChan session = atomically $ dupTChan (sShadow session)
+getStanzaChan :: Chans -> IO (TChan Stanza)
+getStanzaChan chans = atomically $ dupTChan (sShadow chans)
-- | Get the inbound stanza channel, duplicates from master if necessary. Please
-- note that once duplicated it will keep filling up, call 'dropMessageChan' to
-- allow it to be garbage collected.
-getMessageChan :: Session -> IO (TChan (Either MessageError Message))
-getMessageChan session = do
- mCh <- readIORef $ messagesRef session
+getMessageChan :: Chans -> IO (TChan (Either MessageError Message))
+getMessageChan chans = do
+ mCh <- readIORef $ messagesRef chans
case mCh of
Nothing -> do
- mCh' <- atomically $ dupTChan (mShadow session)
- writeIORef (messagesRef session) (Just mCh')
+ mCh' <- atomically $ dupTChan (mShadow chans)
+ writeIORef (messagesRef chans) (Just mCh')
return mCh'
Just mCh' -> return mCh'
-- | Analogous to 'getMessageChan'.
-getPresenceChan :: Session -> IO (TChan (Either PresenceError Presence))
-getPresenceChan session = do
- pCh <- readIORef $ presenceRef session
+getPresenceChan :: Chans -> IO (TChan (Either PresenceError Presence))
+getPresenceChan chans = do
+ pCh <- readIORef $ presenceRef chans
case pCh of
Nothing -> do
- pCh' <- atomically $ dupTChan (pShadow session)
- writeIORef (presenceRef session) (Just pCh')
+ pCh' <- atomically $ dupTChan (pShadow chans)
+ writeIORef (presenceRef chans) (Just pCh')
return pCh'
Just pCh' -> return pCh'
-- | Drop the local end of the inbound stanza channel from our context so it can
-- be GC-ed.
-dropMessageChan :: Session -> IO ()
-dropMessageChan session = writeIORef (messagesRef session) Nothing
+dropMessageChan :: Chans -> IO ()
+dropMessageChan chans = writeIORef (messagesRef chans) Nothing
-- | Analogous to 'dropMessageChan'.
-dropPresenceChan :: Session -> IO ()
-dropPresenceChan session = writeIORef (presenceRef session) Nothing
+dropPresenceChan :: Chans -> IO ()
+dropPresenceChan chans = writeIORef (presenceRef chans) Nothing
-- | Read an element from the inbound stanza channel, acquiring a copy of the
-- channel as necessary.
-pullMessage :: Session -> IO (Either MessageError Message)
-pullMessage session = do
- c <- getMessageChan session
+pullMessage :: Chans -> IO (Either MessageError Message)
+pullMessage chans = do
+ c <- getMessageChan chans
atomically $ readTChan c
-- | Read an element from the inbound stanza channel, acquiring a copy of the
-- channel as necessary.
-pullPresence :: Session -> IO (Either PresenceError Presence)
-pullPresence session = do
- c <- getPresenceChan session
+pullPresence :: Chans -> IO (Either PresenceError Presence)
+pullPresence chans = do
+ c <- getPresenceChan chans
atomically $ readTChan c
-- | Send a stanza to the server.
-sendStanza :: Stanza -> Session -> IO ()
-sendStanza a session = atomically $ writeTChan (outCh session) a
+sendStanza :: Stanza -> Chans -> IO ()
+sendStanza a chans = atomically $ writeTChan (outCh chans) a
--- | Create a forked session object
-forkSession :: Session -> IO Session
-forkSession session = do
+-- | Create a forked chans object
+forkChans :: Chans -> IO Chans
+forkChans chans = do
mCH' <- newIORef Nothing
pCH' <- newIORef Nothing
- return $ session {messagesRef = mCH', presenceRef = pCH'}
+ return $ chans {messagesRef = mCH', presenceRef = pCH'}
-- | Pulls a message and returns it if the given predicate returns @True@.
filterMessages :: (MessageError -> Bool)
-> (Message -> Bool)
- -> Session -> IO (Either MessageError Message)
-filterMessages f g session = do
- s <- pullMessage session
+ -> Chans -> IO (Either MessageError Message)
+filterMessages f g chans = do
+ s <- pullMessage chans
case s of
Left e | f e -> return $ Left e
- | otherwise -> filterMessages f g session
+ | otherwise -> filterMessages f g chans
Right m | g m -> return $ Right m
- | otherwise -> filterMessages f g session
+ | otherwise -> filterMessages f g chans
-- | Pulls a (non-error) message and returns it if the given predicate returns
-- @True@.
-waitForMessage :: (Message -> Bool) -> Session -> IO Message
-waitForMessage f session = do
- s <- pullMessage session
+waitForMessage :: (Message -> Bool) -> Chans -> IO Message
+waitForMessage f chans = do
+ s <- pullMessage chans
case s of
- Left _ -> waitForMessage f session
+ Left _ -> waitForMessage f chans
Right m | f m -> return m
- | otherwise -> waitForMessage f session
+ | otherwise -> waitForMessage f chans
-- | Pulls an error message and returns it if the given predicate returns @True@.
-waitForMessageError :: (MessageError -> Bool) -> Session -> IO MessageError
-waitForMessageError f session = do
- s <- pullMessage session
+waitForMessageError :: (MessageError -> Bool) -> Chans -> IO MessageError
+waitForMessageError f chans = do
+ s <- pullMessage chans
case s of
- Right _ -> waitForMessageError f session
+ Right _ -> waitForMessageError f chans
Left m | f m -> return m
- | otherwise -> waitForMessageError f session
+ | otherwise -> waitForMessageError f chans
-- | Pulls a (non-error) presence and returns it if the given predicate returns
-- @True@.
-waitForPresence :: (Presence -> Bool) -> Session -> IO Presence
-waitForPresence f session = do
- s <- pullPresence session
+waitForPresence :: (Presence -> Bool) -> Chans -> IO Presence
+waitForPresence f chans = do
+ s <- pullPresence chans
case s of
- Left _ -> waitForPresence f session
+ Left _ -> waitForPresence f chans
Right m | f m -> return m
- | otherwise -> waitForPresence f session
+ | otherwise -> waitForPresence f chans
-- TODO: Wait for presence error?
@@ -194,12 +194,12 @@ withConnection a session = do
]
-- | Send a presence stanza.
-sendPresence :: Presence -> Session -> IO ()
-sendPresence p session = sendStanza (PresenceS p) session
+sendPresence :: Presence -> Chans -> IO ()
+sendPresence p chans = sendStanza (PresenceS p) chans
-- | Send a message stanza.
-sendMessage :: Message -> Session -> IO ()
-sendMessage m session = sendStanza (MessageS m) session
+sendMessage :: Message -> Chans -> IO ()
+sendMessage m chans = sendStanza (MessageS m) chans
-- | Executes a function to update the event handlers.
diff --git a/source/Network/Xmpp/Concurrent/Threads.hs b/source/Network/Xmpp/Concurrent/Threads.hs
index 074d455..4100588 100644
--- a/source/Network/Xmpp/Concurrent/Threads.hs
+++ b/source/Network/Xmpp/Concurrent/Threads.hs
@@ -1,32 +1,22 @@
-{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings #-}
+{-# LANGUAGE ScopedTypeVariables #-}
+
module Network.Xmpp.Concurrent.Threads where
import Network.Xmpp.Types
-import Control.Applicative((<$>),(<*>))
+import Control.Applicative((<$>))
import Control.Concurrent
import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.IO.Class
-import Control.Monad.Reader
import Control.Monad.State.Strict
import qualified Data.ByteString as BS
-import Data.IORef
-import qualified Data.Map as Map
-import Data.Maybe
-
-import Data.XML.Types
-
import Network.Xmpp.Monad
-import Network.Xmpp.Marshal
-import Network.Xmpp.Pickle
import Network.Xmpp.Concurrent.Types
-import Text.XML.Stream.Elements
-
import GHC.IO (unsafeUnmask)
-- Worker to read stanzas from the stream and concurrently distribute them to
@@ -73,55 +63,25 @@ readWorker onStanza onConnectionClosed stateRef =
Ex.catch (atomically $ forM ts takeTMVar)
(\(Interrupt t) -> handleInterrupts (t:ts))
--- If the IQ request has a namespace, send it through the appropriate channel.
-handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
-handleIQRequest handlers iq = do
- (byNS, _) <- readTVar handlers
- let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq)
- case Map.lookup (iqRequestType iq, iqNS) byNS of
- Nothing -> return () -- TODO: send error stanza
- Just ch -> do
- sent <- newTVar False
- writeTChan ch $ IQRequestTicket sent iq
-
-handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM ()
-handleIQResponse handlers iq = do
- (byNS, byID) <- readTVar handlers
- case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of
- (Nothing, _) -> return () -- We are not supposed to send an error.
- (Just tmvar, byID') -> do
- let answer = either IQResponseError IQResponseResult iq
- _ <- tryPutTMVar tmvar answer -- Don't block.
- writeTVar handlers (byNS, byID')
- where
- iqID (Left err) = iqErrorID err
- iqID (Right iq') = iqResultID iq'
-
--- Worker to write stanzas to the stream concurrently.
-writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO ()
-writeWorker stCh writeR = forever $ do
- (write, next) <- atomically $ (,) <$>
- takeTMVar writeR <*>
- readTChan stCh
- r <- write $ renderElement (pickleElem xpStanza next)
- atomically $ putTMVar writeR write
- unless r $ do
- atomically $ unGetTChan stCh next -- If the writing failed, the
- -- connection is dead.
- threadDelay 250000 -- Avoid free spinning.
-
-- Two streams: input and output. Threads read from input stream and write to
-- output stream.
-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing
-- stances, respectively, and an Action to stop the Threads and close the
-- connection.
-startThreadsWith stanzaHandler outC eh = do
+startThreadsWith :: (Stanza -> IO ())
+ -> TVar EventHandlers
+ -> IO
+ (IO (),
+ TMVar (BS.ByteString -> IO Bool),
+ TMVar XmppConnection,
+ ThreadId)
+startThreadsWith stanzaHandler eh = do
writeLock <- newTMVarIO (\_ -> return False)
conS <- newTMVarIO xmppNoConnection
- lw <- forkIO $ writeWorker outC writeLock
+-- lw <- forkIO $ writeWorker outC writeLock
cp <- forkIO $ connPersist writeLock
rd <- forkIO $ readWorker stanzaHandler (noCon eh) conS
- return ( killConnection writeLock [lw, rd, cp]
+ return ( killConnection writeLock [rd, cp]
, writeLock
, conS
, rd
@@ -131,39 +91,12 @@ startThreadsWith stanzaHandler outC eh = do
_ <- atomically $ takeTMVar writeLock -- Should we put it back?
_ <- forM threads killThread
return ()
-
--- | Creates and initializes a new concurrent session.
-newSessionChans :: IO Session
-newSessionChans = do
- messageC <- newTChanIO
- presenceC <- newTChanIO
- outC <- newTChanIO
- stanzaC <- newTChanIO
- iqHandlers <- newTVarIO (Map.empty, Map.empty)
- eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () }
- let stanzaHandler = toChans messageC presenceC stanzaC iqHandlers
- (kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler outC eh
- workermCh <- newIORef $ Nothing
- workerpCh <- newIORef $ Nothing
- idRef <- newTVarIO 1
- let getId = atomically $ do
- curId <- readTVar idRef
- writeTVar idRef (curId + 1 :: Integer)
- return . read. show $ curId
- return $ Session { mShadow = messageC
- , pShadow = presenceC
- , sShadow = stanzaC
- , messagesRef = workermCh
- , presenceRef = workerpCh
- , outCh = outC
- , iqHandlers = iqHandlers
- , writeRef = wLock
- , readerThread = readerThread
- , idGenerator = getId
- , conStateRef = conState
- , eventHandlers = eh
- , stopThreads = kill
- }
+ -- Call the connection closed handlers.
+ noCon :: TVar EventHandlers -> StreamError -> IO ()
+ noCon h e = do
+ hands <- atomically $ readTVar h
+ _ <- forkIO $ connectionClosedHandler hands e
+ return ()
-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every 30 seconds to keep the connection alive.
@@ -173,38 +106,3 @@ connPersist lock = forever $ do
_ <- pushBS " "
atomically $ putTMVar lock pushBS
threadDelay 30000000 -- 30s
-
-
-toChans messageC presenceC stanzaC iqHands sta = atomically $ do
- writeTChan stanzaC sta
- void $ readTChan stanzaC -- sic
- case sta of
- MessageS m -> do writeTChan messageC $ Right m
- _ <- readTChan messageC -- Sic!
- return ()
- -- this may seem ridiculous, but to prevent
- -- the channel from filling up we
- -- immedtiately remove the
- -- Stanza we just put in. It will still be
- -- available in duplicates.
- MessageErrorS m -> do writeTChan messageC $ Left m
- _ <- readTChan messageC
- return ()
- PresenceS p -> do
- writeTChan presenceC $ Right p
- _ <- readTChan presenceC
- return ()
- PresenceErrorS p -> do
- writeTChan presenceC $ Left p
- _ <- readTChan presenceC
- return ()
- IQRequestS i -> handleIQRequest iqHands i
- IQResultS i -> handleIQResponse iqHands (Right i)
- IQErrorS i -> handleIQResponse iqHands (Left i)
-
--- Call the connection closed handlers.
-noCon :: TVar EventHandlers -> StreamError -> IO ()
-noCon h e = do
- hands <- atomically $ readTVar h
- _ <- forkIO $ connectionClosedHandler hands e
- return ()
\ No newline at end of file
diff --git a/source/Network/Xmpp/Concurrent/Types.hs b/source/Network/Xmpp/Concurrent/Types.hs
index 60a465d..78bdfcc 100644
--- a/source/Network/Xmpp/Concurrent/Types.hs
+++ b/source/Network/Xmpp/Concurrent/Types.hs
@@ -6,7 +6,6 @@ module Network.Xmpp.Concurrent.Types where
import qualified Control.Exception.Lifted as Ex
import Control.Concurrent
import Control.Concurrent.STM
-import Control.Monad.Trans.Reader
import qualified Data.ByteString as BS
import Data.IORef
@@ -30,7 +29,20 @@ data EventHandlers = EventHandlers
-- The Session object is the Xmpp (ReaderT) state.
data Session = Session
- { -- The original master channels that the reader puts stanzas
+ { writeRef :: TMVar (BS.ByteString -> IO Bool)
+ , readerThread :: ThreadId
+ , idGenerator :: IO StanzaId
+ -- Lock (used by withConnection) to make sure that a maximum of one
+ -- XmppConMonad calculation is executed at any given time.
+ , conStateRef :: TMVar XmppConnection
+ , eventHandlers :: TVar EventHandlers
+ , stopThreads :: IO ()
+ , chans :: Chans
+ }
+
+data Chans = Chans
+ {
+ -- The original master channels that the reader puts stanzas
-- into. These are cloned by @get{STanza,Message,Presence}Chan
-- on demand when first used by the thread and are stored in the
-- {message,presence}Ref fields below.
@@ -46,17 +58,8 @@ data Session = Session
, iqHandlers :: TVar IQHandlers
-- Writing lock, so that only one thread could write to the stream at any
-- given time.
- , writeRef :: TMVar (BS.ByteString -> IO Bool)
- , readerThread :: ThreadId
- , idGenerator :: IO StanzaId
- -- Lock (used by withConnection) to make sure that a maximum of one
- -- XmppConMonad calculation is executed at any given time.
- , conStateRef :: TMVar XmppConnection
- , eventHandlers :: TVar EventHandlers
- , stopThreads :: IO ()
}
-
-- Interrupt is used to signal to the reader thread that it should stop.
data Interrupt = Interrupt (TMVar ()) deriving Typeable
instance Show Interrupt where show _ = ""
diff --git a/source/Network/Xmpp/Session.hs b/source/Network/Xmpp/Session.hs
index 94e34f5..b6500f3 100644
--- a/source/Network/Xmpp/Session.hs
+++ b/source/Network/Xmpp/Session.hs
@@ -33,11 +33,11 @@ xmppStartSession = do
Left e -> error $ show e
Right _ -> return ()
--- Sends the session IQ set element and waits for an answer. Throws an error if
--- if an IQ error stanza is returned from the server.
-startSession :: Session -> IO ()
-startSession session = do
- answer <- sendIQ' Nothing Set Nothing sessionXML session
- case answer of
- IQResponseResult _ -> return ()
- e -> error $ show e
+-- -- Sends the session IQ set element and waits for an answer. Throws an error if
+-- -- if an IQ error stanza is returned from the server.
+-- startSession :: Session -> IO ()
+-- startSession session = do
+-- answer <- sendIQ' Nothing Set Nothing sessionXML session
+-- case answer of
+-- IQResponseResult _ -> return ()
+-- e -> error $ show e
diff --git a/source/Network/Xmpp/Xep/ServiceDiscovery.hs b/source/Network/Xmpp/Xep/ServiceDiscovery.hs
index 2d45618..332a018 100644
--- a/source/Network/Xmpp/Xep/ServiceDiscovery.hs
+++ b/source/Network/Xmpp/Xep/ServiceDiscovery.hs
@@ -28,6 +28,8 @@ import Network.Xmpp.Monad
import Network.Xmpp.Pickle
import Network.Xmpp.Types
import Network.Xmpp.Concurrent
+import Network.Xmpp.Concurrent.Types
+import Network.Xmpp.Concurrent.Channels
data DiscoError = DiscoNoQueryElement
| DiscoIQError IQError