You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

113 lines
4.3 KiB

{-# OPTIONS_HADDOCK hide #-}
14 years ago
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.Xmpp.Concurrent.Threads where
14 years ago
import Network.Xmpp.Types
14 years ago
import Control.Applicative((<$>))
import Control.Concurrent
import Control.Concurrent.STM
14 years ago
import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.IO.Class
import Control.Monad.State.Strict
14 years ago
import qualified Data.ByteString as BS
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Connection
14 years ago
import GHC.IO (unsafeUnmask)
-- Worker to read stanzas from the stream and concurrently distribute them to
-- all listener threads.
13 years ago
readWorker :: (Stanza -> IO ())
-> (StreamError -> IO ())
-> TMVar Connection
13 years ago
-> IO a
readWorker onStanza onConnectionClosed stateRef =
Ex.mask_ . forever $ do
13 years ago
res <- Ex.catches ( do
-- we don't know whether pull will
-- necessarily be interruptible
13 years ago
s <- atomically $ do
con@(Connection con_) <- readTMVar stateRef
state <- sConnectionState <$> readTMVar con_
when (state == XmppConnectionClosed)
retry
return con
allowInterrupt
Just <$> pullStanza s
)
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
, Ex.Handler $ \(e :: StreamError) -> do
13 years ago
onConnectionClosed e
return Nothing
]
13 years ago
case res of
Nothing -> return () -- Caught an exception, nothing to do
Just sta -> onStanza sta
where
-- Defining an Control.Exception.allowInterrupt equivalent for GHC 7
-- compatibility.
allowInterrupt :: IO ()
allowInterrupt = unsafeUnmask $ return ()
-- While waiting for the first semaphore(s) to flip we might receive another
-- interrupt. When that happens we add it's semaphore to the list and retry
13 years ago
-- waiting. We do this because we might receive another
-- interrupt while we're waiting for a mutex to unlock; if that happens, the
-- new interrupt is added to the list and is waited for as well.
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts ts =
Ex.catch (atomically $ forM ts takeTMVar)
(\(Interrupt t) -> handleInterrupts (t:ts))
14 years ago
-- Two streams: input and output. Threads read from input stream and write to
-- output stream.
-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing
-- stances, respectively, and an Action to stop the Threads and close the
-- connection.
startThreadsWith :: (Stanza -> IO ())
-> TVar EventHandlers
-> Connection
-> IO
(IO (),
TMVar (BS.ByteString -> IO Bool),
TMVar Connection,
ThreadId)
startThreadsWith stanzaHandler eh con = do
read <- withConnection' (gets $ cSend. cHand) con
writeLock <- newTMVarIO read
conS <- newTMVarIO con
-- lw <- forkIO $ writeWorker outC writeLock
cp <- forkIO $ connPersist writeLock
13 years ago
rd <- forkIO $ readWorker stanzaHandler (noCon eh) conS
return ( killConnection writeLock [rd, cp]
, writeLock
, conS
, rd
13 years ago
)
14 years ago
where
killConnection writeLock threads = liftIO $ do
14 years ago
_ <- atomically $ takeTMVar writeLock -- Should we put it back?
_ <- forM threads killThread
return ()
-- Call the connection closed handlers.
noCon :: TVar EventHandlers -> StreamError -> IO ()
noCon h e = do
hands <- atomically $ readTVar h
_ <- forkIO $ connectionClosedHandler hands e
return ()
-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every 30 seconds to keep the connection alive.
connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO ()
14 years ago
connPersist lock = forever $ do
pushBS <- atomically $ takeTMVar lock
_ <- pushBS " "
atomically $ putTMVar lock pushBS
threadDelay 30000000 -- 30s