You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

141 lines
5.9 KiB

{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE OverloadedStrings #-}
module Network.Xmpp.Concurrent.Channels
( module Network.Xmpp.Concurrent.Channels.Basic
, module Network.Xmpp.Concurrent.Channels.Types
, module Network.Xmpp.Concurrent.Channels.Message
, module Network.Xmpp.Concurrent.Channels.Presence
, module Network.Xmpp.Concurrent.Channels.IQ
, toChans
, newContext
, writeWorker
)
where
import Control.Applicative((<$>),(<*>))
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import qualified Data.ByteString as BS
import Data.IORef
import qualified Data.Map as Map
import Data.Maybe (fromMaybe)
import Data.XML.Types
import Network.Xmpp.Concurrent.Channels.Basic
import Network.Xmpp.Concurrent.Channels.IQ
import Network.Xmpp.Concurrent.Channels.Message
import Network.Xmpp.Concurrent.Channels.Presence
import Network.Xmpp.Concurrent.Channels.Types
import Network.Xmpp.Concurrent.Threads
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Marshal
import Network.Xmpp.Pickle
import Network.Xmpp.Types
import Text.XML.Stream.Elements
toChans :: TChan (Either MessageError Message)
-> TChan (Either PresenceError Presence)
-> TChan Stanza
-> TVar IQHandlers
-> Stanza
-> IO ()
toChans messageC presenceC stanzaC iqHands sta = atomically $ do
writeTChan stanzaC sta
void $ readTChan stanzaC -- sic
case sta of
MessageS m -> do writeTChan messageC $ Right m
_ <- readTChan messageC -- Sic!
return ()
-- this may seem ridiculous, but to prevent
-- the channel from filling up we
-- immedtiately remove the
-- Stanza we just put in. It will still be
-- available in duplicates.
MessageErrorS m -> do writeTChan messageC $ Left m
_ <- readTChan messageC
return ()
PresenceS p -> do
writeTChan presenceC $ Right p
_ <- readTChan presenceC
return ()
PresenceErrorS p -> do
writeTChan presenceC $ Left p
_ <- readTChan presenceC
return ()
IQRequestS i -> handleIQRequest iqHands i
IQResultS i -> handleIQResponse iqHands (Right i)
IQErrorS i -> handleIQResponse iqHands (Left i)
where
-- If the IQ request has a namespace, send it through the appropriate channel.
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
handleIQRequest handlers iq = do
(byNS, _) <- readTVar handlers
let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq)
case Map.lookup (iqRequestType iq, iqNS) byNS of
Nothing -> return () -- TODO: send error stanza
Just ch -> do
sent <- newTVar False
writeTChan ch $ IQRequestTicket sent iq
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM ()
handleIQResponse handlers iq = do
(byNS, byID) <- readTVar handlers
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of
(Nothing, _) -> return () -- We are not supposed to send an error.
(Just tmvar, byID') -> do
let answer = either IQResponseError IQResponseResult iq
_ <- tryPutTMVar tmvar answer -- Don't block.
writeTVar handlers (byNS, byID')
where
iqID (Left err) = iqErrorID err
iqID (Right iq') = iqResultID iq'
-- | Creates and initializes a new Xmpp context.
newContext :: IO Context
newContext = do
messageC <- newTChanIO
presenceC <- newTChanIO
outC <- newTChanIO
stanzaC <- newTChanIO
iqHandlers <- newTVarIO (Map.empty, Map.empty)
eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () }
let stanzaHandler = toChans messageC presenceC stanzaC iqHandlers
(kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh
writer <- forkIO $ writeWorker outC wLock
workermCh <- newIORef $ Nothing
workerpCh <- newIORef $ Nothing
idRef <- newTVarIO 1
let getId = atomically $ do
curId <- readTVar idRef
writeTVar idRef (curId + 1 :: Integer)
return . read. show $ curId
let sess = Session { writeRef = wLock
, readerThread = readerThread
, idGenerator = getId
, conStateRef = conState
, eventHandlers = eh
, stopThreads = kill >> killThread writer
}
return $ Context { session = sess
, mShadow = messageC
, pShadow = presenceC
, sShadow = stanzaC
, messagesRef = workermCh
, presenceRef = workerpCh
, outCh = outC
, iqHandlers = iqHandlers
}
-- Worker to write stanzas to the stream concurrently.
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO ()
writeWorker stCh writeR = forever $ do
(write, next) <- atomically $ (,) <$>
takeTMVar writeR <*>
readTChan stCh
r <- write $ renderElement (pickleElem xpStanza next)
atomically $ putTMVar writeR write
unless r $ do
atomically $ unGetTChan stCh next -- If the writing failed, the
-- connection is dead.
threadDelay 250000 -- Avoid free spinning.