From c856d332a26c17f23d048a4d70c92a79973e154d Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Sat, 14 Apr 2012 16:18:39 +0200 Subject: [PATCH] removed dependency on ResourceT changed withConnection to run in the caller thread --- src/Data/Conduit/TLS.hs | 19 ++--- src/Network/XMPP/Concurrent/Monad.hs | 31 +++++--- src/Network/XMPP/Concurrent/Threads.hs | 101 +++++++++++++++---------- src/Network/XMPP/Concurrent/Types.hs | 8 +- src/Network/XMPP/Monad.hs | 61 +++++++++++++-- src/Network/XMPP/Stream.hs | 6 +- src/Network/XMPP/Types.hs | 8 +- src/Tests.hs | 7 +- 8 files changed, 159 insertions(+), 82 deletions(-) diff --git a/src/Data/Conduit/TLS.hs b/src/Data/Conduit/TLS.hs index bf2adf1..4a7d4f0 100644 --- a/src/Data/Conduit/TLS.hs +++ b/src/Data/Conduit/TLS.hs @@ -7,9 +7,8 @@ module Data.Conduit.TLS ) where -import Control.Applicative +import Control.Monad(liftM) import Control.Monad.IO.Class -import Control.Monad.Trans.Resource import Crypto.Random @@ -23,7 +22,7 @@ import Network.TLS.Extra as TLSExtra import System.IO(Handle) tlsinit - :: (MonadIO m, MonadIO m1, MonadResource m1) => + :: (MonadIO m, MonadIO m1) => TLSParams -> Handle -> m ( Source m1 BS.ByteString , Sink BS.ByteString m1 () @@ -32,15 +31,13 @@ tlsinit tlsParams handle = do gen <- liftIO $ (newGenIO :: IO SystemRandom) -- TODO: Find better random source? clientContext <- client tlsParams gen handle handshake clientContext - let src = sourceIO - (return clientContext) - (bye) - (\con -> IOOpen <$> recvData con) - let snk = sinkIO - (return clientContext) - (\_ -> return ()) + let src = sourceState + clientContext + (\con -> StateOpen con `liftM` recvData con) + let snk = sinkState + clientContext (\con bs -> sendData con (BL.fromChunks [bs]) - >> return IOProcessing ) + >> return (StateProcessing con)) (\_ -> return ()) return ( src , snk diff --git a/src/Network/XMPP/Concurrent/Monad.hs b/src/Network/XMPP/Concurrent/Monad.hs index 69b2f29..a92af43 100644 --- a/src/Network/XMPP/Concurrent/Monad.hs +++ b/src/Network/XMPP/Concurrent/Monad.hs @@ -4,6 +4,7 @@ import Network.XMPP.Types import Control.Concurrent import Control.Concurrent.STM +import qualified Control.Exception.Lifted as Ex import Control.Monad.IO.Class import Control.Monad.Trans.Reader import Control.Monad.Trans.State @@ -141,18 +142,26 @@ waitForPresence f = do -- | Run an XMPPMonad action in isolation. -- Reader and writer workers will be temporarily stopped -- and resumed with the new session details once the action returns. --- The Action will run in the reader thread. -withConnection :: XMPPConMonad () -> XMPPThread () +-- The Action will run in the calling thread/ +-- NB: This will /not/ catch any exceptions. If you action dies, deadlocks +-- or otherwisely exits abnormaly the XMPP session will be dead. +withConnection :: XMPPConMonad a -> XMPPThread a withConnection a = do - writeLock <- asks writeRef - rdr <- asks readerThread - _ <- liftIO . atomically $ takeTMVar writeLock -- we replace it with the - -- one returned by a - liftIO . throwTo rdr . ReaderSignal $ do - a - out <- gets sConPushBS - liftIO . atomically $ putTMVar writeLock out - return () + readerId <- asks readerThread + stateRef <- asks conStateRef + write <- asks writeRef + wait <- liftIO $ newEmptyTMVarIO + liftIO . throwTo readerId $ Interrupt wait + s <- liftIO . atomically $ do + putTMVar wait () + takeTMVar write + takeTMVar stateRef + (res, s') <- liftIO $ runStateT a s + liftIO . atomically $ do + putTMVar write (sConPushBS s') + putTMVar stateRef s' + return res + sendPresence :: Presence -> XMPPThread () sendPresence = sendS . PresenceS diff --git a/src/Network/XMPP/Concurrent/Threads.hs b/src/Network/XMPP/Concurrent/Threads.hs index ad59d02..db9b0ca 100644 --- a/src/Network/XMPP/Concurrent/Threads.hs +++ b/src/Network/XMPP/Concurrent/Threads.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE OverloadedStrings #-} module Network.XMPP.Concurrent.Threads where @@ -36,40 +37,51 @@ import qualified Text.XML.Stream.Render as XR readWorker :: TChan (Either MessageError Message) -> TChan (Either PresenceError Presence) -> TVar IQHandlers - -> XMPPConState - -> ResourceT IO () -readWorker messageC presenceC handlers s = Ex.catch - (forever . flip runStateT s $ do - sta <- pull - liftIO .atomically $ do - case sta of - MessageS m -> do writeTChan messageC $ Right m - _ <- readTChan messageC -- Sic! - return () - -- this may seem ridiculous, but to prevent - -- the channel from filling up we immedtiately remove the - -- Stanza we just put in. It will still be - -- available in duplicates. - MessageErrorS m -> do writeTChan messageC $ Left m - _ <- readTChan messageC - return () - PresenceS p -> do - writeTChan presenceC $ Right p - _ <- readTChan presenceC - return () - PresenceErrorS p -> do - writeTChan presenceC $ Left p - _ <- readTChan presenceC - return () - - IQRequestS i -> handleIQRequest handlers i - IQResultS i -> handleIQResponse handlers (Right i) - IQErrorS i -> handleIQResponse handlers (Left i) - ) - ( \(ReaderSignal a) -> do - ((),s') <- runStateT a s - readWorker messageC presenceC handlers s' - ) + -> TMVar XMPPConState + -> IO () +readWorker messageC presenceC handlers stateRef = + Ex.mask_ . forever $ do + s <- liftIO . atomically $ takeTMVar stateRef + (sta', s') <- flip runStateT s $ Ex.catch ( do + -- we don't know whether pull will necessarily be interruptible + liftIO $ Ex.allowInterrupt + Just <$> pull + ) + (\(Interrupt t) -> do + liftIO . atomically $ + putTMVar stateRef s + liftIO . atomically $ takeTMVar t + return Nothing + ) + liftIO . atomically $ do + case sta' of + Nothing -> return () + Just sta -> do + putTMVar stateRef s' + case sta of + MessageS m -> do writeTChan messageC $ Right m + _ <- readTChan messageC -- Sic! + return () + -- this may seem ridiculous, but to prevent + -- the channel from filling up we immedtiately remove the + -- Stanza we just put in. It will still be + -- available in duplicates. + MessageErrorS m -> do writeTChan messageC $ Left m + _ <- readTChan messageC + return () + PresenceS p -> do + writeTChan presenceC $ Right p + _ <- readTChan presenceC + return () + PresenceErrorS p -> do + writeTChan presenceC $ Left p + _ <- readTChan presenceC + return () + + IQRequestS i -> handleIQRequest handlers i + IQResultS i -> handleIQResponse handlers (Right i) + IQErrorS i -> handleIQResponse handlers (Left i) + handleIQRequest handlers iq = do (byNS, _) <- readTVar handlers @@ -110,8 +122,10 @@ startThreads :: XMPPConMonad ( TChan (Either MessageError Message) , TChan (Either PresenceError Presence) , TVar IQHandlers - , TChan Stanza, IO () + , TChan Stanza + , IO () , TMVar (BS.ByteString -> IO ()) + , TMVar XMPPConState , ThreadId ) @@ -122,24 +136,28 @@ startThreads = do iqC <- liftIO newTChanIO outC <- liftIO newTChanIO handlers <- liftIO $ newTVarIO ( Map.empty, Map.empty) + conS <- liftIO . newTMVarIO =<< get lw <- liftIO . forkIO $ writeWorker outC writeLock cp <- liftIO . forkIO $ connPersist writeLock s <- get - rd <- lift . resourceForkIO $ readWorker messageC presenceC handlers s - return (messageC, presenceC, handlers, outC, killConnection writeLock [lw, rd, cp], writeLock, rd) + rd <- liftIO . forkIO $ readWorker messageC presenceC handlers conS + return (messageC, presenceC, handlers, outC + , killConnection writeLock [lw, rd, cp] + , writeLock, conS ,rd) where killConnection writeLock threads = liftIO $ do _ <- atomically $ takeTMVar writeLock -- Should we put it back? _ <- forM threads killThread return() - -- | Start worker threads and run action. The supplied action will run -- in the calling thread. use 'forkXMPP' to start another thread. runThreaded :: XMPPThread a -> XMPPConMonad a runThreaded a = do - (mC, pC, hand, outC, _stopThreads, writeR, rdr ) <- startThreads + liftIO . putStrLn $ "starting threads" + (mC, pC, hand, outC, _stopThreads, writeR, conS, rdr ) <- startThreads + liftIO . putStrLn $ "threads running" workermCh <- liftIO . newIORef $ Nothing workerpCh <- liftIO . newIORef $ Nothing idRef <- liftIO $ newTVarIO 1 @@ -147,7 +165,10 @@ runThreaded a = do curId <- readTVar idRef writeTVar idRef (curId + 1 :: Integer) return . read. show $ curId - liftIO $ runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR rdr getId) + s <- get + liftIO . putStrLn $ "starting application" + liftIO $ runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR rdr getId conS) + -- | Sends a blank space every 30 seconds to keep the connection alive connPersist :: TMVar (BS.ByteString -> IO ()) -> IO () diff --git a/src/Network/XMPP/Concurrent/Types.hs b/src/Network/XMPP/Concurrent/Types.hs index 889638b..14f0d04 100644 --- a/src/Network/XMPP/Concurrent/Types.hs +++ b/src/Network/XMPP/Concurrent/Types.hs @@ -38,11 +38,11 @@ data Thread = Thread { messagesRef :: IORef (Maybe ( TChan (Either , writeRef :: TMVar (BS.ByteString -> IO () ) , readerThread :: ThreadId , idGenerator :: IO StanzaId + , conStateRef :: TMVar XMPPConState } type XMPPThread a = ReaderT Thread IO a - -data ReaderSignal = ReaderSignal (XMPPConMonad ()) deriving Typeable -instance Show ReaderSignal where show _ = "" -instance Ex.Exception ReaderSignal +data Interrupt = Interrupt (TMVar ()) deriving Typeable +instance Show Interrupt where show _ = "" +instance Ex.Exception Interrupt diff --git a/src/Network/XMPP/Monad.hs b/src/Network/XMPP/Monad.hs index ff3aded..50ef734 100644 --- a/src/Network/XMPP/Monad.hs +++ b/src/Network/XMPP/Monad.hs @@ -5,7 +5,7 @@ module Network.XMPP.Monad where import Control.Applicative((<$>)) import Control.Monad.IO.Class import Control.Monad.Trans.Class -import Control.Monad.Trans.Resource +--import Control.Monad.Trans.Resource import Control.Monad.Trans.State import Data.ByteString as BS @@ -16,6 +16,7 @@ import Data.Text(Text) import Data.XML.Pickle import Data.XML.Types +import Network import Network.XMPP.Types import Network.XMPP.Marshal import Network.XMPP.Pickle @@ -41,7 +42,7 @@ pushOpen e = do lift . sink $ openElementToEvents e return () -pulls :: Sink Event (ResourceT IO) b -> XMPPConMonad b +pulls :: Sink Event IO b -> XMPPConMonad b pulls snk = do source <- gets sConSrc (src', r) <- lift $ source $$+ snk @@ -63,15 +64,15 @@ xmppFromHandle :: Handle -> Maybe Text -> XMPPConMonad a -> IO (a, XMPPConState) -xmppFromHandle handle hostname username res f = runResourceT $ do +xmppFromHandle handle hostname username res f = do liftIO $ hSetBuffering handle NoBuffering - let raw = CB.sourceHandle handle + let raw = sourceHandle' handle let src = raw $= XP.parseBytes def let st = XMPPConState src (raw) (\xs -> CL.sourceList xs - $$ XR.renderBytes def =$ CB.sinkHandle handle) + $$ XR.renderBytes def =$ sinkHandle' handle) (BS.hPut handle) (Just handle) (SF Nothing [] []) @@ -81,3 +82,53 @@ xmppFromHandle handle hostname username res f = runResourceT $ do res runStateT f st +-- TODO: Once pullrequest has been merged, switch back to upstream +sourceHandle' :: MonadIO m => Handle -> Source m BS.ByteString +sourceHandle' h = + src + where + src = PipeM pull close + + pull = do + bs <- liftIO (BS.hGetSome h 4096) + if BS.null bs + then return $ Done Nothing () + else return $ HaveOutput src close bs + + close = return () + +sinkHandle' :: MonadIO m + => Handle + -> Sink BS.ByteString m () +sinkHandle' h = + NeedInput push close + where + push input = PipeM + (liftIO (BS.hPut h input) >> return (NeedInput push close)) + (return ()) + close = return () + +xmppConnect :: HostName -> Text -> XMPPConMonad () +xmppConnect host hostname = do + uname <- gets sUsername + con <- liftIO $ do + con <- connectTo host (PortNumber 5222) + hSetBuffering con NoBuffering + return con + let raw = sourceHandle' con + let src = raw $= XP.parseBytes def + let st = XMPPConState + src + (raw) + (\xs -> CL.sourceList xs + $$ XR.renderBytes def =$ sinkHandle' con) + (BS.hPut con) + (Just con) + (SF Nothing [] []) + False + hostname + uname + Nothing + put st + return () + diff --git a/src/Network/XMPP/Stream.hs b/src/Network/XMPP/Stream.hs index a54e6ae..3bc4188 100644 --- a/src/Network/XMPP/Stream.hs +++ b/src/Network/XMPP/Stream.hs @@ -53,12 +53,12 @@ xmppRestartStream = do xmppStartStream -xmppStream :: Sink Event (ResourceT IO) ServerFeatures +xmppStream :: Sink Event IO ServerFeatures xmppStream = do xmppStreamHeader xmppStreamFeatures -xmppStreamHeader :: Sink Event (ResourceT IO) () +xmppStreamHeader :: Sink Event IO () xmppStreamHeader = do throwOutJunk (ver, _, _) <- unpickleElem pickleStream <$> openElementFromEvents @@ -66,7 +66,7 @@ xmppStreamHeader = do return() -xmppStreamFeatures :: Sink Event (ResourceT IO) ServerFeatures +xmppStreamFeatures :: Sink Event IO ServerFeatures xmppStreamFeatures = unpickleElem pickleStreamFeatures <$> elementFromEvents diff --git a/src/Network/XMPP/Types.hs b/src/Network/XMPP/Types.hs index cd3f164..49cac1f 100644 --- a/src/Network/XMPP/Types.hs +++ b/src/Network/XMPP/Types.hs @@ -608,9 +608,9 @@ data ServerFeatures = SF } deriving Show data XMPPConState = XMPPConState - { sConSrc :: Source (ResourceT IO) Event - , sRawSrc :: Source (ResourceT IO) BS.ByteString - , sConPush :: [Event] -> ResourceT IO () + { sConSrc :: Source IO Event + , sRawSrc :: Source IO BS.ByteString + , sConPush :: [Event] -> IO () , sConPushBS :: BS.ByteString -> IO () , sConHandle :: Maybe Handle , sFeatures :: ServerFeatures @@ -627,7 +627,7 @@ data XMPPConState = XMPPConState newtype XMPPT m a = XMPPT { runXMPPT :: StateT XMPPConState m a } deriving (Monad, MonadIO) -type XMPPConMonad a = StateT XMPPConState (ResourceT IO) a +type XMPPConMonad a = StateT XMPPConState IO a -- Make XMPPT derive the Monad and MonadIO instances. diff --git a/src/Tests.hs b/src/Tests.hs index 6043679..b9d553d 100644 --- a/src/Tests.hs +++ b/src/Tests.hs @@ -78,11 +78,11 @@ runMain debug number = do 1 -> (testUser1, testUser2,True) 2 -> (testUser2, testUser1,False) _ -> error "Need either 1 or 2" + let debug' = liftIO . atomically . + debug . (("Thread " ++ show number ++ ":") ++) sessionConnect "localhost" "species64739.dyndns.org" (fromJust $ node we) (resource we) $ do - let debug' = liftIO . atomically . debug . - (("Thread " ++ show number ++ ":") ++) withConnection $ xmppSASL "pwd" xmppThreadedBind (resource we) withConnection $ xmppSession @@ -90,7 +90,6 @@ runMain debug number = do forkXMPP autoAccept forkXMPP iqResponder -- sendS . SPresence $ Presence Nothing (Just them) Nothing (Just Subscribe) Nothing Nothing Nothing [] - let delay = if active then 1000000 else 5000000 when active . void . forkXMPP $ do forM [1..10] $ \count -> do let message = Text.pack . show $ node we @@ -100,7 +99,7 @@ runMain debug number = do let answerPayload = unpickleElem payloadP (fromJust $ iqResultPayload answer) expect debug' (invertPayload payload) answerPayload - liftIO $ threadDelay delay + liftIO $ threadDelay 500000 sendUser "All tests done" liftIO . forever $ threadDelay 10000000 return ()