diff --git a/pontarius-xmpp.cabal b/pontarius-xmpp.cabal index ea1735b..c54a7f3 100644 --- a/pontarius-xmpp.cabal +++ b/pontarius-xmpp.cabal @@ -82,7 +82,6 @@ Library , Network.Xmpp.Concurrent.Threads , Network.Xmpp.Concurrent.Monad , Text.XML.Stream.Elements - , Data.Conduit.BufferedSource , Data.Conduit.TLS , Network.Xmpp.Sasl.Common , Network.Xmpp.Sasl.StringPrep diff --git a/source/Data/Conduit/BufferedSource.hs b/source/Data/Conduit/BufferedSource.hs deleted file mode 100644 index efe6a76..0000000 --- a/source/Data/Conduit/BufferedSource.hs +++ /dev/null @@ -1,32 +0,0 @@ -{-# LANGUAGE DeriveDataTypeable #-} -module Data.Conduit.BufferedSource where - -import Control.Monad.IO.Class -import Control.Monad.Trans.Class -import Control.Exception -import Data.IORef -import Data.Conduit -import Data.Typeable(Typeable) -import qualified Data.Conduit.Internal as DCI -import qualified Data.Conduit.List as CL - -data SourceClosed = SourceClosed deriving (Show, Typeable) - -instance Exception SourceClosed - -newtype BufferedSource m o = BufferedSource - { bs :: IORef (ResumableSource m o) - } - - --- | Buffered source from conduit 0.3 -bufferSource :: Monad m => Source m o -> IO (BufferedSource m o) -bufferSource s = do - srcRef <- newIORef $ DCI.ResumableSource s (return ()) - return $ BufferedSource srcRef - -(.$$+) (BufferedSource bs) snk = do - src <- liftIO $ readIORef bs - (src', r) <- src $$++ snk - liftIO $ writeIORef bs src' - return r diff --git a/source/Network/Xmpp.hs b/source/Network/Xmpp.hs index efc07bd..03faca8 100644 --- a/source/Network/Xmpp.hs +++ b/source/Network/Xmpp.hs @@ -31,7 +31,7 @@ module Network.Xmpp Context , newContext , withConnection - , connect + , connectTcp , simpleConnect , startTLS , simpleAuth @@ -93,6 +93,7 @@ module Network.Xmpp , sendMessage -- *** Receiving , pullMessage + , getMessage , waitForMessage , waitForMessageError , filterMessages @@ -147,10 +148,13 @@ module Network.Xmpp -- * Miscellaneous , LangTag(..) , exampleParams + , PortID(..) + ) where import Data.XML.Types (Element) +import Network import Network.Xmpp.Bind import Network.Xmpp.Concurrent import Network.Xmpp.Concurrent.Channels diff --git a/source/Network/Xmpp/Basic.hs b/source/Network/Xmpp/Basic.hs index be2de9d..5cb3fb7 100644 --- a/source/Network/Xmpp/Basic.hs +++ b/source/Network/Xmpp/Basic.hs @@ -1,8 +1,7 @@ module Network.Xmpp.Basic - ( XmppConMonad - , XmppConnection(..) + ( Connection(..) , XmppConnectionState(..) - , connect + , connectTcp , simpleConnect , startTLS , simpleAuth diff --git a/source/Network/Xmpp/Bind.hs b/source/Network/Xmpp/Bind.hs index ef9c624..50e9fe7 100644 --- a/source/Network/Xmpp/Bind.hs +++ b/source/Network/Xmpp/Bind.hs @@ -28,15 +28,15 @@ bindBody = pickleElem $ -- Sends a (synchronous) IQ set request for a (`Just') given or server-generated -- resource and extract the JID from the non-error response. -xmppBind :: Maybe Text -> XmppConMonad Jid -xmppBind rsrc = do - answer <- xmppSendIQ' "bind" Nothing Set Nothing (bindBody rsrc) +xmppBind :: Maybe Text -> Connection -> IO Jid +xmppBind rsrc c = do + answer <- pushIQ' "bind" Nothing Set Nothing (bindBody rsrc) c jid <- case () of () | Right IQResult{iqResultPayload = Just b} <- answer , Right jid <- unpickleElem xpJid b -> return jid | otherwise -> throw $ StreamXMLError ("Bind couldn't unpickle JID from " ++ show answer) - modify (\s -> s{sJid = Just jid}) + withConnection (modify $ \s -> s{sJid = Just jid}) c return jid where -- Extracts the character data in the `jid' element. diff --git a/source/Network/Xmpp/Concurrent/Channels.hs b/source/Network/Xmpp/Concurrent/Channels.hs index 32d6ae9..cf89faf 100644 --- a/source/Network/Xmpp/Concurrent/Channels.hs +++ b/source/Network/Xmpp/Concurrent/Channels.hs @@ -92,8 +92,8 @@ toChans messageC presenceC stanzaC iqHands sta = atomically $ do -- | Creates and initializes a new Xmpp context. -newContext :: IO Context -newContext = do +newContext :: Connection -> IO Context +newContext con = do messageC <- newTChanIO presenceC <- newTChanIO outC <- newTChanIO @@ -101,7 +101,7 @@ newContext = do iqHandlers <- newTVarIO (Map.empty, Map.empty) eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () } let stanzaHandler = toChans messageC presenceC stanzaC iqHandlers - (kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh + (kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh con writer <- forkIO $ writeWorker outC wLock workermCh <- newIORef $ Nothing workerpCh <- newIORef $ Nothing @@ -113,7 +113,7 @@ newContext = do let sess = Session { writeRef = wLock , readerThread = readerThread , idGenerator = getId - , conStateRef = conState + , conRef = conState , eventHandlers = eh , stopThreads = kill >> killThread writer } diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs index b52571c..a48eb51 100644 --- a/source/Network/Xmpp/Concurrent/Monad.hs +++ b/source/Network/Xmpp/Concurrent/Monad.hs @@ -4,18 +4,9 @@ module Network.Xmpp.Concurrent.Monad where import Network.Xmpp.Types -import Control.Applicative((<$>)) -import Control.Concurrent import Control.Concurrent.STM -import Control.Concurrent.STM.TVar (TVar, readTVar, writeTVar) import qualified Control.Exception.Lifted as Ex -import Control.Monad.IO.Class import Control.Monad.Reader -import Control.Monad.State.Strict - -import Data.IORef -import qualified Data.Map as Map -import Data.Text(Text) import Network.Xmpp.Concurrent.Types import Network.Xmpp.Connection @@ -25,47 +16,47 @@ import Network.Xmpp.Connection -- TODO: Wait for presence error? --- | Run an XmppConMonad action in isolation. Reader and writer workers will be --- temporarily stopped and resumed with the new session details once the action --- returns. The action will run in the calling thread. Any uncaught exceptions --- will be interpreted as connection failure. -withConnection :: XmppConMonad a -> Session -> IO (Either StreamError a) -withConnection a session = do - wait <- newEmptyTMVarIO - Ex.mask_ $ do - -- Suspends the reader until the lock (wait) is released (set to `()'). - throwTo (readerThread session) $ Interrupt wait - -- We acquire the write and stateRef locks, to make sure that this is - -- the only thread that can write to the stream and to perform a - -- withConnection calculation. Afterwards, we release the lock and - -- fetches an updated state. - s <- Ex.catch - (atomically $ do - _ <- takeTMVar (writeRef session) - s <- takeTMVar (conStateRef session) - putTMVar wait () - return s - ) - -- If we catch an exception, we have failed to take the MVars above. - (\e -> atomically (putTMVar wait ()) >> - Ex.throwIO (e :: Ex.SomeException) - ) - -- Run the XmppMonad action, save the (possibly updated) states, release - -- the locks, and return the result. - Ex.catches - (do - (res, s') <- runStateT a s - atomically $ do - putTMVar (writeRef session) (cSend . sCon $ s') - putTMVar (conStateRef session) s' - return $ Right res - ) - -- We treat all Exceptions as fatal. If we catch a StreamError, we - -- return it. Otherwise, we throw an exception. - [ Ex.Handler $ \e -> return $ Left (e :: StreamError) - , Ex.Handler $ \e -> runStateT xmppKillConnection s - >> Ex.throwIO (e :: Ex.SomeException) - ] +-- -- | Run an XmppConMonad action in isolation. Reader and writer workers will be +-- -- temporarily stopped and resumed with the new session details once the action +-- -- returns. The action will run in the calling thread. Any uncaught exceptions +-- -- will be interpreted as connection failure. +-- withConnection :: XmppConMonad a -> Session -> IO (Either StreamError a) +-- withConnection a session = do +-- wait <- newEmptyTMVarIO +-- Ex.mask_ $ do +-- -- Suspends the reader until the lock (wait) is released (set to `()'). +-- throwTo (readerThread session) $ Interrupt wait +-- -- We acquire the write and stateRef locks, to make sure that this is +-- -- the only thread that can write to the stream and to perform a +-- -- withConnection calculation. Afterwards, we release the lock and +-- -- fetches an updated state. +-- s <- Ex.catch +-- (atomically $ do +-- _ <- takeTMVar (writeRef session) +-- s <- takeTMVar (conStateRef session) +-- putTMVar wait () +-- return s +-- ) +-- -- If we catch an exception, we have failed to take the MVars above. +-- (\e -> atomically (putTMVar wait ()) >> +-- Ex.throwIO (e :: Ex.SomeException) +-- ) +-- -- Run the XmppMonad action, save the (possibly updated) states, release +-- -- the locks, and return the result. +-- Ex.catches +-- (do +-- (res, s') <- runStateT a s +-- atomically $ do +-- putTMVar (writeRef session) (cSend . sCon $ s') +-- putTMVar (conStateRef session) s' +-- return $ Right res +-- ) +-- -- We treat all Exceptions as fatal. If we catch a StreamError, we +-- -- return it. Otherwise, we throw an exception. +-- [ Ex.Handler $ \e -> return $ Left (e :: StreamError) +-- , Ex.Handler $ \e -> runStateT xmppKillConnection s +-- >> Ex.throwIO (e :: Ex.SomeException) +-- ] -- | Executes a function to update the event handlers. modifyHandlers :: (EventHandlers -> EventHandlers) -> Session -> IO () @@ -93,7 +84,7 @@ runHandler h session = h =<< atomically (readTVar $ eventHandlers session) -- | End the current Xmpp session. endSession :: Session -> IO () endSession session = do -- TODO: This has to be idempotent (is it?) - void $ withConnection xmppKillConnection session + closeConnection session stopThreads session -- | Close the connection to the server. Closes the stream (by enforcing a @@ -101,14 +92,8 @@ endSession session = do -- TODO: This has to be idempotent (is it?) -- seconds, and then closes the connection. closeConnection :: Session -> IO () closeConnection session = Ex.mask_ $ do - send <- atomically $ takeTMVar (writeRef session) - cc <- cClose . sCon <$> ( atomically $ readTMVar (conStateRef session)) - send "" - void . forkIO $ do - threadDelay 3000000 - -- When we close the connection, we close the handle that was used in the - -- sCloseConnection above. So even if a new connection has been - -- established at this point, it will not be affected by this action. - (Ex.try cc) :: IO (Either Ex.SomeException ()) - return () - atomically $ putTMVar (writeRef session) (\_ -> return False) + (_send, connection) <- atomically $ liftM2 (,) + (takeTMVar $ writeRef session) + (takeTMVar $ conRef session) + _ <- closeStreams connection + return () diff --git a/source/Network/Xmpp/Concurrent/Threads.hs b/source/Network/Xmpp/Concurrent/Threads.hs index 6d8534e..e1f43e5 100644 --- a/source/Network/Xmpp/Concurrent/Threads.hs +++ b/source/Network/Xmpp/Concurrent/Threads.hs @@ -24,7 +24,7 @@ import GHC.IO (unsafeUnmask) -- all listener threads. readWorker :: (Stanza -> IO ()) -> (StreamError -> IO ()) - -> TMVar XmppConnection + -> TMVar Connection -> IO a readWorker onStanza onConnectionClosed stateRef = Ex.mask_ . forever $ do @@ -32,12 +32,13 @@ readWorker onStanza onConnectionClosed stateRef = -- we don't know whether pull will -- necessarily be interruptible s <- atomically $ do - sr <- readTMVar stateRef - when (sConnectionState sr == XmppConnectionClosed) + con@(Connection con_) <- readTMVar stateRef + state <- sConnectionState <$> readTMVar con_ + when (state == XmppConnectionClosed) retry - return sr + return con allowInterrupt - Just . fst <$> runStateT pullStanza s + Just <$> pullStanza s ) [ Ex.Handler $ \(Interrupt t) -> do void $ handleInterrupts [t] @@ -71,14 +72,16 @@ readWorker onStanza onConnectionClosed stateRef = -- connection. startThreadsWith :: (Stanza -> IO ()) -> TVar EventHandlers + -> Connection -> IO (IO (), TMVar (BS.ByteString -> IO Bool), - TMVar XmppConnection, + TMVar Connection, ThreadId) -startThreadsWith stanzaHandler eh = do - writeLock <- newTMVarIO (\_ -> return False) - conS <- newTMVarIO xmppNoConnection +startThreadsWith stanzaHandler eh con = do + read <- withConnection' (gets $ cSend. cHand) con + writeLock <- newTMVarIO read + conS <- newTMVarIO con -- lw <- forkIO $ writeWorker outC writeLock cp <- forkIO $ connPersist writeLock rd <- forkIO $ readWorker stanzaHandler (noCon eh) conS diff --git a/source/Network/Xmpp/Concurrent/Types.hs b/source/Network/Xmpp/Concurrent/Types.hs index 249077c..d71a9d7 100644 --- a/source/Network/Xmpp/Concurrent/Types.hs +++ b/source/Network/Xmpp/Concurrent/Types.hs @@ -8,9 +8,6 @@ import Control.Concurrent import Control.Concurrent.STM import qualified Data.ByteString as BS -import Data.IORef -import qualified Data.Map as Map -import Data.Text(Text) import Data.Typeable import Network.Xmpp.Types @@ -28,7 +25,7 @@ data Session = Session , idGenerator :: IO StanzaId -- | Lock (used by withConnection) to make sure that a maximum of one -- XmppConMonad action is executed at any given time. - , conStateRef :: TMVar XmppConnection + , conRef :: TMVar Connection , eventHandlers :: TVar EventHandlers , stopThreads :: IO () } diff --git a/source/Network/Xmpp/Connection.hs b/source/Network/Xmpp/Connection.hs index d96aae5..de4bd52 100644 --- a/source/Network/Xmpp/Connection.hs +++ b/source/Network/Xmpp/Connection.hs @@ -17,7 +17,7 @@ import Control.Monad.State.Strict import Data.ByteString as BS import Data.Conduit import Data.Conduit.Binary as CB -import Data.Conduit.BufferedSource +import Data.Conduit.Internal as DCI import qualified Data.Conduit.List as CL import Data.IORef import Data.Text(Text) @@ -41,41 +41,42 @@ import Text.XML.Unresolved(InvalidEventStream(..)) debug :: Bool debug = False -pushElement :: Element -> XmppConMonad Bool +pushElement :: Element -> StateT Connection_ IO Bool pushElement x = do - send <- gets (cSend . sCon) + send <- gets (cSend . cHand) liftIO . send $ renderElement x -- | Encode and send stanza -pushStanza :: Stanza -> XmppConMonad Bool -pushStanza = pushElement . pickleElem xpStanza +pushStanza :: Stanza -> Connection -> IO Bool +pushStanza s = withConnection' . pushElement $ pickleElem xpStanza s -- XML documents and XMPP streams SHOULD be preceeded by an XML declaration. -- UTF-8 is the only supported XMPP encoding. The standalone document -- declaration (matching "SDDecl" in the XML standard) MUST NOT be included in -- XMPP streams. RFC 6120 defines XMPP only in terms of XML 1.0. -pushXmlDecl :: XmppConMonad Bool +pushXmlDecl :: StateT Connection_ IO Bool pushXmlDecl = do - con <- gets sCon + con <- gets cHand liftIO $ (cSend con) "" -pushOpenElement :: Element -> XmppConMonad Bool +pushOpenElement :: Element -> StateT Connection_ IO Bool pushOpenElement e = do - sink <- gets (cSend . sCon ) + sink <- gets (cSend . cHand ) liftIO . sink $ renderOpenElement e -- `Connect-and-resumes' the given sink to the connection source, and pulls a -- `b' value. -pullToSinkEvents :: Sink Event IO b -> XmppConMonad b -pullToSinkEvents snk = do - source <- gets (cEventSource . sCon) - r <- lift $ source .$$+ snk +runEventsSink :: Sink Event IO b -> StateT Connection_ IO b +runEventsSink snk = do + source <- gets cEventSource + (src', r) <- lift $ source $$++ snk + modify (\s -> s{cEventSource = src'}) return r -pullElement :: XmppConMonad Element +pullElement :: StateT Connection_ IO Element pullElement = do Ex.catches (do - e <- pullToSinkEvents (elements =$ await) + e <- runEventsSink (elements =$ await) case e of Nothing -> liftIO $ Ex.throwIO StreamConnectionError Just r -> return r @@ -85,12 +86,11 @@ pullElement = do -> liftIO . Ex.throwIO $ StreamXMLError s) , Ex.Handler $ \(e :: InvalidEventStream) -> liftIO . Ex.throwIO $ StreamXMLError (show e) - ] -- Pulls an element and unpickles it. -pullPickle :: PU [Node] a -> XmppConMonad a -pullPickle p = do +pullUnpickle :: PU [Node] a -> StateT Connection_ IO a +pullUnpickle p = do res <- unpickleElem p <$> pullElement case res of Left e -> liftIO . Ex.throwIO $ StreamXMLError (show e) @@ -98,17 +98,17 @@ pullPickle p = do -- | Pulls a stanza (or stream error) from the stream. Throws an error on a stream -- error. -pullStanza :: XmppConMonad Stanza -pullStanza = do - res <- pullPickle xpStreamStanza +pullStanza :: Connection -> IO Stanza +pullStanza = withConnection' $ do + res <- pullUnpickle xpStreamStanza case res of Left e -> liftIO . Ex.throwIO $ StreamError e Right r -> return r -- Performs the given IO operation, catches any errors and re-throws everything -- except 'ResourceVanished' and IllegalOperation, in which case it will return False instead -catchSend :: IO () -> IO Bool -catchSend p = Ex.catch +catchPush :: IO () -> IO Bool +catchPush p = Ex.catch (p >> return True) (\e -> case GIE.ioe_type e of GIE.ResourceVanished -> return False @@ -116,16 +116,16 @@ catchSend p = Ex.catch _ -> Ex.throwIO e ) --- -- XmppConnection state used when there is no connection. -xmppNoConnection :: XmppConnection -xmppNoConnection = XmppConnection - { sCon = Connection { cSend = \_ -> return False - , cRecv = \_ -> Ex.throwIO - $ StreamConnectionError - , cEventSource = undefined - , cFlush = return () - , cClose = return () - } +-- -- Connection_ state used when there is no connection. +xmppNoConnection :: Connection_ +xmppNoConnection = Connection_ + { cHand = Hand { cSend = \_ -> return False + , cRecv = \_ -> Ex.throwIO + $ StreamConnectionError + , cFlush = return () + , cClose = return () + } + , cEventSource = DCI.ResumableSource zeroSource (return ()) , sFeatures = SF Nothing [] [] , sConnectionState = XmppConnectionClosed , sHostname = Nothing @@ -142,31 +142,30 @@ xmppNoConnection = XmppConnection zeroSource = liftIO . Ex.throwIO $ StreamConnectionError -- Connects to the given hostname on port 5222 (TODO: Make this dynamic) and --- updates the XmppConMonad XmppConnection state. -xmppConnectTCP :: HostName -> PortID -> Text -> XmppConMonad () -xmppConnectTCP host port hostname = do - hand <- liftIO $ do - h <- connectTo host port - hSetBuffering h NoBuffering - return h - eSource <- liftIO . bufferSource $ (sourceHandle hand) $= XP.parseBytes def - let con = Connection { cSend = if debug - then \d -> do - BS.putStrLn (BS.append "out: " d) - catchSend $ BS.hPut hand d - else catchSend . BS.hPut hand - , cRecv = if debug then - \n -> do - bs <- BS.hGetSome hand n - BS.putStrLn bs - return bs - else BS.hGetSome hand - , cEventSource = eSource - , cFlush = hFlush hand - , cClose = hClose hand - } - let st = XmppConnection - { sCon = con +-- updates the XmppConMonad Connection_ state. +connectTcpRaw :: HostName -> PortID -> Text -> IO Connection +connectTcpRaw host port hostname = do + h <- connectTo host port + hSetBuffering h NoBuffering + let eSource = DCI.ResumableSource (sourceHandle h $= XP.parseBytes def) + (return ()) + let hand = Hand { cSend = if debug + then \d -> do + BS.putStrLn (BS.append "out: " d) + catchPush $ BS.hPut h d + else catchPush . BS.hPut h + , cRecv = if debug then + \n -> do + bs <- BS.hGetSome h n + BS.putStrLn bs + return bs + else BS.hGetSome h + , cFlush = hFlush h + , cClose = hClose h + } + let con = Connection_ + { cHand = hand + , cEventSource = eSource , sFeatures = (SF Nothing [] []) , sConnectionState = XmppConnectionPlain , sHostname = (Just hostname) @@ -178,55 +177,48 @@ xmppConnectTCP host port hostname = do , sJidWhenPlain = False -- TODO: Allow user to set , sFrom = Nothing } - put st + mkConnection con --- Execute a XmppConMonad computation. -xmppNewSession :: XmppConMonad a -> IO (a, XmppConnection) -xmppNewSession action = runStateT action xmppNoConnection --- Closes the connection and updates the XmppConMonad XmppConnection state. -xmppKillConnection :: XmppConMonad (Either Ex.SomeException ()) -xmppKillConnection = do - cc <- gets (cClose . sCon) +-- Closes the connection and updates the XmppConMonad Connection_ state. +killConnection :: Connection -> IO (Either Ex.SomeException ()) +killConnection = withConnection $ do + cc <- gets (cClose . cHand) err <- liftIO $ (Ex.try cc :: IO (Either Ex.SomeException ())) put xmppNoConnection return err -xmppReplaceConnection :: XmppConnection -> XmppConMonad (Either Ex.SomeException ()) -xmppReplaceConnection newCon = do - cc <- gets (cClose . sCon) - err <- liftIO $ (Ex.try cc :: IO (Either Ex.SomeException ())) - put newCon - return err - -- Sends an IQ request and waits for the response. If the response ID does not -- match the outgoing ID, an error is thrown. -xmppSendIQ' :: StanzaId +pushIQ' :: StanzaId -> Maybe Jid -> IQRequestType -> Maybe LangTag -> Element - -> XmppConMonad (Either IQError IQResult) -xmppSendIQ' iqID to tp lang body = do - pushStanza . IQRequestS $ IQRequest iqID Nothing to lang tp body - res <- pullPickle $ xpEither xpIQError xpIQResult + -> Connection + -> IO (Either IQError IQResult) +pushIQ' iqID to tp lang body con = do + pushStanza (IQRequestS $ IQRequest iqID Nothing to lang tp body) con + res <- pullStanza con case res of - Left e -> return $ Left e - Right iq' -> do + IQErrorS e -> return $ Left e + IQResultS r -> do unless - (iqID == iqResultID iq') . liftIO . Ex.throwIO $ + (iqID == iqResultID r) . liftIO . Ex.throwIO $ StreamXMLError - ("In xmppSendIQ' IDs don't match: " ++ show iqID ++ " /= " ++ - show (iqResultID iq') ++ " .") - return $ Right iq' + ("In sendIQ' IDs don't match: " ++ show iqID ++ " /= " ++ + show (iqResultID r) ++ " .") + return $ Right r + _ -> liftIO . Ex.throwIO . StreamXMLError $ + "sendIQ': unexpected stanza type " -- | Send "" and wait for the server to finish processing and to -- close the connection. Any remaining elements from the server and whether or -- not we received a element from the server is returned. -xmppCloseStreams :: XmppConMonad ([Element], Bool) -xmppCloseStreams = do - send <- gets (cSend . sCon) - cc <- gets (cClose . sCon) +closeStreams :: Connection -> IO ([Element], Bool) +closeStreams = withConnection $ do + send <- gets (cSend . cHand) + cc <- gets (cClose . cHand) liftIO $ send "" void $ liftIO $ forkIO $ do threadDelay 3000000 @@ -236,18 +228,18 @@ xmppCloseStreams = do where -- Pulls elements from the stream until the stream ends, or an error is -- raised. - collectElems :: [Element] -> XmppConMonad ([Element], Bool) - collectElems elems = do + collectElems :: [Element] -> StateT Connection_ IO ([Element], Bool) + collectElems es = do result <- Ex.try pullElement case result of - Left StreamStreamEnd -> return (elems, True) - Left _ -> return (elems, False) - Right elem -> collectElems (elem:elems) + Left StreamStreamEnd -> return (es, True) + Left _ -> return (es, False) + Right e -> collectElems (e:es) debugConduit :: Pipe l ByteString ByteString u IO b debugConduit = forever $ do - s <- await - case s of + s' <- await + case s' of Just s -> do liftIO $ BS.putStrLn (BS.append "in: " s) yield s diff --git a/source/Network/Xmpp/Sasl.hs b/source/Network/Xmpp/Sasl.hs index bc809c5..5ad5d36 100644 --- a/source/Network/Xmpp/Sasl.hs +++ b/source/Network/Xmpp/Sasl.hs @@ -30,7 +30,6 @@ import Data.Text (Text) import qualified Data.Text.Encoding as Text import Network.Xmpp.Connection -import Network.Xmpp.Pickle import Network.Xmpp.Stream import Network.Xmpp.Types @@ -44,8 +43,9 @@ import Network.Xmpp.Sasl.Mechanisms -- success. xmppSasl :: [SaslHandler] -- ^ Acceptable authentication mechanisms and their -- corresponding handlers - -> XmppConMonad (Either AuthError ()) -xmppSasl handlers = do + -> Connection + -> IO (Either AuthError ()) +xmppSasl handlers = withConnection $ do -- Chooses the first mechanism that is acceptable by both the client and the -- server. mechanisms <- gets $ saslMechanisms . sFeatures @@ -57,5 +57,5 @@ xmppSasl handlers = do XmppConnectionClosed -> throwError AuthConnectionError _ -> do r <- handler - _ <- ErrorT $ left AuthStreamError <$> xmppRestartStream + _ <- ErrorT $ left AuthStreamError <$> restartStream return r diff --git a/source/Network/Xmpp/Sasl/Common.hs b/source/Network/Xmpp/Sasl/Common.hs index 7b71630..5d4164f 100644 --- a/source/Network/Xmpp/Sasl/Common.hs +++ b/source/Network/Xmpp/Sasl/Common.hs @@ -113,7 +113,7 @@ saslInit mechanism payload = lift . pushElement . saslInitE mechanism $ -- | Pull the next element. pullSaslElement :: SaslM SaslElement pullSaslElement = do - el <- lift $ pullPickle (xpEither xpFailure xpSaslElement) + el <- lift $ pullUnpickle (xpEither xpFailure xpSaslElement) case el of Left e ->throwError $ AuthSaslFailure e Right r -> return r diff --git a/source/Network/Xmpp/Sasl/Mechanisms/DigestMd5.hs b/source/Network/Xmpp/Sasl/Mechanisms/DigestMd5.hs index 806b13a..55bce2c 100644 --- a/source/Network/Xmpp/Sasl/Mechanisms/DigestMd5.hs +++ b/source/Network/Xmpp/Sasl/Mechanisms/DigestMd5.hs @@ -35,8 +35,6 @@ import Network.Xmpp.Connection import Network.Xmpp.Pickle import Network.Xmpp.Stream import Network.Xmpp.Types - - import Network.Xmpp.Sasl.Common import Network.Xmpp.Sasl.StringPrep import Network.Xmpp.Sasl.Types diff --git a/source/Network/Xmpp/Sasl/Mechanisms/Plain.hs b/source/Network/Xmpp/Sasl/Mechanisms/Plain.hs index 8ac5484..33a0170 100644 --- a/source/Network/Xmpp/Sasl/Mechanisms/Plain.hs +++ b/source/Network/Xmpp/Sasl/Mechanisms/Plain.hs @@ -36,9 +36,9 @@ import qualified Data.ByteString as BS import Data.XML.Types import Network.Xmpp.Connection -import Network.Xmpp.Pickle import Network.Xmpp.Stream import Network.Xmpp.Types +import Network.Xmpp.Pickle import qualified System.Random as Random diff --git a/source/Network/Xmpp/Sasl/Types.hs b/source/Network/Xmpp/Sasl/Types.hs index daa13ec..a11f9ef 100644 --- a/source/Network/Xmpp/Sasl/Types.hs +++ b/source/Network/Xmpp/Sasl/Types.hs @@ -29,7 +29,7 @@ data SaslElement = SaslSuccess (Maybe Text.Text) -- | SASL mechanism XmppConnection computation, with the possibility of throwing -- an authentication error. -type SaslM a = ErrorT AuthError (StateT XmppConnection IO) a +type SaslM a = ErrorT AuthError (StateT Connection_ IO) a type Pairs = [(ByteString, ByteString)] diff --git a/source/Network/Xmpp/Session.hs b/source/Network/Xmpp/Session.hs index 39c0fa5..70cb2c3 100644 --- a/source/Network/Xmpp/Session.hs +++ b/source/Network/Xmpp/Session.hs @@ -2,6 +2,7 @@ {-# LANGUAGE OverloadedStrings #-} module Network.Xmpp.Session where +import qualified Control.Exception as Ex import Control.Monad.Error import Data.Text as Text import Data.XML.Pickle @@ -10,6 +11,7 @@ import Network import qualified Network.TLS as TLS import Network.Xmpp.Bind import Network.Xmpp.Concurrent.Types +import Network.Xmpp.Concurrent.Channels import Network.Xmpp.Connection import Network.Xmpp.Marshal import Network.Xmpp.Pickle @@ -45,28 +47,31 @@ simpleConnect :: HostName -- ^ Host to connect to -> Text -- ^ Password -> Maybe Text -- ^ Desired resource (or Nothing to let the server -- decide) - -> XmppConMonad Jid + -> IO Context simpleConnect host port hostname username password resource = do - connect host port hostname - startTLS exampleParams - saslResponse <- simpleAuth username password resource + con' <- connectTcp host port hostname + con <- case con' of + Left e -> Ex.throwIO e + Right r -> return r + startTLS exampleParams con + saslResponse <- simpleAuth username password resource con case saslResponse of - Right jid -> return jid + Right jid -> newContext con Left e -> error $ show e -- | Connect to host with given address. -connect :: HostName -> PortID -> Text -> XmppConMonad (Either StreamError ()) -connect address port hostname = do - xmppConnectTCP address port hostname - result <- xmppStartStream +connectTcp :: HostName -> PortID -> Text -> IO (Either StreamError Connection) +connectTcp address port hostname = do + con <- connectTcpRaw address port hostname + result <- withConnection startStream con case result of Left e -> do - pushElement . pickleElem xpStreamError $ toError e - xmppCloseStreams - return () - Right () -> return () - return result + withConnection (pushElement . pickleElem xpStreamError $ toError e) + con + closeStreams con + return $ Left e + Right () -> return $ Right con where -- TODO: Descriptive texts in stream errors? toError (StreamNotStreamElement _name) = @@ -100,9 +105,9 @@ sessionIQ = IQRequestS $ IQRequest { iqRequestID = "sess" -- Sends the session IQ set element and waits for an answer. Throws an error if -- if an IQ error stanza is returned from the server. -xmppStartSession :: XmppConMonad () -xmppStartSession = do - answer <- xmppSendIQ' "session" Nothing Set Nothing sessionXML +startSession :: Connection -> IO () +startSession con = do + answer <- pushIQ' "session" Nothing Set Nothing sessionXML con case answer of Left e -> error $ show e Right _ -> return () @@ -111,11 +116,12 @@ xmppStartSession = do -- resource. auth :: [SaslHandler] -> Maybe Text - -> XmppConMonad (Either AuthError Jid) -auth mechanisms resource = runErrorT $ do - ErrorT $ xmppSasl mechanisms - jid <- lift $ xmppBind resource - lift $ xmppStartSession + -> Connection + -> IO (Either AuthError Jid) +auth mechanisms resource con = runErrorT $ do + ErrorT $ xmppSasl mechanisms con + jid <- lift $ xmppBind resource con + lift $ startSession con return jid -- | Authenticate to the server with the given username and password @@ -126,7 +132,8 @@ simpleAuth :: Text.Text -- ^ The username -> Text.Text -- ^ The password -> Maybe Text -- ^ The desired resource or 'Nothing' to let the -- server assign one - -> XmppConMonad (Either AuthError Jid) + -> Connection + -> IO (Either AuthError Jid) simpleAuth username passwd resource = flip auth resource $ [ -- TODO: scramSha1Plus scramSha1 username Nothing passwd diff --git a/source/Network/Xmpp/Stream.hs b/source/Network/Xmpp/Stream.hs index 1e70320..303db8c 100644 --- a/source/Network/Xmpp/Stream.hs +++ b/source/Network/Xmpp/Stream.hs @@ -7,11 +7,12 @@ module Network.Xmpp.Stream where import Control.Applicative ((<$>), (<*>)) import qualified Control.Exception as Ex import Control.Monad.Error +import Control.Monad.Reader import Control.Monad.State.Strict import qualified Data.ByteString as BS import Data.Conduit -import Data.Conduit.BufferedSource +import qualified Data.Conduit.Internal as DCI import Data.Conduit.List as CL import Data.Maybe (fromJust, isJust, isNothing) import Data.Text as Text @@ -61,8 +62,8 @@ openElementFromEvents = do _ -> throwError $ StreamConnectionError -- Sends the initial stream:stream element and pulls the server features. -xmppStartStream :: XmppConMonad (Either StreamError ()) -xmppStartStream = runErrorT $ do +startStream :: StateT Connection_ IO (Either StreamError ()) +startStream = runErrorT $ do state <- get -- Set the `to' attribute depending on the state of the connection. let from = case sConnectionState state of @@ -80,24 +81,24 @@ xmppStartStream = runErrorT $ do , Nothing , sPreferredLang state ) - (lt, from, id, features) <- ErrorT . pullToSinkEvents $ runErrorT $ - xmppStream from - modify (\s -> s { sFeatures = features - , sStreamLang = Just lt - , sStreamId = id - , sFrom = from - } - ) + (lt, from, id, features) <- ErrorT . runEventsSink $ runErrorT $ + streamS from + modify (\s -> s{ sFeatures = features + , sStreamLang = Just lt + , sStreamId = id + , sFrom = from + } ) return () -- Sets a new Event source using the raw source (of bytes) -- and calls xmppStartStream. -xmppRestartStream :: XmppConMonad (Either StreamError ()) -xmppRestartStream = do - raw <- gets (cRecv . sCon) - newSrc <- liftIO . bufferSource $ loopRead raw $= XP.parseBytes def - modify (\s -> s{sCon = (sCon s){cEventSource = newSrc}}) - xmppStartStream +restartStream :: StateT Connection_ IO (Either StreamError ()) +restartStream = do + raw <- gets (cRecv . cHand) + let newSource = DCI.ResumableSource (loopRead raw $= XP.parseBytes def) + (return ()) + modify (\s -> s{cEventSource = newSource }) + startStream where loopRead read = do bs <- liftIO (read 4096) @@ -109,11 +110,11 @@ xmppRestartStream = do -- Also validates the stream element's attributes and throws an error if -- appropriate. -- TODO: from. -xmppStream :: Maybe Jid -> StreamSink ( LangTag +streamS :: Maybe Jid -> StreamSink ( LangTag , Maybe Jid , Maybe Text , ServerFeatures) -xmppStream expectedTo = do +streamS expectedTo = do (from, to, id, langTag) <- xmppStreamHeader features <- xmppStreamFeatures return (langTag, from, id, features) diff --git a/source/Network/Xmpp/TLS.hs b/source/Network/Xmpp/TLS.hs index 161dc90..6ffa991 100644 --- a/source/Network/Xmpp/TLS.hs +++ b/source/Network/Xmpp/TLS.hs @@ -86,15 +86,17 @@ instance Error XmppTLSError where -- Pushes ", waits for "", performs the TLS handshake, and -- restarts the stream. May throw errors. -startTLS :: TLS.TLSParams -> XmppConMonad (Either XmppTLSError ()) -startTLS params = Ex.handle (return . Left . TLSError) . runErrorT $ do +startTLS :: TLS.TLSParams -> Connection -> IO (Either XmppTLSError ()) +startTLS params con = Ex.handle (return . Left . TLSError) + . flip withConnection con + . runErrorT $ do features <- lift $ gets sFeatures state <- gets sConnectionState case state of XmppConnectionPlain -> return () XmppConnectionClosed -> throwError TLSNoConnection XmppConnectionSecured -> throwError TLSConnectionSecured - con <- lift $ gets sCon + con <- lift $ gets cHand when (stls features == Nothing) $ throwError TLSNoServerSupport lift $ pushElement starttlsE answer <- lift $ pullElement @@ -105,15 +107,13 @@ startTLS params = Ex.handle (return . Left . TLSError) . runErrorT $ do -- TODO: find something more suitable e -> lift . Ex.throwIO . StreamXMLError $ "Unexpected element: " ++ ppElement e - liftIO $ putStrLn "#" (raw, _snk, psh, read, ctx) <- lift $ TLS.tlsinit debug params (mkBackend con) - liftIO $ putStrLn "*" - let newCon = Connection { cSend = catchSend . psh - , cRecv = read - , cFlush = contextFlush ctx - , cClose = bye ctx >> cClose con - } - lift $ modify ( \x -> x {sCon = newCon}) - either (lift . Ex.throwIO) return =<< lift xmppRestartStream + let newHand = Hand { cSend = catchPush . psh + , cRecv = read + , cFlush = contextFlush ctx + , cClose = bye ctx >> cClose con + } + lift $ modify ( \x -> x {cHand = newHand}) + either (lift . Ex.throwIO) return =<< lift restartStream modify (\s -> s{sConnectionState = XmppConnectionSecured}) return () diff --git a/source/Network/Xmpp/Types.hs b/source/Network/Xmpp/Types.hs index 3806477..6ad94b9 100644 --- a/source/Network/Xmpp/Types.hs +++ b/source/Network/Xmpp/Types.hs @@ -31,11 +31,13 @@ module Network.Xmpp.Types , StreamError(..) , StreamErrorCondition(..) , Version(..) - , XmppConMonad + , HandleLike(..) , Connection(..) - , XmppConnection(..) + , Connection_(..) + , withConnection + , withConnection' + , mkConnection , XmppConnectionState(..) - , XmppT(..) , XmppStreamError(..) , langTag , module Network.Xmpp.Jid @@ -43,15 +45,15 @@ module Network.Xmpp.Types where import Control.Applicative ((<$>), many) +import Control.Concurrent.STM import Control.Exception +import Control.Monad.Error import Control.Monad.IO.Class import Control.Monad.State.Strict -import Control.Monad.Error import qualified Data.Attoparsec.Text as AP import qualified Data.ByteString as BS import Data.Conduit -import Data.Conduit.BufferedSource import Data.IORef import Data.Maybe (fromJust, fromMaybe, maybeToList) import Data.String(IsString(..)) @@ -60,6 +62,7 @@ import qualified Data.Text as Text import Data.Typeable(Typeable) import Data.XML.Types + import qualified Network as N import Network.Xmpp.Jid @@ -743,54 +746,68 @@ data XmppConnectionState | XmppConnectionSecured -- ^ Connection established and secured via TLS. deriving (Show, Eq, Typeable) -data Connection = Connection { cSend :: BS.ByteString -> IO Bool - , cRecv :: Int -> IO BS.ByteString - -- This is to hold the state of the XML parser - -- (otherwise we will receive lot's of EvenBegin - -- Document and forger about name prefixes) - , cEventSource :: BufferedSource IO Event - - , cFlush :: IO () - , cClose :: IO () - } - -data XmppConnection = XmppConnection - { sCon :: Connection - , sFeatures :: !ServerFeatures -- ^ Features the server - -- advertised - , sConnectionState :: !XmppConnectionState -- ^ State of connection - , sHostname :: !(Maybe Text) -- ^ Hostname of the server - , sJid :: !(Maybe Jid) -- ^ Our JID - , sPreferredLang :: !(Maybe LangTag) -- ^ Default language when - -- no explicit language - -- tag is set - , sStreamLang :: !(Maybe LangTag) -- ^ Will be a `Just' value - -- once connected to the - -- server. - , sStreamId :: !(Maybe Text) -- ^ Stream ID as specified by - -- the server. - , sToJid :: !(Maybe Jid) -- ^ JID to include in the - -- stream element's `to' - -- attribute when the - -- connection is secured. See - -- also below. - , sJidWhenPlain :: !Bool -- ^ Whether or not to also include the - -- Jid when the connection is plain. - , sFrom :: !(Maybe Jid) -- ^ From as specified by the - -- server in the stream - -- element's `from' - -- attribute. - } - --- | --- The Xmpp monad transformer. Contains internal state in order to --- work with Pontarius. Pontarius clients needs to operate in this --- context. -newtype XmppT m a = XmppT { runXmppT :: StateT XmppConnection m a } deriving (Monad, MonadIO) - --- | Low-level and single-threaded Xmpp monad. See @Xmpp@ for a concurrent --- implementation. -type XmppConMonad a = StateT XmppConnection IO a - --- Make XmppT derive the Monad and MonadIO instances. -deriving instance (Monad m, MonadIO m) => MonadState (XmppConnection) (XmppT m) +data HandleLike = Hand { cSend :: BS.ByteString -> IO Bool + , cRecv :: Int -> IO BS.ByteString + -- This is to hold the state of the XML parser + -- (otherwise we will receive lot's of EvenBegin + -- Document and forger about name prefixes) + , cFlush :: IO () + , cClose :: IO () + } + +data Connection_ = Connection_ + { sConnectionState :: !XmppConnectionState -- ^ State of + -- connection + , cHand :: HandleLike + , cEventSource :: ResumableSource IO Event + , sFeatures :: !ServerFeatures -- ^ Features the server + -- advertised + , sHostname :: !(Maybe Text) -- ^ Hostname of the + -- server + , sJid :: !(Maybe Jid) -- ^ Our JID + , sPreferredLang :: !(Maybe LangTag) -- ^ Default language + -- when no explicit + -- language tag is set + , sStreamLang :: !(Maybe LangTag) -- ^ Will be a `Just' + -- value once connected + -- to the server. + , sStreamId :: !(Maybe Text) -- ^ Stream ID as + -- specified by the + -- server. + , sToJid :: !(Maybe Jid) -- ^ JID to include in the + -- stream element's `to' + -- attribute when the + -- connection is + -- secured. See also below. + , sJidWhenPlain :: !Bool -- ^ Whether or not to also + -- include the Jid when the + -- connection is plain. + , sFrom :: !(Maybe Jid) -- ^ From as specified by + -- the server in the + -- stream element's `from' + -- attribute. + } + + +newtype Connection = Connection {unConnection :: TMVar Connection_} + +withConnection :: StateT Connection_ IO c -> Connection -> IO c +withConnection action (Connection con) = bracketOnError + (atomically $ takeTMVar con) + (atomically . putTMVar con ) + (\c -> do + (r, c') <- runStateT action c + atomically $ putTMVar con c' + return r + ) + +-- nonblocking version. Changes to the connection are ignored! +withConnection' :: StateT Connection_ IO b -> Connection -> IO b +withConnection' action (Connection con) = do + con_ <- atomically $ readTMVar con + (r, _) <- runStateT action con_ + return r + + +mkConnection :: Connection_ -> IO Connection +mkConnection con = Connection `fmap` (atomically $ newTMVar con) diff --git a/source/Network/Xmpp/Xep/InbandRegistration.hs b/source/Network/Xmpp/Xep/InbandRegistration.hs index 4641f37..7f433ef 100644 --- a/source/Network/Xmpp/Xep/InbandRegistration.hs +++ b/source/Network/Xmpp/Xep/InbandRegistration.hs @@ -47,33 +47,33 @@ data Query = Query { instructions :: Maybe Text.Text emptyQuery = Query Nothing False False [] -supported :: XmppConMonad (Either IbrError Bool) -supported = runErrorT $ fromFeatures <+> fromDisco - where - fromFeatures = do - fs <- other <$> gets sFeatures - let fe = XML.Element - "{http://jabber.org/features/iq-register}register" - [] - [] - return $ fe `elem` fs - fromDisco = do - hn' <- gets sHostname - hn <- case hn' of - Just h -> return (Jid Nothing h Nothing) - Nothing -> throwError IbrNoConnection - qi <- lift $ xmppQueryInfo Nothing Nothing - case qi of - Left e -> return False - Right qir -> return $ "jabber:iq:register" `elem` qiFeatures qir - f <+> g = do - r <- f - if r then return True else g - - -query :: IQRequestType -> Query -> XmppConMonad (Either IbrError Query) -query queryType x = do - answer <- xmppSendIQ' "ibr" Nothing queryType Nothing (pickleElem xpQuery x) +-- supported :: XmppConMonad (Either IbrError Bool) +-- supported = runErrorT $ fromFeatures <+> fromDisco +-- where +-- fromFeatures = do +-- fs <- other <$> gets sFeatures +-- let fe = XML.Element +-- "{http://jabber.org/features/iq-register}register" +-- [] +-- [] +-- return $ fe `elem` fs +-- fromDisco = do +-- hn' <- gets sHostname +-- hn <- case hn' of +-- Just h -> return (Jid Nothing h Nothing) +-- Nothing -> throwError IbrNoConnection +-- qi <- lift $ xmppQueryInfo Nothing Nothing +-- case qi of +-- Left e -> return False +-- Right qir -> return $ "jabber:iq:register" `elem` qiFeatures qir +-- f <+> g = do +-- r <- f +-- if r then return True else g + + +query :: IQRequestType -> Query -> Connection -> IO (Either IbrError Query) +query queryType x con = do + answer <- pushIQ' "ibr" Nothing queryType Nothing (pickleElem xpQuery x) con case answer of Right IQResult{iqResultPayload = Just b} -> case unpickleElem xpQuery b of @@ -96,9 +96,11 @@ mapError f = mapErrorT (liftM $ left f) -- | Retrieve the necessary fields and fill them in to register an account with -- the server -registerWith :: [(Field, Text.Text)] -> XmppConMonad (Either RegisterError Query) -registerWith givenFields = runErrorT $ do - fs <- mapError IbrError $ ErrorT requestFields +registerWith :: [(Field, Text.Text)] + -> Connection + -> IO (Either RegisterError Query) +registerWith givenFields con = runErrorT $ do + fs <- mapError IbrError . ErrorT $ requestFields con when (registered fs) . throwError $ AlreadyRegistered let res = flip map (fields fs) $ \(field,_) -> case lookup field givenFields of @@ -107,18 +109,18 @@ registerWith givenFields = runErrorT $ do fields <- case partitionEithers res of ([],fs) -> return fs (fs,_) -> throwError $ MissingFields fs - result <- mapError IbrError . ErrorT . query Set $ emptyQuery {fields} + result <- mapError IbrError . ErrorT $ query Set (emptyQuery {fields}) con return result -- | Terminate your account on the server. You have to be logged in for this to -- work. You connection will most likely be terminated after unregistering. -unregister :: XmppConMonad (Either IbrError Query) +unregister :: Connection -> IO (Either IbrError Query) unregister = query Set $ emptyQuery {remove = True} -requestFields = runErrorT $ do +requestFields con = runErrorT $ do -- supp <- ErrorT supported -- unless supp $ throwError $ IbrNotSupported - qr <- ErrorT $ query Get emptyQuery + qr <- ErrorT $ query Get emptyQuery con return $ qr xpQuery :: PU [XML.Node] Query diff --git a/source/Network/Xmpp/Xep/ServiceDiscovery.hs b/source/Network/Xmpp/Xep/ServiceDiscovery.hs index a85fee4..29fcf83 100644 --- a/source/Network/Xmpp/Xep/ServiceDiscovery.hs +++ b/source/Network/Xmpp/Xep/ServiceDiscovery.hs @@ -89,8 +89,8 @@ queryInfo :: Jid -- ^ Entity to query -> Maybe Text.Text -- ^ Node -> Context -> IO (Either DiscoError QueryInfoResult) -queryInfo to node session = do - res <- sendIQ' (Just to) Get Nothing queryBody session +queryInfo to node context = do + res <- sendIQ' (Just to) Get Nothing queryBody context return $ case res of IQResponseError e -> Left $ DiscoIQError e IQResponseTimeout -> Left $ DiscoTimeout @@ -105,9 +105,10 @@ queryInfo to node session = do xmppQueryInfo :: Maybe Jid -> Maybe Text.Text - -> XmppConMonad (Either DiscoError QueryInfoResult) -xmppQueryInfo to node = do - res <- xmppSendIQ' "info" to Get Nothing queryBody + -> Connection + -> IO (Either DiscoError QueryInfoResult) +xmppQueryInfo to node con = do + res <- pushIQ' "info" to Get Nothing queryBody con return $ case res of Left e -> Left $ DiscoIQError e Right r -> case iqResultPayload r of