Browse Source

allow read Worker to close streams on read failure and prevent it from stopping itself in this case

master
Philipp Balzarek 13 years ago
parent
commit
53b73bc905
  1. 73
      source/Network/Xmpp/Concurrent/Threads.hs

73
source/Network/Xmpp/Concurrent/Threads.hs

@ -23,37 +23,50 @@ import System.Log.Logger @@ -23,37 +23,50 @@ import System.Log.Logger
readWorker :: (Stanza -> IO ())
-> (XmppFailure -> IO ())
-> TMVar Stream
-> IO ()
readWorker onStanza onConnectionClosed stateRef = Ex.mask_ go
where
go = do
res <- Ex.catches ( do
-- we don't know whether pull will
-- necessarily be interruptible
s <- atomically $ do
-> IO a
readWorker onStanza onConnectionClosed stateRef = forever . Ex.mask_ $ do
s' <- Ex.catches ( do
-- we don't know whether pull will
-- necessarily be interruptible
atomically $ do
s@(Stream con) <- readTMVar stateRef
scs <- streamConnectionState <$> readTMVar con
when (scs == Closed)
when (stateIsClosed scs)
retry
return s
allowInterrupt
Just <$> pullStanza s
)
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
, Ex.Handler $ \(e :: XmppFailure) -> do
onConnectionClosed e
errorM "Pontarius.Xmpp" $ "Read error: " ++ show e
return Nothing
]
case res of
Nothing -> go -- Caught an exception, nothing to do. TODO: Can this happen?
Just (Left e) -> do
infoM "Pontarius.Xmpp.Reader" $
"Connection died: " ++ show e
onConnectionClosed e
Just (Right sta) -> onStanza sta >> go
return $ Just s
)
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
]
case s' of
Nothing -> return ()
Just s -> do
res <- Ex.catches (do
allowInterrupt
Just <$> pullStanza s
)
[ Ex.Handler $ \(Interrupt t) -> do
void $ handleInterrupts [t]
return Nothing
, Ex.Handler $ \(e :: XmppFailure) -> do
errorM "Pontarius.Xmpp" $ "Read error: "
++ show e
closeStreams s
onConnectionClosed e
return Nothing
]
case res of
Nothing -> return () -- Caught an exception, nothing to
-- do. TODO: Can this happen?
Just (Left e) -> do
errorM "Pontarius.Xmpp" $ "Stanza error:" ++ show e
closeStreams s
onConnectionClosed e
Just (Right sta) -> void $ onStanza sta
where
-- Defining an Control.Exception.allowInterrupt equivalent for GHC 7
-- compatibility.
allowInterrupt :: IO ()
@ -67,6 +80,10 @@ readWorker onStanza onConnectionClosed stateRef = Ex.mask_ go @@ -67,6 +80,10 @@ readWorker onStanza onConnectionClosed stateRef = Ex.mask_ go
handleInterrupts ts =
Ex.catch (atomically $ forM ts takeTMVar)
(\(Interrupt t) -> handleInterrupts (t:ts))
stateIsClosed Closed = True
stateIsClosed Finished = True
stateIsClosed _ = False
-- Two streams: input and output. Threads read from input stream and write to
-- output stream.

Loading…
Cancel
Save