|
|
|
|
@ -26,8 +26,6 @@ readWorker :: (Stanza -> IO ())
@@ -26,8 +26,6 @@ readWorker :: (Stanza -> IO ())
|
|
|
|
|
readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do |
|
|
|
|
|
|
|
|
|
s' <- Ex.catches ( do |
|
|
|
|
-- we don't know whether pull will |
|
|
|
|
-- necessarily be interruptible |
|
|
|
|
atomically $ do |
|
|
|
|
s@(Stream con) <- readTMVar stateRef |
|
|
|
|
scs <- streamConnectionState <$> readTMVar con |
|
|
|
|
@ -44,6 +42,8 @@ readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do
@@ -44,6 +42,8 @@ readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do
|
|
|
|
|
Nothing -> return () |
|
|
|
|
Just s -> do |
|
|
|
|
res <- Ex.catches (do |
|
|
|
|
-- we don't know whether pull will |
|
|
|
|
-- necessarily be interruptible |
|
|
|
|
allowInterrupt |
|
|
|
|
res <- pullStanza s |
|
|
|
|
case res of |
|
|
|
|
@ -70,9 +70,7 @@ readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do
@@ -70,9 +70,7 @@ readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do
|
|
|
|
|
allowInterrupt = unsafeUnmask $ return () |
|
|
|
|
-- While waiting for the first semaphore(s) to flip we might receive another |
|
|
|
|
-- interrupt. When that happens we add it's semaphore to the list and retry |
|
|
|
|
-- waiting. We do this because we might receive another |
|
|
|
|
-- interrupt while we're waiting for a mutex to unlock; if that happens, the |
|
|
|
|
-- new interrupt is added to the list and is waited for as well. |
|
|
|
|
-- waiting. |
|
|
|
|
handleInterrupts :: [TMVar ()] -> IO [()] |
|
|
|
|
handleInterrupts ts = |
|
|
|
|
Ex.catch (atomically $ forM ts takeTMVar) |
|
|
|
|
@ -81,9 +79,6 @@ readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do
@@ -81,9 +79,6 @@ readWorker onStanza onCClosed stateRef = forever . Ex.mask_ $ do
|
|
|
|
|
stateIsClosed Finished = True |
|
|
|
|
stateIsClosed _ = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- Two streams: input and output. Threads read from input stream and write to |
|
|
|
|
-- output stream. |
|
|
|
|
-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing |
|
|
|
|
-- stances, respectively, and an Action to stop the Threads and close the |
|
|
|
|
-- connection. |
|
|
|
|
|