Browse Source

enforcing single threads

master
Philipp Balzarek 14 years ago
parent
commit
d925ccddb6
  1. 4
      src/Main.hs
  2. 18
      src/Network/XMPP.hs
  3. 175
      src/Network/XMPP/Concurrent.hs
  4. 15
      src/Network/XMPP/Types.hs
  5. 1
      xmpp-lib.cabal

4
src/Main.hs

@ -56,8 +56,8 @@ main = do
-- sendS . SPresence $ -- sendS . SPresence $
-- Presence Nothing Nothing Nothing Nothing (Just Available) Nothing Nothing [] -- Presence Nothing Nothing Nothing Nothing (Just Available) Nothing Nothing []
withNewThread autoAccept forkXMPP autoAccept
withNewThread mirror forkXMPP mirror
-- withNewThread killer -- withNewThread killer
sendS . SPresence $ Presence Nothing Nothing Nothing Nothing sendS . SPresence $ Presence Nothing Nothing Nothing Nothing
(Just Available) Nothing Nothing [] (Just Available) Nothing Nothing []

18
src/Network/XMPP.hs

@ -36,9 +36,25 @@ fromHandle handle hostname username resource password a =
runThreaded a runThreaded a
return () return ()
--fromHandle :: Handle -> Text -> Text -> Maybe Text -> Text -> IO ((), XMPPState)
fromHandle' :: Handle -> Text -> Text -> Maybe Text -> Text -> XMPPThread a
-> IO ((), XMPPState)
fromHandle' handle hostname username resource password a =
xmppFromHandle handle hostname username resource $ do
xmppStartStream
runThreaded $ do
-- this will check whether the server supports tls
-- on it's own
singleThreaded $ xmppStartTLS exampleParams
singleThreaded $ xmppSASL password
singleThreaded $ xmppBind
singleThreaded $ xmppSession
a
return ()
connectXMPP :: HostName -> Text -> Text -> Maybe Text connectXMPP :: HostName -> Text -> Text -> Maybe Text
-> Text -> XMPPThread a -> IO ((), XMPPState) -> Text -> XMPPThread a -> IO ((), XMPPState)
connectXMPP host hostname username resource passwd a = do connectXMPP host hostname username resource passwd a = do
con <- connectTo host (PortNumber 5222) con <- connectTo host (PortNumber 5222)
hSetBuffering con NoBuffering hSetBuffering con NoBuffering
fromHandle con hostname username resource passwd a fromHandle' con hostname username resource passwd a

175
src/Network/XMPP/Concurrent.hs

@ -1,3 +1,4 @@
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE NoMonomorphismRestriction #-} {-# LANGUAGE NoMonomorphismRestriction #-}
@ -6,36 +7,44 @@ module Network.XMPP.Concurrent
where where
-- import Network.XMPP.Stream -- import Network.XMPP.Stream
import Network.XMPP.Types import Network.XMPP.Types
import Control.Concurrent import Control.Applicative((<$>),(<*>))
import Control.Concurrent.STM import Control.Concurrent
import Control.Concurrent.STM.TChan import Control.Concurrent.STM
import Control.Concurrent.STM.TMVar import Control.Concurrent.STM.TChan
import Control.Monad.IO.Class import Control.Concurrent.STM.TMVar
import Control.Monad import Control.Exception (throwTo)
import Control.Monad.Trans.Class import qualified Control.Exception.Lifted as Ex
import Control.Monad.Trans.Reader import Control.Monad
import Control.Monad.Trans.Resource import Control.Monad.IO.Class
import Control.Monad.Trans.State import Control.Monad.Trans.Class
import Control.Monad.Trans.Reader
import Control.Monad.Trans.Resource
import Control.Monad.Trans.State
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import Data.Conduit
import qualified Data.Conduit.List as CL
import Data.Default (def)
import Data.IORef
import qualified Data.Map as Map import qualified Data.Map as Map
import Data.Maybe import Data.Maybe
import Data.IORef import Data.Text(Text)
import Data.Text(Text) import Data.Typeable
import Data.XML.Types import Data.XML.Types
import Network.XMPP.Types import Network.XMPP.Types
import Network.XMPP.Monad import Network.XMPP.Monad
import Network.XMPP.Marshal import Network.XMPP.Marshal
import Network.XMPP.Pickle import Network.XMPP.Pickle
import System.IO import System.IO
import Text.XML.Stream.Elements import Text.XML.Stream.Elements
import qualified Text.XML.Stream.Render as XR
data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message)) data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message))
, presenceRef :: IORef (Maybe (TChan Presence)) , presenceRef :: IORef (Maybe (TChan Presence))
@ -45,10 +54,63 @@ data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message))
, iqHandlers :: TVar ( Map.Map (IQType, Text) (TChan IQ) , iqHandlers :: TVar ( Map.Map (IQType, Text) (TChan IQ)
, Map.Map Text (TMVar IQ) , Map.Map Text (TMVar IQ)
) )
, writeRef :: TMVar (BS.ByteString -> IO () )
, readerThread :: ThreadId
} }
type XMPPThread a = ReaderT Thread IO a type XMPPThread a = ReaderT Thread IO a
data ReaderSignal = ReaderSignal (XMPPMonad ()) deriving Typeable
instance Show ReaderSignal where show _ = "<ReaderSignal>"
instance Ex.Exception ReaderSignal
readWorker :: TChan Message -> TChan Presence -> TChan IQ -> XMPPState -> ResourceT IO ()
readWorker messageC presenceC iqC s = Ex.catch (forever . flip runStateT s $ do
sta <- pull
case sta of
SMessage m -> liftIO . atomically $ writeTChan messageC m
SPresence p -> liftIO . atomically $ writeTChan presenceC p
SIQ i -> liftIO . atomically $ writeTChan iqC i
)
( \(ReaderSignal a) -> do
((),s') <- runStateT a s
readWorker messageC presenceC iqC s'
)
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO ()) -> IO ()
writeWorker stCh writeRef = forever $ do
(write, next) <- atomically $ (,) <$>
takeTMVar writeRef <*>
readTChan stCh
outBS <- CL.sourceList (elementToEvents $ pickleElem stanzaP next)
$= XR.renderBytes def $$ CL.consume
forM outBS write
atomically $ putTMVar writeRef write
handleIQs handlers iqC = liftIO . forever . atomically $ do
iq <- readTChan iqC
(byNS, byID) <- readTVar handlers
let iqNS' = nameNamespace . elementName . iqBody $ iq
case iqNS' of
Nothing -> return () -- TODO: send error stanza
Just iqNS -> case iqType iq of
Get -> case Map.lookup (Get, iqNS) byNS of
Nothing -> return () -- TODO: send error stanza
Just ch -> writeTChan ch iq
Set -> case Map.lookup (Set, iqNS) byNS of
Nothing -> return () -- TODO: send error stanza
Just ch -> writeTChan ch iq
Result -> case Map.updateLookupWithKey (\_ _ -> Nothing)
(iqId iq) byID of
(Nothing, _) -> return () -- we are not supposed
-- to send an error
(Just tmvar, byID') -> do
tryPutTMVar tmvar iq -- don't block
writeTVar handlers (byNS, byID)
-- Two streams: input and output. Threads read from input stream and write to output stream. -- Two streams: input and output. Threads read from input stream and write to output stream.
-- | Runs thread in XmppState monad -- | Runs thread in XmppState monad
-- returns channel of incoming and outgoing stances, respectively -- returns channel of incoming and outgoing stances, respectively
@ -60,9 +122,13 @@ startThreads
, Map.Map Text (TMVar IQ) , Map.Map Text (TMVar IQ)
) )
, TChan Stanza, IO () , TChan Stanza, IO ()
, TMVar (BS.ByteString -> IO ())
, ThreadId
) )
startThreads = do startThreads = do
writeLock <- liftIO $ newTMVarIO () writeLock <- liftIO . newTMVarIO =<< gets sConPushBS
messageC <- liftIO newTChanIO messageC <- liftIO newTChanIO
presenceC <- liftIO newTChanIO presenceC <- liftIO newTChanIO
iqC <- liftIO newTChanIO iqC <- liftIO newTChanIO
@ -70,43 +136,18 @@ startThreads = do
iqHandlers <- liftIO $ newTVarIO ( Map.empty, Map.empty) iqHandlers <- liftIO $ newTVarIO ( Map.empty, Map.empty)
pushEvents <- gets sConPush pushEvents <- gets sConPush
pushBS <- gets sConPushBS pushBS <- gets sConPushBS
lw <- lift . resourceForkIO $ loopWrite writeLock pushEvents outC lw <- liftIO . forkIO $ writeWorker outC writeLock
cp <- liftIO . forkIO $ connPersist pushBS writeLock cp <- liftIO . forkIO $ connPersist writeLock
iqh <- lift . resourceForkIO $ handleIQs iqHandlers iqC iqh <- liftIO . forkIO $ handleIQs iqHandlers iqC
s <- get s <- get
rd <- lift . resourceForkIO . void . flip runStateT s . forever $ do rd <- lift . resourceForkIO $ readWorker messageC presenceC iqC s
sta <- pull return (messageC, presenceC, iqHandlers, outC, killConnection writeLock [lw, rd, cp], writeLock, rd)
case sta of
SMessage m -> liftIO . atomically $ writeTChan messageC m
SPresence p -> liftIO . atomically $ writeTChan presenceC p
SIQ i -> liftIO . atomically $ writeTChan iqC i
return (messageC, presenceC, iqHandlers, outC, killConnection writeLock [lw, rd, cp])
where where
loopWrite writeLock pushEvents out' = forever $ do loopWrite writeLock pushEvents out' = forever $ do
next <- liftIO . atomically $ ( takeTMVar writeLock next <- liftIO . atomically $ ( takeTMVar writeLock
>> readTChan out') >> readTChan out')
pushEvents . elementToEvents $ pickleElem stanzaP next pushEvents . elementToEvents $ pickleElem stanzaP next
liftIO . atomically $ putTMVar writeLock () liftIO . atomically $ putTMVar writeLock ()
handleIQs handlers iqC = liftIO . forever . atomically $ do
iq <- readTChan iqC
(byNS, byID) <- readTVar handlers
let iqNS' = nameNamespace . elementName . iqBody $ iq
case iqNS' of
Nothing -> return () -- TODO: send error stanza
Just iqNS -> case iqType iq of
Get -> case Map.lookup (Get, iqNS) byNS of
Nothing -> return () -- TODO: send error stanza
Just ch -> writeTChan ch iq
Set -> case Map.lookup (Set, iqNS) byNS of
Nothing -> return () -- TODO: send error stanza
Just ch -> writeTChan ch iq
Result -> case Map.updateLookupWithKey (\_ _ -> Nothing)
(iqId iq) byID of
(Nothing, _) -> return () -- we are not supposed
-- to send an error
(Just tmvar, byID') -> do
tryPutTMVar tmvar iq -- don't block
writeTVar handlers (byNS, byID)
killConnection writeLock threads = liftIO $ do killConnection writeLock threads = liftIO $ do
atomically $ takeTMVar writeLock atomically $ takeTMVar writeLock
@ -126,16 +167,15 @@ addIQChan tp ns = do
Nothing -> (False, iqCh) Nothing -> (False, iqCh)
Just iqCh' -> (True, iqCh') Just iqCh' -> (True, iqCh')
runThreaded :: XMPPThread a runThreaded :: XMPPThread a
-> XMPPMonad ThreadId -> XMPPMonad ThreadId
runThreaded a = do runThreaded a = do
(mC, pC, hand, outC, stopThreads) <- startThreads (mC, pC, hand, outC, stopThreads, writeR, reader ) <- startThreads
workermCh <- liftIO . newIORef $ Just mC workermCh <- liftIO . newIORef $ Just mC
workerpCh <- liftIO . newIORef $ Just pC workerpCh <- liftIO . newIORef $ Just pC
worker <- liftIO . forkIO $ do worker <- liftIO . forkIO $ do
runReaderT a (Thread workermCh workerpCh mC pC outC hand) runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR
reader)
return () return ()
return worker return worker
@ -203,8 +243,8 @@ sendS a = do
return () return ()
-- | Fork a new thread -- | Fork a new thread
withNewThread :: XMPPThread () -> XMPPThread ThreadId forkXMPP :: XMPPThread () -> XMPPThread ThreadId
withNewThread a = do forkXMPP a = do
thread <- ask thread <- ask
mCH' <- liftIO $ newIORef Nothing mCH' <- liftIO $ newIORef Nothing
pCH' <- liftIO $ newIORef Nothing pCH' <- liftIO $ newIORef Nothing
@ -229,13 +269,22 @@ waitForPresence f = do
waitForPresence f waitForPresence f
connPersist :: (BS.ByteString -> IO ()) -> TMVar () -> IO () connPersist :: TMVar (BS.ByteString -> IO ()) -> IO ()
connPersist pushBS lock = forever $ do connPersist lock = forever $ do
atomically $ takeTMVar lock pushBS <- atomically $ takeTMVar lock
pushBS " " pushBS " "
atomically $ putTMVar lock () atomically $ putTMVar lock pushBS
-- putStrLn "<space added>" -- putStrLn "<space added>"
threadDelay 30000000 threadDelay 30000000
singleThreaded a = do
writeLock <- asks writeRef
reader <- asks readerThread
liftIO . atomically $ takeTMVar writeLock
liftIO . throwTo reader . ReaderSignal $ do
a
out <- gets sConPushBS
liftIO . atomically $ putTMVar writeLock out

15
src/Network/XMPP/Types.hs

@ -1,17 +1,13 @@
module Network.XMPP.Types where module Network.XMPP.Types where
-- proudly "borrowed" from haskell-xmpp -- proudly "borrowed" from haskell-xmpp
import Control.Applicative((<$>))
import Control.Monad
import Control.Monad.Trans.State import Control.Monad.Trans.State
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import Data.Conduit import Data.Conduit
import Data.Default import Data.Default
import Data.List.Split as L import Data.List.Split as L
import Data.Maybe
import Data.Text as Text import Data.Text as Text
import Data.String as Str
import Data.XML.Types import Data.XML.Types
@ -26,9 +22,9 @@ data JID = JID { node :: Maybe Text
-- ^ Resource name -- ^ Resource name
} }
instance Show JID where instance Show JID where
show (JID nd domain res) = show (JID nd dmn res) =
maybe "" ((++ "@") . Text.unpack) nd ++ maybe "" ((++ "@") . Text.unpack) nd ++
(Text.unpack domain) ++ (Text.unpack dmn) ++
maybe "" (('/' :) . Text.unpack) res maybe "" (('/' :) . Text.unpack) res
type XMPPMonad a = StateT XMPPState (ResourceT IO) a type XMPPMonad a = StateT XMPPState (ResourceT IO) a
@ -62,14 +58,15 @@ instance Default ServerFeatures where
-- Ugh, that smells a bit. -- Ugh, that smells a bit.
parseJID :: [Char] -> JID
parseJID jid = parseJID jid =
let (jid', rst) = case L.splitOn "@" jid of let (jid', rst) = case L.splitOn "@" jid of
[rest] -> (JID Nothing, rest) [rest] -> (JID Nothing, rest)
[node,rest] -> (JID (Just (Text.pack node)), rest) [nd,rest] -> (JID (Just (Text.pack nd)), rest)
_ -> error $ "Couldn't parse JID: \"" ++ jid ++ "\"" _ -> error $ "Couldn't parse JID: \"" ++ jid ++ "\""
in case L.splitOn "/" rst of in case L.splitOn "/" rst of
[domain] -> jid' (Text.pack domain) Nothing [dmn] -> jid' (Text.pack dmn) Nothing
[domain, resource] -> jid' (Text.pack domain) (Just (Text.pack resource)) [dmn, rsrc] -> jid' (Text.pack dmn) (Just (Text.pack rsrc))
_ -> error $ "Couldn't parse JID: \"" ++ jid ++ "\"" _ -> error $ "Couldn't parse JID: \"" ++ jid ++ "\""
instance Read JID where instance Read JID where

1
xmpp-lib.cabal

@ -40,6 +40,7 @@ library
, bytestring -any , bytestring -any
, transformers -any , transformers -any
, network -any , network -any
, lifted-base -any
, split -any , split -any
, stm -any , stm -any
, xml-types -any , xml-types -any

Loading…
Cancel
Save