You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

132 lines
5.2 KiB

{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Network.Xmpp.Concurrent.Threads where
import Control.Applicative((<$>))
import Control.Concurrent
import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import Control.Monad
import Control.Monad.Error
import qualified Data.ByteString as BS
import GHC.IO (unsafeUnmask)
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Stream
import Network.Xmpp.Types
import System.Log.Logger
-- Worker to read stanzas from the stream and concurrently distribute them to
-- all listener threads.
readWorker :: (Stanza -> IO ())
-> (XmppFailure -> IO ())
-> TMVar Stream
-> IO a
readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do
s' <- Ex.catches ( do
-- we don't know whether pull will
-- necessarily be interruptible
atomically $ do
s@(Stream con) <- readTMVar stateRef
scs <- streamConnectionState <$> readTMVar con
when (stateIsClosed scs)
retry
return $ Just s
)
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
]
case s' of
Nothing -> return ()
Just s -> do
res <- Ex.catches (do
allowInterrupt
Just <$> pullStanza s
)
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
, Ex.Handler $ \(e :: XmppFailure) -> do
errorM "Pontarius.Xmpp" $ "Read error: "
++ show e
_ <- closeStreams s
onCClosed e
return Nothing
]
case res of
Nothing -> return () -- Caught an exception, nothing to
-- do. TODO: Can this happen?
Just (Left e) -> do
errorM "Pontarius.Xmpp" $ "Stanza error:" ++ show e
_ <- closeStreams s
onCClosed e
Just (Right sta) -> void $ onStanza sta
where
-- Defining an Control.Exception.allowInterrupt equivalent for GHC 7
-- compatibility.
allowInterrupt :: IO ()
allowInterrupt = unsafeUnmask $ return ()
-- While waiting for the first semaphore(s) to flip we might receive another
-- interrupt. When that happens we add it's semaphore to the list and retry
-- waiting. We do this because we might receive another
-- interrupt while we're waiting for a mutex to unlock; if that happens, the
-- new interrupt is added to the list and is waited for as well.
handleInterrupts :: [TMVar ()] -> IO [()]
handleInterrupts ts =
Ex.catch (atomically $ forM ts takeTMVar)
(\(Interrupt t) -> handleInterrupts (t:ts))
stateIsClosed Closed = True
stateIsClosed Finished = True
stateIsClosed _ = False
-- Two streams: input and output. Threads read from input stream and write to
-- output stream.
-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing
-- stances, respectively, and an Action to stop the Threads and close the
-- connection.
startThreadsWith :: TMVar (BS.ByteString -> IO Bool)
-> (Stanza -> IO ())
-> TVar EventHandlers
-> Stream
-> IO (Either XmppFailure (IO (),
TMVar (BS.ByteString -> IO Bool),
TMVar Stream,
ThreadId))
startThreadsWith writeSem stanzaHandler eh con = do
-- read' <- withStream' (gets $ streamSend . streamHandle) con
-- writeSem <- newTMVarIO read'
conS <- newTMVarIO con
cp <- forkIO $ connPersist writeSem
rdw <- forkIO $ readWorker stanzaHandler (noCon eh) conS
return $ Right ( killConnection [rdw, cp]
, writeSem
, conS
, rdw
)
where
killConnection threads = liftIO $ do
_ <- atomically $ do
_ <- takeTMVar writeSem
putTMVar writeSem $ \_ -> return False
_ <- forM threads killThread
return ()
-- Call the connection closed handlers.
noCon :: TVar EventHandlers -> XmppFailure -> IO ()
noCon h e = do
hands <- atomically $ readTVar h
_ <- forkIO $ connectionClosedHandler hands e
return ()
-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every 30 seconds to keep the connection alive.
connPersist :: TMVar (BS.ByteString -> IO Bool) -> IO ()
connPersist sem = forever $ do
pushBS <- atomically $ takeTMVar sem
_ <- pushBS " "
atomically $ putTMVar sem pushBS
threadDelay 30000000 -- 30s