diff --git a/src/Data/Conduit/TLS.hs b/src/Data/Conduit/TLS.hs index 141eeb0..bf2adf1 100644 --- a/src/Data/Conduit/TLS.hs +++ b/src/Data/Conduit/TLS.hs @@ -9,7 +9,6 @@ module Data.Conduit.TLS import Control.Applicative import Control.Monad.IO.Class -import Control.Monad.Trans.Class import Control.Monad.Trans.Resource import Crypto.Random @@ -22,9 +21,6 @@ import Network.TLS as TLS import Network.TLS.Extra as TLSExtra import System.IO(Handle) -import System.Random - -import System.IO tlsinit :: (MonadIO m, MonadIO m1, MonadResource m1) => @@ -43,7 +39,7 @@ tlsinit tlsParams handle = do let snk = sinkIO (return clientContext) (\_ -> return ()) - (\con bs -> sendData clientContext (BL.fromChunks [bs]) + (\con bs -> sendData con (BL.fromChunks [bs]) >> return IOProcessing ) (\_ -> return ()) return ( src diff --git a/src/Network/XMPP/Bind.hs b/src/Network/XMPP/Bind.hs index 10cdf60..7f198d4 100644 --- a/src/Network/XMPP/Bind.hs +++ b/src/Network/XMPP/Bind.hs @@ -14,8 +14,6 @@ import Network.XMPP.Types import Network.XMPP.Pickle import Network.XMPP.Concurrent -import Control.Monad.IO.Class - bindBody :: Maybe Text -> Element bindBody rsrc = (pickleElem (bindP . xpOption $ xpElemNodes "resource" (xpContent xpId)) diff --git a/src/Network/XMPP/Concurrent.hs b/src/Network/XMPP/Concurrent.hs index cd1cf74..19f4ef7 100644 --- a/src/Network/XMPP/Concurrent.hs +++ b/src/Network/XMPP/Concurrent.hs @@ -1,343 +1,18 @@ -{-# LANGUAGE ScopedTypeVariables #-} -{-# LANGUAGE DeriveDataTypeable #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE NoMonomorphismRestriction #-} - - module Network.XMPP.Concurrent - where - --- import Network.XMPP.Stream -import Network.XMPP.Types - -import Control.Applicative((<$>),(<*>)) -import Control.Concurrent -import Control.Concurrent.STM -import qualified Control.Exception.Lifted as Ex -import Control.Monad -import Control.Monad.IO.Class -import Control.Monad.Trans.Class -import Control.Monad.Trans.Reader -import Control.Monad.Trans.Resource -import Control.Monad.Trans.State - -import qualified Data.ByteString as BS -import Data.Conduit -import qualified Data.Conduit.List as CL -import Data.Default (def) -import Data.IORef -import qualified Data.Map as Map -import Data.Maybe -import qualified Data.Text as Text -import Data.Text(Text) -import Data.Typeable - -import Data.XML.Types - -import Network.XMPP.Monad -import Network.XMPP.Marshal -import Network.XMPP.Pickle - -import Text.XML.Stream.Elements -import qualified Text.XML.Stream.Render as XR - -type IQHandlers = (Map.Map (IQType, Text) (TChan (IQ, TVar Bool)) - , Map.Map Text (TMVar IQ) - ) - -data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message)) - , presenceRef :: IORef (Maybe (TChan Presence)) - , mShadow :: TChan Message -- the original chan - , pShadow :: TChan Presence -- the original chan - , outCh :: TChan Stanza - , iqHandlers :: TVar IQHandlers - , writeRef :: TMVar (BS.ByteString -> IO () ) - , readerThread :: ThreadId - , idGenerator :: IO Text - } - -type XMPPThread a = ReaderT Thread IO a - - -data ReaderSignal = ReaderSignal (XMPPMonad ()) deriving Typeable -instance Show ReaderSignal where show _ = "" -instance Ex.Exception ReaderSignal - -readWorker :: TChan Message -> TChan Presence -> TChan IQ -> XMPPState -> ResourceT IO () -readWorker messageC presenceC iqC s = Ex.catch (forever . flip runStateT s $ do - sta <- pull - case sta of - SMessage m -> liftIO . atomically $ do - writeTChan messageC m - _ <- readTChan messageC -- Sic! - return () - -- this may seem ridiculous, but to prevent - -- the channel from filling up we immedtiately remove the - -- Stanza we just put in. It will still be - -- available in duplicates. - SPresence p -> liftIO . atomically $ do - writeTChan presenceC p - _ <- readTChan presenceC - return () - SIQ i -> liftIO . atomically $ do - writeTChan iqC i - return () - ) - ( \(ReaderSignal a) -> do - ((),s') <- runStateT a s - readWorker messageC presenceC iqC s' - ) - -writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO ()) -> IO () -writeWorker stCh writeR = forever $ do - (write, next) <- atomically $ (,) <$> - takeTMVar writeR <*> - readTChan stCh - outBS <- CL.sourceList (elementToEvents $ pickleElem stanzaP next) - $= XR.renderBytes def $$ CL.consume - _ <- forM outBS write - atomically $ putTMVar writeR write - -handleIQs :: MonadIO m => TVar IQHandlers -> TChan IQ -> m a -handleIQs handlers iqC = liftIO . forever . atomically $ do - iq <- readTChan iqC - (byNS, byID) <- readTVar handlers - let iqNS = fromMaybe ("") (nameNamespace . elementName . iqBody $ iq) - case () of () | (iqType iq) `elem` [Get, Set] -> - case Map.lookup (Get, iqNS) byNS of - Nothing -> return () -- TODO: send error stanza - Just ch -> do - sent <- newTVar False - writeTChan ch (iq, sent) - | otherwise -> case Map.updateLookupWithKey (\_ _ -> Nothing) - (iqId iq) byID of - (Nothing, _) -> return () -- we are not supposed - -- to send an error - (Just tmvar, byID') -> do - _ <- tryPutTMVar tmvar iq -- don't block - writeTVar handlers (byNS, byID') - - - --- Two streams: input and output. Threads read from input stream and write to output stream. --- | Runs thread in XmppState monad --- returns channel of incoming and outgoing stances, respectively --- and an Action to stop the Threads and close the connection -startThreads - :: XMPPMonad ( TChan Message - , TChan Presence - , TVar IQHandlers - , TChan Stanza, IO () - , TMVar (BS.ByteString -> IO ()) - , ThreadId - ) - - -startThreads = do - writeLock <- liftIO . newTMVarIO =<< gets sConPushBS - messageC <- liftIO newTChanIO - presenceC <- liftIO newTChanIO - iqC <- liftIO newTChanIO - outC <- liftIO newTChanIO - handlers <- liftIO $ newTVarIO ( Map.empty, Map.empty) - lw <- liftIO . forkIO $ writeWorker outC writeLock - cp <- liftIO . forkIO $ connPersist writeLock - iqh <- liftIO . forkIO $ handleIQs handlers iqC - s <- get - rd <- lift . resourceForkIO $ readWorker messageC presenceC iqC s - return (messageC, presenceC, handlers, outC, killConnection writeLock [lw, rd, cp, iqh], writeLock, rd) - where - killConnection writeLock threads = liftIO $ do - _ <- atomically $ takeTMVar writeLock -- Should we put it back? - _ <- forM threads killThread - return() - - --- | Register a new IQ listener. IQ matching the type and namespace will --- be put in the channel. IQ of type Result and Error will never be put --- into channels, even though this function won't stop you from registering --- them -listenIQChan :: IQType -- ^ type of IQs to receive (Get / Set) - -> Text -- ^ namespace of the child element - -> XMPPThread (Bool, TChan (IQ, TVar Bool)) -listenIQChan tp ns = do - handlers <- asks iqHandlers - liftIO . atomically $ do - (byNS, byID) <- readTVar handlers - iqCh <- newTChan - let (present, byNS') = Map.insertLookupWithKey' (\_ new _ -> new) - (tp,ns) iqCh byNS - writeTVar handlers (byNS', byID) - return $ case present of - Nothing -> (True, iqCh) - Just iqCh' -> (False, iqCh') - --- | Start worker threads and run action. The supplied action will run --- in the calling thread. use 'forkXMPP' to start another thread. -runThreaded :: XMPPThread a - -> XMPPMonad a -runThreaded a = do - (mC, pC, hand, outC, _stopThreads, writeR, rdr ) <- startThreads - workermCh <- liftIO . newIORef $ Nothing - workerpCh <- liftIO . newIORef $ Nothing - idRef <- liftIO $ newTVarIO 1 - let getId = atomically $ do - curId <- readTVar idRef - writeTVar idRef (curId + 1 :: Integer) - return . Text.pack $ show curId - liftIO $ runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR rdr getId) - - - --- | get the inbound stanza channel, duplicates from master if necessary --- please note that once duplicated it will keep filling up, call --- 'dropMessageChan' to allow it to be garbage collected -getMessageChan :: XMPPThread (TChan Message) -getMessageChan = do - mChR <- asks messagesRef - mCh <- liftIO $ readIORef mChR - case mCh of - Nothing -> do - shadow <- asks mShadow - mCh' <- liftIO $ atomically $ dupTChan shadow - liftIO $ writeIORef mChR (Just mCh') - return mCh' - Just mCh' -> return mCh' - --- | see 'getMessageChan' -getPresenceChan :: XMPPThread (TChan Presence) -getPresenceChan = do - pChR <- asks presenceRef - pCh <- liftIO $ readIORef pChR - case pCh of - Nothing -> do - shadow <- asks pShadow - pCh' <- liftIO $ atomically $ dupTChan shadow - liftIO $ writeIORef pChR (Just pCh') - return pCh' - Just pCh' -> return pCh' - --- | Drop the local end of the inbound stanza channel --- from our context so it can be GC-ed -dropMessageChan :: XMPPThread () -dropMessageChan = do - r <- asks messagesRef - liftIO $ writeIORef r Nothing - --- | see 'dropMessageChan' -dropPresenceChan :: XMPPThread () -dropPresenceChan = do - r <- asks presenceRef - liftIO $ writeIORef r Nothing - --- | Read an element from the inbound stanza channel, acquiring a copy --- of the channel as necessary -pullMessage :: XMPPThread Message -pullMessage = do - c <- getMessageChan - st <- liftIO $ atomically $ readTChan c - return st - --- | Read an element from the inbound stanza channel, acquiring a copy --- of the channel as necessary -pullPresence :: XMPPThread Presence -pullPresence = do - c <- getPresenceChan - st <- liftIO $ atomically $ readTChan c - return st - - --- | Send a stanza to the server -sendS :: Stanza -> XMPPThread () -sendS a = do - out <- asks outCh - liftIO . atomically $ writeTChan out a - return () - --- | Fork a new thread -forkXMPP :: XMPPThread () -> XMPPThread ThreadId -forkXMPP a = do - thread <- ask - mCH' <- liftIO $ newIORef Nothing - pCH' <- liftIO $ newIORef Nothing - liftIO $ forkIO $ runReaderT a (thread {messagesRef = mCH' - ,presenceRef = pCH' - }) - -waitForMessage :: (Message -> Bool) -> XMPPThread Message -waitForMessage f = do - s <- pullMessage - if (f s) then - return s - else do - waitForMessage f +( module Network.XMPP.Concurrent.Types +, module Network.XMPP.Concurrent.Monad +, module Network.XMPP.Concurrent.Threads +, module Network.XMPP.Concurrent.IQ +) where -waitForPresence :: (Presence -> Bool) -> XMPPThread Presence -waitForPresence f = do - s <- pullPresence - if (f s) then - return s - else do - waitForPresence f +import Network.XMPP.Concurrent.Types +import Network.XMPP.Concurrent.Monad +import Network.XMPP.Concurrent.Threads +import Network.XMPP.Concurrent.IQ -connPersist :: TMVar (BS.ByteString -> IO ()) -> IO () -connPersist lock = forever $ do - pushBS <- atomically $ takeTMVar lock - pushBS " " - atomically $ putTMVar lock pushBS --- putStrLn "" - threadDelay 30000000 --- | Run an XMPPMonad action in isolation. --- Reader and writer workers will be temporarily stopped --- and resumed with the new session details once the action returns. --- The Action will run in the reader thread. -singleThreaded :: XMPPMonad () -> XMPPThread () -singleThreaded a = do - writeLock <- asks writeRef - rdr <- asks readerThread - _ <- liftIO . atomically $ takeTMVar writeLock -- we replace it with the - -- one returned by a - liftIO . throwTo rdr . ReaderSignal $ do - a - out <- gets sConPushBS - liftIO . atomically $ putTMVar writeLock out - return () --- | Sends an IQ, returns a 'TMVar' that will be filled with the first inbound --- IQ with a matching ID that has type @result@ or @error@ -sendIQ :: Maybe JID -- ^ Recipient (to) - -> IQType -- ^ IQ type (Get or Set) - -> Element -- ^ The iq body (there has to be exactly one) - -> XMPPThread (TMVar IQ) -sendIQ to tp body = do -- TODO: add timeout - newId <- liftIO =<< asks idGenerator - handlers <- asks iqHandlers - ref <- liftIO . atomically $ do - resRef <- newEmptyTMVar - (byNS, byId) <- readTVar handlers - writeTVar handlers (byNS, Map.insert newId resRef byId) - -- TODO: Check for id collisions (shouldn't happen?) - return resRef - sendS . SIQ $ IQ Nothing (to) newId tp body - return ref --- | like 'sendIQ', but waits for the answer IQ -sendIQ' :: Maybe JID -> IQType -> Element -> XMPPThread IQ -sendIQ' to tp body = do - ref <- sendIQ to tp body - liftIO . atomically $ takeTMVar ref -answerIQ :: MonadIO m => (IQ, TVar Bool) -> Element -> ReaderT Thread m Bool -answerIQ ((IQ from _to id _tp _bd), sentRef) body = do - out <- asks outCh - liftIO . atomically $ do - sent <- readTVar sentRef - case sent of - False -> do - writeTVar sentRef True - writeTChan out . SIQ $ IQ Nothing from id Result body - return True - True -> return False diff --git a/src/Network/XMPP/Concurrent/IQ.hs b/src/Network/XMPP/Concurrent/IQ.hs new file mode 100644 index 0000000..2609a14 --- /dev/null +++ b/src/Network/XMPP/Concurrent/IQ.hs @@ -0,0 +1,48 @@ +module Network.XMPP.Concurrent.IQ where + +import Control.Concurrent.STM +import Control.Monad.IO.Class +import Control.Monad.Trans.Reader + +import Data.XML.Types +import qualified Data.Map as Map + +import Network.XMPP.Concurrent.Types +import Network.XMPP.Concurrent.Monad +import Network.XMPP.Types + +-- | Sends an IQ, returns a 'TMVar' that will be filled with the first inbound +-- IQ with a matching ID that has type @result@ or @error@ +sendIQ :: Maybe JID -- ^ Recipient (to) + -> IQType -- ^ IQ type (Get or Set) + -> Element -- ^ The iq body (there has to be exactly one) + -> XMPPThread (TMVar IQ) +sendIQ to tp body = do -- TODO: add timeout + newId <- liftIO =<< asks idGenerator + handlers <- asks iqHandlers + ref <- liftIO . atomically $ do + resRef <- newEmptyTMVar + (byNS, byId) <- readTVar handlers + writeTVar handlers (byNS, Map.insert newId resRef byId) + -- TODO: Check for id collisions (shouldn't happen?) + return resRef + sendS . SIQ $ IQ Nothing (to) newId tp body + return ref + +-- | like 'sendIQ', but waits for the answer IQ +sendIQ' :: Maybe JID -> IQType -> Element -> XMPPThread IQ +sendIQ' to tp body = do + ref <- sendIQ to tp body + liftIO . atomically $ takeTMVar ref + +answerIQ :: MonadIO m => (IQ, TVar Bool) -> Element -> ReaderT Thread m Bool +answerIQ ((IQ from _to iqid _tp _bd), sentRef) body = do + out <- asks outCh + liftIO . atomically $ do + sent <- readTVar sentRef + case sent of + False -> do + writeTVar sentRef True + writeTChan out . SIQ $ IQ Nothing from iqid Result body + return True + True -> return False diff --git a/src/Network/XMPP/Concurrent/Monad.hs b/src/Network/XMPP/Concurrent/Monad.hs new file mode 100644 index 0000000..7b09cdb --- /dev/null +++ b/src/Network/XMPP/Concurrent/Monad.hs @@ -0,0 +1,143 @@ +module Network.XMPP.Concurrent.Monad where + +import Network.XMPP.Types + +import Control.Concurrent +import Control.Concurrent.STM +import Control.Monad.IO.Class +import Control.Monad.Trans.Reader +import Control.Monad.Trans.State + +import Data.IORef +import qualified Data.Map as Map +import Data.Text(Text) + +import Network.XMPP.Concurrent.Types + +-- | Register a new IQ listener. IQ matching the type and namespace will +-- be put in the channel. IQ of type Result and Error will never be put +-- into channels, even though this function won't stop you from registering +-- them +listenIQChan :: IQType -- ^ type of IQs to receive (Get / Set) + -> Text -- ^ namespace of the child element + -> XMPPThread (Bool, TChan (IQ, TVar Bool)) +listenIQChan tp ns = do + handlers <- asks iqHandlers + liftIO . atomically $ do + (byNS, byID) <- readTVar handlers + iqCh <- newTChan + let (present, byNS') = Map.insertLookupWithKey' (\_ new _ -> new) + (tp,ns) iqCh byNS + writeTVar handlers (byNS', byID) + return $ case present of + Nothing -> (True, iqCh) + Just iqCh' -> (False, iqCh') + +-- | get the inbound stanza channel, duplicates from master if necessary +-- please note that once duplicated it will keep filling up, call +-- 'dropMessageChan' to allow it to be garbage collected +getMessageChan :: XMPPThread (TChan Message) +getMessageChan = do + mChR <- asks messagesRef + mCh <- liftIO $ readIORef mChR + case mCh of + Nothing -> do + shadow <- asks mShadow + mCh' <- liftIO $ atomically $ dupTChan shadow + liftIO $ writeIORef mChR (Just mCh') + return mCh' + Just mCh' -> return mCh' + +-- | see 'getMessageChan' +getPresenceChan :: XMPPThread (TChan Presence) +getPresenceChan = do + pChR <- asks presenceRef + pCh <- liftIO $ readIORef pChR + case pCh of + Nothing -> do + shadow <- asks pShadow + pCh' <- liftIO $ atomically $ dupTChan shadow + liftIO $ writeIORef pChR (Just pCh') + return pCh' + Just pCh' -> return pCh' + +-- | Drop the local end of the inbound stanza channel +-- from our context so it can be GC-ed +dropMessageChan :: XMPPThread () +dropMessageChan = do + r <- asks messagesRef + liftIO $ writeIORef r Nothing + +-- | see 'dropMessageChan' +dropPresenceChan :: XMPPThread () +dropPresenceChan = do + r <- asks presenceRef + liftIO $ writeIORef r Nothing + +-- | Read an element from the inbound stanza channel, acquiring a copy +-- of the channel as necessary +pullMessage :: XMPPThread Message +pullMessage = do + c <- getMessageChan + st <- liftIO $ atomically $ readTChan c + return st + +-- | Read an element from the inbound stanza channel, acquiring a copy +-- of the channel as necessary +pullPresence :: XMPPThread Presence +pullPresence = do + c <- getPresenceChan + st <- liftIO $ atomically $ readTChan c + return st + + +-- | Send a stanza to the server +sendS :: Stanza -> XMPPThread () +sendS a = do + out <- asks outCh + liftIO . atomically $ writeTChan out a + return () + +-- | Fork a new thread +forkXMPP :: XMPPThread () -> XMPPThread ThreadId +forkXMPP a = do + thread <- ask + mCH' <- liftIO $ newIORef Nothing + pCH' <- liftIO $ newIORef Nothing + liftIO $ forkIO $ runReaderT a (thread {messagesRef = mCH' + ,presenceRef = pCH' + }) + +waitForMessage :: (Message -> Bool) -> XMPPThread Message +waitForMessage f = do + s <- pullMessage + if (f s) then + return s + else do + waitForMessage f + +waitForPresence :: (Presence -> Bool) -> XMPPThread Presence +waitForPresence f = do + s <- pullPresence + if (f s) then + return s + else do + waitForPresence f + + +-- | Run an XMPPMonad action in isolation. +-- Reader and writer workers will be temporarily stopped +-- and resumed with the new session details once the action returns. +-- The Action will run in the reader thread. +singleThreaded :: XMPPMonad () -> XMPPThread () +singleThreaded a = do + writeLock <- asks writeRef + rdr <- asks readerThread + _ <- liftIO . atomically $ takeTMVar writeLock -- we replace it with the + -- one returned by a + liftIO . throwTo rdr . ReaderSignal $ do + a + out <- gets sConPushBS + liftIO . atomically $ putTMVar writeLock out + return () + diff --git a/src/Network/XMPP/Concurrent/Threads.hs b/src/Network/XMPP/Concurrent/Threads.hs new file mode 100644 index 0000000..474b6cf --- /dev/null +++ b/src/Network/XMPP/Concurrent/Threads.hs @@ -0,0 +1,147 @@ +{-# LANGUAGE OverloadedStrings #-} +module Network.XMPP.Concurrent.Threads where + +import Network.XMPP.Types + +import Control.Applicative((<$>),(<*>)) +import Control.Concurrent +import Control.Concurrent.STM +import qualified Control.Exception.Lifted as Ex +import Control.Monad +import Control.Monad.IO.Class +import Control.Monad.Trans.Class +import Control.Monad.Trans.Reader +import Control.Monad.Trans.Resource +import Control.Monad.Trans.State + +import qualified Data.ByteString as BS +import Data.Conduit +import qualified Data.Conduit.List as CL +import Data.Default (def) +import Data.IORef +import qualified Data.Map as Map +import Data.Maybe +import qualified Data.Text as Text + +import Data.XML.Types + +import Network.XMPP.Monad +import Network.XMPP.Marshal +import Network.XMPP.Pickle +import Network.XMPP.Concurrent.Types + +import Text.XML.Stream.Elements +import qualified Text.XML.Stream.Render as XR + +readWorker :: TChan Message -> TChan Presence -> TChan IQ -> XMPPState -> ResourceT IO () +readWorker messageC presenceC iqC s = Ex.catch (forever . flip runStateT s $ do + sta <- pull + case sta of + SMessage m -> liftIO . atomically $ do + writeTChan messageC m + _ <- readTChan messageC -- Sic! + return () + -- this may seem ridiculous, but to prevent + -- the channel from filling up we immedtiately remove the + -- Stanza we just put in. It will still be + -- available in duplicates. + SPresence p -> liftIO . atomically $ do + writeTChan presenceC p + _ <- readTChan presenceC + return () + SIQ i -> liftIO . atomically $ do + writeTChan iqC i + return () + ) + ( \(ReaderSignal a) -> do + ((),s') <- runStateT a s + readWorker messageC presenceC iqC s' + ) + +writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO ()) -> IO () +writeWorker stCh writeR = forever $ do + (write, next) <- atomically $ (,) <$> + takeTMVar writeR <*> + readTChan stCh + outBS <- CL.sourceList (elementToEvents $ pickleElem stanzaP next) + $= XR.renderBytes def $$ CL.consume + _ <- forM outBS write + atomically $ putTMVar writeR write + +handleIQs :: MonadIO m => TVar IQHandlers -> TChan IQ -> m a +handleIQs handlers iqC = liftIO . forever . atomically $ do + iq <- readTChan iqC + (byNS, byID) <- readTVar handlers + let iqNS = fromMaybe ("") (nameNamespace . elementName . iqBody $ iq) + case () of () | (iqType iq) `elem` [Get, Set] -> + case Map.lookup (Get, iqNS) byNS of + Nothing -> return () -- TODO: send error stanza + Just ch -> do + sent <- newTVar False + writeTChan ch (iq, sent) + | otherwise -> case Map.updateLookupWithKey (\_ _ -> Nothing) + (iqId iq) byID of + (Nothing, _) -> return () -- we are not supposed + -- to send an error + (Just tmvar, byID') -> do + _ <- tryPutTMVar tmvar iq -- don't block + writeTVar handlers (byNS, byID') + + + +-- Two streams: input and output. Threads read from input stream and write to output stream. +-- | Runs thread in XmppState monad +-- returns channel of incoming and outgoing stances, respectively +-- and an Action to stop the Threads and close the connection +startThreads + :: XMPPMonad ( TChan Message + , TChan Presence + , TVar IQHandlers + , TChan Stanza, IO () + , TMVar (BS.ByteString -> IO ()) + , ThreadId + ) + +startThreads = do + writeLock <- liftIO . newTMVarIO =<< gets sConPushBS + messageC <- liftIO newTChanIO + presenceC <- liftIO newTChanIO + iqC <- liftIO newTChanIO + outC <- liftIO newTChanIO + handlers <- liftIO $ newTVarIO ( Map.empty, Map.empty) + lw <- liftIO . forkIO $ writeWorker outC writeLock + cp <- liftIO . forkIO $ connPersist writeLock + iqh <- liftIO . forkIO $ handleIQs handlers iqC + s <- get + rd <- lift . resourceForkIO $ readWorker messageC presenceC iqC s + return (messageC, presenceC, handlers, outC, killConnection writeLock [lw, rd, cp, iqh], writeLock, rd) + where + killConnection writeLock threads = liftIO $ do + _ <- atomically $ takeTMVar writeLock -- Should we put it back? + _ <- forM threads killThread + return() + + +-- | Start worker threads and run action. The supplied action will run +-- in the calling thread. use 'forkXMPP' to start another thread. +runThreaded :: XMPPThread a + -> XMPPMonad a +runThreaded a = do + (mC, pC, hand, outC, _stopThreads, writeR, rdr ) <- startThreads + workermCh <- liftIO . newIORef $ Nothing + workerpCh <- liftIO . newIORef $ Nothing + idRef <- liftIO $ newTVarIO 1 + let getId = atomically $ do + curId <- readTVar idRef + writeTVar idRef (curId + 1 :: Integer) + return . Text.pack $ show curId + liftIO $ runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR rdr getId) + +-- | Sends a blank space every 30 seconds to keep the connection alive +connPersist :: TMVar (BS.ByteString -> IO ()) -> IO () +connPersist lock = forever $ do + pushBS <- atomically $ takeTMVar lock + pushBS " " + atomically $ putTMVar lock pushBS +-- putStrLn "" + threadDelay 30000000 diff --git a/src/Network/XMPP/Concurrent/Types.hs b/src/Network/XMPP/Concurrent/Types.hs new file mode 100644 index 0000000..b848e43 --- /dev/null +++ b/src/Network/XMPP/Concurrent/Types.hs @@ -0,0 +1,40 @@ +{-# LANGUAGE DeriveDataTypeable #-} + +module Network.XMPP.Concurrent.Types where + +import qualified Control.Exception.Lifted as Ex +import Control.Concurrent +import Control.Concurrent.STM +import Control.Monad.Trans.Reader + +import qualified Data.ByteString as BS +import Data.IORef +import qualified Data.Map as Map +import Data.Text(Text) +import Data.Typeable + + +import Network.XMPP.Types + + +type IQHandlers = (Map.Map (IQType, Text) (TChan (IQ, TVar Bool)) + , Map.Map Text (TMVar IQ) + ) + +data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message)) + , presenceRef :: IORef (Maybe (TChan Presence)) + , mShadow :: TChan Message -- the original chan + , pShadow :: TChan Presence -- the original chan + , outCh :: TChan Stanza + , iqHandlers :: TVar IQHandlers + , writeRef :: TMVar (BS.ByteString -> IO () ) + , readerThread :: ThreadId + , idGenerator :: IO Text + } + +type XMPPThread a = ReaderT Thread IO a + + +data ReaderSignal = ReaderSignal (XMPPMonad ()) deriving Typeable +instance Show ReaderSignal where show _ = "" +instance Ex.Exception ReaderSignal