Browse Source

split readWorker

master
Philipp Balzarek 13 years ago
parent
commit
4f78596402
  1. 171
      source/Network/Xmpp/Concurrent/Threads.hs

171
source/Network/Xmpp/Concurrent/Threads.hs

@ -31,19 +31,16 @@ import GHC.IO (unsafeUnmask) @@ -31,19 +31,16 @@ import GHC.IO (unsafeUnmask)
-- Worker to read stanzas from the stream and concurrently distribute them to
-- all listener threads.
readWorker :: TChan (Either MessageError Message)
-> TChan (Either PresenceError Presence)
-> TChan Stanza
-> TVar IQHandlers
-> TVar EventHandlers
readWorker :: (Stanza -> IO ())
-> (StreamError -> IO ())
-> TMVar XmppConnection
-> IO ()
readWorker messageC presenceC stanzaC iqHands handlers stateRef =
-> IO a
readWorker onStanza onConnectionClosed stateRef =
Ex.mask_ . forever $ do
res <- liftIO $ Ex.catches ( do
res <- Ex.catches ( do
-- we don't know whether pull will
-- necessarily be interruptible
s <- liftIO . atomically $ do
s <- atomically $ do
sr <- readTMVar stateRef
when (sConnectionState sr == XmppConnectionClosed)
retry
@ -55,52 +52,17 @@ readWorker messageC presenceC stanzaC iqHands handlers stateRef = @@ -55,52 +52,17 @@ readWorker messageC presenceC stanzaC iqHands handlers stateRef =
void $ handleInterrupts [t]
return Nothing
, Ex.Handler $ \(e :: StreamError) -> do
hands <- atomically $ readTVar handlers
_ <- forkIO $ connectionClosedHandler hands e
onConnectionClosed e
return Nothing
]
liftIO . atomically $ do
case res of
Nothing -> return ()
Just sta -> do
writeTChan stanzaC sta
void $ readTChan stanzaC -- sic
case sta of
MessageS m -> do writeTChan messageC $ Right m
_ <- readTChan messageC -- Sic!
return ()
-- this may seem ridiculous, but to prevent
-- the channel from filling up we
-- immedtiately remove the
-- Stanza we just put in. It will still be
-- available in duplicates.
MessageErrorS m -> do writeTChan messageC $ Left m
_ <- readTChan messageC
return ()
PresenceS p -> do
writeTChan presenceC $ Right p
_ <- readTChan presenceC
return ()
PresenceErrorS p -> do
writeTChan presenceC $ Left p
_ <- readTChan presenceC
return ()
IQRequestS i -> handleIQRequest iqHands i
IQResultS i -> handleIQResponse iqHands (Right i)
IQErrorS i -> handleIQResponse iqHands (Left i)
Nothing -> return () -- Caught an exception, nothing to do
Just sta -> onStanza sta
where
-- Defining an Control.Exception.allowInterrupt equivalent for GHC 7
-- compatibility.
allowInterrupt :: IO ()
allowInterrupt = unsafeUnmask $ return ()
-- Call the connection closed handlers.
noCon :: TVar EventHandlers -> StreamError -> IO (Maybe a)
noCon h e = do
hands <- atomically $ readTVar h
_ <- forkIO $ connectionClosedHandler hands e
return Nothing
-- While waiting for the first semaphore(s) to flip we might receive another
-- interrupt. When that happens we add it's semaphore to the list and retry
-- waiting. We do this because we might receive another
@ -111,7 +73,7 @@ readWorker messageC presenceC stanzaC iqHands handlers stateRef = @@ -111,7 +73,7 @@ readWorker messageC presenceC stanzaC iqHands handlers stateRef =
Ex.catch (atomically $ forM ts takeTMVar)
(\(Interrupt t) -> handleInterrupts (t:ts))
-- If the IQ request has a namespace, sent it through the appropriate channel.
-- If the IQ request has a namespace, send it through the appropriate channel.
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
handleIQRequest handlers iq = do
(byNS, _) <- readTVar handlers
@ -148,61 +110,39 @@ writeWorker stCh writeR = forever $ do @@ -148,61 +110,39 @@ writeWorker stCh writeR = forever $ do
-- connection is dead.
threadDelay 250000 -- Avoid free spinning.
-- Two streams: input and output. Threads read from input stream and write to
-- output stream.
-- | Runs thread in XmppState monad. Returns channel of incoming and outgoing
-- stances, respectively, and an Action to stop the Threads and close the
-- connection.
startThreads :: IO ( TChan (Either MessageError Message)
, TChan (Either PresenceError Presence)
, TChan Stanza
, TVar IQHandlers
, TChan Stanza
, IO ()
, TMVar (BS.ByteString -> IO Bool)
, TMVar XmppConnection
, ThreadId
, TVar EventHandlers
)
startThreads = do
startThreadsWith stanzaHandler outC eh = do
writeLock <- newTMVarIO (\_ -> return False)
messageC <- newTChanIO
presenceC <- newTChanIO
outC <- newTChanIO
stanzaC <- newTChanIO
handlers <- newTVarIO (Map.empty, Map.empty)
eh <- newTVarIO zeroEventHandlers
conS <- newTMVarIO xmppNoConnection
lw <- forkIO $ writeWorker outC writeLock
cp <- forkIO $ connPersist writeLock
rd <- forkIO $ readWorker messageC presenceC stanzaC handlers eh conS
return ( messageC
, presenceC
, stanzaC
, handlers
, outC
, killConnection writeLock [lw, rd, cp]
rd <- forkIO $ readWorker stanzaHandler (noCon eh) conS
return ( killConnection writeLock [lw, rd, cp]
, writeLock
, conS
, rd
, eh)
)
where
killConnection writeLock threads = liftIO $ do
_ <- atomically $ takeTMVar writeLock -- Should we put it back?
_ <- forM threads killThread
return ()
zeroEventHandlers :: EventHandlers
zeroEventHandlers = EventHandlers
{ connectionClosedHandler = \_ -> return ()
}
-- | Creates and initializes a new concurrent session.
newSession :: IO Session
newSession = do
(mC, pC, sC, hand, outC, stopThreads', writeR, conS, rdr, eh) <- startThreads
newSessionChans :: IO Session
newSessionChans = do
messageC <- newTChanIO
presenceC <- newTChanIO
outC <- newTChanIO
stanzaC <- newTChanIO
iqHandlers <- newTVarIO (Map.empty, Map.empty)
eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () }
let stanzaHandler = toChans messageC presenceC stanzaC iqHandlers
(kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler outC eh
workermCh <- newIORef $ Nothing
workerpCh <- newIORef $ Nothing
idRef <- newTVarIO 1
@ -210,20 +150,20 @@ newSession = do @@ -210,20 +150,20 @@ newSession = do
curId <- readTVar idRef
writeTVar idRef (curId + 1 :: Integer)
return . read. show $ curId
return $ Session
mC
pC
sC
workermCh
workerpCh
outC
hand
writeR
rdr
getId
conS
eh
stopThreads'
return $ Session { mShadow = messageC
, pShadow = presenceC
, sShadow = stanzaC
, messagesRef = workermCh
, presenceRef = workerpCh
, outCh = outC
, iqHandlers = iqHandlers
, writeRef = wLock
, readerThread = readerThread
, idGenerator = getId
, conStateRef = conState
, eventHandlers = eh
, stopThreads = kill
}
-- Acquires the write lock, pushes a space, and releases the lock.
-- | Sends a blank space every 30 seconds to keep the connection alive.
@ -233,3 +173,38 @@ connPersist lock = forever $ do @@ -233,3 +173,38 @@ connPersist lock = forever $ do
_ <- pushBS " "
atomically $ putTMVar lock pushBS
threadDelay 30000000 -- 30s
toChans messageC presenceC stanzaC iqHands sta = atomically $ do
writeTChan stanzaC sta
void $ readTChan stanzaC -- sic
case sta of
MessageS m -> do writeTChan messageC $ Right m
_ <- readTChan messageC -- Sic!
return ()
-- this may seem ridiculous, but to prevent
-- the channel from filling up we
-- immedtiately remove the
-- Stanza we just put in. It will still be
-- available in duplicates.
MessageErrorS m -> do writeTChan messageC $ Left m
_ <- readTChan messageC
return ()
PresenceS p -> do
writeTChan presenceC $ Right p
_ <- readTChan presenceC
return ()
PresenceErrorS p -> do
writeTChan presenceC $ Left p
_ <- readTChan presenceC
return ()
IQRequestS i -> handleIQRequest iqHands i
IQResultS i -> handleIQResponse iqHands (Right i)
IQErrorS i -> handleIQResponse iqHands (Left i)
-- Call the connection closed handlers.
noCon :: TVar EventHandlers -> StreamError -> IO ()
noCon h e = do
hands <- atomically $ readTVar h
_ <- forkIO $ connectionClosedHandler hands e
return ()
Loading…
Cancel
Save