You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

181 lines
7.9 KiB

{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE OverloadedStrings #-}
module Network.Xmpp.Concurrent
( module Network.Xmpp.Concurrent.Monad
, module Network.Xmpp.Concurrent.Threads
, module Network.Xmpp.Concurrent.Basic
, module Network.Xmpp.Concurrent.Types
, module Network.Xmpp.Concurrent.Message
, module Network.Xmpp.Concurrent.Presence
, module Network.Xmpp.Concurrent.IQ
, StanzaHandler
, newSession
, writeWorker
, session
) where
import Control.Applicative((<$>),(<*>))
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import Control.Monad.Error
import qualified Data.ByteString as BS
import qualified Data.Map as Map
import Data.Maybe
import Data.Text as Text
import Data.XML.Types
import Network
import Network.Xmpp.Concurrent.Basic
import Network.Xmpp.Concurrent.IQ
import Network.Xmpp.Concurrent.Message
import Network.Xmpp.Concurrent.Monad
import Network.Xmpp.Concurrent.Presence
import Network.Xmpp.Concurrent.Threads
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.IM.Roster.Types
import Network.Xmpp.IM.Roster
import Network.Xmpp.Marshal
import Network.Xmpp.Sasl
import Network.Xmpp.Sasl.Types
import Network.Xmpp.Stream
import Network.Xmpp.Tls
import Network.Xmpp.Types
import Network.Xmpp.Utilities
runHandlers :: (TChan Stanza) -> [StanzaHandler] -> Stanza -> IO ()
runHandlers _ [] _ = return ()
runHandlers outC (h:hands) sta = do
res <- h outC sta
case res of
True -> runHandlers outC hands sta
False -> return ()
toChan :: TChan Stanza -> StanzaHandler
toChan stanzaC _ sta = do
atomically $ writeTChan stanzaC sta
return True
handleIQ :: TVar IQHandlers
-> StanzaHandler
handleIQ iqHands outC sta = atomically $ do
case sta of
IQRequestS i -> handleIQRequest iqHands i >> return False
IQResultS i -> handleIQResponse iqHands (Right i) >> return False
IQErrorS i -> handleIQResponse iqHands (Left i) >> return False
_ -> return True
where
-- If the IQ request has a namespace, send it through the appropriate channel.
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
handleIQRequest handlers iq = do
(byNS, _) <- readTVar handlers
let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq)
case Map.lookup (iqRequestType iq, iqNS) byNS of
Nothing -> writeTChan outC $ serviceUnavailable iq
Just ch -> do
sentRef <- newTVar False
let answerT answer = do
let IQRequest iqid from _to lang _tp bd = iq
response = case answer of
Left er -> IQErrorS $ IQError iqid Nothing
from lang er
(Just bd)
Right res -> IQResultS $ IQResult iqid Nothing
from lang res
atomically $ do
sent <- readTVar sentRef
case sent of
False -> do
writeTVar sentRef True
writeTChan outC response
return True
True -> return False
writeTChan ch $ IQRequestTicket answerT iq
serviceUnavailable (IQRequest iqid from _to lang _tp bd) =
IQErrorS $ IQError iqid Nothing from lang err (Just bd)
err = StanzaError Cancel ServiceUnavailable Nothing Nothing
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM ()
handleIQResponse handlers iq = do
(byNS, byID) <- readTVar handlers
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of
(Nothing, _) -> return () -- We are not supposed to send an error.
(Just tmvar, byID') -> do
let answer = either IQResponseError IQResponseResult iq
_ <- tryPutTMVar tmvar answer -- Don't block.
writeTVar handlers (byNS, byID')
where
iqID (Left err') = iqErrorID err'
iqID (Right iq') = iqResultID iq'
-- | Creates and initializes a new Xmpp context.
newSession :: Stream -> SessionConfiguration -> IO (Either XmppFailure Session)
newSession stream config = runErrorT $ do
outC <- lift newTChanIO
stanzaChan <- lift newTChanIO
iqHands <- lift $ newTVarIO (Map.empty, Map.empty)
eh <- lift $ newTVarIO $ EventHandlers { connectionClosedHandler = onConnectionClosed config }
ros <- liftIO . newTVarIO $ Roster Nothing Map.empty
let rosterH = if (enableRoster config) then handleRoster ros
else \ _ _ -> return True
let stanzaHandler = runHandlers outC $ Prelude.concat [ [ toChan stanzaChan ]
, extraStanzaHandlers
config
, [ handleIQ iqHands
, rosterH
]
]
(kill, wLock, streamState, reader) <- ErrorT $ startThreadsWith stanzaHandler eh stream
writer <- lift $ forkIO $ writeWorker outC wLock
idGen <- liftIO $ sessionStanzaIDs config
return $ Session { stanzaCh = stanzaChan
, outCh = outC
, iqHandlers = iqHands
, writeRef = wLock
, readerThread = reader
, idGenerator = idGen
, streamRef = streamState
, eventHandlers = eh
, stopThreads = kill >> killThread writer
, conf = config
, rosterRef = ros
}
-- Worker to write stanzas to the stream concurrently.
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO ()
writeWorker stCh writeR = forever $ do
(write, next) <- atomically $ (,) <$>
takeTMVar writeR <*>
readTChan stCh
let outData = renderElement $ nsHack (pickleElem xpStanza next)
debugOut outData
r <- write outData
atomically $ putTMVar writeR write
unless r $ do
atomically $ unGetTChan stCh next -- If the writing failed, the
-- connection is dead.
threadDelay 250000 -- Avoid free spinning.
-- | Creates a 'Session' object by setting up a connection with an XMPP server.
--
-- Will connect to the specified host with the provided configuration. If the
-- third parameter is a 'Just' value, @session@ will attempt to authenticate and
-- acquire an XMPP resource.
session :: HostName -- ^ The hostname / realm
-> Maybe ([SaslHandler], Maybe Text) -- ^ SASL handlers and the desired
-- JID resource (or Nothing to let
-- the server decide)
-> SessionConfiguration -- ^ configuration details
-> IO (Either XmppFailure Session)
session realm mbSasl config = runErrorT $ do
stream <- ErrorT $ openStream realm (sessionStreamConfiguration config)
ErrorT $ tls stream
mbAuthError <- case mbSasl of
Nothing -> return Nothing
Just (handlers, resource) -> ErrorT $ auth handlers resource stream
case mbAuthError of
Nothing -> return ()
Just _ -> throwError XmppAuthFailure
ses <- ErrorT $ newSession stream config
liftIO $ when (enableRoster config) $ initRoster ses
return ses