Browse Source

removed dependency on ResourceT

changed withConnection to run in the caller thread
master
Philipp Balzarek 14 years ago
parent
commit
c856d332a2
  1. 19
      src/Data/Conduit/TLS.hs
  2. 31
      src/Network/XMPP/Concurrent/Monad.hs
  3. 55
      src/Network/XMPP/Concurrent/Threads.hs
  4. 8
      src/Network/XMPP/Concurrent/Types.hs
  5. 61
      src/Network/XMPP/Monad.hs
  6. 6
      src/Network/XMPP/Stream.hs
  7. 8
      src/Network/XMPP/Types.hs
  8. 7
      src/Tests.hs

19
src/Data/Conduit/TLS.hs

@ -7,9 +7,8 @@ module Data.Conduit.TLS
) )
where where
import Control.Applicative import Control.Monad(liftM)
import Control.Monad.IO.Class import Control.Monad.IO.Class
import Control.Monad.Trans.Resource
import Crypto.Random import Crypto.Random
@ -23,7 +22,7 @@ import Network.TLS.Extra as TLSExtra
import System.IO(Handle) import System.IO(Handle)
tlsinit tlsinit
:: (MonadIO m, MonadIO m1, MonadResource m1) => :: (MonadIO m, MonadIO m1) =>
TLSParams TLSParams
-> Handle -> m ( Source m1 BS.ByteString -> Handle -> m ( Source m1 BS.ByteString
, Sink BS.ByteString m1 () , Sink BS.ByteString m1 ()
@ -32,15 +31,13 @@ tlsinit tlsParams handle = do
gen <- liftIO $ (newGenIO :: IO SystemRandom) -- TODO: Find better random source? gen <- liftIO $ (newGenIO :: IO SystemRandom) -- TODO: Find better random source?
clientContext <- client tlsParams gen handle clientContext <- client tlsParams gen handle
handshake clientContext handshake clientContext
let src = sourceIO let src = sourceState
(return clientContext) clientContext
(bye) (\con -> StateOpen con `liftM` recvData con)
(\con -> IOOpen <$> recvData con) let snk = sinkState
let snk = sinkIO clientContext
(return clientContext)
(\_ -> return ())
(\con bs -> sendData con (BL.fromChunks [bs]) (\con bs -> sendData con (BL.fromChunks [bs])
>> return IOProcessing ) >> return (StateProcessing con))
(\_ -> return ()) (\_ -> return ())
return ( src return ( src
, snk , snk

31
src/Network/XMPP/Concurrent/Monad.hs

@ -4,6 +4,7 @@ import Network.XMPP.Types
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.STM import Control.Concurrent.STM
import qualified Control.Exception.Lifted as Ex
import Control.Monad.IO.Class import Control.Monad.IO.Class
import Control.Monad.Trans.Reader import Control.Monad.Trans.Reader
import Control.Monad.Trans.State import Control.Monad.Trans.State
@ -141,18 +142,26 @@ waitForPresence f = do
-- | Run an XMPPMonad action in isolation. -- | Run an XMPPMonad action in isolation.
-- Reader and writer workers will be temporarily stopped -- Reader and writer workers will be temporarily stopped
-- and resumed with the new session details once the action returns. -- and resumed with the new session details once the action returns.
-- The Action will run in the reader thread. -- The Action will run in the calling thread/
withConnection :: XMPPConMonad () -> XMPPThread () -- NB: This will /not/ catch any exceptions. If you action dies, deadlocks
-- or otherwisely exits abnormaly the XMPP session will be dead.
withConnection :: XMPPConMonad a -> XMPPThread a
withConnection a = do withConnection a = do
writeLock <- asks writeRef readerId <- asks readerThread
rdr <- asks readerThread stateRef <- asks conStateRef
_ <- liftIO . atomically $ takeTMVar writeLock -- we replace it with the write <- asks writeRef
-- one returned by a wait <- liftIO $ newEmptyTMVarIO
liftIO . throwTo rdr . ReaderSignal $ do liftIO . throwTo readerId $ Interrupt wait
a s <- liftIO . atomically $ do
out <- gets sConPushBS putTMVar wait ()
liftIO . atomically $ putTMVar writeLock out takeTMVar write
return () takeTMVar stateRef
(res, s') <- liftIO $ runStateT a s
liftIO . atomically $ do
putTMVar write (sConPushBS s')
putTMVar stateRef s'
return res
sendPresence :: Presence -> XMPPThread () sendPresence :: Presence -> XMPPThread ()
sendPresence = sendS . PresenceS sendPresence = sendS . PresenceS

55
src/Network/XMPP/Concurrent/Threads.hs

@ -1,3 +1,4 @@
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
module Network.XMPP.Concurrent.Threads where module Network.XMPP.Concurrent.Threads where
@ -36,12 +37,27 @@ import qualified Text.XML.Stream.Render as XR
readWorker :: TChan (Either MessageError Message) readWorker :: TChan (Either MessageError Message)
-> TChan (Either PresenceError Presence) -> TChan (Either PresenceError Presence)
-> TVar IQHandlers -> TVar IQHandlers
-> XMPPConState -> TMVar XMPPConState
-> ResourceT IO () -> IO ()
readWorker messageC presenceC handlers s = Ex.catch readWorker messageC presenceC handlers stateRef =
(forever . flip runStateT s $ do Ex.mask_ . forever $ do
sta <- pull s <- liftIO . atomically $ takeTMVar stateRef
liftIO .atomically $ do (sta', s') <- flip runStateT s $ Ex.catch ( do
-- we don't know whether pull will necessarily be interruptible
liftIO $ Ex.allowInterrupt
Just <$> pull
)
(\(Interrupt t) -> do
liftIO . atomically $
putTMVar stateRef s
liftIO . atomically $ takeTMVar t
return Nothing
)
liftIO . atomically $ do
case sta' of
Nothing -> return ()
Just sta -> do
putTMVar stateRef s'
case sta of case sta of
MessageS m -> do writeTChan messageC $ Right m MessageS m -> do writeTChan messageC $ Right m
_ <- readTChan messageC -- Sic! _ <- readTChan messageC -- Sic!
@ -65,11 +81,7 @@ readWorker messageC presenceC handlers s = Ex.catch
IQRequestS i -> handleIQRequest handlers i IQRequestS i -> handleIQRequest handlers i
IQResultS i -> handleIQResponse handlers (Right i) IQResultS i -> handleIQResponse handlers (Right i)
IQErrorS i -> handleIQResponse handlers (Left i) IQErrorS i -> handleIQResponse handlers (Left i)
)
( \(ReaderSignal a) -> do
((),s') <- runStateT a s
readWorker messageC presenceC handlers s'
)
handleIQRequest handlers iq = do handleIQRequest handlers iq = do
(byNS, _) <- readTVar handlers (byNS, _) <- readTVar handlers
@ -110,8 +122,10 @@ startThreads
:: XMPPConMonad ( TChan (Either MessageError Message) :: XMPPConMonad ( TChan (Either MessageError Message)
, TChan (Either PresenceError Presence) , TChan (Either PresenceError Presence)
, TVar IQHandlers , TVar IQHandlers
, TChan Stanza, IO () , TChan Stanza
, IO ()
, TMVar (BS.ByteString -> IO ()) , TMVar (BS.ByteString -> IO ())
, TMVar XMPPConState
, ThreadId , ThreadId
) )
@ -122,24 +136,28 @@ startThreads = do
iqC <- liftIO newTChanIO iqC <- liftIO newTChanIO
outC <- liftIO newTChanIO outC <- liftIO newTChanIO
handlers <- liftIO $ newTVarIO ( Map.empty, Map.empty) handlers <- liftIO $ newTVarIO ( Map.empty, Map.empty)
conS <- liftIO . newTMVarIO =<< get
lw <- liftIO . forkIO $ writeWorker outC writeLock lw <- liftIO . forkIO $ writeWorker outC writeLock
cp <- liftIO . forkIO $ connPersist writeLock cp <- liftIO . forkIO $ connPersist writeLock
s <- get s <- get
rd <- lift . resourceForkIO $ readWorker messageC presenceC handlers s rd <- liftIO . forkIO $ readWorker messageC presenceC handlers conS
return (messageC, presenceC, handlers, outC, killConnection writeLock [lw, rd, cp], writeLock, rd) return (messageC, presenceC, handlers, outC
, killConnection writeLock [lw, rd, cp]
, writeLock, conS ,rd)
where where
killConnection writeLock threads = liftIO $ do killConnection writeLock threads = liftIO $ do
_ <- atomically $ takeTMVar writeLock -- Should we put it back? _ <- atomically $ takeTMVar writeLock -- Should we put it back?
_ <- forM threads killThread _ <- forM threads killThread
return() return()
-- | Start worker threads and run action. The supplied action will run -- | Start worker threads and run action. The supplied action will run
-- in the calling thread. use 'forkXMPP' to start another thread. -- in the calling thread. use 'forkXMPP' to start another thread.
runThreaded :: XMPPThread a runThreaded :: XMPPThread a
-> XMPPConMonad a -> XMPPConMonad a
runThreaded a = do runThreaded a = do
(mC, pC, hand, outC, _stopThreads, writeR, rdr ) <- startThreads liftIO . putStrLn $ "starting threads"
(mC, pC, hand, outC, _stopThreads, writeR, conS, rdr ) <- startThreads
liftIO . putStrLn $ "threads running"
workermCh <- liftIO . newIORef $ Nothing workermCh <- liftIO . newIORef $ Nothing
workerpCh <- liftIO . newIORef $ Nothing workerpCh <- liftIO . newIORef $ Nothing
idRef <- liftIO $ newTVarIO 1 idRef <- liftIO $ newTVarIO 1
@ -147,7 +165,10 @@ runThreaded a = do
curId <- readTVar idRef curId <- readTVar idRef
writeTVar idRef (curId + 1 :: Integer) writeTVar idRef (curId + 1 :: Integer)
return . read. show $ curId return . read. show $ curId
liftIO $ runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR rdr getId) s <- get
liftIO . putStrLn $ "starting application"
liftIO $ runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR rdr getId conS)
-- | Sends a blank space every 30 seconds to keep the connection alive -- | Sends a blank space every 30 seconds to keep the connection alive
connPersist :: TMVar (BS.ByteString -> IO ()) -> IO () connPersist :: TMVar (BS.ByteString -> IO ()) -> IO ()

8
src/Network/XMPP/Concurrent/Types.hs

@ -38,11 +38,11 @@ data Thread = Thread { messagesRef :: IORef (Maybe ( TChan (Either
, writeRef :: TMVar (BS.ByteString -> IO () ) , writeRef :: TMVar (BS.ByteString -> IO () )
, readerThread :: ThreadId , readerThread :: ThreadId
, idGenerator :: IO StanzaId , idGenerator :: IO StanzaId
, conStateRef :: TMVar XMPPConState
} }
type XMPPThread a = ReaderT Thread IO a type XMPPThread a = ReaderT Thread IO a
data Interrupt = Interrupt (TMVar ()) deriving Typeable
data ReaderSignal = ReaderSignal (XMPPConMonad ()) deriving Typeable instance Show Interrupt where show _ = "<Interrupt>"
instance Show ReaderSignal where show _ = "<ReaderSignal>" instance Ex.Exception Interrupt
instance Ex.Exception ReaderSignal

61
src/Network/XMPP/Monad.hs

@ -5,7 +5,7 @@ module Network.XMPP.Monad where
import Control.Applicative((<$>)) import Control.Applicative((<$>))
import Control.Monad.IO.Class import Control.Monad.IO.Class
import Control.Monad.Trans.Class import Control.Monad.Trans.Class
import Control.Monad.Trans.Resource --import Control.Monad.Trans.Resource
import Control.Monad.Trans.State import Control.Monad.Trans.State
import Data.ByteString as BS import Data.ByteString as BS
@ -16,6 +16,7 @@ import Data.Text(Text)
import Data.XML.Pickle import Data.XML.Pickle
import Data.XML.Types import Data.XML.Types
import Network
import Network.XMPP.Types import Network.XMPP.Types
import Network.XMPP.Marshal import Network.XMPP.Marshal
import Network.XMPP.Pickle import Network.XMPP.Pickle
@ -41,7 +42,7 @@ pushOpen e = do
lift . sink $ openElementToEvents e lift . sink $ openElementToEvents e
return () return ()
pulls :: Sink Event (ResourceT IO) b -> XMPPConMonad b pulls :: Sink Event IO b -> XMPPConMonad b
pulls snk = do pulls snk = do
source <- gets sConSrc source <- gets sConSrc
(src', r) <- lift $ source $$+ snk (src', r) <- lift $ source $$+ snk
@ -63,15 +64,15 @@ xmppFromHandle :: Handle
-> Maybe Text -> Maybe Text
-> XMPPConMonad a -> XMPPConMonad a
-> IO (a, XMPPConState) -> IO (a, XMPPConState)
xmppFromHandle handle hostname username res f = runResourceT $ do xmppFromHandle handle hostname username res f = do
liftIO $ hSetBuffering handle NoBuffering liftIO $ hSetBuffering handle NoBuffering
let raw = CB.sourceHandle handle let raw = sourceHandle' handle
let src = raw $= XP.parseBytes def let src = raw $= XP.parseBytes def
let st = XMPPConState let st = XMPPConState
src src
(raw) (raw)
(\xs -> CL.sourceList xs (\xs -> CL.sourceList xs
$$ XR.renderBytes def =$ CB.sinkHandle handle) $$ XR.renderBytes def =$ sinkHandle' handle)
(BS.hPut handle) (BS.hPut handle)
(Just handle) (Just handle)
(SF Nothing [] []) (SF Nothing [] [])
@ -81,3 +82,53 @@ xmppFromHandle handle hostname username res f = runResourceT $ do
res res
runStateT f st runStateT f st
-- TODO: Once pullrequest has been merged, switch back to upstream
sourceHandle' :: MonadIO m => Handle -> Source m BS.ByteString
sourceHandle' h =
src
where
src = PipeM pull close
pull = do
bs <- liftIO (BS.hGetSome h 4096)
if BS.null bs
then return $ Done Nothing ()
else return $ HaveOutput src close bs
close = return ()
sinkHandle' :: MonadIO m
=> Handle
-> Sink BS.ByteString m ()
sinkHandle' h =
NeedInput push close
where
push input = PipeM
(liftIO (BS.hPut h input) >> return (NeedInput push close))
(return ())
close = return ()
xmppConnect :: HostName -> Text -> XMPPConMonad ()
xmppConnect host hostname = do
uname <- gets sUsername
con <- liftIO $ do
con <- connectTo host (PortNumber 5222)
hSetBuffering con NoBuffering
return con
let raw = sourceHandle' con
let src = raw $= XP.parseBytes def
let st = XMPPConState
src
(raw)
(\xs -> CL.sourceList xs
$$ XR.renderBytes def =$ sinkHandle' con)
(BS.hPut con)
(Just con)
(SF Nothing [] [])
False
hostname
uname
Nothing
put st
return ()

6
src/Network/XMPP/Stream.hs

@ -53,12 +53,12 @@ xmppRestartStream = do
xmppStartStream xmppStartStream
xmppStream :: Sink Event (ResourceT IO) ServerFeatures xmppStream :: Sink Event IO ServerFeatures
xmppStream = do xmppStream = do
xmppStreamHeader xmppStreamHeader
xmppStreamFeatures xmppStreamFeatures
xmppStreamHeader :: Sink Event (ResourceT IO) () xmppStreamHeader :: Sink Event IO ()
xmppStreamHeader = do xmppStreamHeader = do
throwOutJunk throwOutJunk
(ver, _, _) <- unpickleElem pickleStream <$> openElementFromEvents (ver, _, _) <- unpickleElem pickleStream <$> openElementFromEvents
@ -66,7 +66,7 @@ xmppStreamHeader = do
return() return()
xmppStreamFeatures :: Sink Event (ResourceT IO) ServerFeatures xmppStreamFeatures :: Sink Event IO ServerFeatures
xmppStreamFeatures = unpickleElem pickleStreamFeatures <$> elementFromEvents xmppStreamFeatures = unpickleElem pickleStreamFeatures <$> elementFromEvents

8
src/Network/XMPP/Types.hs

@ -608,9 +608,9 @@ data ServerFeatures = SF
} deriving Show } deriving Show
data XMPPConState = XMPPConState data XMPPConState = XMPPConState
{ sConSrc :: Source (ResourceT IO) Event { sConSrc :: Source IO Event
, sRawSrc :: Source (ResourceT IO) BS.ByteString , sRawSrc :: Source IO BS.ByteString
, sConPush :: [Event] -> ResourceT IO () , sConPush :: [Event] -> IO ()
, sConPushBS :: BS.ByteString -> IO () , sConPushBS :: BS.ByteString -> IO ()
, sConHandle :: Maybe Handle , sConHandle :: Maybe Handle
, sFeatures :: ServerFeatures , sFeatures :: ServerFeatures
@ -627,7 +627,7 @@ data XMPPConState = XMPPConState
newtype XMPPT m a = XMPPT { runXMPPT :: StateT XMPPConState m a } deriving (Monad, MonadIO) newtype XMPPT m a = XMPPT { runXMPPT :: StateT XMPPConState m a } deriving (Monad, MonadIO)
type XMPPConMonad a = StateT XMPPConState (ResourceT IO) a type XMPPConMonad a = StateT XMPPConState IO a
-- Make XMPPT derive the Monad and MonadIO instances. -- Make XMPPT derive the Monad and MonadIO instances.

7
src/Tests.hs

@ -78,11 +78,11 @@ runMain debug number = do
1 -> (testUser1, testUser2,True) 1 -> (testUser1, testUser2,True)
2 -> (testUser2, testUser1,False) 2 -> (testUser2, testUser1,False)
_ -> error "Need either 1 or 2" _ -> error "Need either 1 or 2"
let debug' = liftIO . atomically .
debug . (("Thread " ++ show number ++ ":") ++)
sessionConnect "localhost" sessionConnect "localhost"
"species64739.dyndns.org" "species64739.dyndns.org"
(fromJust $ node we) (resource we) $ do (fromJust $ node we) (resource we) $ do
let debug' = liftIO . atomically . debug .
(("Thread " ++ show number ++ ":") ++)
withConnection $ xmppSASL "pwd" withConnection $ xmppSASL "pwd"
xmppThreadedBind (resource we) xmppThreadedBind (resource we)
withConnection $ xmppSession withConnection $ xmppSession
@ -90,7 +90,6 @@ runMain debug number = do
forkXMPP autoAccept forkXMPP autoAccept
forkXMPP iqResponder forkXMPP iqResponder
-- sendS . SPresence $ Presence Nothing (Just them) Nothing (Just Subscribe) Nothing Nothing Nothing [] -- sendS . SPresence $ Presence Nothing (Just them) Nothing (Just Subscribe) Nothing Nothing Nothing []
let delay = if active then 1000000 else 5000000
when active . void . forkXMPP $ do when active . void . forkXMPP $ do
forM [1..10] $ \count -> do forM [1..10] $ \count -> do
let message = Text.pack . show $ node we let message = Text.pack . show $ node we
@ -100,7 +99,7 @@ runMain debug number = do
let answerPayload = unpickleElem payloadP let answerPayload = unpickleElem payloadP
(fromJust $ iqResultPayload answer) (fromJust $ iqResultPayload answer)
expect debug' (invertPayload payload) answerPayload expect debug' (invertPayload payload) answerPayload
liftIO $ threadDelay delay liftIO $ threadDelay 500000
sendUser "All tests done" sendUser "All tests done"
liftIO . forever $ threadDelay 10000000 liftIO . forever $ threadDelay 10000000
return () return ()

Loading…
Cancel
Save