From bcd67d306eaf0fdb35e2e64452b8afb4de200354 Mon Sep 17 00:00:00 2001
From: Philipp Balzarek
Date: Sun, 8 Apr 2012 00:35:20 +0200
Subject: [PATCH] split Concurrent
---
src/Data/Conduit/TLS.hs | 6 +-
src/Network/XMPP/Bind.hs | 2 -
src/Network/XMPP/Concurrent.hs | 343 +------------------------
src/Network/XMPP/Concurrent/IQ.hs | 48 ++++
src/Network/XMPP/Concurrent/Monad.hs | 143 +++++++++++
src/Network/XMPP/Concurrent/Threads.hs | 147 +++++++++++
src/Network/XMPP/Concurrent/Types.hs | 40 +++
7 files changed, 388 insertions(+), 341 deletions(-)
create mode 100644 src/Network/XMPP/Concurrent/IQ.hs
create mode 100644 src/Network/XMPP/Concurrent/Monad.hs
create mode 100644 src/Network/XMPP/Concurrent/Threads.hs
create mode 100644 src/Network/XMPP/Concurrent/Types.hs
diff --git a/src/Data/Conduit/TLS.hs b/src/Data/Conduit/TLS.hs
index 141eeb0..bf2adf1 100644
--- a/src/Data/Conduit/TLS.hs
+++ b/src/Data/Conduit/TLS.hs
@@ -9,7 +9,6 @@ module Data.Conduit.TLS
import Control.Applicative
import Control.Monad.IO.Class
-import Control.Monad.Trans.Class
import Control.Monad.Trans.Resource
import Crypto.Random
@@ -22,9 +21,6 @@ import Network.TLS as TLS
import Network.TLS.Extra as TLSExtra
import System.IO(Handle)
-import System.Random
-
-import System.IO
tlsinit
:: (MonadIO m, MonadIO m1, MonadResource m1) =>
@@ -43,7 +39,7 @@ tlsinit tlsParams handle = do
let snk = sinkIO
(return clientContext)
(\_ -> return ())
- (\con bs -> sendData clientContext (BL.fromChunks [bs])
+ (\con bs -> sendData con (BL.fromChunks [bs])
>> return IOProcessing )
(\_ -> return ())
return ( src
diff --git a/src/Network/XMPP/Bind.hs b/src/Network/XMPP/Bind.hs
index 10cdf60..7f198d4 100644
--- a/src/Network/XMPP/Bind.hs
+++ b/src/Network/XMPP/Bind.hs
@@ -14,8 +14,6 @@ import Network.XMPP.Types
import Network.XMPP.Pickle
import Network.XMPP.Concurrent
-import Control.Monad.IO.Class
-
bindBody :: Maybe Text -> Element
bindBody rsrc = (pickleElem
(bindP . xpOption $ xpElemNodes "resource" (xpContent xpId))
diff --git a/src/Network/XMPP/Concurrent.hs b/src/Network/XMPP/Concurrent.hs
index cd1cf74..19f4ef7 100644
--- a/src/Network/XMPP/Concurrent.hs
+++ b/src/Network/XMPP/Concurrent.hs
@@ -1,343 +1,18 @@
-{-# LANGUAGE ScopedTypeVariables #-}
-{-# LANGUAGE DeriveDataTypeable #-}
-{-# LANGUAGE OverloadedStrings #-}
-{-# LANGUAGE NoMonomorphismRestriction #-}
-
-
module Network.XMPP.Concurrent
- where
-
--- import Network.XMPP.Stream
-import Network.XMPP.Types
-
-import Control.Applicative((<$>),(<*>))
-import Control.Concurrent
-import Control.Concurrent.STM
-import qualified Control.Exception.Lifted as Ex
-import Control.Monad
-import Control.Monad.IO.Class
-import Control.Monad.Trans.Class
-import Control.Monad.Trans.Reader
-import Control.Monad.Trans.Resource
-import Control.Monad.Trans.State
-
-import qualified Data.ByteString as BS
-import Data.Conduit
-import qualified Data.Conduit.List as CL
-import Data.Default (def)
-import Data.IORef
-import qualified Data.Map as Map
-import Data.Maybe
-import qualified Data.Text as Text
-import Data.Text(Text)
-import Data.Typeable
-
-import Data.XML.Types
-
-import Network.XMPP.Monad
-import Network.XMPP.Marshal
-import Network.XMPP.Pickle
-
-import Text.XML.Stream.Elements
-import qualified Text.XML.Stream.Render as XR
-
-type IQHandlers = (Map.Map (IQType, Text) (TChan (IQ, TVar Bool))
- , Map.Map Text (TMVar IQ)
- )
-
-data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message))
- , presenceRef :: IORef (Maybe (TChan Presence))
- , mShadow :: TChan Message -- the original chan
- , pShadow :: TChan Presence -- the original chan
- , outCh :: TChan Stanza
- , iqHandlers :: TVar IQHandlers
- , writeRef :: TMVar (BS.ByteString -> IO () )
- , readerThread :: ThreadId
- , idGenerator :: IO Text
- }
-
-type XMPPThread a = ReaderT Thread IO a
-
-
-data ReaderSignal = ReaderSignal (XMPPMonad ()) deriving Typeable
-instance Show ReaderSignal where show _ = ""
-instance Ex.Exception ReaderSignal
-
-readWorker :: TChan Message -> TChan Presence -> TChan IQ -> XMPPState -> ResourceT IO ()
-readWorker messageC presenceC iqC s = Ex.catch (forever . flip runStateT s $ do
- sta <- pull
- case sta of
- SMessage m -> liftIO . atomically $ do
- writeTChan messageC m
- _ <- readTChan messageC -- Sic!
- return ()
- -- this may seem ridiculous, but to prevent
- -- the channel from filling up we immedtiately remove the
- -- Stanza we just put in. It will still be
- -- available in duplicates.
- SPresence p -> liftIO . atomically $ do
- writeTChan presenceC p
- _ <- readTChan presenceC
- return ()
- SIQ i -> liftIO . atomically $ do
- writeTChan iqC i
- return ()
- )
- ( \(ReaderSignal a) -> do
- ((),s') <- runStateT a s
- readWorker messageC presenceC iqC s'
- )
-
-writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO ()) -> IO ()
-writeWorker stCh writeR = forever $ do
- (write, next) <- atomically $ (,) <$>
- takeTMVar writeR <*>
- readTChan stCh
- outBS <- CL.sourceList (elementToEvents $ pickleElem stanzaP next)
- $= XR.renderBytes def $$ CL.consume
- _ <- forM outBS write
- atomically $ putTMVar writeR write
-
-handleIQs :: MonadIO m => TVar IQHandlers -> TChan IQ -> m a
-handleIQs handlers iqC = liftIO . forever . atomically $ do
- iq <- readTChan iqC
- (byNS, byID) <- readTVar handlers
- let iqNS = fromMaybe ("") (nameNamespace . elementName . iqBody $ iq)
- case () of () | (iqType iq) `elem` [Get, Set] ->
- case Map.lookup (Get, iqNS) byNS of
- Nothing -> return () -- TODO: send error stanza
- Just ch -> do
- sent <- newTVar False
- writeTChan ch (iq, sent)
- | otherwise -> case Map.updateLookupWithKey (\_ _ -> Nothing)
- (iqId iq) byID of
- (Nothing, _) -> return () -- we are not supposed
- -- to send an error
- (Just tmvar, byID') -> do
- _ <- tryPutTMVar tmvar iq -- don't block
- writeTVar handlers (byNS, byID')
-
-
-
--- Two streams: input and output. Threads read from input stream and write to output stream.
--- | Runs thread in XmppState monad
--- returns channel of incoming and outgoing stances, respectively
--- and an Action to stop the Threads and close the connection
-startThreads
- :: XMPPMonad ( TChan Message
- , TChan Presence
- , TVar IQHandlers
- , TChan Stanza, IO ()
- , TMVar (BS.ByteString -> IO ())
- , ThreadId
- )
-
-
-startThreads = do
- writeLock <- liftIO . newTMVarIO =<< gets sConPushBS
- messageC <- liftIO newTChanIO
- presenceC <- liftIO newTChanIO
- iqC <- liftIO newTChanIO
- outC <- liftIO newTChanIO
- handlers <- liftIO $ newTVarIO ( Map.empty, Map.empty)
- lw <- liftIO . forkIO $ writeWorker outC writeLock
- cp <- liftIO . forkIO $ connPersist writeLock
- iqh <- liftIO . forkIO $ handleIQs handlers iqC
- s <- get
- rd <- lift . resourceForkIO $ readWorker messageC presenceC iqC s
- return (messageC, presenceC, handlers, outC, killConnection writeLock [lw, rd, cp, iqh], writeLock, rd)
- where
- killConnection writeLock threads = liftIO $ do
- _ <- atomically $ takeTMVar writeLock -- Should we put it back?
- _ <- forM threads killThread
- return()
-
-
--- | Register a new IQ listener. IQ matching the type and namespace will
--- be put in the channel. IQ of type Result and Error will never be put
--- into channels, even though this function won't stop you from registering
--- them
-listenIQChan :: IQType -- ^ type of IQs to receive (Get / Set)
- -> Text -- ^ namespace of the child element
- -> XMPPThread (Bool, TChan (IQ, TVar Bool))
-listenIQChan tp ns = do
- handlers <- asks iqHandlers
- liftIO . atomically $ do
- (byNS, byID) <- readTVar handlers
- iqCh <- newTChan
- let (present, byNS') = Map.insertLookupWithKey' (\_ new _ -> new)
- (tp,ns) iqCh byNS
- writeTVar handlers (byNS', byID)
- return $ case present of
- Nothing -> (True, iqCh)
- Just iqCh' -> (False, iqCh')
-
--- | Start worker threads and run action. The supplied action will run
--- in the calling thread. use 'forkXMPP' to start another thread.
-runThreaded :: XMPPThread a
- -> XMPPMonad a
-runThreaded a = do
- (mC, pC, hand, outC, _stopThreads, writeR, rdr ) <- startThreads
- workermCh <- liftIO . newIORef $ Nothing
- workerpCh <- liftIO . newIORef $ Nothing
- idRef <- liftIO $ newTVarIO 1
- let getId = atomically $ do
- curId <- readTVar idRef
- writeTVar idRef (curId + 1 :: Integer)
- return . Text.pack $ show curId
- liftIO $ runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR rdr getId)
-
-
-
--- | get the inbound stanza channel, duplicates from master if necessary
--- please note that once duplicated it will keep filling up, call
--- 'dropMessageChan' to allow it to be garbage collected
-getMessageChan :: XMPPThread (TChan Message)
-getMessageChan = do
- mChR <- asks messagesRef
- mCh <- liftIO $ readIORef mChR
- case mCh of
- Nothing -> do
- shadow <- asks mShadow
- mCh' <- liftIO $ atomically $ dupTChan shadow
- liftIO $ writeIORef mChR (Just mCh')
- return mCh'
- Just mCh' -> return mCh'
-
--- | see 'getMessageChan'
-getPresenceChan :: XMPPThread (TChan Presence)
-getPresenceChan = do
- pChR <- asks presenceRef
- pCh <- liftIO $ readIORef pChR
- case pCh of
- Nothing -> do
- shadow <- asks pShadow
- pCh' <- liftIO $ atomically $ dupTChan shadow
- liftIO $ writeIORef pChR (Just pCh')
- return pCh'
- Just pCh' -> return pCh'
-
--- | Drop the local end of the inbound stanza channel
--- from our context so it can be GC-ed
-dropMessageChan :: XMPPThread ()
-dropMessageChan = do
- r <- asks messagesRef
- liftIO $ writeIORef r Nothing
-
--- | see 'dropMessageChan'
-dropPresenceChan :: XMPPThread ()
-dropPresenceChan = do
- r <- asks presenceRef
- liftIO $ writeIORef r Nothing
-
--- | Read an element from the inbound stanza channel, acquiring a copy
--- of the channel as necessary
-pullMessage :: XMPPThread Message
-pullMessage = do
- c <- getMessageChan
- st <- liftIO $ atomically $ readTChan c
- return st
-
--- | Read an element from the inbound stanza channel, acquiring a copy
--- of the channel as necessary
-pullPresence :: XMPPThread Presence
-pullPresence = do
- c <- getPresenceChan
- st <- liftIO $ atomically $ readTChan c
- return st
-
-
--- | Send a stanza to the server
-sendS :: Stanza -> XMPPThread ()
-sendS a = do
- out <- asks outCh
- liftIO . atomically $ writeTChan out a
- return ()
-
--- | Fork a new thread
-forkXMPP :: XMPPThread () -> XMPPThread ThreadId
-forkXMPP a = do
- thread <- ask
- mCH' <- liftIO $ newIORef Nothing
- pCH' <- liftIO $ newIORef Nothing
- liftIO $ forkIO $ runReaderT a (thread {messagesRef = mCH'
- ,presenceRef = pCH'
- })
-
-waitForMessage :: (Message -> Bool) -> XMPPThread Message
-waitForMessage f = do
- s <- pullMessage
- if (f s) then
- return s
- else do
- waitForMessage f
+( module Network.XMPP.Concurrent.Types
+, module Network.XMPP.Concurrent.Monad
+, module Network.XMPP.Concurrent.Threads
+, module Network.XMPP.Concurrent.IQ
+) where
-waitForPresence :: (Presence -> Bool) -> XMPPThread Presence
-waitForPresence f = do
- s <- pullPresence
- if (f s) then
- return s
- else do
- waitForPresence f
+import Network.XMPP.Concurrent.Types
+import Network.XMPP.Concurrent.Monad
+import Network.XMPP.Concurrent.Threads
+import Network.XMPP.Concurrent.IQ
-connPersist :: TMVar (BS.ByteString -> IO ()) -> IO ()
-connPersist lock = forever $ do
- pushBS <- atomically $ takeTMVar lock
- pushBS " "
- atomically $ putTMVar lock pushBS
--- putStrLn ""
- threadDelay 30000000
--- | Run an XMPPMonad action in isolation.
--- Reader and writer workers will be temporarily stopped
--- and resumed with the new session details once the action returns.
--- The Action will run in the reader thread.
-singleThreaded :: XMPPMonad () -> XMPPThread ()
-singleThreaded a = do
- writeLock <- asks writeRef
- rdr <- asks readerThread
- _ <- liftIO . atomically $ takeTMVar writeLock -- we replace it with the
- -- one returned by a
- liftIO . throwTo rdr . ReaderSignal $ do
- a
- out <- gets sConPushBS
- liftIO . atomically $ putTMVar writeLock out
- return ()
--- | Sends an IQ, returns a 'TMVar' that will be filled with the first inbound
--- IQ with a matching ID that has type @result@ or @error@
-sendIQ :: Maybe JID -- ^ Recipient (to)
- -> IQType -- ^ IQ type (Get or Set)
- -> Element -- ^ The iq body (there has to be exactly one)
- -> XMPPThread (TMVar IQ)
-sendIQ to tp body = do -- TODO: add timeout
- newId <- liftIO =<< asks idGenerator
- handlers <- asks iqHandlers
- ref <- liftIO . atomically $ do
- resRef <- newEmptyTMVar
- (byNS, byId) <- readTVar handlers
- writeTVar handlers (byNS, Map.insert newId resRef byId)
- -- TODO: Check for id collisions (shouldn't happen?)
- return resRef
- sendS . SIQ $ IQ Nothing (to) newId tp body
- return ref
--- | like 'sendIQ', but waits for the answer IQ
-sendIQ' :: Maybe JID -> IQType -> Element -> XMPPThread IQ
-sendIQ' to tp body = do
- ref <- sendIQ to tp body
- liftIO . atomically $ takeTMVar ref
-answerIQ :: MonadIO m => (IQ, TVar Bool) -> Element -> ReaderT Thread m Bool
-answerIQ ((IQ from _to id _tp _bd), sentRef) body = do
- out <- asks outCh
- liftIO . atomically $ do
- sent <- readTVar sentRef
- case sent of
- False -> do
- writeTVar sentRef True
- writeTChan out . SIQ $ IQ Nothing from id Result body
- return True
- True -> return False
diff --git a/src/Network/XMPP/Concurrent/IQ.hs b/src/Network/XMPP/Concurrent/IQ.hs
new file mode 100644
index 0000000..2609a14
--- /dev/null
+++ b/src/Network/XMPP/Concurrent/IQ.hs
@@ -0,0 +1,48 @@
+module Network.XMPP.Concurrent.IQ where
+
+import Control.Concurrent.STM
+import Control.Monad.IO.Class
+import Control.Monad.Trans.Reader
+
+import Data.XML.Types
+import qualified Data.Map as Map
+
+import Network.XMPP.Concurrent.Types
+import Network.XMPP.Concurrent.Monad
+import Network.XMPP.Types
+
+-- | Sends an IQ, returns a 'TMVar' that will be filled with the first inbound
+-- IQ with a matching ID that has type @result@ or @error@
+sendIQ :: Maybe JID -- ^ Recipient (to)
+ -> IQType -- ^ IQ type (Get or Set)
+ -> Element -- ^ The iq body (there has to be exactly one)
+ -> XMPPThread (TMVar IQ)
+sendIQ to tp body = do -- TODO: add timeout
+ newId <- liftIO =<< asks idGenerator
+ handlers <- asks iqHandlers
+ ref <- liftIO . atomically $ do
+ resRef <- newEmptyTMVar
+ (byNS, byId) <- readTVar handlers
+ writeTVar handlers (byNS, Map.insert newId resRef byId)
+ -- TODO: Check for id collisions (shouldn't happen?)
+ return resRef
+ sendS . SIQ $ IQ Nothing (to) newId tp body
+ return ref
+
+-- | like 'sendIQ', but waits for the answer IQ
+sendIQ' :: Maybe JID -> IQType -> Element -> XMPPThread IQ
+sendIQ' to tp body = do
+ ref <- sendIQ to tp body
+ liftIO . atomically $ takeTMVar ref
+
+answerIQ :: MonadIO m => (IQ, TVar Bool) -> Element -> ReaderT Thread m Bool
+answerIQ ((IQ from _to iqid _tp _bd), sentRef) body = do
+ out <- asks outCh
+ liftIO . atomically $ do
+ sent <- readTVar sentRef
+ case sent of
+ False -> do
+ writeTVar sentRef True
+ writeTChan out . SIQ $ IQ Nothing from iqid Result body
+ return True
+ True -> return False
diff --git a/src/Network/XMPP/Concurrent/Monad.hs b/src/Network/XMPP/Concurrent/Monad.hs
new file mode 100644
index 0000000..7b09cdb
--- /dev/null
+++ b/src/Network/XMPP/Concurrent/Monad.hs
@@ -0,0 +1,143 @@
+module Network.XMPP.Concurrent.Monad where
+
+import Network.XMPP.Types
+
+import Control.Concurrent
+import Control.Concurrent.STM
+import Control.Monad.IO.Class
+import Control.Monad.Trans.Reader
+import Control.Monad.Trans.State
+
+import Data.IORef
+import qualified Data.Map as Map
+import Data.Text(Text)
+
+import Network.XMPP.Concurrent.Types
+
+-- | Register a new IQ listener. IQ matching the type and namespace will
+-- be put in the channel. IQ of type Result and Error will never be put
+-- into channels, even though this function won't stop you from registering
+-- them
+listenIQChan :: IQType -- ^ type of IQs to receive (Get / Set)
+ -> Text -- ^ namespace of the child element
+ -> XMPPThread (Bool, TChan (IQ, TVar Bool))
+listenIQChan tp ns = do
+ handlers <- asks iqHandlers
+ liftIO . atomically $ do
+ (byNS, byID) <- readTVar handlers
+ iqCh <- newTChan
+ let (present, byNS') = Map.insertLookupWithKey' (\_ new _ -> new)
+ (tp,ns) iqCh byNS
+ writeTVar handlers (byNS', byID)
+ return $ case present of
+ Nothing -> (True, iqCh)
+ Just iqCh' -> (False, iqCh')
+
+-- | get the inbound stanza channel, duplicates from master if necessary
+-- please note that once duplicated it will keep filling up, call
+-- 'dropMessageChan' to allow it to be garbage collected
+getMessageChan :: XMPPThread (TChan Message)
+getMessageChan = do
+ mChR <- asks messagesRef
+ mCh <- liftIO $ readIORef mChR
+ case mCh of
+ Nothing -> do
+ shadow <- asks mShadow
+ mCh' <- liftIO $ atomically $ dupTChan shadow
+ liftIO $ writeIORef mChR (Just mCh')
+ return mCh'
+ Just mCh' -> return mCh'
+
+-- | see 'getMessageChan'
+getPresenceChan :: XMPPThread (TChan Presence)
+getPresenceChan = do
+ pChR <- asks presenceRef
+ pCh <- liftIO $ readIORef pChR
+ case pCh of
+ Nothing -> do
+ shadow <- asks pShadow
+ pCh' <- liftIO $ atomically $ dupTChan shadow
+ liftIO $ writeIORef pChR (Just pCh')
+ return pCh'
+ Just pCh' -> return pCh'
+
+-- | Drop the local end of the inbound stanza channel
+-- from our context so it can be GC-ed
+dropMessageChan :: XMPPThread ()
+dropMessageChan = do
+ r <- asks messagesRef
+ liftIO $ writeIORef r Nothing
+
+-- | see 'dropMessageChan'
+dropPresenceChan :: XMPPThread ()
+dropPresenceChan = do
+ r <- asks presenceRef
+ liftIO $ writeIORef r Nothing
+
+-- | Read an element from the inbound stanza channel, acquiring a copy
+-- of the channel as necessary
+pullMessage :: XMPPThread Message
+pullMessage = do
+ c <- getMessageChan
+ st <- liftIO $ atomically $ readTChan c
+ return st
+
+-- | Read an element from the inbound stanza channel, acquiring a copy
+-- of the channel as necessary
+pullPresence :: XMPPThread Presence
+pullPresence = do
+ c <- getPresenceChan
+ st <- liftIO $ atomically $ readTChan c
+ return st
+
+
+-- | Send a stanza to the server
+sendS :: Stanza -> XMPPThread ()
+sendS a = do
+ out <- asks outCh
+ liftIO . atomically $ writeTChan out a
+ return ()
+
+-- | Fork a new thread
+forkXMPP :: XMPPThread () -> XMPPThread ThreadId
+forkXMPP a = do
+ thread <- ask
+ mCH' <- liftIO $ newIORef Nothing
+ pCH' <- liftIO $ newIORef Nothing
+ liftIO $ forkIO $ runReaderT a (thread {messagesRef = mCH'
+ ,presenceRef = pCH'
+ })
+
+waitForMessage :: (Message -> Bool) -> XMPPThread Message
+waitForMessage f = do
+ s <- pullMessage
+ if (f s) then
+ return s
+ else do
+ waitForMessage f
+
+waitForPresence :: (Presence -> Bool) -> XMPPThread Presence
+waitForPresence f = do
+ s <- pullPresence
+ if (f s) then
+ return s
+ else do
+ waitForPresence f
+
+
+-- | Run an XMPPMonad action in isolation.
+-- Reader and writer workers will be temporarily stopped
+-- and resumed with the new session details once the action returns.
+-- The Action will run in the reader thread.
+singleThreaded :: XMPPMonad () -> XMPPThread ()
+singleThreaded a = do
+ writeLock <- asks writeRef
+ rdr <- asks readerThread
+ _ <- liftIO . atomically $ takeTMVar writeLock -- we replace it with the
+ -- one returned by a
+ liftIO . throwTo rdr . ReaderSignal $ do
+ a
+ out <- gets sConPushBS
+ liftIO . atomically $ putTMVar writeLock out
+ return ()
+
diff --git a/src/Network/XMPP/Concurrent/Threads.hs b/src/Network/XMPP/Concurrent/Threads.hs
new file mode 100644
index 0000000..474b6cf
--- /dev/null
+++ b/src/Network/XMPP/Concurrent/Threads.hs
@@ -0,0 +1,147 @@
+{-# LANGUAGE OverloadedStrings #-}
+module Network.XMPP.Concurrent.Threads where
+
+import Network.XMPP.Types
+
+import Control.Applicative((<$>),(<*>))
+import Control.Concurrent
+import Control.Concurrent.STM
+import qualified Control.Exception.Lifted as Ex
+import Control.Monad
+import Control.Monad.IO.Class
+import Control.Monad.Trans.Class
+import Control.Monad.Trans.Reader
+import Control.Monad.Trans.Resource
+import Control.Monad.Trans.State
+
+import qualified Data.ByteString as BS
+import Data.Conduit
+import qualified Data.Conduit.List as CL
+import Data.Default (def)
+import Data.IORef
+import qualified Data.Map as Map
+import Data.Maybe
+import qualified Data.Text as Text
+
+import Data.XML.Types
+
+import Network.XMPP.Monad
+import Network.XMPP.Marshal
+import Network.XMPP.Pickle
+import Network.XMPP.Concurrent.Types
+
+import Text.XML.Stream.Elements
+import qualified Text.XML.Stream.Render as XR
+
+readWorker :: TChan Message -> TChan Presence -> TChan IQ -> XMPPState -> ResourceT IO ()
+readWorker messageC presenceC iqC s = Ex.catch (forever . flip runStateT s $ do
+ sta <- pull
+ case sta of
+ SMessage m -> liftIO . atomically $ do
+ writeTChan messageC m
+ _ <- readTChan messageC -- Sic!
+ return ()
+ -- this may seem ridiculous, but to prevent
+ -- the channel from filling up we immedtiately remove the
+ -- Stanza we just put in. It will still be
+ -- available in duplicates.
+ SPresence p -> liftIO . atomically $ do
+ writeTChan presenceC p
+ _ <- readTChan presenceC
+ return ()
+ SIQ i -> liftIO . atomically $ do
+ writeTChan iqC i
+ return ()
+ )
+ ( \(ReaderSignal a) -> do
+ ((),s') <- runStateT a s
+ readWorker messageC presenceC iqC s'
+ )
+
+writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO ()) -> IO ()
+writeWorker stCh writeR = forever $ do
+ (write, next) <- atomically $ (,) <$>
+ takeTMVar writeR <*>
+ readTChan stCh
+ outBS <- CL.sourceList (elementToEvents $ pickleElem stanzaP next)
+ $= XR.renderBytes def $$ CL.consume
+ _ <- forM outBS write
+ atomically $ putTMVar writeR write
+
+handleIQs :: MonadIO m => TVar IQHandlers -> TChan IQ -> m a
+handleIQs handlers iqC = liftIO . forever . atomically $ do
+ iq <- readTChan iqC
+ (byNS, byID) <- readTVar handlers
+ let iqNS = fromMaybe ("") (nameNamespace . elementName . iqBody $ iq)
+ case () of () | (iqType iq) `elem` [Get, Set] ->
+ case Map.lookup (Get, iqNS) byNS of
+ Nothing -> return () -- TODO: send error stanza
+ Just ch -> do
+ sent <- newTVar False
+ writeTChan ch (iq, sent)
+ | otherwise -> case Map.updateLookupWithKey (\_ _ -> Nothing)
+ (iqId iq) byID of
+ (Nothing, _) -> return () -- we are not supposed
+ -- to send an error
+ (Just tmvar, byID') -> do
+ _ <- tryPutTMVar tmvar iq -- don't block
+ writeTVar handlers (byNS, byID')
+
+
+
+-- Two streams: input and output. Threads read from input stream and write to output stream.
+-- | Runs thread in XmppState monad
+-- returns channel of incoming and outgoing stances, respectively
+-- and an Action to stop the Threads and close the connection
+startThreads
+ :: XMPPMonad ( TChan Message
+ , TChan Presence
+ , TVar IQHandlers
+ , TChan Stanza, IO ()
+ , TMVar (BS.ByteString -> IO ())
+ , ThreadId
+ )
+
+startThreads = do
+ writeLock <- liftIO . newTMVarIO =<< gets sConPushBS
+ messageC <- liftIO newTChanIO
+ presenceC <- liftIO newTChanIO
+ iqC <- liftIO newTChanIO
+ outC <- liftIO newTChanIO
+ handlers <- liftIO $ newTVarIO ( Map.empty, Map.empty)
+ lw <- liftIO . forkIO $ writeWorker outC writeLock
+ cp <- liftIO . forkIO $ connPersist writeLock
+ iqh <- liftIO . forkIO $ handleIQs handlers iqC
+ s <- get
+ rd <- lift . resourceForkIO $ readWorker messageC presenceC iqC s
+ return (messageC, presenceC, handlers, outC, killConnection writeLock [lw, rd, cp, iqh], writeLock, rd)
+ where
+ killConnection writeLock threads = liftIO $ do
+ _ <- atomically $ takeTMVar writeLock -- Should we put it back?
+ _ <- forM threads killThread
+ return()
+
+
+-- | Start worker threads and run action. The supplied action will run
+-- in the calling thread. use 'forkXMPP' to start another thread.
+runThreaded :: XMPPThread a
+ -> XMPPMonad a
+runThreaded a = do
+ (mC, pC, hand, outC, _stopThreads, writeR, rdr ) <- startThreads
+ workermCh <- liftIO . newIORef $ Nothing
+ workerpCh <- liftIO . newIORef $ Nothing
+ idRef <- liftIO $ newTVarIO 1
+ let getId = atomically $ do
+ curId <- readTVar idRef
+ writeTVar idRef (curId + 1 :: Integer)
+ return . Text.pack $ show curId
+ liftIO $ runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR rdr getId)
+
+-- | Sends a blank space every 30 seconds to keep the connection alive
+connPersist :: TMVar (BS.ByteString -> IO ()) -> IO ()
+connPersist lock = forever $ do
+ pushBS <- atomically $ takeTMVar lock
+ pushBS " "
+ atomically $ putTMVar lock pushBS
+-- putStrLn ""
+ threadDelay 30000000
diff --git a/src/Network/XMPP/Concurrent/Types.hs b/src/Network/XMPP/Concurrent/Types.hs
new file mode 100644
index 0000000..b848e43
--- /dev/null
+++ b/src/Network/XMPP/Concurrent/Types.hs
@@ -0,0 +1,40 @@
+{-# LANGUAGE DeriveDataTypeable #-}
+
+module Network.XMPP.Concurrent.Types where
+
+import qualified Control.Exception.Lifted as Ex
+import Control.Concurrent
+import Control.Concurrent.STM
+import Control.Monad.Trans.Reader
+
+import qualified Data.ByteString as BS
+import Data.IORef
+import qualified Data.Map as Map
+import Data.Text(Text)
+import Data.Typeable
+
+
+import Network.XMPP.Types
+
+
+type IQHandlers = (Map.Map (IQType, Text) (TChan (IQ, TVar Bool))
+ , Map.Map Text (TMVar IQ)
+ )
+
+data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message))
+ , presenceRef :: IORef (Maybe (TChan Presence))
+ , mShadow :: TChan Message -- the original chan
+ , pShadow :: TChan Presence -- the original chan
+ , outCh :: TChan Stanza
+ , iqHandlers :: TVar IQHandlers
+ , writeRef :: TMVar (BS.ByteString -> IO () )
+ , readerThread :: ThreadId
+ , idGenerator :: IO Text
+ }
+
+type XMPPThread a = ReaderT Thread IO a
+
+
+data ReaderSignal = ReaderSignal (XMPPMonad ()) deriving Typeable
+instance Show ReaderSignal where show _ = ""
+instance Ex.Exception ReaderSignal