diff --git a/src/Network/XMPP/Concurrent.hs b/src/Network/XMPP/Concurrent.hs index a1e82a4..8e518c8 100644 --- a/src/Network/XMPP/Concurrent.hs +++ b/src/Network/XMPP/Concurrent.hs @@ -42,6 +42,9 @@ data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message)) , mShadow :: TChan Message -- the original chan , pShadow :: TChan Presence -- the original chan , outCh :: TChan Stanza + , iqHandlers :: TVar ( Map.Map (IQType, Text) (TChan IQ) + , Map.Map Text (TMVar IQ) + ) } type XMPPThread a = ReaderT Thread IO a @@ -97,14 +100,34 @@ startThreads = do Set -> case Map.lookup (Set, iqNS) byNS of Nothing -> return () -- TODO: send error stanza Just ch -> writeTChan ch iq - Result -> case Map.lookup (iqId iq) byID of - Nothing -> return () -- ?? Should we be sending an error? - Just tmvar -> putTMVar tmvar iq + Result -> case Map.updateLookupWithKey (\_ _ -> Nothing) + (iqId iq) byID of + (Nothing, _) -> return () -- we are not supposed + -- to send an error + (Just tmvar, byID') -> do + tryPutTMVar tmvar iq -- don't block + writeTVar handlers (byNS, byID) + killConnection writeLock threads = liftIO $ do atomically $ takeTMVar writeLock forM threads killThread return() +addIQChan :: IQType -> Text -> XMPPThread (Bool, TChan IQ) +addIQChan tp ns = do + handlers <- asks iqHandlers + liftIO . atomically $ do + (byNS, byID) <- readTVar handlers + iqCh <- newTChan + let (present, byNS') = Map.insertLookupWithKey' (\_ new _ -> new) + (tp,ns) iqCh byNS + writeTVar handlers (byNS', byID) + return $ case present of + Nothing -> (False, iqCh) + Just iqCh' -> (True, iqCh') + + + runThreaded :: XMPPThread a -> XMPPMonad ThreadId runThreaded a = do @@ -112,7 +135,7 @@ runThreaded a = do workermCh <- liftIO . newIORef $ Just mC workerpCh <- liftIO . newIORef $ Just pC worker <- liftIO . forkIO $ do - runReaderT a (Thread workermCh workerpCh mC pC outC) + runReaderT a (Thread workermCh workerpCh mC pC outC hand) return () return worker @@ -215,3 +238,4 @@ connPersist pushBS lock = forever $ do threadDelay 30000000 +