15 changed files with 548 additions and 346 deletions
@ -0,0 +1,140 @@
@@ -0,0 +1,140 @@
|
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
module Network.Xmpp.Concurrent.Channels |
||||
( module Network.Xmpp.Concurrent.Channels.Basic |
||||
, module Network.Xmpp.Concurrent.Channels.Types |
||||
, module Network.Xmpp.Concurrent.Channels.Message |
||||
, module Network.Xmpp.Concurrent.Channels.Presence |
||||
, module Network.Xmpp.Concurrent.Channels.IQ |
||||
, toChans |
||||
, newSessionChans |
||||
, writeWorker |
||||
) |
||||
|
||||
where |
||||
|
||||
import Control.Applicative((<$>),(<*>)) |
||||
import Control.Concurrent |
||||
import Control.Concurrent.STM |
||||
import Control.Monad |
||||
import qualified Data.ByteString as BS |
||||
import Data.IORef |
||||
import qualified Data.Map as Map |
||||
import Data.Maybe (fromMaybe) |
||||
import Data.XML.Types |
||||
import Network.Xmpp.Concurrent.Channels.Basic |
||||
import Network.Xmpp.Concurrent.Channels.IQ |
||||
import Network.Xmpp.Concurrent.Channels.Message |
||||
import Network.Xmpp.Concurrent.Channels.Presence |
||||
import Network.Xmpp.Concurrent.Channels.Types |
||||
import Network.Xmpp.Concurrent.Threads |
||||
import Network.Xmpp.Concurrent.Types |
||||
import Network.Xmpp.Marshal |
||||
import Network.Xmpp.Pickle |
||||
import Network.Xmpp.Types |
||||
import Text.XML.Stream.Elements |
||||
|
||||
toChans :: TChan (Either MessageError Message) |
||||
-> TChan (Either PresenceError Presence) |
||||
-> TChan Stanza |
||||
-> TVar IQHandlers |
||||
-> Stanza |
||||
-> IO () |
||||
toChans messageC presenceC stanzaC iqHands sta = atomically $ do |
||||
writeTChan stanzaC sta |
||||
void $ readTChan stanzaC -- sic |
||||
case sta of |
||||
MessageS m -> do writeTChan messageC $ Right m |
||||
_ <- readTChan messageC -- Sic! |
||||
return () |
||||
-- this may seem ridiculous, but to prevent |
||||
-- the channel from filling up we |
||||
-- immedtiately remove the |
||||
-- Stanza we just put in. It will still be |
||||
-- available in duplicates. |
||||
MessageErrorS m -> do writeTChan messageC $ Left m |
||||
_ <- readTChan messageC |
||||
return () |
||||
PresenceS p -> do |
||||
writeTChan presenceC $ Right p |
||||
_ <- readTChan presenceC |
||||
return () |
||||
PresenceErrorS p -> do |
||||
writeTChan presenceC $ Left p |
||||
_ <- readTChan presenceC |
||||
return () |
||||
IQRequestS i -> handleIQRequest iqHands i |
||||
IQResultS i -> handleIQResponse iqHands (Right i) |
||||
IQErrorS i -> handleIQResponse iqHands (Left i) |
||||
where |
||||
-- If the IQ request has a namespace, send it through the appropriate channel. |
||||
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () |
||||
handleIQRequest handlers iq = do |
||||
(byNS, _) <- readTVar handlers |
||||
let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq) |
||||
case Map.lookup (iqRequestType iq, iqNS) byNS of |
||||
Nothing -> return () -- TODO: send error stanza |
||||
Just ch -> do |
||||
sent <- newTVar False |
||||
writeTChan ch $ IQRequestTicket sent iq |
||||
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM () |
||||
handleIQResponse handlers iq = do |
||||
(byNS, byID) <- readTVar handlers |
||||
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of |
||||
(Nothing, _) -> return () -- We are not supposed to send an error. |
||||
(Just tmvar, byID') -> do |
||||
let answer = either IQResponseError IQResponseResult iq |
||||
_ <- tryPutTMVar tmvar answer -- Don't block. |
||||
writeTVar handlers (byNS, byID') |
||||
where |
||||
iqID (Left err) = iqErrorID err |
||||
iqID (Right iq') = iqResultID iq' |
||||
|
||||
|
||||
-- | Creates and initializes a new concurrent session. |
||||
newSessionChans :: IO CSession |
||||
newSessionChans = do |
||||
messageC <- newTChanIO |
||||
presenceC <- newTChanIO |
||||
outC <- newTChanIO |
||||
stanzaC <- newTChanIO |
||||
iqHandlers <- newTVarIO (Map.empty, Map.empty) |
||||
eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () } |
||||
let stanzaHandler = toChans messageC presenceC stanzaC iqHandlers |
||||
(kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh |
||||
writer <- forkIO $ writeWorker outC wLock |
||||
workermCh <- newIORef $ Nothing |
||||
workerpCh <- newIORef $ Nothing |
||||
idRef <- newTVarIO 1 |
||||
let getId = atomically $ do |
||||
curId <- readTVar idRef |
||||
writeTVar idRef (curId + 1 :: Integer) |
||||
return . read. show $ curId |
||||
let sess = Session { writeRef = wLock |
||||
, readerThread = readerThread |
||||
, idGenerator = getId |
||||
, conStateRef = conState |
||||
, eventHandlers = eh |
||||
, stopThreads = kill >> killThread writer |
||||
} |
||||
return $ CSession { session = sess |
||||
, mShadow = messageC |
||||
, pShadow = presenceC |
||||
, sShadow = stanzaC |
||||
, messagesRef = workermCh |
||||
, presenceRef = workerpCh |
||||
, outCh = outC |
||||
, iqHandlers = iqHandlers |
||||
} |
||||
|
||||
-- Worker to write stanzas to the stream concurrently. |
||||
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO () |
||||
writeWorker stCh writeR = forever $ do |
||||
(write, next) <- atomically $ (,) <$> |
||||
takeTMVar writeR <*> |
||||
readTChan stCh |
||||
r <- write $ renderElement (pickleElem xpStanza next) |
||||
atomically $ putTMVar writeR write |
||||
unless r $ do |
||||
atomically $ unGetTChan stCh next -- If the writing failed, the |
||||
-- connection is dead. |
||||
threadDelay 250000 -- Avoid free spinning. |
||||
@ -0,0 +1,21 @@
@@ -0,0 +1,21 @@
|
||||
module Network.Xmpp.Concurrent.Channels.Basic where |
||||
|
||||
import Control.Concurrent.STM |
||||
import Data.IORef |
||||
import Network.Xmpp.Concurrent.Channels.Types |
||||
import Network.Xmpp.Types |
||||
|
||||
-- | Get a duplicate of the stanza channel |
||||
getStanzaChan :: CSession -> IO (TChan Stanza) |
||||
getStanzaChan session = atomically $ dupTChan (sShadow session) |
||||
|
||||
-- | Send a stanza to the server. |
||||
sendStanza :: Stanza -> CSession -> IO () |
||||
sendStanza a session = atomically $ writeTChan (outCh session) a |
||||
|
||||
-- | Create a forked session object |
||||
forkCSession :: CSession -> IO CSession |
||||
forkCSession session = do |
||||
mCH' <- newIORef Nothing |
||||
pCH' <- newIORef Nothing |
||||
return $ session {messagesRef = mCH' , presenceRef = pCH'} |
||||
@ -0,0 +1,107 @@
@@ -0,0 +1,107 @@
|
||||
module Network.Xmpp.Concurrent.Channels.IQ where |
||||
|
||||
import Control.Concurrent (forkIO, threadDelay) |
||||
import Control.Concurrent.STM |
||||
import Control.Monad |
||||
import Control.Monad.IO.Class |
||||
import Control.Monad.Trans.Reader |
||||
|
||||
import qualified Data.Map as Map |
||||
import Data.Text (Text) |
||||
import Data.XML.Types |
||||
|
||||
import Network.Xmpp.Concurrent.Channels.Basic |
||||
import Network.Xmpp.Concurrent.Channels.Types |
||||
import Network.Xmpp.Concurrent.Types |
||||
import Network.Xmpp.Types |
||||
|
||||
-- | Sends an IQ, returns a 'TMVar' that will be filled with the first inbound |
||||
-- IQ with a matching ID that has type @result@ or @error@. |
||||
sendIQ :: Maybe Int -- ^ Timeout |
||||
-> Maybe Jid -- ^ Recipient (to) |
||||
-> IQRequestType -- ^ IQ type (@Get@ or @Set@) |
||||
-> Maybe LangTag -- ^ Language tag of the payload (@Nothing@ for |
||||
-- default) |
||||
-> Element -- ^ The IQ body (there has to be exactly one) |
||||
-> CSession |
||||
-> IO (TMVar IQResponse) |
||||
sendIQ timeOut to tp lang body csession = do -- TODO: Add timeout |
||||
newId <- idGenerator (session csession) |
||||
ref <- atomically $ do |
||||
resRef <- newEmptyTMVar |
||||
(byNS, byId) <- readTVar (iqHandlers csession) |
||||
writeTVar (iqHandlers csession) (byNS, Map.insert newId resRef byId) |
||||
-- TODO: Check for id collisions (shouldn't happen?) |
||||
return resRef |
||||
sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) csession |
||||
case timeOut of |
||||
Nothing -> return () |
||||
Just t -> void . forkIO $ do |
||||
threadDelay t |
||||
doTimeOut (iqHandlers csession) newId ref |
||||
return ref |
||||
where |
||||
doTimeOut handlers iqid var = atomically $ do |
||||
p <- tryPutTMVar var IQResponseTimeout |
||||
when p $ do |
||||
(byNS, byId) <- readTVar (iqHandlers csession) |
||||
writeTVar handlers (byNS, Map.delete iqid byId) |
||||
return () |
||||
|
||||
|
||||
-- | Like 'sendIQ', but waits for the answer IQ. Times out after 3 seconds |
||||
sendIQ' :: Maybe Jid |
||||
-> IQRequestType |
||||
-> Maybe LangTag |
||||
-> Element |
||||
-> CSession |
||||
-> IO IQResponse |
||||
sendIQ' to tp lang body csession = do |
||||
ref <- sendIQ (Just 3000000) to tp lang body csession |
||||
atomically $ takeTMVar ref |
||||
|
||||
|
||||
-- | Retrieves an IQ listener channel. If the namespace/'IQRequestType' is not |
||||
-- already handled, a new 'TChan' is created and returned as a 'Right' value. |
||||
-- Otherwise, the already existing channel will be returned wrapped in a 'Left' |
||||
-- value. Note that the 'Left' channel might need to be duplicated in order not |
||||
-- to interfere with existing consumers. |
||||
listenIQChan :: IQRequestType -- ^ Type of IQs to receive (@Get@ or @Set@) |
||||
-> Text -- ^ Namespace of the child element |
||||
-> CSession |
||||
-> IO (Either (TChan IQRequestTicket) (TChan IQRequestTicket)) |
||||
listenIQChan tp ns csession = do |
||||
let handlers = (iqHandlers csession) |
||||
atomically $ do |
||||
(byNS, byID) <- readTVar handlers |
||||
iqCh <- newTChan |
||||
let (present, byNS') = Map.insertLookupWithKey' |
||||
(\_ _ old -> old) |
||||
(tp, ns) |
||||
iqCh |
||||
byNS |
||||
writeTVar handlers (byNS', byID) |
||||
return $ case present of |
||||
Nothing -> Right iqCh |
||||
Just iqCh' -> Left iqCh' |
||||
|
||||
answerIQ :: IQRequestTicket |
||||
-> Either StanzaError (Maybe Element) |
||||
-> CSession |
||||
-> IO Bool |
||||
answerIQ (IQRequestTicket |
||||
sentRef |
||||
(IQRequest iqid from _to lang _tp bd)) |
||||
answer csession = do |
||||
let response = case answer of |
||||
Left err -> IQErrorS $ IQError iqid Nothing from lang err (Just bd) |
||||
Right res -> IQResultS $ IQResult iqid Nothing from lang res |
||||
atomically $ do |
||||
sent <- readTVar sentRef |
||||
case sent of |
||||
False -> do |
||||
writeTVar sentRef True |
||||
|
||||
writeTChan (outCh csession) response |
||||
return True |
||||
True -> return False |
||||
@ -0,0 +1,69 @@
@@ -0,0 +1,69 @@
|
||||
module Network.Xmpp.Concurrent.Channels.Message where |
||||
|
||||
import Network.Xmpp.Concurrent.Channels.Types |
||||
import Control.Concurrent.STM |
||||
import Data.IORef |
||||
import Network.Xmpp.Types |
||||
import Network.Xmpp.Concurrent.Types |
||||
import Network.Xmpp.Concurrent.Channels.Basic |
||||
|
||||
-- | Get the inbound stanza channel, duplicates from master if necessary. Please |
||||
-- note that once duplicated it will keep filling up, call 'dropMessageChan' to |
||||
-- allow it to be garbage collected. |
||||
getMessageChan :: CSession -> IO (TChan (Either MessageError Message)) |
||||
getMessageChan session = do |
||||
mCh <- readIORef . messagesRef $ session |
||||
case mCh of |
||||
Nothing -> do |
||||
mCh' <- atomically $ dupTChan (mShadow session) |
||||
writeIORef (messagesRef session) (Just mCh') |
||||
return mCh' |
||||
Just mCh' -> return mCh' |
||||
|
||||
-- | Drop the local end of the inbound stanza channel from our context so it can |
||||
-- be GC-ed. |
||||
dropMessageChan :: CSession -> IO () |
||||
dropMessageChan session = writeIORef (messagesRef session) Nothing |
||||
|
||||
-- | Read an element from the inbound stanza channel, acquiring a copy of the |
||||
-- channel as necessary. |
||||
pullMessage :: CSession -> IO (Either MessageError Message) |
||||
pullMessage session = do |
||||
c <- getMessageChan session |
||||
atomically $ readTChan c |
||||
|
||||
-- | Pulls a (non-error) message and returns it if the given predicate returns |
||||
-- @True@. |
||||
waitForMessage :: (Message -> Bool) -> CSession -> IO Message |
||||
waitForMessage f session = do |
||||
s <- pullMessage session |
||||
case s of |
||||
Left _ -> waitForMessage f session |
||||
Right m | f m -> return m |
||||
| otherwise -> waitForMessage f session |
||||
|
||||
-- | Pulls an error message and returns it if the given predicate returns @True@. |
||||
waitForMessageError :: (MessageError -> Bool) -> CSession -> IO MessageError |
||||
waitForMessageError f session = do |
||||
s <- pullMessage session |
||||
case s of |
||||
Right _ -> waitForMessageError f session |
||||
Left m | f m -> return m |
||||
| otherwise -> waitForMessageError f session |
||||
|
||||
|
||||
-- | Pulls a message and returns it if the given predicate returns @True@. |
||||
filterMessages :: (MessageError -> Bool) |
||||
-> (Message -> Bool) |
||||
-> CSession -> IO (Either MessageError Message) |
||||
filterMessages f g session = do |
||||
s <- pullMessage session |
||||
case s of |
||||
Left e | f e -> return $ Left e |
||||
| otherwise -> filterMessages f g session |
||||
Right m | g m -> return $ Right m |
||||
| otherwise -> filterMessages f g session |
||||
|
||||
-- | Send a message stanza. |
||||
sendMessage :: Message -> CSession -> IO () |
||||
sendMessage m session = sendStanza (MessageS m) session |
||||
@ -0,0 +1,46 @@
@@ -0,0 +1,46 @@
|
||||
module Network.Xmpp.Concurrent.Channels.Presence where |
||||
|
||||
import Network.Xmpp.Concurrent.Channels.Types |
||||
import Control.Concurrent.STM |
||||
import Data.IORef |
||||
import Network.Xmpp.Types |
||||
import Network.Xmpp.Concurrent.Types |
||||
import Network.Xmpp.Concurrent.Channels.Basic |
||||
|
||||
-- | Analogous to 'getMessageChan'. |
||||
getPresenceChan :: CSession -> IO (TChan (Either PresenceError Presence)) |
||||
getPresenceChan session = do |
||||
pCh <- readIORef $ (presenceRef session) |
||||
case pCh of |
||||
Nothing -> do |
||||
pCh' <- atomically $ dupTChan (pShadow session) |
||||
writeIORef (presenceRef session) (Just pCh') |
||||
return pCh' |
||||
Just pCh' -> return pCh' |
||||
|
||||
|
||||
-- | Analogous to 'dropMessageChan'. |
||||
dropPresenceChan :: CSession -> IO () |
||||
dropPresenceChan session = writeIORef (presenceRef session) Nothing |
||||
|
||||
|
||||
-- | Read an element from the inbound stanza channel, acquiring a copy of the |
||||
-- channel as necessary. |
||||
pullPresence :: CSession -> IO (Either PresenceError Presence) |
||||
pullPresence session = do |
||||
c <- getPresenceChan session |
||||
atomically $ readTChan c |
||||
|
||||
-- | Pulls a (non-error) presence and returns it if the given predicate returns |
||||
-- @True@. |
||||
waitForPresence :: (Presence -> Bool) -> CSession -> IO Presence |
||||
waitForPresence f session = do |
||||
s <- pullPresence session |
||||
case s of |
||||
Left _ -> waitForPresence f session |
||||
Right m | f m -> return m |
||||
| otherwise -> waitForPresence f session |
||||
|
||||
-- | Send a presence stanza. |
||||
sendPresence :: Presence -> CSession -> IO () |
||||
sendPresence p session = sendStanza (PresenceS p) session |
||||
@ -0,0 +1,42 @@
@@ -0,0 +1,42 @@
|
||||
module Network.Xmpp.Concurrent.Channels.Types where |
||||
|
||||
import Control.Concurrent.STM |
||||
import Data.IORef |
||||
import qualified Data.Map as Map |
||||
import Data.Text (Text) |
||||
import Network.Xmpp.Concurrent.Types |
||||
import Network.Xmpp.Types |
||||
|
||||
-- | Session with Channels |
||||
data CSession = CSession |
||||
{ session :: Session |
||||
-- The original master channels that the reader puts stanzas |
||||
-- into. These are cloned by @get{STanza,Message,Presence}Chan |
||||
-- on demand when first used by the thread and are stored in the |
||||
-- {message,presence}Ref fields below. |
||||
, mShadow :: TChan (Either MessageError Message) |
||||
, pShadow :: TChan (Either PresenceError Presence) |
||||
, sShadow :: TChan Stanza -- All stanzas |
||||
-- The cloned copies of the original/shadow channels. They are |
||||
-- thread-local (as opposed to the shadow channels) and contains all |
||||
-- stanzas received after the cloning of the shadow channels. |
||||
, messagesRef :: IORef (Maybe (TChan (Either MessageError Message))) |
||||
, presenceRef :: IORef (Maybe (TChan (Either PresenceError Presence))) |
||||
, outCh :: TChan Stanza |
||||
, iqHandlers :: TVar IQHandlers |
||||
-- Writing lock, so that only one thread could write to the stream at any |
||||
-- given time. |
||||
} |
||||
|
||||
-- | IQHandlers holds the registered channels for incomming IQ requests and |
||||
-- TMVars of and TMVars for expected IQ responses |
||||
type IQHandlers = (Map.Map (IQRequestType, Text) (TChan IQRequestTicket) |
||||
, Map.Map StanzaId (TMVar IQResponse) |
||||
) |
||||
|
||||
-- | Contains whether or not a reply has been sent, and the IQ request body to |
||||
-- reply to. |
||||
data IQRequestTicket = IQRequestTicket |
||||
{ sentRef :: (TVar Bool) |
||||
, iqRequestBody :: IQRequest |
||||
} |
||||
@ -1,81 +0,0 @@
@@ -1,81 +0,0 @@
|
||||
module Network.Xmpp.Concurrent.IQ where |
||||
|
||||
import Control.Concurrent.STM |
||||
import Control.Concurrent (forkIO, threadDelay) |
||||
import Control.Monad |
||||
import Control.Monad.IO.Class |
||||
import Control.Monad.Trans.Reader |
||||
|
||||
import Data.XML.Types |
||||
import qualified Data.Map as Map |
||||
|
||||
import Network.Xmpp.Concurrent.Types |
||||
import Network.Xmpp.Concurrent.Monad |
||||
import Network.Xmpp.Types |
||||
|
||||
-- | Sends an IQ, returns a 'TMVar' that will be filled with the first inbound |
||||
-- IQ with a matching ID that has type @result@ or @error@. |
||||
sendIQ :: Maybe Int -- ^ Timeout |
||||
-> Maybe Jid -- ^ Recipient (to) |
||||
-> IQRequestType -- ^ IQ type (@Get@ or @Set@) |
||||
-> Maybe LangTag -- ^ Language tag of the payload (@Nothing@ for |
||||
-- default) |
||||
-> Element -- ^ The IQ body (there has to be exactly one) |
||||
-> Session |
||||
-> IO (TMVar IQResponse) |
||||
sendIQ timeOut to tp lang body session = do -- TODO: Add timeout |
||||
newId <- idGenerator session |
||||
ref <- atomically $ do |
||||
resRef <- newEmptyTMVar |
||||
(byNS, byId) <- readTVar (iqHandlers . chans $ session) |
||||
writeTVar (iqHandlers . chans $ session) (byNS, Map.insert newId resRef byId) |
||||
-- TODO: Check for id collisions (shouldn't happen?) |
||||
return resRef |
||||
sendStanza (IQRequestS $ IQRequest newId Nothing to lang tp body) (chans session) |
||||
case timeOut of |
||||
Nothing -> return () |
||||
Just t -> void . forkIO $ do |
||||
threadDelay t |
||||
doTimeOut (iqHandlers . chans $ session) newId ref |
||||
return ref |
||||
where |
||||
doTimeOut handlers iqid var = atomically $ do |
||||
p <- tryPutTMVar var IQResponseTimeout |
||||
when p $ do |
||||
(byNS, byId) <- readTVar (iqHandlers . chans $ session) |
||||
writeTVar handlers (byNS, Map.delete iqid byId) |
||||
return () |
||||
|
||||
|
||||
-- | Like 'sendIQ', but waits for the answer IQ. Times out after 3 seconds |
||||
sendIQ' :: Maybe Jid |
||||
-> IQRequestType |
||||
-> Maybe LangTag |
||||
-> Element |
||||
-> Session |
||||
-> IO IQResponse |
||||
sendIQ' to tp lang body session = do |
||||
ref <- sendIQ (Just 3000000) to tp lang body session |
||||
atomically $ takeTMVar ref |
||||
|
||||
|
||||
answerIQ :: IQRequestTicket |
||||
-> Either StanzaError (Maybe Element) |
||||
-> Session |
||||
-> IO Bool |
||||
answerIQ (IQRequestTicket |
||||
sentRef |
||||
(IQRequest iqid from _to lang _tp bd)) |
||||
answer session = do |
||||
let response = case answer of |
||||
Left err -> IQErrorS $ IQError iqid Nothing from lang err (Just bd) |
||||
Right res -> IQResultS $ IQResult iqid Nothing from lang res |
||||
atomically $ do |
||||
sent <- readTVar sentRef |
||||
case sent of |
||||
False -> do |
||||
writeTVar sentRef True |
||||
|
||||
writeTChan (outCh . chans $ session) response |
||||
return True |
||||
True -> return False |
||||
Loading…
Reference in new issue