You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

247 lines
9.2 KiB

{-# LANGUAGE ScopedTypeVariables #-}
14 years ago
{-# LANGUAGE OverloadedStrings #-}
module Network.Xmpp.Concurrent.Threads where
14 years ago
import Network.Xmpp.Types
14 years ago
import Control.Applicative((<$>),(<*>))
import Control.Concurrent
import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.Reader
import Control.Monad.State.Strict
14 years ago
import qualified Data.ByteString as BS
import Data.IORef
import qualified Data.Map as Map
import Data.Maybe
import Data.XML.Types
import Network.Xmpp.Monad
import Network.Xmpp.Marshal
import Network.Xmpp.Pickle
import Network.Xmpp.Concurrent.Types
14 years ago
import Text.XML.Stream.Elements
import GHC.IO (unsafeUnmask)
-- Worker to read stanzas from the stream and concurrently distribute them to
-- all listener threads.
readWorker :: TChan (Either MessageError Message)
-> TChan (Either PresenceError Presence)
-> TChan Stanza
-> TVar IQHandlers
-> TVar EventHandlers
-> TMVar XmppConnection
-> IO ()
readWorker messageC presenceC stanzaC iqHands handlers stateRef =
Ex.mask_ . forever $ do
res <- liftIO $ Ex.catches ( do
-- we don't know whether pull will
-- necessarily be interruptible
s <- liftIO . atomically $ do
sr <- readTMVar stateRef
when (sConnectionState sr == XmppConnectionClosed)
retry
return sr
allowInterrupt
Just . fst <$> runStateT pullStanza s
)
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
, Ex.Handler $ \(e :: StreamError) -> do
hands <- atomically $ readTVar handlers
_ <- forkIO $ connectionClosedHandler hands e
return Nothing
]
liftIO . atomically $ do
case res of
Nothing -> return ()
Just sta -> do
writeTChan stanzaC sta
void $ readTChan stanzaC -- sic
case sta of
MessageS m -> do writeTChan messageC $ Right m
_ <- readTChan messageC -- Sic!
return ()
-- this may seem ridiculous, but to prevent
14 years ago
-- the channel from filling up we
-- immedtiately remove the
-- Stanza we just put in. It will still be
-- available in duplicates.
MessageErrorS m -> do writeTChan messageC $ Left m
_ <- readTChan messageC
return ()
PresenceS p -> do
writeTChan presenceC $ Right p
_ <- readTChan presenceC
return ()
PresenceErrorS p -> do
writeTChan presenceC $ Left p
_ <- readTChan presenceC
return ()
IQRequestS i -> handleIQRequest iqHands i
IQResultS i -> handleIQResponse iqHands (Right i)
IQErrorS i -> handleIQResponse iqHands (Left i)
where
-- Defining an Control.Exception.allowInterrupt equivalent for GHC 7
-- compatibility.
allowInterrupt :: IO ()
allowInterrupt = unsafeUnmask $ return ()
-- Call the connection closed handlers.
noCon :: TVar EventHandlers -> StreamError -> IO (Maybe a)
noCon h e = do
hands <- atomically $ readTVar h
_ <- forkIO $ connectionClosedHandler hands e
return Nothing
-- While waiting for the first semaphore(s) to flip we might receive another
-- interrupt. When that happens we add it's semaphore to the list and retry
13 years ago
-- waiting. We do this because we might receive another
-- interrupt while we're waiting for a mutex to unlock; if that happens, the
-- new interrupt is added to the list and is waited for as well.
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts ts =
Ex.catch (atomically $ forM ts takeTMVar)
(\(Interrupt t) -> handleInterrupts (t:ts))
14 years ago
-- If the IQ request has a namespace, sent it through the appropriate channel.
14 years ago
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
handleIQRequest handlers iq = do
(byNS, _) <- readTVar handlers
let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq)
case Map.lookup (iqRequestType iq, iqNS) byNS of
Nothing -> return () -- TODO: send error stanza
Just ch -> do
sent <- newTVar False
writeTChan ch $ IQRequestTicket sent iq
14 years ago
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM ()
handleIQResponse handlers iq = do
(byNS, byID) <- readTVar handlers
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of
(Nothing, _) -> return () -- We are not supposed to send an error.
(Just tmvar, byID') -> do
let answer = either IQResponseError IQResponseResult iq
_ <- tryPutTMVar tmvar answer -- Don't block.
writeTVar handlers (byNS, byID')
where
iqID (Left err) = iqErrorID err
iqID (Right iq') = iqResultID iq'
-- Worker to write stanzas to the stream concurrently.
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO ()
14 years ago
writeWorker stCh writeR = forever $ do
(write, next) <- atomically $ (,) <$>
takeTMVar writeR <*>
readTChan stCh
r <- write $ renderElement (pickleElem xpStanza next)
atomically $ putTMVar writeR write
unless r $ do
atomically $ unGetTChan stCh next -- If the writing failed, the
-- connection is dead.
threadDelay 250000 -- Avoid free spinning.
-- Two streams: input and output. Threads read from input stream and write to
-- output stream.
-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing
-- stances, respectively, and an Action to stop the Threads and close the
-- connection.
startThreads :: IO ( TChan (Either MessageError Message)
, TChan (Either PresenceError Presence)
, TChan Stanza
, TVar IQHandlers
, TChan Stanza
, IO ()
, TMVar (BS.ByteString -> IO Bool)
, TMVar XmppConnection
, ThreadId
, TVar EventHandlers
)
14 years ago
startThreads = do
writeLock <- newTMVarIO (\_ -> return False)
messageC <- newTChanIO
presenceC <- newTChanIO
outC <- newTChanIO
stanzaC <- newTChanIO
handlers <- newTVarIO (Map.empty, Map.empty)
eh <- newTVarIO zeroEventHandlers
conS <- newTMVarIO xmppNoConnection
lw <- forkIO $ writeWorker outC writeLock
cp <- forkIO $ connPersist writeLock
rd <- forkIO $ readWorker messageC presenceC stanzaC handlers eh conS
return ( messageC
, presenceC
, stanzaC
, handlers
, outC
, killConnection writeLock [lw, rd, cp]
, writeLock
, conS
, rd
, eh)
14 years ago
where
killConnection writeLock threads = liftIO $ do
14 years ago
_ <- atomically $ takeTMVar writeLock -- Should we put it back?
_ <- forM threads killThread
return ()
zeroEventHandlers :: EventHandlers
zeroEventHandlers = EventHandlers
{ connectionClosedHandler = \_ -> return ()
}
14 years ago
-- | Creates and initializes a new concurrent session.
newSession :: IO Session
newSession = do
(mC, pC, sC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads
workermCh <- newIORef $ Nothing
workerpCh <- newIORef $ Nothing
idRef <- newTVarIO 1
14 years ago
let getId = atomically $ do
curId <- readTVar idRef
writeTVar idRef (curId + 1 :: Integer)
return . read. show $ curId
return $ Session
mC
pC
sC
workermCh
workerpCh
outC
hand
writeR
rdr
getId
conS
eh
stopThreads'
-- | Creates a new session and runs the given Xmpp computation.
withNewSession :: Xmpp b -> IO (Session, b)
withNewSession a = do
sess <- newSession
ret <- runReaderT a sess
return (sess, ret)
-- | Runs the given Xmpp computation in the given session.
withSession :: Session -> Xmpp a -> IO a
withSession = flip runReaderT
14 years ago
-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every 30 seconds to keep the connection alive.
connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO ()
14 years ago
connPersist lock = forever $ do
pushBS <- atomically $ takeTMVar lock
_ <- pushBS " "
atomically $ putTMVar lock pushBS
threadDelay 30000000 -- 30s