You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
241 lines
9.0 KiB
241 lines
9.0 KiB
|
14 years ago
|
{-# LANGUAGE ScopedTypeVariables #-}
|
||
|
14 years ago
|
{-# LANGUAGE OverloadedStrings #-}
|
||
|
14 years ago
|
module Network.Xmpp.Concurrent.Threads where
|
||
|
14 years ago
|
|
||
|
14 years ago
|
import Network.Xmpp.Types
|
||
|
14 years ago
|
|
||
|
|
import Control.Applicative((<$>),(<*>))
|
||
|
|
import Control.Concurrent
|
||
|
|
import Control.Concurrent.STM
|
||
|
|
import qualified Control.Exception.Lifted as Ex
|
||
|
|
import Control.Monad
|
||
|
|
import Control.Monad.IO.Class
|
||
|
14 years ago
|
import Control.Monad.Reader
|
||
|
|
import Control.Monad.State.Strict
|
||
|
14 years ago
|
|
||
|
|
import qualified Data.ByteString as BS
|
||
|
|
import Data.IORef
|
||
|
|
import qualified Data.Map as Map
|
||
|
|
import Data.Maybe
|
||
|
|
|
||
|
|
import Data.XML.Types
|
||
|
|
|
||
|
14 years ago
|
import Network.Xmpp.Monad
|
||
|
|
import Network.Xmpp.Marshal
|
||
|
|
import Network.Xmpp.Pickle
|
||
|
|
import Network.Xmpp.Concurrent.Types
|
||
|
14 years ago
|
|
||
|
|
import Text.XML.Stream.Elements
|
||
|
|
|
||
|
14 years ago
|
import GHC.IO (unsafeUnmask)
|
||
|
|
|
||
|
14 years ago
|
-- Worker to read stanzas from the stream and concurrently distribute them to
|
||
|
|
-- all listener threads.
|
||
|
14 years ago
|
readWorker :: TChan (Either MessageError Message)
|
||
|
|
-> TChan (Either PresenceError Presence)
|
||
|
14 years ago
|
-> TChan Stanza
|
||
|
14 years ago
|
-> TVar IQHandlers
|
||
|
14 years ago
|
-> TVar EventHandlers
|
||
|
14 years ago
|
-> TMVar XmppConnection
|
||
|
14 years ago
|
-> IO ()
|
||
|
14 years ago
|
readWorker messageC presenceC stanzaC iqHands handlers stateRef =
|
||
|
14 years ago
|
Ex.mask_ . forever $ do
|
||
|
14 years ago
|
res <- liftIO $ Ex.catches ( do
|
||
|
14 years ago
|
-- we don't know whether pull will
|
||
|
|
-- necessarily be interruptible
|
||
|
14 years ago
|
s <- liftIO . atomically $ do
|
||
|
|
sr <- readTMVar stateRef
|
||
|
|
when (sConnectionState sr == XmppConnectionClosed)
|
||
|
|
retry
|
||
|
|
return sr
|
||
|
14 years ago
|
allowInterrupt
|
||
|
14 years ago
|
Just . fst <$> runStateT pullStanza s
|
||
|
14 years ago
|
)
|
||
|
|
[ Ex.Handler $ \(Interrupt t) -> do
|
||
|
|
void $ handleInterrupts [t]
|
||
|
|
return Nothing
|
||
|
|
, Ex.Handler $ \e -> noCon handlers (e :: StreamError)
|
||
|
|
]
|
||
|
14 years ago
|
liftIO . atomically $ do
|
||
|
14 years ago
|
case res of
|
||
|
14 years ago
|
Nothing -> return ()
|
||
|
14 years ago
|
Just sta -> do
|
||
|
14 years ago
|
writeTChan stanzaC sta
|
||
|
|
void $ readTChan stanzaC -- sic
|
||
|
14 years ago
|
case sta of
|
||
|
|
MessageS m -> do writeTChan messageC $ Right m
|
||
|
|
_ <- readTChan messageC -- Sic!
|
||
|
|
return ()
|
||
|
|
-- this may seem ridiculous, but to prevent
|
||
|
14 years ago
|
-- the channel from filling up we
|
||
|
|
-- immedtiately remove the
|
||
|
14 years ago
|
-- Stanza we just put in. It will still be
|
||
|
|
-- available in duplicates.
|
||
|
|
MessageErrorS m -> do writeTChan messageC $ Left m
|
||
|
|
_ <- readTChan messageC
|
||
|
|
return ()
|
||
|
|
PresenceS p -> do
|
||
|
|
writeTChan presenceC $ Right p
|
||
|
|
_ <- readTChan presenceC
|
||
|
|
return ()
|
||
|
|
PresenceErrorS p -> do
|
||
|
|
writeTChan presenceC $ Left p
|
||
|
|
_ <- readTChan presenceC
|
||
|
|
return ()
|
||
|
|
|
||
|
14 years ago
|
IQRequestS i -> handleIQRequest iqHands i
|
||
|
|
IQResultS i -> handleIQResponse iqHands (Right i)
|
||
|
|
IQErrorS i -> handleIQResponse iqHands (Left i)
|
||
|
14 years ago
|
|
||
|
14 years ago
|
where
|
||
|
14 years ago
|
-- Defining an Control.Exception.allowInterrupt equivalent for GHC 7
|
||
|
|
-- compatibility.
|
||
|
14 years ago
|
allowInterrupt :: IO ()
|
||
|
|
allowInterrupt = unsafeUnmask $ return ()
|
||
|
14 years ago
|
-- Call the connection closed handlers.
|
||
|
14 years ago
|
noCon :: TVar EventHandlers -> StreamError -> IO (Maybe a)
|
||
|
|
noCon h e = do
|
||
|
|
hands <- atomically $ readTVar h
|
||
|
|
_ <- forkIO $ connectionClosedHandler hands e
|
||
|
|
return Nothing
|
||
|
14 years ago
|
-- While waiting for the first semaphore(s) to flip we might receive another
|
||
|
|
-- interrupt. When that happens we add it's semaphore to the list and retry
|
||
|
|
-- waiting. We do this because we might receive another interrupt while
|
||
|
14 years ago
|
-- recovering from the last one. We do this because we might receive another
|
||
|
|
-- interrupt while we're waiting for a mutex to unlock; if that happens, the
|
||
|
|
-- new interrupt is added to the list and is waited for as well.
|
||
|
14 years ago
|
handleInterrupts :: [TMVar ()] -> IO [()]
|
||
|
|
handleInterrupts ts =
|
||
|
|
Ex.catch (atomically $ forM ts takeTMVar)
|
||
|
14 years ago
|
(\(Interrupt t) -> handleInterrupts (t:ts))
|
||
|
14 years ago
|
|
||
|
14 years ago
|
-- If the IQ request has a namespace, sent it through the appropriate channel.
|
||
|
14 years ago
|
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
|
||
|
14 years ago
|
handleIQRequest handlers iq = do
|
||
|
|
(byNS, _) <- readTVar handlers
|
||
|
|
let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq)
|
||
|
|
case Map.lookup (iqRequestType iq, iqNS) byNS of
|
||
|
|
Nothing -> return () -- TODO: send error stanza
|
||
|
|
Just ch -> do
|
||
|
|
sent <- newTVar False
|
||
|
14 years ago
|
writeTChan ch $ IQRequestTicket sent iq
|
||
|
14 years ago
|
|
||
|
14 years ago
|
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM ()
|
||
|
14 years ago
|
handleIQResponse handlers iq = do
|
||
|
14 years ago
|
(byNS, byID) <- readTVar handlers
|
||
|
|
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of
|
||
|
|
(Nothing, _) -> return () -- We are not supposed to send an error.
|
||
|
|
(Just tmvar, byID') -> do
|
||
|
|
_ <- tryPutTMVar tmvar iq -- Don't block.
|
||
|
|
writeTVar handlers (byNS, byID')
|
||
|
|
where
|
||
|
|
iqID (Left err) = iqErrorID err
|
||
|
|
iqID (Right iq') = iqResultID iq'
|
||
|
14 years ago
|
|
||
|
14 years ago
|
-- Worker to write stanzas to the stream concurrently.
|
||
|
14 years ago
|
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO ()
|
||
|
14 years ago
|
writeWorker stCh writeR = forever $ do
|
||
|
14 years ago
|
(write, next) <- atomically $ (,) <$>
|
||
|
|
takeTMVar writeR <*>
|
||
|
|
readTChan stCh
|
||
|
|
r <- write $ renderElement (pickleElem xpStanza next)
|
||
|
|
unless r $ do -- If the writing failed, the connection is dead.
|
||
|
|
atomically $ unGetTChan stCh next
|
||
|
|
threadDelay 250000 -- Avoid free spinning.
|
||
|
|
atomically $ putTMVar writeR write -- Put it back.
|
||
|
|
|
||
|
|
-- Two streams: input and output. Threads read from input stream and write to
|
||
|
|
-- output stream.
|
||
|
|
-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing
|
||
|
|
-- stances, respectively, and an Action to stop the Threads and close the
|
||
|
|
-- connection.
|
||
|
|
startThreads :: IO ( TChan (Either MessageError Message)
|
||
|
|
, TChan (Either PresenceError Presence)
|
||
|
14 years ago
|
, TChan Stanza
|
||
|
14 years ago
|
, TVar IQHandlers
|
||
|
|
, TChan Stanza
|
||
|
|
, IO ()
|
||
|
|
, TMVar (BS.ByteString -> IO Bool)
|
||
|
|
, TMVar XmppConnection
|
||
|
|
, ThreadId
|
||
|
|
, TVar EventHandlers
|
||
|
|
)
|
||
|
14 years ago
|
startThreads = do
|
||
|
14 years ago
|
writeLock <- newTMVarIO (\_ -> return False)
|
||
|
|
messageC <- newTChanIO
|
||
|
|
presenceC <- newTChanIO
|
||
|
|
outC <- newTChanIO
|
||
|
14 years ago
|
stanzaC <- newTChanIO
|
||
|
14 years ago
|
handlers <- newTVarIO (Map.empty, Map.empty)
|
||
|
|
eh <- newTVarIO zeroEventHandlers
|
||
|
14 years ago
|
conS <- newTMVarIO xmppNoConnection
|
||
|
|
lw <- forkIO $ writeWorker outC writeLock
|
||
|
|
cp <- forkIO $ connPersist writeLock
|
||
|
14 years ago
|
rd <- forkIO $ readWorker messageC presenceC stanzaC handlers eh conS
|
||
|
14 years ago
|
return ( messageC
|
||
|
|
, presenceC
|
||
|
14 years ago
|
, stanzaC
|
||
|
14 years ago
|
, handlers
|
||
|
|
, outC
|
||
|
14 years ago
|
, killConnection writeLock [lw, rd, cp]
|
||
|
14 years ago
|
, writeLock
|
||
|
|
, conS
|
||
|
|
, rd
|
||
|
|
, eh)
|
||
|
14 years ago
|
where
|
||
|
14 years ago
|
killConnection writeLock threads = liftIO $ do
|
||
|
14 years ago
|
_ <- atomically $ takeTMVar writeLock -- Should we put it back?
|
||
|
|
_ <- forM threads killThread
|
||
|
14 years ago
|
return ()
|
||
|
14 years ago
|
zeroEventHandlers :: EventHandlers
|
||
|
|
zeroEventHandlers = EventHandlers
|
||
|
|
{ sessionEndHandler = return ()
|
||
|
|
, connectionClosedHandler = \_ -> return ()
|
||
|
|
}
|
||
|
14 years ago
|
|
||
|
14 years ago
|
-- | Creates and initializes a new Xmpp session.
|
||
|
14 years ago
|
newSession :: IO Session
|
||
|
|
newSession = do
|
||
|
14 years ago
|
(mC, pC, sC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads
|
||
|
14 years ago
|
workermCh <- newIORef $ Nothing
|
||
|
|
workerpCh <- newIORef $ Nothing
|
||
|
|
idRef <- newTVarIO 1
|
||
|
14 years ago
|
let getId = atomically $ do
|
||
|
|
curId <- readTVar idRef
|
||
|
|
writeTVar idRef (curId + 1 :: Integer)
|
||
|
14 years ago
|
return . read. show $ curId
|
||
|
14 years ago
|
return $ Session
|
||
|
|
mC
|
||
|
|
pC
|
||
|
14 years ago
|
sC
|
||
|
14 years ago
|
workermCh
|
||
|
|
workerpCh
|
||
|
14 years ago
|
outC
|
||
|
|
hand
|
||
|
|
writeR
|
||
|
|
rdr
|
||
|
|
getId
|
||
|
|
conS
|
||
|
|
eh
|
||
|
|
stopThreads'
|
||
|
|
|
||
|
14 years ago
|
-- | Creates a new session and runs the given Xmpp computation.
|
||
|
|
withNewSession :: Xmpp b -> IO (Session, b)
|
||
|
14 years ago
|
withNewSession a = do
|
||
|
14 years ago
|
sess <- newSession
|
||
|
|
ret <- runReaderT a sess
|
||
|
|
return (sess, ret)
|
||
|
14 years ago
|
|
||
|
14 years ago
|
-- | Runs the given Xmpp computation in the given session.
|
||
|
|
withSession :: Session -> Xmpp a -> IO a
|
||
|
14 years ago
|
withSession = flip runReaderT
|
||
|
14 years ago
|
|
||
|
14 years ago
|
-- Acquires the write lock, pushes a space, and releases the lock.
|
||
|
14 years ago
|
-- | Sends a blank space every 30 seconds to keep the connection alive.
|
||
|
|
connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO ()
|
||
|
14 years ago
|
connPersist lock = forever $ do
|
||
|
14 years ago
|
pushBS <- atomically $ takeTMVar lock
|
||
|
|
_ <- pushBS " "
|
||
|
|
atomically $ putTMVar lock pushBS
|
||
|
14 years ago
|
threadDelay 30000000
|