From 5547a7025999e10635369184ebe1ae437e770985 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Thu, 5 Apr 2012 00:31:06 +0200 Subject: [PATCH] sendIQ, unique ID generation, channel autodrop, some documentation --- src/Network/XMPP/Concurrent.hs | 83 +++++++++++++++++++++++++++------- 1 file changed, 66 insertions(+), 17 deletions(-) diff --git a/src/Network/XMPP/Concurrent.hs b/src/Network/XMPP/Concurrent.hs index c67f0d2..9909069 100644 --- a/src/Network/XMPP/Concurrent.hs +++ b/src/Network/XMPP/Concurrent.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE NoMonomorphismRestriction #-} @@ -31,6 +32,7 @@ import Data.Default (def) import Data.IORef import qualified Data.Map as Map import Data.Maybe +import qualified Data.Text as Text import Data.Text(Text) import Data.Typeable @@ -56,6 +58,7 @@ data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message)) ) , writeRef :: TMVar (BS.ByteString -> IO () ) , readerThread :: ThreadId + , idGenerator :: IO Text } type XMPPThread a = ReaderT Thread IO a @@ -69,9 +72,22 @@ readWorker :: TChan Message -> TChan Presence -> TChan IQ -> XMPPState -> Resour readWorker messageC presenceC iqC s = Ex.catch (forever . flip runStateT s $ do sta <- pull case sta of - SMessage m -> liftIO . atomically $ writeTChan messageC m - SPresence p -> liftIO . atomically $ writeTChan presenceC p - SIQ i -> liftIO . atomically $ writeTChan iqC i + SMessage m -> liftIO . atomically $ do + writeTChan messageC m + _ <- readTChan messageC -- Sic! + return () + -- this may seem ridiculous, but to prevent + -- the channel from filling up we immedtiately remove the + -- Stanza we just put in. It will still be + -- available in duplicates. + SPresence p -> liftIO . atomically $ do + writeTChan presenceC p + _ <- readTChan presenceC + return () + SIQ i -> liftIO . atomically $ do + writeTChan iqC i + _ <-readTChan iqC + return () ) ( \(ReaderSignal a) -> do ((),s') <- runStateT a s @@ -154,8 +170,15 @@ startThreads = do forM threads killThread return() -addIQChan :: IQType -> Text -> XMPPThread (Bool, TChan IQ) -addIQChan tp ns = do + +-- | Register a new IQ listener. IQ matching the type and namespace will +-- be put in the channel. IQ of type Result and Error will never be put +-- into channels, even though this function won't stop you from registering +-- them +listenIQChan :: IQType -- ^ type of IQs to receive (Get / Set) + -> Text -- ^ namespace of the child element + -> XMPPThread (Bool, TChan IQ) +listenIQChan tp ns = do handlers <- asks iqHandlers liftIO . atomically $ do (byNS, byID) <- readTVar handlers @@ -167,21 +190,26 @@ addIQChan tp ns = do Nothing -> (False, iqCh) Just iqCh' -> (True, iqCh') +-- | Start worker threads and run action. The supplied action will run +-- in the calling thread. use 'forkXMPP' to start another thread. runThreaded :: XMPPThread a - -> XMPPMonad ThreadId + -> XMPPMonad a runThreaded a = do (mC, pC, hand, outC, stopThreads, writeR, reader ) <- startThreads - workermCh <- liftIO . newIORef $ Just mC - workerpCh <- liftIO . newIORef $ Just pC - worker <- liftIO . forkIO $ do - runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR - reader) - return () - return worker + workermCh <- liftIO . newIORef $ Nothing + workerpCh <- liftIO . newIORef $ Nothing + idRef <- liftIO $ newTVarIO 1 + let getId = atomically $ do + curId <- readTVar idRef + writeTVar idRef (curId + 1 :: Integer) + return . Text.pack $ show curId + liftIO $ runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR reader getId) + --- | get the inbound stanza channel, duplicate from master if necessary --- please note that once duplicated it will keep filling up +-- | get the inbound stanza channel, duplicates from master if necessary +-- please note that once duplicated it will keep filling up, call +-- 'dropMessageChan' to allow it to be garbage collected getMessageChan = do mChR <- asks messagesRef mCh <- liftIO $ readIORef mChR @@ -193,8 +221,7 @@ getMessageChan = do return mCh' Just mCh -> return mCh --- | get the inbound stanza channel, duplicate from master if necessary --- please note that once duplicated it will keep filling up +-- | see 'getMessageChan' getPresenceChan = do pChR <- asks presenceRef pCh <- liftIO $ readIORef pChR @@ -213,6 +240,7 @@ dropMessageChan = do r <- asks messagesRef liftIO $ writeIORef r Nothing +-- | see 'dropMessageChan' dropPresenceChan :: XMPPThread () dropPresenceChan = do r <- asks presenceRef @@ -277,6 +305,12 @@ connPersist lock = forever $ do -- putStrLn "" threadDelay 30000000 + +-- | Run an XMPPMonad action in isolation. +-- Reader and writer workers will be temporarily stopped +-- and resumed with the new session details once the action returns. +-- The Action will run in the reader thread. +singleThreaded :: XMPPMonad () -> XMPPThread () singleThreaded a = do writeLock <- asks writeRef reader <- asks readerThread @@ -285,6 +319,21 @@ singleThreaded a = do a out <- gets sConPushBS liftIO . atomically $ putTMVar writeLock out + return () +-- | Sends an IQ, returns a 'TMVar' that will be filled with the first inbound +-- IQ with a matching ID that has type @result@ or @error@ +sendIQ :: JID -> IQType -> Element -> XMPPThread (TMVar IQ) +sendIQ to tp body = do -- TODO: add timeout + newId <- liftIO =<< asks idGenerator + handlers <- asks iqHandlers + ref <- liftIO . atomically $ do + resRef <- newEmptyTMVar + (byNS, byId) <- readTVar handlers + writeTVar handlers (byNS, Map.insert newId resRef byId) + -- TODO: Check for id collisions (shouldn't happen?) + return resRef + sendS . SIQ $ IQ Nothing (Just to) newId tp body + return (readTMVar ref)