From 4f785964026b576043cde32c1e49036a95a4fbd3 Mon Sep 17 00:00:00 2001
From: Philipp Balzarek
Date: Fri, 16 Nov 2012 13:50:48 +0100
Subject: [PATCH] split readWorker
---
source/Network/Xmpp/Concurrent/Threads.hs | 173 +++++++++-------------
1 file changed, 74 insertions(+), 99 deletions(-)
diff --git a/source/Network/Xmpp/Concurrent/Threads.hs b/source/Network/Xmpp/Concurrent/Threads.hs
index 8788996..074d455 100644
--- a/source/Network/Xmpp/Concurrent/Threads.hs
+++ b/source/Network/Xmpp/Concurrent/Threads.hs
@@ -31,19 +31,16 @@ import GHC.IO (unsafeUnmask)
-- Worker to read stanzas from the stream and concurrently distribute them to
-- all listener threads.
-readWorker :: TChan (Either MessageError Message)
- -> TChan (Either PresenceError Presence)
- -> TChan Stanza
- -> TVar IQHandlers
- -> TVar EventHandlers
+readWorker :: (Stanza -> IO ())
+ -> (StreamError -> IO ())
-> TMVar XmppConnection
- -> IO ()
-readWorker messageC presenceC stanzaC iqHands handlers stateRef =
+ -> IO a
+readWorker onStanza onConnectionClosed stateRef =
Ex.mask_ . forever $ do
- res <- liftIO $ Ex.catches ( do
+ res <- Ex.catches ( do
-- we don't know whether pull will
-- necessarily be interruptible
- s <- liftIO . atomically $ do
+ s <- atomically $ do
sr <- readTMVar stateRef
when (sConnectionState sr == XmppConnectionClosed)
retry
@@ -55,52 +52,17 @@ readWorker messageC presenceC stanzaC iqHands handlers stateRef =
void $ handleInterrupts [t]
return Nothing
, Ex.Handler $ \(e :: StreamError) -> do
- hands <- atomically $ readTVar handlers
- _ <- forkIO $ connectionClosedHandler hands e
+ onConnectionClosed e
return Nothing
]
- liftIO . atomically $ do
- case res of
- Nothing -> return ()
- Just sta -> do
- writeTChan stanzaC sta
- void $ readTChan stanzaC -- sic
- case sta of
- MessageS m -> do writeTChan messageC $ Right m
- _ <- readTChan messageC -- Sic!
- return ()
- -- this may seem ridiculous, but to prevent
- -- the channel from filling up we
- -- immedtiately remove the
- -- Stanza we just put in. It will still be
- -- available in duplicates.
- MessageErrorS m -> do writeTChan messageC $ Left m
- _ <- readTChan messageC
- return ()
- PresenceS p -> do
- writeTChan presenceC $ Right p
- _ <- readTChan presenceC
- return ()
- PresenceErrorS p -> do
- writeTChan presenceC $ Left p
- _ <- readTChan presenceC
- return ()
-
- IQRequestS i -> handleIQRequest iqHands i
- IQResultS i -> handleIQResponse iqHands (Right i)
- IQErrorS i -> handleIQResponse iqHands (Left i)
-
+ case res of
+ Nothing -> return () -- Caught an exception, nothing to do
+ Just sta -> onStanza sta
where
-- Defining an Control.Exception.allowInterrupt equivalent for GHC 7
-- compatibility.
allowInterrupt :: IO ()
allowInterrupt = unsafeUnmask $ return ()
- -- Call the connection closed handlers.
- noCon :: TVar EventHandlers -> StreamError -> IO (Maybe a)
- noCon h e = do
- hands <- atomically $ readTVar h
- _ <- forkIO $ connectionClosedHandler hands e
- return Nothing
-- While waiting for the first semaphore(s) to flip we might receive another
-- interrupt. When that happens we add it's semaphore to the list and retry
-- waiting. We do this because we might receive another
@@ -111,7 +73,7 @@ readWorker messageC presenceC stanzaC iqHands handlers stateRef =
Ex.catch (atomically $ forM ts takeTMVar)
(\(Interrupt t) -> handleInterrupts (t:ts))
--- If the IQ request has a namespace, sent it through the appropriate channel.
+-- If the IQ request has a namespace, send it through the appropriate channel.
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
handleIQRequest handlers iq = do
(byNS, _) <- readTVar handlers
@@ -148,61 +110,39 @@ writeWorker stCh writeR = forever $ do
-- connection is dead.
threadDelay 250000 -- Avoid free spinning.
-
-
-
-- Two streams: input and output. Threads read from input stream and write to
-- output stream.
-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing
-- stances, respectively, and an Action to stop the Threads and close the
-- connection.
-startThreads :: IO ( TChan (Either MessageError Message)
- , TChan (Either PresenceError Presence)
- , TChan Stanza
- , TVar IQHandlers
- , TChan Stanza
- , IO ()
- , TMVar (BS.ByteString -> IO Bool)
- , TMVar XmppConnection
- , ThreadId
- , TVar EventHandlers
- )
-startThreads = do
+startThreadsWith stanzaHandler outC eh = do
writeLock <- newTMVarIO (\_ -> return False)
- messageC <- newTChanIO
- presenceC <- newTChanIO
- outC <- newTChanIO
- stanzaC <- newTChanIO
- handlers <- newTVarIO (Map.empty, Map.empty)
- eh <- newTVarIO zeroEventHandlers
conS <- newTMVarIO xmppNoConnection
lw <- forkIO $ writeWorker outC writeLock
cp <- forkIO $ connPersist writeLock
- rd <- forkIO $ readWorker messageC presenceC stanzaC handlers eh conS
- return ( messageC
- , presenceC
- , stanzaC
- , handlers
- , outC
- , killConnection writeLock [lw, rd, cp]
+ rd <- forkIO $ readWorker stanzaHandler (noCon eh) conS
+ return ( killConnection writeLock [lw, rd, cp]
, writeLock
, conS
, rd
- , eh)
+ )
where
killConnection writeLock threads = liftIO $ do
_ <- atomically $ takeTMVar writeLock -- Should we put it back?
_ <- forM threads killThread
return ()
- zeroEventHandlers :: EventHandlers
- zeroEventHandlers = EventHandlers
- { connectionClosedHandler = \_ -> return ()
- }
-- | Creates and initializes a new concurrent session.
-newSession :: IO Session
-newSession = do
- (mC, pC, sC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads
+newSessionChans :: IO Session
+newSessionChans = do
+ messageC <- newTChanIO
+ presenceC <- newTChanIO
+ outC <- newTChanIO
+ stanzaC <- newTChanIO
+ iqHandlers <- newTVarIO (Map.empty, Map.empty)
+ eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () }
+ let stanzaHandler = toChans messageC presenceC stanzaC iqHandlers
+ (kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler outC eh
workermCh <- newIORef $ Nothing
workerpCh <- newIORef $ Nothing
idRef <- newTVarIO 1
@@ -210,20 +150,20 @@ newSession = do
curId <- readTVar idRef
writeTVar idRef (curId + 1 :: Integer)
return . read. show $ curId
- return $ Session
- mC
- pC
- sC
- workermCh
- workerpCh
- outC
- hand
- writeR
- rdr
- getId
- conS
- eh
- stopThreads'
+ return $ Session { mShadow = messageC
+ , pShadow = presenceC
+ , sShadow = stanzaC
+ , messagesRef = workermCh
+ , presenceRef = workerpCh
+ , outCh = outC
+ , iqHandlers = iqHandlers
+ , writeRef = wLock
+ , readerThread = readerThread
+ , idGenerator = getId
+ , conStateRef = conState
+ , eventHandlers = eh
+ , stopThreads = kill
+ }
-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every 30 seconds to keep the connection alive.
@@ -233,3 +173,38 @@ connPersist lock = forever $ do
_ <- pushBS " "
atomically $ putTMVar lock pushBS
threadDelay 30000000 -- 30s
+
+
+toChans messageC presenceC stanzaC iqHands sta = atomically $ do
+ writeTChan stanzaC sta
+ void $ readTChan stanzaC -- sic
+ case sta of
+ MessageS m -> do writeTChan messageC $ Right m
+ _ <- readTChan messageC -- Sic!
+ return ()
+ -- this may seem ridiculous, but to prevent
+ -- the channel from filling up we
+ -- immedtiately remove the
+ -- Stanza we just put in. It will still be
+ -- available in duplicates.
+ MessageErrorS m -> do writeTChan messageC $ Left m
+ _ <- readTChan messageC
+ return ()
+ PresenceS p -> do
+ writeTChan presenceC $ Right p
+ _ <- readTChan presenceC
+ return ()
+ PresenceErrorS p -> do
+ writeTChan presenceC $ Left p
+ _ <- readTChan presenceC
+ return ()
+ IQRequestS i -> handleIQRequest iqHands i
+ IQResultS i -> handleIQResponse iqHands (Right i)
+ IQErrorS i -> handleIQResponse iqHands (Left i)
+
+-- Call the connection closed handlers.
+noCon :: TVar EventHandlers -> StreamError -> IO ()
+noCon h e = do
+ hands <- atomically $ readTVar h
+ _ <- forkIO $ connectionClosedHandler hands e
+ return ()
\ No newline at end of file