|
|
|
@ -29,6 +29,13 @@ import Text.XML.Stream.Elements |
|
|
|
|
|
|
|
|
|
|
|
import GHC.IO (unsafeUnmask) |
|
|
|
import GHC.IO (unsafeUnmask) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- While waiting for the first semaphore(s) to flip we might receive |
|
|
|
|
|
|
|
-- another interrupt. When that happens we add it's semaphore to the |
|
|
|
|
|
|
|
-- list and retry waiting |
|
|
|
|
|
|
|
handleInterrupts ts = |
|
|
|
|
|
|
|
Ex.catch (atomically $ forM ts takeTMVar) |
|
|
|
|
|
|
|
( \(Interrupt t) -> handleInterrupts (t:ts)) |
|
|
|
|
|
|
|
|
|
|
|
readWorker :: TChan (Either MessageError Message) |
|
|
|
readWorker :: TChan (Either MessageError Message) |
|
|
|
-> TChan (Either PresenceError Presence) |
|
|
|
-> TChan (Either PresenceError Presence) |
|
|
|
-> TVar IQHandlers |
|
|
|
-> TVar IQHandlers |
|
|
|
@ -36,22 +43,25 @@ readWorker :: TChan (Either MessageError Message) |
|
|
|
-> IO () |
|
|
|
-> IO () |
|
|
|
readWorker messageC presenceC handlers stateRef = |
|
|
|
readWorker messageC presenceC handlers stateRef = |
|
|
|
Ex.mask_ . forever $ do |
|
|
|
Ex.mask_ . forever $ do |
|
|
|
s <- liftIO . atomically $ takeTMVar stateRef |
|
|
|
res <- liftIO $ Ex.catch ( |
|
|
|
(sta', s') <- flip runStateT s $ Ex.catch ( do |
|
|
|
Ex.bracket |
|
|
|
-- we don't know whether pull will necessarily be interruptible |
|
|
|
(atomically $ takeTMVar stateRef) |
|
|
|
liftIO $ allowInterrupt |
|
|
|
(atomically . putTMVar stateRef ) |
|
|
|
Just <$> pull |
|
|
|
(\s -> do |
|
|
|
|
|
|
|
-- we don't know whether pull will |
|
|
|
|
|
|
|
-- necessarily be interruptible |
|
|
|
|
|
|
|
allowInterrupt |
|
|
|
|
|
|
|
Just <$> runStateT pull s |
|
|
|
|
|
|
|
) |
|
|
|
) |
|
|
|
) |
|
|
|
(\(Interrupt t) -> do |
|
|
|
(\(Interrupt t) -> do |
|
|
|
liftIO . atomically $ |
|
|
|
handleInterrupts [t] |
|
|
|
putTMVar stateRef s |
|
|
|
|
|
|
|
liftIO . atomically $ takeTMVar t |
|
|
|
|
|
|
|
return Nothing |
|
|
|
return Nothing |
|
|
|
) |
|
|
|
) |
|
|
|
liftIO . atomically $ do |
|
|
|
liftIO . atomically $ do |
|
|
|
case sta' of |
|
|
|
case res of |
|
|
|
Nothing -> return () |
|
|
|
Nothing -> return () |
|
|
|
Just sta -> do |
|
|
|
Just (sta, s') -> do |
|
|
|
putTMVar stateRef s' |
|
|
|
putTMVar stateRef s' |
|
|
|
case sta of |
|
|
|
case sta of |
|
|
|
MessageS m -> do writeTChan messageC $ Right m |
|
|
|
MessageS m -> do writeTChan messageC $ Right m |
|
|
|
|