From d925ccddb69c9d34468aa5d7a26bc4b3e53d0757 Mon Sep 17 00:00:00 2001 From: Philipp Balzarek Date: Wed, 4 Apr 2012 23:22:06 +0200 Subject: [PATCH] enforcing single threads --- src/Main.hs | 4 +- src/Network/XMPP.hs | 18 +++- src/Network/XMPP/Concurrent.hs | 175 +++++++++++++++++++++------------ src/Network/XMPP/Types.hs | 15 ++- xmpp-lib.cabal | 1 + 5 files changed, 138 insertions(+), 75 deletions(-) diff --git a/src/Main.hs b/src/Main.hs index 71ca0b2..b69dcd3 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -56,8 +56,8 @@ main = do -- sendS . SPresence $ -- Presence Nothing Nothing Nothing Nothing (Just Available) Nothing Nothing [] - withNewThread autoAccept - withNewThread mirror + forkXMPP autoAccept + forkXMPP mirror -- withNewThread killer sendS . SPresence $ Presence Nothing Nothing Nothing Nothing (Just Available) Nothing Nothing [] diff --git a/src/Network/XMPP.hs b/src/Network/XMPP.hs index 25f54b6..40152b2 100644 --- a/src/Network/XMPP.hs +++ b/src/Network/XMPP.hs @@ -36,9 +36,25 @@ fromHandle handle hostname username resource password a = runThreaded a return () +--fromHandle :: Handle -> Text -> Text -> Maybe Text -> Text -> IO ((), XMPPState) +fromHandle' :: Handle -> Text -> Text -> Maybe Text -> Text -> XMPPThread a + -> IO ((), XMPPState) +fromHandle' handle hostname username resource password a = + xmppFromHandle handle hostname username resource $ do + xmppStartStream + runThreaded $ do + -- this will check whether the server supports tls + -- on it's own + singleThreaded $ xmppStartTLS exampleParams + singleThreaded $ xmppSASL password + singleThreaded $ xmppBind + singleThreaded $ xmppSession + a + return () + connectXMPP :: HostName -> Text -> Text -> Maybe Text -> Text -> XMPPThread a -> IO ((), XMPPState) connectXMPP host hostname username resource passwd a = do con <- connectTo host (PortNumber 5222) hSetBuffering con NoBuffering - fromHandle con hostname username resource passwd a + fromHandle' con hostname username resource passwd a diff --git a/src/Network/XMPP/Concurrent.hs b/src/Network/XMPP/Concurrent.hs index 8e518c8..c67f0d2 100644 --- a/src/Network/XMPP/Concurrent.hs +++ b/src/Network/XMPP/Concurrent.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE NoMonomorphismRestriction #-} @@ -6,36 +7,44 @@ module Network.XMPP.Concurrent where -- import Network.XMPP.Stream -import Network.XMPP.Types - -import Control.Concurrent -import Control.Concurrent.STM -import Control.Concurrent.STM.TChan -import Control.Concurrent.STM.TMVar -import Control.Monad.IO.Class -import Control.Monad -import Control.Monad.Trans.Class -import Control.Monad.Trans.Reader -import Control.Monad.Trans.Resource -import Control.Monad.Trans.State +import Network.XMPP.Types + +import Control.Applicative((<$>),(<*>)) +import Control.Concurrent +import Control.Concurrent.STM +import Control.Concurrent.STM.TChan +import Control.Concurrent.STM.TMVar +import Control.Exception (throwTo) +import qualified Control.Exception.Lifted as Ex +import Control.Monad +import Control.Monad.IO.Class +import Control.Monad.Trans.Class +import Control.Monad.Trans.Reader +import Control.Monad.Trans.Resource +import Control.Monad.Trans.State import qualified Data.ByteString as BS +import Data.Conduit +import qualified Data.Conduit.List as CL +import Data.Default (def) +import Data.IORef import qualified Data.Map as Map -import Data.Maybe -import Data.IORef -import Data.Text(Text) +import Data.Maybe +import Data.Text(Text) +import Data.Typeable -import Data.XML.Types +import Data.XML.Types -import Network.XMPP.Types -import Network.XMPP.Monad -import Network.XMPP.Marshal -import Network.XMPP.Pickle +import Network.XMPP.Types +import Network.XMPP.Monad +import Network.XMPP.Marshal +import Network.XMPP.Pickle -import System.IO +import System.IO -import Text.XML.Stream.Elements +import Text.XML.Stream.Elements +import qualified Text.XML.Stream.Render as XR data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message)) , presenceRef :: IORef (Maybe (TChan Presence)) @@ -45,10 +54,63 @@ data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message)) , iqHandlers :: TVar ( Map.Map (IQType, Text) (TChan IQ) , Map.Map Text (TMVar IQ) ) + , writeRef :: TMVar (BS.ByteString -> IO () ) + , readerThread :: ThreadId } type XMPPThread a = ReaderT Thread IO a + +data ReaderSignal = ReaderSignal (XMPPMonad ()) deriving Typeable +instance Show ReaderSignal where show _ = "" +instance Ex.Exception ReaderSignal + +readWorker :: TChan Message -> TChan Presence -> TChan IQ -> XMPPState -> ResourceT IO () +readWorker messageC presenceC iqC s = Ex.catch (forever . flip runStateT s $ do + sta <- pull + case sta of + SMessage m -> liftIO . atomically $ writeTChan messageC m + SPresence p -> liftIO . atomically $ writeTChan presenceC p + SIQ i -> liftIO . atomically $ writeTChan iqC i + ) + ( \(ReaderSignal a) -> do + ((),s') <- runStateT a s + readWorker messageC presenceC iqC s' + ) + +writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO ()) -> IO () +writeWorker stCh writeRef = forever $ do + (write, next) <- atomically $ (,) <$> + takeTMVar writeRef <*> + readTChan stCh + outBS <- CL.sourceList (elementToEvents $ pickleElem stanzaP next) + $= XR.renderBytes def $$ CL.consume + forM outBS write + atomically $ putTMVar writeRef write + + +handleIQs handlers iqC = liftIO . forever . atomically $ do + iq <- readTChan iqC + (byNS, byID) <- readTVar handlers + let iqNS' = nameNamespace . elementName . iqBody $ iq + case iqNS' of + Nothing -> return () -- TODO: send error stanza + Just iqNS -> case iqType iq of + Get -> case Map.lookup (Get, iqNS) byNS of + Nothing -> return () -- TODO: send error stanza + Just ch -> writeTChan ch iq + Set -> case Map.lookup (Set, iqNS) byNS of + Nothing -> return () -- TODO: send error stanza + Just ch -> writeTChan ch iq + Result -> case Map.updateLookupWithKey (\_ _ -> Nothing) + (iqId iq) byID of + (Nothing, _) -> return () -- we are not supposed + -- to send an error + (Just tmvar, byID') -> do + tryPutTMVar tmvar iq -- don't block + writeTVar handlers (byNS, byID) + + -- Two streams: input and output. Threads read from input stream and write to output stream. -- | Runs thread in XmppState monad -- returns channel of incoming and outgoing stances, respectively @@ -60,9 +122,13 @@ startThreads , Map.Map Text (TMVar IQ) ) , TChan Stanza, IO () + , TMVar (BS.ByteString -> IO ()) + , ThreadId ) + + startThreads = do - writeLock <- liftIO $ newTMVarIO () + writeLock <- liftIO . newTMVarIO =<< gets sConPushBS messageC <- liftIO newTChanIO presenceC <- liftIO newTChanIO iqC <- liftIO newTChanIO @@ -70,43 +136,18 @@ startThreads = do iqHandlers <- liftIO $ newTVarIO ( Map.empty, Map.empty) pushEvents <- gets sConPush pushBS <- gets sConPushBS - lw <- lift . resourceForkIO $ loopWrite writeLock pushEvents outC - cp <- liftIO . forkIO $ connPersist pushBS writeLock - iqh <- lift . resourceForkIO $ handleIQs iqHandlers iqC + lw <- liftIO . forkIO $ writeWorker outC writeLock + cp <- liftIO . forkIO $ connPersist writeLock + iqh <- liftIO . forkIO $ handleIQs iqHandlers iqC s <- get - rd <- lift . resourceForkIO . void . flip runStateT s . forever $ do - sta <- pull - case sta of - SMessage m -> liftIO . atomically $ writeTChan messageC m - SPresence p -> liftIO . atomically $ writeTChan presenceC p - SIQ i -> liftIO . atomically $ writeTChan iqC i - return (messageC, presenceC, iqHandlers, outC, killConnection writeLock [lw, rd, cp]) + rd <- lift . resourceForkIO $ readWorker messageC presenceC iqC s + return (messageC, presenceC, iqHandlers, outC, killConnection writeLock [lw, rd, cp], writeLock, rd) where loopWrite writeLock pushEvents out' = forever $ do next <- liftIO . atomically $ ( takeTMVar writeLock >> readTChan out') pushEvents . elementToEvents $ pickleElem stanzaP next liftIO . atomically $ putTMVar writeLock () - handleIQs handlers iqC = liftIO . forever . atomically $ do - iq <- readTChan iqC - (byNS, byID) <- readTVar handlers - let iqNS' = nameNamespace . elementName . iqBody $ iq - case iqNS' of - Nothing -> return () -- TODO: send error stanza - Just iqNS -> case iqType iq of - Get -> case Map.lookup (Get, iqNS) byNS of - Nothing -> return () -- TODO: send error stanza - Just ch -> writeTChan ch iq - Set -> case Map.lookup (Set, iqNS) byNS of - Nothing -> return () -- TODO: send error stanza - Just ch -> writeTChan ch iq - Result -> case Map.updateLookupWithKey (\_ _ -> Nothing) - (iqId iq) byID of - (Nothing, _) -> return () -- we are not supposed - -- to send an error - (Just tmvar, byID') -> do - tryPutTMVar tmvar iq -- don't block - writeTVar handlers (byNS, byID) killConnection writeLock threads = liftIO $ do atomically $ takeTMVar writeLock @@ -126,16 +167,15 @@ addIQChan tp ns = do Nothing -> (False, iqCh) Just iqCh' -> (True, iqCh') - - runThreaded :: XMPPThread a -> XMPPMonad ThreadId runThreaded a = do - (mC, pC, hand, outC, stopThreads) <- startThreads + (mC, pC, hand, outC, stopThreads, writeR, reader ) <- startThreads workermCh <- liftIO . newIORef $ Just mC workerpCh <- liftIO . newIORef $ Just pC worker <- liftIO . forkIO $ do - runReaderT a (Thread workermCh workerpCh mC pC outC hand) + runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR + reader) return () return worker @@ -203,8 +243,8 @@ sendS a = do return () -- | Fork a new thread -withNewThread :: XMPPThread () -> XMPPThread ThreadId -withNewThread a = do +forkXMPP :: XMPPThread () -> XMPPThread ThreadId +forkXMPP a = do thread <- ask mCH' <- liftIO $ newIORef Nothing pCH' <- liftIO $ newIORef Nothing @@ -229,13 +269,22 @@ waitForPresence f = do waitForPresence f -connPersist :: (BS.ByteString -> IO ()) -> TMVar () -> IO () -connPersist pushBS lock = forever $ do - atomically $ takeTMVar lock +connPersist :: TMVar (BS.ByteString -> IO ()) -> IO () +connPersist lock = forever $ do + pushBS <- atomically $ takeTMVar lock pushBS " " - atomically $ putTMVar lock () + atomically $ putTMVar lock pushBS -- putStrLn "" threadDelay 30000000 +singleThreaded a = do + writeLock <- asks writeRef + reader <- asks readerThread + liftIO . atomically $ takeTMVar writeLock + liftIO . throwTo reader . ReaderSignal $ do + a + out <- gets sConPushBS + liftIO . atomically $ putTMVar writeLock out + diff --git a/src/Network/XMPP/Types.hs b/src/Network/XMPP/Types.hs index 47fb28a..f4a5eeb 100644 --- a/src/Network/XMPP/Types.hs +++ b/src/Network/XMPP/Types.hs @@ -1,17 +1,13 @@ module Network.XMPP.Types where -- proudly "borrowed" from haskell-xmpp -import Control.Applicative((<$>)) -import Control.Monad import Control.Monad.Trans.State import qualified Data.ByteString as BS import Data.Conduit import Data.Default import Data.List.Split as L -import Data.Maybe import Data.Text as Text -import Data.String as Str import Data.XML.Types @@ -26,9 +22,9 @@ data JID = JID { node :: Maybe Text -- ^ Resource name } instance Show JID where - show (JID nd domain res) = + show (JID nd dmn res) = maybe "" ((++ "@") . Text.unpack) nd ++ - (Text.unpack domain) ++ + (Text.unpack dmn) ++ maybe "" (('/' :) . Text.unpack) res type XMPPMonad a = StateT XMPPState (ResourceT IO) a @@ -62,14 +58,15 @@ instance Default ServerFeatures where -- Ugh, that smells a bit. +parseJID :: [Char] -> JID parseJID jid = let (jid', rst) = case L.splitOn "@" jid of [rest] -> (JID Nothing, rest) - [node,rest] -> (JID (Just (Text.pack node)), rest) + [nd,rest] -> (JID (Just (Text.pack nd)), rest) _ -> error $ "Couldn't parse JID: \"" ++ jid ++ "\"" in case L.splitOn "/" rst of - [domain] -> jid' (Text.pack domain) Nothing - [domain, resource] -> jid' (Text.pack domain) (Just (Text.pack resource)) + [dmn] -> jid' (Text.pack dmn) Nothing + [dmn, rsrc] -> jid' (Text.pack dmn) (Just (Text.pack rsrc)) _ -> error $ "Couldn't parse JID: \"" ++ jid ++ "\"" instance Read JID where diff --git a/xmpp-lib.cabal b/xmpp-lib.cabal index 6f0f043..8422964 100644 --- a/xmpp-lib.cabal +++ b/xmpp-lib.cabal @@ -40,6 +40,7 @@ library , bytestring -any , transformers -any , network -any + , lifted-base -any , split -any , stm -any , xml-types -any