@ -10,17 +10,14 @@ module Network.Xmpp.Concurrent
, module Network.Xmpp.Concurrent.IQ
, module Network.Xmpp.Concurrent.IQ
, StanzaHandler
, StanzaHandler
, newSession
, newSession
, writeWorker
, session
, session
, newStanzaID
, newStanzaID
) where
) where
import Control.Applicative ( ( <$> ) , ( <*> ) )
import Control.Concurrent
import Control.Concurrent.STM
import Control.Concurrent.STM
import Control.Monad
import Control.Monad
import Control.Monad.Error
import Control.Monad.Error
import qualified Data.ByteString as BS
import qualified Control.Exception as Ex
import qualified Data.Map as Map
import qualified Data.Map as Map
import Data.Maybe
import Data.Maybe
import Data.Text as Text
import Data.Text as Text
@ -35,22 +32,20 @@ import Network.Xmpp.Concurrent.Threads
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.IM.Roster.Types
import Network.Xmpp.IM.Roster.Types
import Network.Xmpp.IM.Roster
import Network.Xmpp.IM.Roster
import Network.Xmpp.Marshal
import Network.Xmpp.Sasl
import Network.Xmpp.Sasl
import Network.Xmpp.Sasl.Types
import Network.Xmpp.Sasl.Types
import Network.Xmpp.Stream
import Network.Xmpp.Stream
import Network.Xmpp.Tls
import Network.Xmpp.Tls
import Network.Xmpp.Types
import Network.Xmpp.Types
import Network.Xmpp.Utilities
import Control.Monad.State.Strict
import Control.Monad.State.Strict
runHandlers :: ( TChan Stanza ) -> [ StanzaHandler ] -> Stanza -> IO ()
runHandlers :: WriteSemaphore -> [ StanzaHandler ] -> Stanza -> IO ()
runHandlers _ [] _ = return ()
runHandlers _ [] _ = return ()
runHandlers outC ( h : hands ) sta = do
runHandlers sem ( h : hands ) sta = do
res <- h outC sta
res <- h sem sta
case res of
case res of
True -> runHandlers outC hands sta
True -> runHandlers sem hands sta
False -> return ()
False -> return ()
toChan :: TChan Stanza -> StanzaHandler
toChan :: TChan Stanza -> StanzaHandler
@ -61,7 +56,7 @@ toChan stanzaC _ sta = do
handleIQ :: TVar IQHandlers
handleIQ :: TVar IQHandlers
-> StanzaHandler
-> StanzaHandler
handleIQ iqHands outC sta = atomically $ do
handleIQ iqHands writeSem sta = do
case sta of
case sta of
IQRequestS i -> handleIQRequest iqHands i >> return False
IQRequestS i -> handleIQRequest iqHands i >> return False
IQResultS i -> handleIQResponse iqHands ( Right i ) >> return False
IQResultS i -> handleIQResponse iqHands ( Right i ) >> return False
@ -69,14 +64,16 @@ handleIQ iqHands outC sta = atomically $ do
_ -> return True
_ -> return True
where
where
-- If the IQ request has a namespace, send it through the appropriate channel.
-- If the IQ request has a namespace, send it through the appropriate channel.
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
handleIQRequest :: TVar IQHandlers -> IQRequest -> IO ()
handleIQRequest handlers iq = do
handleIQRequest handlers iq = do
out <- atomically $ do
( byNS , _ ) <- readTVar handlers
( byNS , _ ) <- readTVar handlers
let iqNS = fromMaybe " " ( nameNamespace . elementName $ iqRequestPayload iq )
let iqNS = fromMaybe " " ( nameNamespace . elementName
$ iqRequestPayload iq )
case Map . lookup ( iqRequestType iq , iqNS ) byNS of
case Map . lookup ( iqRequestType iq , iqNS ) byNS of
Nothing -> writeTChan outC $ serviceUnavailable iq
Nothing -> return . Just $ serviceUnavailable iq
Just ch -> do
Just ch -> do
sentRef <- newTVar False
sentRef <- newTM Var False
let answerT answer = do
let answerT answer = do
let IQRequest iqid from _to lang _tp bd = iq
let IQRequest iqid from _to lang _tp bd = iq
response = case answer of
response = case answer of
@ -85,21 +82,31 @@ handleIQ iqHands outC sta = atomically $ do
( Just bd )
( Just bd )
Right res -> IQResultS $ IQResult iqid Nothing
Right res -> IQResultS $ IQResult iqid Nothing
from lang res
from lang res
atomically $ do
Ex . bracketOnError ( atomically $ takeTMVar sentRef )
sent <- readTVar sentRef
( atomically . putTMVar sentRef )
case sent of
$ \ wasSent -> do
case wasSent of
True -> do
atomically $ putTMVar sentRef True
return Nothing
False -> do
False -> do
writeTVar sentRef True
didSend <- writeStanza writeSem response
writeTChan outC response
case didSend of
return True
True -> do
True -> return False
atomically $ putTMVar sentRef True
return $ Just True
False -> do
atomically $ putTMVar sentRef False
return $ Just False
writeTChan ch $ IQRequestTicket answerT iq
writeTChan ch $ IQRequestTicket answerT iq
return Nothing
maybe ( return () ) ( void . writeStanza writeSem ) out
serviceUnavailable ( IQRequest iqid from _to lang _tp bd ) =
serviceUnavailable ( IQRequest iqid from _to lang _tp bd ) =
IQErrorS $ IQError iqid Nothing from lang err ( Just bd )
IQErrorS $ IQError iqid Nothing from lang err ( Just bd )
err = StanzaError Cancel ServiceUnavailable Nothing Nothing
err = StanzaError Cancel ServiceUnavailable Nothing Nothing
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM ()
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> IO ()
handleIQResponse handlers iq = do
handleIQResponse handlers iq = atomically $ do
( byNS , byID ) <- readTVar handlers
( byNS , byID ) <- readTVar handlers
case Map . updateLookupWithKey ( \ _ _ -> Nothing ) ( iqID iq ) byID of
case Map . updateLookupWithKey ( \ _ _ -> Nothing ) ( iqID iq ) byID of
( Nothing , _ ) -> return () -- We are not supposed to send an error.
( Nothing , _ ) -> return () -- We are not supposed to send an error.
@ -114,51 +121,36 @@ handleIQ iqHands outC sta = atomically $ do
-- | Creates and initializes a new Xmpp context.
-- | Creates and initializes a new Xmpp context.
newSession :: Stream -> SessionConfiguration -> IO ( Either XmppFailure Session )
newSession :: Stream -> SessionConfiguration -> IO ( Either XmppFailure Session )
newSession stream config = runErrorT $ do
newSession stream config = runErrorT $ do
outC <- lift newTChanIO
write' <- liftIO $ withStream' ( gets $ streamSend . streamHandle ) stream
writeSem <- liftIO $ newTMVarIO write'
stanzaChan <- lift newTChanIO
stanzaChan <- lift newTChanIO
iqHands <- lift $ newTVarIO ( Map . empty , Map . empty )
iqHands <- lift $ newTVarIO ( Map . empty , Map . empty )
eh <- lift $ newTVarIO $ EventHandlers { connectionClosedHandler = onConnectionClosed config }
eh <- lift $ newTVarIO $ EventHandlers { connectionClosedHandler = onConnectionClosed config }
ros <- liftIO . newTVarIO $ Roster Nothing Map . empty
ros <- liftIO . newTVarIO $ Roster Nothing Map . empty
let rosterH = if ( enableRoster config ) then handleRoster ros
let rosterH = if ( enableRoster config ) then handleRoster ros
else \ _ _ -> return True
else \ _ _ -> return True
let stanzaHandler = runHandlers outC $ Prelude . concat [ [ toChan stanzaChan ]
let stanzaHandler = runHandlers writeSem
$ Prelude . concat [ [ toChan stanzaChan ]
, extraStanzaHandlers
, extraStanzaHandlers
config
config
, [ handleIQ iqHands
, [ handleIQ iqHands
, rosterH
, rosterH
]
]
]
]
( kill , wLock , streamState , reader ) <- ErrorT $ startThreadsWith stanzaHandler eh stream
( kill , wLock , streamState , reader ) <- ErrorT $ startThreadsWith writeSem stanzaHandler eh stream
writer <- lift $ forkIO $ writeWorker outC wLock
idGen <- liftIO $ sessionStanzaIDs config
idGen <- liftIO $ sessionStanzaIDs config
return $ Session { stanzaCh = stanzaChan
return $ Session { stanzaCh = stanzaChan
, outCh = outC
, iqHandlers = iqHands
, iqHandlers = iqHands
, writeRef = wLock
, writeSemaphore = wLock
, readerThread = reader
, readerThread = reader
, idGenerator = idGen
, idGenerator = idGen
, streamRef = streamState
, streamRef = streamState
, eventHandlers = eh
, eventHandlers = eh
, stopThreads = kill >> killThread writer
, stopThreads = kill
, conf = config
, conf = config
, rosterRef = ros
, rosterRef = ros
}
}
-- Worker to write stanzas to the stream concurrently.
writeWorker :: TChan Stanza -> TMVar ( BS . ByteString -> IO Bool ) -> IO ()
writeWorker stCh writeR = forever $ do
( write , next ) <- atomically $ ( , ) <$>
takeTMVar writeR <*>
readTChan stCh
let outData = renderElement $ nsHack ( pickleElem xpStanza next )
debugOut outData
r <- write outData
atomically $ putTMVar writeR write
unless r $ do
atomically $ unGetTChan stCh next -- If the writing failed, the
-- connection is dead.
threadDelay 250000 -- Avoid free spinning.
-- | Creates a 'Session' object by setting up a connection with an XMPP server.
-- | Creates a 'Session' object by setting up a connection with an XMPP server.
--
--
-- Will connect to the specified host with the provided configuration. If the
-- Will connect to the specified host with the provided configuration. If the
@ -186,4 +178,3 @@ session realm mbSasl config = runErrorT $ do
newStanzaID :: Session -> IO StanzaID
newStanzaID :: Session -> IO StanzaID
newStanzaID = idGenerator
newStanzaID = idGenerator