You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
141 lines
5.9 KiB
141 lines
5.9 KiB
{-# OPTIONS_HADDOCK hide #-} |
|
{-# LANGUAGE OverloadedStrings #-} |
|
module Network.Xmpp.Concurrent.Channels |
|
( module Network.Xmpp.Concurrent.Channels.Basic |
|
, module Network.Xmpp.Concurrent.Channels.Types |
|
, module Network.Xmpp.Concurrent.Channels.Message |
|
, module Network.Xmpp.Concurrent.Channels.Presence |
|
, module Network.Xmpp.Concurrent.Channels.IQ |
|
, toChans |
|
, newContext |
|
, writeWorker |
|
) |
|
|
|
where |
|
|
|
import Control.Applicative((<$>),(<*>)) |
|
import Control.Concurrent |
|
import Control.Concurrent.STM |
|
import Control.Monad |
|
import qualified Data.ByteString as BS |
|
import Data.IORef |
|
import qualified Data.Map as Map |
|
import Data.Maybe (fromMaybe) |
|
import Data.XML.Types |
|
import Network.Xmpp.Concurrent.Channels.Basic |
|
import Network.Xmpp.Concurrent.Channels.IQ |
|
import Network.Xmpp.Concurrent.Channels.Message |
|
import Network.Xmpp.Concurrent.Channels.Presence |
|
import Network.Xmpp.Concurrent.Channels.Types |
|
import Network.Xmpp.Concurrent.Threads |
|
import Network.Xmpp.Concurrent.Types |
|
import Network.Xmpp.Marshal |
|
import Network.Xmpp.Pickle |
|
import Network.Xmpp.Types |
|
import Text.XML.Stream.Elements |
|
|
|
toChans :: TChan (Either MessageError Message) |
|
-> TChan (Either PresenceError Presence) |
|
-> TChan Stanza |
|
-> TVar IQHandlers |
|
-> Stanza |
|
-> IO () |
|
toChans messageC presenceC stanzaC iqHands sta = atomically $ do |
|
writeTChan stanzaC sta |
|
void $ readTChan stanzaC -- sic |
|
case sta of |
|
MessageS m -> do writeTChan messageC $ Right m |
|
_ <- readTChan messageC -- Sic! |
|
return () |
|
-- this may seem ridiculous, but to prevent |
|
-- the channel from filling up we |
|
-- immedtiately remove the |
|
-- Stanza we just put in. It will still be |
|
-- available in duplicates. |
|
MessageErrorS m -> do writeTChan messageC $ Left m |
|
_ <- readTChan messageC |
|
return () |
|
PresenceS p -> do |
|
writeTChan presenceC $ Right p |
|
_ <- readTChan presenceC |
|
return () |
|
PresenceErrorS p -> do |
|
writeTChan presenceC $ Left p |
|
_ <- readTChan presenceC |
|
return () |
|
IQRequestS i -> handleIQRequest iqHands i |
|
IQResultS i -> handleIQResponse iqHands (Right i) |
|
IQErrorS i -> handleIQResponse iqHands (Left i) |
|
where |
|
-- If the IQ request has a namespace, send it through the appropriate channel. |
|
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () |
|
handleIQRequest handlers iq = do |
|
(byNS, _) <- readTVar handlers |
|
let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq) |
|
case Map.lookup (iqRequestType iq, iqNS) byNS of |
|
Nothing -> return () -- TODO: send error stanza |
|
Just ch -> do |
|
sent <- newTVar False |
|
writeTChan ch $ IQRequestTicket sent iq |
|
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM () |
|
handleIQResponse handlers iq = do |
|
(byNS, byID) <- readTVar handlers |
|
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of |
|
(Nothing, _) -> return () -- We are not supposed to send an error. |
|
(Just tmvar, byID') -> do |
|
let answer = either IQResponseError IQResponseResult iq |
|
_ <- tryPutTMVar tmvar answer -- Don't block. |
|
writeTVar handlers (byNS, byID') |
|
where |
|
iqID (Left err) = iqErrorID err |
|
iqID (Right iq') = iqResultID iq' |
|
|
|
|
|
-- | Creates and initializes a new Xmpp context. |
|
newContext :: IO Context |
|
newContext = do |
|
messageC <- newTChanIO |
|
presenceC <- newTChanIO |
|
outC <- newTChanIO |
|
stanzaC <- newTChanIO |
|
iqHandlers <- newTVarIO (Map.empty, Map.empty) |
|
eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () } |
|
let stanzaHandler = toChans messageC presenceC stanzaC iqHandlers |
|
(kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh |
|
writer <- forkIO $ writeWorker outC wLock |
|
workermCh <- newIORef $ Nothing |
|
workerpCh <- newIORef $ Nothing |
|
idRef <- newTVarIO 1 |
|
let getId = atomically $ do |
|
curId <- readTVar idRef |
|
writeTVar idRef (curId + 1 :: Integer) |
|
return . read. show $ curId |
|
let sess = Session { writeRef = wLock |
|
, readerThread = readerThread |
|
, idGenerator = getId |
|
, conStateRef = conState |
|
, eventHandlers = eh |
|
, stopThreads = kill >> killThread writer |
|
} |
|
return $ Context { session = sess |
|
, mShadow = messageC |
|
, pShadow = presenceC |
|
, sShadow = stanzaC |
|
, messagesRef = workermCh |
|
, presenceRef = workerpCh |
|
, outCh = outC |
|
, iqHandlers = iqHandlers |
|
} |
|
|
|
-- Worker to write stanzas to the stream concurrently. |
|
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO () |
|
writeWorker stCh writeR = forever $ do |
|
(write, next) <- atomically $ (,) <$> |
|
takeTMVar writeR <*> |
|
readTChan stCh |
|
r <- write $ renderElement (pickleElem xpStanza next) |
|
atomically $ putTMVar writeR write |
|
unless r $ do |
|
atomically $ unGetTChan stCh next -- If the writing failed, the |
|
-- connection is dead. |
|
threadDelay 250000 -- Avoid free spinning.
|
|
|