diff --git a/source/Network/Xmpp/Concurrent/Threads.hs b/source/Network/Xmpp/Concurrent/Threads.hs index 8788996..074d455 100644 --- a/source/Network/Xmpp/Concurrent/Threads.hs +++ b/source/Network/Xmpp/Concurrent/Threads.hs @@ -31,19 +31,16 @@ import GHC.IO (unsafeUnmask) -- Worker to read stanzas from the stream and concurrently distribute them to -- all listener threads. -readWorker :: TChan (Either MessageError Message) - -> TChan (Either PresenceError Presence) - -> TChan Stanza - -> TVar IQHandlers - -> TVar EventHandlers +readWorker :: (Stanza -> IO ()) + -> (StreamError -> IO ()) -> TMVar XmppConnection - -> IO () -readWorker messageC presenceC stanzaC iqHands handlers stateRef = + -> IO a +readWorker onStanza onConnectionClosed stateRef = Ex.mask_ . forever $ do - res <- liftIO $ Ex.catches ( do + res <- Ex.catches ( do -- we don't know whether pull will -- necessarily be interruptible - s <- liftIO . atomically $ do + s <- atomically $ do sr <- readTMVar stateRef when (sConnectionState sr == XmppConnectionClosed) retry @@ -55,52 +52,17 @@ readWorker messageC presenceC stanzaC iqHands handlers stateRef = void $ handleInterrupts [t] return Nothing , Ex.Handler $ \(e :: StreamError) -> do - hands <- atomically $ readTVar handlers - _ <- forkIO $ connectionClosedHandler hands e + onConnectionClosed e return Nothing ] - liftIO . atomically $ do - case res of - Nothing -> return () - Just sta -> do - writeTChan stanzaC sta - void $ readTChan stanzaC -- sic - case sta of - MessageS m -> do writeTChan messageC $ Right m - _ <- readTChan messageC -- Sic! - return () - -- this may seem ridiculous, but to prevent - -- the channel from filling up we - -- immedtiately remove the - -- Stanza we just put in. It will still be - -- available in duplicates. - MessageErrorS m -> do writeTChan messageC $ Left m - _ <- readTChan messageC - return () - PresenceS p -> do - writeTChan presenceC $ Right p - _ <- readTChan presenceC - return () - PresenceErrorS p -> do - writeTChan presenceC $ Left p - _ <- readTChan presenceC - return () - - IQRequestS i -> handleIQRequest iqHands i - IQResultS i -> handleIQResponse iqHands (Right i) - IQErrorS i -> handleIQResponse iqHands (Left i) - + case res of + Nothing -> return () -- Caught an exception, nothing to do + Just sta -> onStanza sta where -- Defining an Control.Exception.allowInterrupt equivalent for GHC 7 -- compatibility. allowInterrupt :: IO () allowInterrupt = unsafeUnmask $ return () - -- Call the connection closed handlers. - noCon :: TVar EventHandlers -> StreamError -> IO (Maybe a) - noCon h e = do - hands <- atomically $ readTVar h - _ <- forkIO $ connectionClosedHandler hands e - return Nothing -- While waiting for the first semaphore(s) to flip we might receive another -- interrupt. When that happens we add it's semaphore to the list and retry -- waiting. We do this because we might receive another @@ -111,7 +73,7 @@ readWorker messageC presenceC stanzaC iqHands handlers stateRef = Ex.catch (atomically $ forM ts takeTMVar) (\(Interrupt t) -> handleInterrupts (t:ts)) --- If the IQ request has a namespace, sent it through the appropriate channel. +-- If the IQ request has a namespace, send it through the appropriate channel. handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () handleIQRequest handlers iq = do (byNS, _) <- readTVar handlers @@ -148,61 +110,39 @@ writeWorker stCh writeR = forever $ do -- connection is dead. threadDelay 250000 -- Avoid free spinning. - - - -- Two streams: input and output. Threads read from input stream and write to -- output stream. -- | Runs thread in XmppState monad. Returns channel of incoming and outgoing -- stances, respectively, and an Action to stop the Threads and close the -- connection. -startThreads :: IO ( TChan (Either MessageError Message) - , TChan (Either PresenceError Presence) - , TChan Stanza - , TVar IQHandlers - , TChan Stanza - , IO () - , TMVar (BS.ByteString -> IO Bool) - , TMVar XmppConnection - , ThreadId - , TVar EventHandlers - ) -startThreads = do +startThreadsWith stanzaHandler outC eh = do writeLock <- newTMVarIO (\_ -> return False) - messageC <- newTChanIO - presenceC <- newTChanIO - outC <- newTChanIO - stanzaC <- newTChanIO - handlers <- newTVarIO (Map.empty, Map.empty) - eh <- newTVarIO zeroEventHandlers conS <- newTMVarIO xmppNoConnection lw <- forkIO $ writeWorker outC writeLock cp <- forkIO $ connPersist writeLock - rd <- forkIO $ readWorker messageC presenceC stanzaC handlers eh conS - return ( messageC - , presenceC - , stanzaC - , handlers - , outC - , killConnection writeLock [lw, rd, cp] + rd <- forkIO $ readWorker stanzaHandler (noCon eh) conS + return ( killConnection writeLock [lw, rd, cp] , writeLock , conS , rd - , eh) + ) where killConnection writeLock threads = liftIO $ do _ <- atomically $ takeTMVar writeLock -- Should we put it back? _ <- forM threads killThread return () - zeroEventHandlers :: EventHandlers - zeroEventHandlers = EventHandlers - { connectionClosedHandler = \_ -> return () - } -- | Creates and initializes a new concurrent session. -newSession :: IO Session -newSession = do - (mC, pC, sC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads +newSessionChans :: IO Session +newSessionChans = do + messageC <- newTChanIO + presenceC <- newTChanIO + outC <- newTChanIO + stanzaC <- newTChanIO + iqHandlers <- newTVarIO (Map.empty, Map.empty) + eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () } + let stanzaHandler = toChans messageC presenceC stanzaC iqHandlers + (kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler outC eh workermCh <- newIORef $ Nothing workerpCh <- newIORef $ Nothing idRef <- newTVarIO 1 @@ -210,20 +150,20 @@ newSession = do curId <- readTVar idRef writeTVar idRef (curId + 1 :: Integer) return . read. show $ curId - return $ Session - mC - pC - sC - workermCh - workerpCh - outC - hand - writeR - rdr - getId - conS - eh - stopThreads' + return $ Session { mShadow = messageC + , pShadow = presenceC + , sShadow = stanzaC + , messagesRef = workermCh + , presenceRef = workerpCh + , outCh = outC + , iqHandlers = iqHandlers + , writeRef = wLock + , readerThread = readerThread + , idGenerator = getId + , conStateRef = conState + , eventHandlers = eh + , stopThreads = kill + } -- Acquires the write lock, pushes a space, and releases the lock. -- | Sends a blank space every 30 seconds to keep the connection alive. @@ -233,3 +173,38 @@ connPersist lock = forever $ do _ <- pushBS " " atomically $ putTMVar lock pushBS threadDelay 30000000 -- 30s + + +toChans messageC presenceC stanzaC iqHands sta = atomically $ do + writeTChan stanzaC sta + void $ readTChan stanzaC -- sic + case sta of + MessageS m -> do writeTChan messageC $ Right m + _ <- readTChan messageC -- Sic! + return () + -- this may seem ridiculous, but to prevent + -- the channel from filling up we + -- immedtiately remove the + -- Stanza we just put in. It will still be + -- available in duplicates. + MessageErrorS m -> do writeTChan messageC $ Left m + _ <- readTChan messageC + return () + PresenceS p -> do + writeTChan presenceC $ Right p + _ <- readTChan presenceC + return () + PresenceErrorS p -> do + writeTChan presenceC $ Left p + _ <- readTChan presenceC + return () + IQRequestS i -> handleIQRequest iqHands i + IQResultS i -> handleIQResponse iqHands (Right i) + IQErrorS i -> handleIQResponse iqHands (Left i) + +-- Call the connection closed handlers. +noCon :: TVar EventHandlers -> StreamError -> IO () +noCon h e = do + hands <- atomically $ readTVar h + _ <- forkIO $ connectionClosedHandler hands e + return () \ No newline at end of file