Browse Source
Conflicts: pontarius-xmpp.cabal source/Network/Xmpp/Basic.hs source/Network/Xmpp/Concurrent/Channels/Types.hs source/Network/Xmpp/Connection.hs source/Network/Xmpp/Errors.hs source/Network/Xmpp/Marshal.hs source/Network/Xmpp/Pickle.hs source/Network/Xmpp/Session.hs source/Network/Xmpp/Stream.hs source/Network/Xmpp/Tls.hs source/Network/Xmpp/Types.hs source/Network/Xmpp/Xep/InbandRegistration.hsmaster
34 changed files with 982 additions and 862 deletions
@ -1,2 +1,2 @@ |
|||||||
Pontarius is an active work in progress to build a Haskell XMPP library that |
Pontarius XMPP is an active work in progress to build a Haskell XMPP library |
||||||
implements the client capabilities of RFC 6120 ("XMPP Core"). |
that implements the client capabilities of RFC 6120 ("XMPP Core"). |
||||||
|
|||||||
|
After Width: | Height: | Size: 197 KiB |
@ -1,12 +1,113 @@ |
|||||||
{-# OPTIONS_HADDOCK hide #-} |
{-# OPTIONS_HADDOCK hide #-} |
||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
module Network.Xmpp.Concurrent |
module Network.Xmpp.Concurrent |
||||||
( Context |
( module Network.Xmpp.Concurrent.Monad |
||||||
, module Network.Xmpp.Concurrent.Monad |
|
||||||
, module Network.Xmpp.Concurrent.Threads |
, module Network.Xmpp.Concurrent.Threads |
||||||
, module Network.Xmpp.Concurrent.Channels |
, module Network.Xmpp.Concurrent.Basic |
||||||
|
, module Network.Xmpp.Concurrent.Types |
||||||
|
, module Network.Xmpp.Concurrent.Message |
||||||
|
, module Network.Xmpp.Concurrent.Presence |
||||||
|
, module Network.Xmpp.Concurrent.IQ |
||||||
|
, toChans |
||||||
|
, newSession |
||||||
|
, writeWorker |
||||||
) where |
) where |
||||||
|
|
||||||
import Network.Xmpp.Concurrent.Types |
|
||||||
import Network.Xmpp.Concurrent.Monad |
import Network.Xmpp.Concurrent.Monad |
||||||
import Network.Xmpp.Concurrent.Threads |
import Network.Xmpp.Concurrent.Threads |
||||||
import Network.Xmpp.Concurrent.Channels |
import Control.Applicative((<$>),(<*>)) |
||||||
|
import Control.Concurrent |
||||||
|
import Control.Concurrent.STM |
||||||
|
import Control.Monad |
||||||
|
import qualified Data.ByteString as BS |
||||||
|
import Data.IORef |
||||||
|
import qualified Data.Map as Map |
||||||
|
import Data.Maybe (fromMaybe) |
||||||
|
import Data.XML.Types |
||||||
|
import Network.Xmpp.Concurrent.Basic |
||||||
|
import Network.Xmpp.Concurrent.IQ |
||||||
|
import Network.Xmpp.Concurrent.Message |
||||||
|
import Network.Xmpp.Concurrent.Presence |
||||||
|
import Network.Xmpp.Concurrent.Types |
||||||
|
import Network.Xmpp.Concurrent.Threads |
||||||
|
import Network.Xmpp.Marshal |
||||||
|
import Network.Xmpp.Pickle |
||||||
|
import Network.Xmpp.Types |
||||||
|
import Text.Xml.Stream.Elements |
||||||
|
|
||||||
|
import Control.Monad.Error |
||||||
|
|
||||||
|
toChans :: TChan Stanza |
||||||
|
-> TVar IQHandlers |
||||||
|
-> Stanza |
||||||
|
-> IO () |
||||||
|
toChans stanzaC iqHands sta = atomically $ do |
||||||
|
writeTChan stanzaC sta |
||||||
|
case sta of |
||||||
|
IQRequestS i -> handleIQRequest iqHands i |
||||||
|
IQResultS i -> handleIQResponse iqHands (Right i) |
||||||
|
IQErrorS i -> handleIQResponse iqHands (Left i) |
||||||
|
_ -> return () |
||||||
|
where |
||||||
|
-- If the IQ request has a namespace, send it through the appropriate channel. |
||||||
|
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () |
||||||
|
handleIQRequest handlers iq = do |
||||||
|
(byNS, _) <- readTVar handlers |
||||||
|
let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq) |
||||||
|
case Map.lookup (iqRequestType iq, iqNS) byNS of |
||||||
|
Nothing -> return () -- TODO: send error stanza |
||||||
|
Just ch -> do |
||||||
|
sent <- newTVar False |
||||||
|
writeTChan ch $ IQRequestTicket sent iq |
||||||
|
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM () |
||||||
|
handleIQResponse handlers iq = do |
||||||
|
(byNS, byID) <- readTVar handlers |
||||||
|
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of |
||||||
|
(Nothing, _) -> return () -- We are not supposed to send an error. |
||||||
|
(Just tmvar, byID') -> do |
||||||
|
let answer = either IQResponseError IQResponseResult iq |
||||||
|
_ <- tryPutTMVar tmvar answer -- Don't block. |
||||||
|
writeTVar handlers (byNS, byID') |
||||||
|
where |
||||||
|
iqID (Left err) = iqErrorID err |
||||||
|
iqID (Right iq') = iqResultID iq' |
||||||
|
|
||||||
|
|
||||||
|
-- | Creates and initializes a new Xmpp context. |
||||||
|
newSession :: TMVar Connection -> IO (Either XmppFailure Session) |
||||||
|
newSession con = runErrorT $ do |
||||||
|
outC <- lift newTChanIO |
||||||
|
stanzaChan <- lift newTChanIO |
||||||
|
iqHandlers <- lift $ newTVarIO (Map.empty, Map.empty) |
||||||
|
eh <- lift $ newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () } |
||||||
|
let stanzaHandler = toChans stanzaChan iqHandlers |
||||||
|
(kill, wLock, conState, readerThread) <- ErrorT $ startThreadsWith stanzaHandler eh con |
||||||
|
writer <- lift $ forkIO $ writeWorker outC wLock |
||||||
|
idRef <- lift $ newTVarIO 1 |
||||||
|
let getId = atomically $ do |
||||||
|
curId <- readTVar idRef |
||||||
|
writeTVar idRef (curId + 1 :: Integer) |
||||||
|
return . read. show $ curId |
||||||
|
return $ Session { stanzaCh = stanzaChan |
||||||
|
, outCh = outC |
||||||
|
, iqHandlers = iqHandlers |
||||||
|
, writeRef = wLock |
||||||
|
, readerThread = readerThread |
||||||
|
, idGenerator = getId |
||||||
|
, conRef = conState |
||||||
|
, eventHandlers = eh |
||||||
|
, stopThreads = kill >> killThread writer |
||||||
|
} |
||||||
|
|
||||||
|
-- Worker to write stanzas to the stream concurrently. |
||||||
|
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO () |
||||||
|
writeWorker stCh writeR = forever $ do |
||||||
|
(write, next) <- atomically $ (,) <$> |
||||||
|
takeTMVar writeR <*> |
||||||
|
readTChan stCh |
||||||
|
r <- write $ renderElement (pickleElem xpStanza next) |
||||||
|
atomically $ putTMVar writeR write |
||||||
|
unless r $ do |
||||||
|
atomically $ unGetTChan stCh next -- If the writing failed, the |
||||||
|
-- connection is dead. |
||||||
|
threadDelay 250000 -- Avoid free spinning. |
||||||
|
|||||||
@ -1,8 +1,8 @@ |
|||||||
{-# OPTIONS_HADDOCK hide #-} |
{-# OPTIONS_HADDOCK hide #-} |
||||||
module Network.Xmpp.Concurrent.Channels.Basic where |
module Network.Xmpp.Concurrent.Basic where |
||||||
|
|
||||||
import Control.Concurrent.STM |
import Control.Concurrent.STM |
||||||
import Network.Xmpp.Concurrent.Channels.Types |
import Network.Xmpp.Concurrent.Types |
||||||
import Network.Xmpp.Types |
import Network.Xmpp.Types |
||||||
|
|
||||||
-- | Send a stanza to the server. |
-- | Send a stanza to the server. |
||||||
@ -1,112 +0,0 @@ |
|||||||
{-# OPTIONS_HADDOCK hide #-} |
|
||||||
{-# LANGUAGE OverloadedStrings #-} |
|
||||||
module Network.Xmpp.Concurrent.Channels |
|
||||||
( module Network.Xmpp.Concurrent.Channels.Basic |
|
||||||
, module Network.Xmpp.Concurrent.Channels.Types |
|
||||||
, module Network.Xmpp.Concurrent.Channels.Message |
|
||||||
, module Network.Xmpp.Concurrent.Channels.Presence |
|
||||||
, module Network.Xmpp.Concurrent.Channels.IQ |
|
||||||
, toChans |
|
||||||
, newSession |
|
||||||
, writeWorker |
|
||||||
) |
|
||||||
|
|
||||||
where |
|
||||||
|
|
||||||
import Control.Applicative((<$>),(<*>)) |
|
||||||
import Control.Concurrent |
|
||||||
import Control.Concurrent.STM |
|
||||||
import Control.Monad |
|
||||||
import qualified Data.ByteString as BS |
|
||||||
import Data.IORef |
|
||||||
import qualified Data.Map as Map |
|
||||||
import Data.Maybe (fromMaybe) |
|
||||||
import Data.XML.Types |
|
||||||
import Network.Xmpp.Concurrent.Channels.Basic |
|
||||||
import Network.Xmpp.Concurrent.Channels.IQ |
|
||||||
import Network.Xmpp.Concurrent.Channels.Message |
|
||||||
import Network.Xmpp.Concurrent.Channels.Presence |
|
||||||
import Network.Xmpp.Concurrent.Channels.Types |
|
||||||
import Network.Xmpp.Concurrent.Threads |
|
||||||
import Network.Xmpp.Concurrent.Types |
|
||||||
import Network.Xmpp.Marshal |
|
||||||
import Network.Xmpp.Pickle |
|
||||||
import Network.Xmpp.Types |
|
||||||
import Text.XML.Stream.Elements |
|
||||||
|
|
||||||
toChans :: TChan Stanza |
|
||||||
-> TVar IQHandlers |
|
||||||
-> Stanza |
|
||||||
-> IO () |
|
||||||
toChans stanzaC iqHands sta = atomically $ do |
|
||||||
writeTChan stanzaC sta |
|
||||||
case sta of |
|
||||||
IQRequestS i -> handleIQRequest iqHands i |
|
||||||
IQResultS i -> handleIQResponse iqHands (Right i) |
|
||||||
IQErrorS i -> handleIQResponse iqHands (Left i) |
|
||||||
_ -> return () |
|
||||||
where |
|
||||||
-- If the IQ request has a namespace, send it through the appropriate channel. |
|
||||||
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () |
|
||||||
handleIQRequest handlers iq = do |
|
||||||
(byNS, _) <- readTVar handlers |
|
||||||
let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq) |
|
||||||
case Map.lookup (iqRequestType iq, iqNS) byNS of |
|
||||||
Nothing -> return () -- TODO: send error stanza |
|
||||||
Just ch -> do |
|
||||||
sent <- newTVar False |
|
||||||
writeTChan ch $ IQRequestTicket sent iq |
|
||||||
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM () |
|
||||||
handleIQResponse handlers iq = do |
|
||||||
(byNS, byID) <- readTVar handlers |
|
||||||
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of |
|
||||||
(Nothing, _) -> return () -- We are not supposed to send an error. |
|
||||||
(Just tmvar, byID') -> do |
|
||||||
let answer = either IQResponseError IQResponseResult iq |
|
||||||
_ <- tryPutTMVar tmvar answer -- Don't block. |
|
||||||
writeTVar handlers (byNS, byID') |
|
||||||
where |
|
||||||
iqID (Left err) = iqErrorID err |
|
||||||
iqID (Right iq') = iqResultID iq' |
|
||||||
|
|
||||||
|
|
||||||
-- | Creates and initializes a new Xmpp context. |
|
||||||
newSession :: Connection -> IO Session |
|
||||||
newSession con = do |
|
||||||
outC <- newTChanIO |
|
||||||
stanzaChan <- newTChanIO |
|
||||||
iqHandlers <- newTVarIO (Map.empty, Map.empty) |
|
||||||
eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () } |
|
||||||
let stanzaHandler = toChans stanzaChan iqHandlers |
|
||||||
(kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh con |
|
||||||
writer <- forkIO $ writeWorker outC wLock |
|
||||||
idRef <- newTVarIO 1 |
|
||||||
let getId = atomically $ do |
|
||||||
curId <- readTVar idRef |
|
||||||
writeTVar idRef (curId + 1 :: Integer) |
|
||||||
return . read. show $ curId |
|
||||||
let cont = Context { writeRef = wLock |
|
||||||
, readerThread = readerThread |
|
||||||
, idGenerator = getId |
|
||||||
, conRef = conState |
|
||||||
, eventHandlers = eh |
|
||||||
, stopThreads = kill >> killThread writer |
|
||||||
} |
|
||||||
return $ Session { context = cont |
|
||||||
, stanzaCh = stanzaChan |
|
||||||
, outCh = outC |
|
||||||
, iqHandlers = iqHandlers |
|
||||||
} |
|
||||||
|
|
||||||
-- Worker to write stanzas to the stream concurrently. |
|
||||||
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO () |
|
||||||
writeWorker stCh writeR = forever $ do |
|
||||||
(write, next) <- atomically $ (,) <$> |
|
||||||
takeTMVar writeR <*> |
|
||||||
readTChan stCh |
|
||||||
r <- write $ renderElement (pickleElem xpStanza next) |
|
||||||
atomically $ putTMVar writeR write |
|
||||||
unless r $ do |
|
||||||
atomically $ unGetTChan stCh next -- If the writing failed, the |
|
||||||
-- connection is dead. |
|
||||||
threadDelay 250000 -- Avoid free spinning. |
|
||||||
@ -1,12 +1,12 @@ |
|||||||
{-# OPTIONS_HADDOCK hide #-} |
{-# OPTIONS_HADDOCK hide #-} |
||||||
module Network.Xmpp.Concurrent.Channels.Message where |
module Network.Xmpp.Concurrent.Message where |
||||||
|
|
||||||
import Network.Xmpp.Concurrent.Channels.Types |
import Network.Xmpp.Concurrent.Types |
||||||
import Control.Concurrent.STM |
import Control.Concurrent.STM |
||||||
import Data.IORef |
import Data.IORef |
||||||
import Network.Xmpp.Types |
import Network.Xmpp.Types |
||||||
import Network.Xmpp.Concurrent.Types |
import Network.Xmpp.Concurrent.Types |
||||||
import Network.Xmpp.Concurrent.Channels.Basic |
import Network.Xmpp.Concurrent.Basic |
||||||
|
|
||||||
-- | Read an element from the inbound stanza channel, acquiring a copy of the |
-- | Read an element from the inbound stanza channel, acquiring a copy of the |
||||||
-- channel as necessary. |
-- channel as necessary. |
||||||
@ -1,12 +1,11 @@ |
|||||||
{-# OPTIONS_HADDOCK hide #-} |
{-# OPTIONS_HADDOCK hide #-} |
||||||
module Network.Xmpp.Concurrent.Channels.Presence where |
module Network.Xmpp.Concurrent.Presence where |
||||||
|
|
||||||
import Network.Xmpp.Concurrent.Channels.Types |
|
||||||
import Control.Concurrent.STM |
import Control.Concurrent.STM |
||||||
import Data.IORef |
import Data.IORef |
||||||
import Network.Xmpp.Types |
import Network.Xmpp.Types |
||||||
import Network.Xmpp.Concurrent.Types |
import Network.Xmpp.Concurrent.Types |
||||||
import Network.Xmpp.Concurrent.Channels.Basic |
import Network.Xmpp.Concurrent.Basic |
||||||
|
|
||||||
-- | Read an element from the inbound stanza channel, acquiring a copy of the |
-- | Read an element from the inbound stanza channel, acquiring a copy of the |
||||||
-- channel as necessary. |
-- channel as necessary. |
||||||
@ -1,262 +1,41 @@ |
|||||||
{-# OPTIONS_HADDOCK hide #-} |
-- | |
||||||
{-# LANGUAGE ScopedTypeVariables #-} |
-- Module: $Header$ |
||||||
{-# LANGUAGE OverloadedStrings #-} |
-- |
||||||
|
-- Maintainer: info@jonkri.com |
||||||
module Network.Xmpp.Connection where |
-- Stability: unstable |
||||||
|
-- Portability: portable |
||||||
import Control.Applicative((<$>)) |
-- |
||||||
import Control.Concurrent (forkIO, threadDelay) |
-- This module allows for low-level access to Pontarius XMPP. Generally, the |
||||||
import Control.Monad |
-- "Network.Xmpp" module should be used instead. |
||||||
import Control.Monad.IO.Class |
-- |
||||||
import Control.Monad.Trans.Class |
-- The 'Connection' object provides the most low-level access to the XMPP |
||||||
--import Control.Monad.Trans.Resource |
-- stream: a simple and single-threaded interface which exposes the conduit |
||||||
import qualified Control.Exception.Lifted as Ex |
-- 'Event' source, as well as the input and output byte streams. Custom stateful |
||||||
import qualified GHC.IO.Exception as GIE |
-- 'Connection' functions can be executed using 'withConnection'. |
||||||
import Control.Monad.State.Strict |
-- |
||||||
|
-- The TLS, SASL, and 'Session' functionalities of Pontarius XMPP are built on |
||||||
import Data.ByteString as BS |
-- top of this API. |
||||||
import Data.Conduit |
|
||||||
import Data.Conduit.Binary as CB |
module Network.Xmpp.Connection |
||||||
import Data.Conduit.Internal as DCI |
( Connection(..) |
||||||
import qualified Data.Conduit.List as CL |
, ConnectionState(..) |
||||||
import Data.IORef |
, ConnectionHandle(..) |
||||||
import Data.Text(Text) |
, ServerFeatures(..) |
||||||
import Data.XML.Pickle |
, connect |
||||||
import Data.XML.Types |
, withConnection |
||||||
|
, startTls |
||||||
import Network |
, simpleAuth |
||||||
import Network.Xmpp.Types |
, auth |
||||||
import Network.Xmpp.Marshal |
, pushStanza |
||||||
import Network.Xmpp.Pickle |
, pullStanza |
||||||
|
, closeConnection |
||||||
import System.IO |
, newSession |
||||||
|
) |
||||||
import Text.XML.Stream.Elements |
|
||||||
import Text.XML.Stream.Parse as XP |
where |
||||||
import Text.XML.Unresolved(InvalidEventStream(..)) |
|
||||||
|
import Network.Xmpp.Connection_ |
||||||
-- Enable/disable debug output |
import Network.Xmpp.Session |
||||||
-- This will dump all incoming and outgoing network taffic to the console, |
import Network.Xmpp.Tls |
||||||
-- prefixed with "in: " and "out: " respectively |
import Network.Xmpp.Types |
||||||
debug :: Bool |
import Network.Xmpp.Concurrent |
||||||
debug = False |
|
||||||
|
|
||||||
pushElement :: Element -> StateT Connection_ IO Bool |
|
||||||
pushElement x = do |
|
||||||
send <- gets (cSend . cHand) |
|
||||||
liftIO . send $ renderElement x |
|
||||||
|
|
||||||
-- | Encode and send stanza |
|
||||||
pushStanza :: Stanza -> Connection -> IO Bool |
|
||||||
pushStanza s = withConnection' . pushElement $ pickleElem xpStanza s |
|
||||||
|
|
||||||
-- XML documents and XMPP streams SHOULD be preceeded by an XML declaration. |
|
||||||
-- UTF-8 is the only supported XMPP encoding. The standalone document |
|
||||||
-- declaration (matching "SDDecl" in the XML standard) MUST NOT be included in |
|
||||||
-- XMPP streams. RFC 6120 defines XMPP only in terms of XML 1.0. |
|
||||||
pushXmlDecl :: StateT Connection_ IO Bool |
|
||||||
pushXmlDecl = do |
|
||||||
con <- gets cHand |
|
||||||
liftIO $ (cSend con) "<?xml version='1.0' encoding='UTF-8' ?>" |
|
||||||
|
|
||||||
pushOpenElement :: Element -> StateT Connection_ IO Bool |
|
||||||
pushOpenElement e = do |
|
||||||
sink <- gets (cSend . cHand ) |
|
||||||
liftIO . sink $ renderOpenElement e |
|
||||||
|
|
||||||
-- `Connect-and-resumes' the given sink to the connection source, and pulls a |
|
||||||
-- `b' value. |
|
||||||
runEventsSink :: Sink Event IO b -> StateT Connection_ IO b |
|
||||||
runEventsSink snk = do |
|
||||||
source <- gets cEventSource |
|
||||||
(src', r) <- lift $ source $$++ snk |
|
||||||
modify (\s -> s{cEventSource = src'}) |
|
||||||
return r |
|
||||||
|
|
||||||
pullElement :: StateT Connection_ IO Element |
|
||||||
pullElement = do |
|
||||||
Ex.catches (do |
|
||||||
e <- runEventsSink (elements =$ await) |
|
||||||
case e of |
|
||||||
Nothing -> liftIO $ Ex.throwIO StreamConnectionError |
|
||||||
Just r -> return r |
|
||||||
) |
|
||||||
[ Ex.Handler (\StreamEnd -> Ex.throwIO StreamStreamEnd) |
|
||||||
, Ex.Handler (\(InvalidXmppXml s) |
|
||||||
-> liftIO . Ex.throwIO $ StreamXMLError s) |
|
||||||
, Ex.Handler $ \(e :: InvalidEventStream) |
|
||||||
-> liftIO . Ex.throwIO $ StreamXMLError (show e) |
|
||||||
] |
|
||||||
|
|
||||||
-- Pulls an element and unpickles it. |
|
||||||
pullUnpickle :: PU [Node] a -> StateT Connection_ IO a |
|
||||||
pullUnpickle p = do |
|
||||||
res <- unpickleElem p <$> pullElement |
|
||||||
case res of |
|
||||||
Left e -> liftIO . Ex.throwIO $ StreamXMLError (show e) |
|
||||||
Right r -> return r |
|
||||||
|
|
||||||
-- | Pulls a stanza (or stream error) from the stream. Throws an error on a stream |
|
||||||
-- error. |
|
||||||
pullStanza :: Connection -> IO Stanza |
|
||||||
pullStanza = withConnection' $ do |
|
||||||
res <- pullUnpickle xpStreamStanza |
|
||||||
case res of |
|
||||||
Left e -> liftIO . Ex.throwIO $ StreamError e |
|
||||||
Right r -> return r |
|
||||||
|
|
||||||
-- Performs the given IO operation, catches any errors and re-throws everything |
|
||||||
-- except 'ResourceVanished' and IllegalOperation, in which case it will return False instead |
|
||||||
catchPush :: IO () -> IO Bool |
|
||||||
catchPush p = Ex.catch |
|
||||||
(p >> return True) |
|
||||||
(\e -> case GIE.ioe_type e of |
|
||||||
GIE.ResourceVanished -> return False |
|
||||||
GIE.IllegalOperation -> return False |
|
||||||
_ -> Ex.throwIO e |
|
||||||
) |
|
||||||
|
|
||||||
-- -- Connection_ state used when there is no connection. |
|
||||||
xmppNoConnection :: Connection_ |
|
||||||
xmppNoConnection = Connection_ |
|
||||||
{ cHand = Hand { cSend = \_ -> return False |
|
||||||
, cRecv = \_ -> Ex.throwIO |
|
||||||
$ StreamConnectionError |
|
||||||
, cFlush = return () |
|
||||||
, cClose = return () |
|
||||||
} |
|
||||||
, cEventSource = DCI.ResumableSource zeroSource (return ()) |
|
||||||
, sFeatures = SF Nothing [] [] |
|
||||||
, sConnectionState = ConnectionClosed |
|
||||||
, sHostname = Nothing |
|
||||||
, sJid = Nothing |
|
||||||
, sStreamLang = Nothing |
|
||||||
, sStreamId = Nothing |
|
||||||
, sPreferredLang = Nothing |
|
||||||
, sToJid = Nothing |
|
||||||
, sJidWhenPlain = False |
|
||||||
, sFrom = Nothing |
|
||||||
} |
|
||||||
where |
|
||||||
zeroSource :: Source IO output |
|
||||||
zeroSource = liftIO . Ex.throwIO $ StreamConnectionError |
|
||||||
|
|
||||||
-- Connects to the given hostname on port 5222 (TODO: Make this dynamic) and |
|
||||||
-- updates the XmppConMonad Connection_ state. |
|
||||||
connectTcpRaw :: HostName -> PortID -> Text -> IO Connection |
|
||||||
connectTcpRaw host port hostname = do |
|
||||||
h <- connectTo host port |
|
||||||
hSetBuffering h NoBuffering |
|
||||||
let eSource = if debug then |
|
||||||
DCI.ResumableSource (sourceHandle h |
|
||||||
$= debugOut |
|
||||||
$= XP.parseBytes def) |
|
||||||
(return ()) |
|
||||||
else DCI.ResumableSource (sourceHandle h |
|
||||||
$= XP.parseBytes def) |
|
||||||
(return ()) |
|
||||||
let hand = Hand { cSend = if debug |
|
||||||
then \d -> do |
|
||||||
BS.putStrLn (BS.append "out: " d) |
|
||||||
catchPush $ BS.hPut h d |
|
||||||
else catchPush . BS.hPut h |
|
||||||
, cRecv = if debug then |
|
||||||
\n -> do |
|
||||||
bs <- BS.hGetSome h n |
|
||||||
Prelude.putStr "in: " |
|
||||||
BS.putStrLn bs |
|
||||||
return bs |
|
||||||
else BS.hGetSome h |
|
||||||
, cFlush = hFlush h |
|
||||||
, cClose = hClose h |
|
||||||
} |
|
||||||
let con = Connection_ |
|
||||||
{ cHand = hand |
|
||||||
, cEventSource = eSource |
|
||||||
, sFeatures = (SF Nothing [] []) |
|
||||||
, sConnectionState = ConnectionPlain |
|
||||||
, sHostname = (Just hostname) |
|
||||||
, sJid = Nothing |
|
||||||
, sPreferredLang = Nothing -- TODO: Allow user to set |
|
||||||
, sStreamLang = Nothing |
|
||||||
, sStreamId = Nothing |
|
||||||
, sToJid = Nothing -- TODO: Allow user to set |
|
||||||
, sJidWhenPlain = False -- TODO: Allow user to set |
|
||||||
, sFrom = Nothing |
|
||||||
} |
|
||||||
mkConnection con |
|
||||||
where |
|
||||||
debugOut = do |
|
||||||
d <- await |
|
||||||
case d of |
|
||||||
Nothing -> return () |
|
||||||
Just bs -> do |
|
||||||
liftIO $ BS.putStr "in: " |
|
||||||
liftIO $ BS.putStrLn bs |
|
||||||
yield bs |
|
||||||
debugOut |
|
||||||
|
|
||||||
-- Closes the connection and updates the XmppConMonad Connection_ state. |
|
||||||
killConnection :: Connection -> IO (Either Ex.SomeException ()) |
|
||||||
killConnection = withConnection $ do |
|
||||||
cc <- gets (cClose . cHand) |
|
||||||
err <- liftIO $ (Ex.try cc :: IO (Either Ex.SomeException ())) |
|
||||||
put xmppNoConnection |
|
||||||
return err |
|
||||||
|
|
||||||
-- Sends an IQ request and waits for the response. If the response ID does not |
|
||||||
-- match the outgoing ID, an error is thrown. |
|
||||||
pushIQ' :: StanzaID |
|
||||||
-> Maybe Jid |
|
||||||
-> IQRequestType |
|
||||||
-> Maybe LangTag |
|
||||||
-> Element |
|
||||||
-> Connection |
|
||||||
-> IO (Either IQError IQResult) |
|
||||||
pushIQ' iqID to tp lang body con = do |
|
||||||
pushStanza (IQRequestS $ IQRequest iqID Nothing to lang tp body) con |
|
||||||
res <- pullStanza con |
|
||||||
case res of |
|
||||||
IQErrorS e -> return $ Left e |
|
||||||
IQResultS r -> do |
|
||||||
unless |
|
||||||
(iqID == iqResultID r) . liftIO . Ex.throwIO $ |
|
||||||
StreamXMLError |
|
||||||
("In sendIQ' IDs don't match: " ++ show iqID ++ " /= " ++ |
|
||||||
show (iqResultID r) ++ " .") |
|
||||||
return $ Right r |
|
||||||
_ -> liftIO . Ex.throwIO . StreamXMLError $ |
|
||||||
"sendIQ': unexpected stanza type " |
|
||||||
|
|
||||||
-- | Send "</stream:stream>" and wait for the server to finish processing and to |
|
||||||
-- close the connection. Any remaining elements from the server and whether or |
|
||||||
-- not we received a </stream:stream> element from the server is returned. |
|
||||||
closeStreams :: Connection -> IO ([Element], Bool) |
|
||||||
closeStreams = withConnection $ do |
|
||||||
send <- gets (cSend . cHand) |
|
||||||
cc <- gets (cClose . cHand) |
|
||||||
liftIO $ send "</stream:stream>" |
|
||||||
void $ liftIO $ forkIO $ do |
|
||||||
threadDelay 3000000 |
|
||||||
(Ex.try cc) :: IO (Either Ex.SomeException ()) |
|
||||||
return () |
|
||||||
collectElems [] |
|
||||||
where |
|
||||||
-- Pulls elements from the stream until the stream ends, or an error is |
|
||||||
-- raised. |
|
||||||
collectElems :: [Element] -> StateT Connection_ IO ([Element], Bool) |
|
||||||
collectElems es = do |
|
||||||
result <- Ex.try pullElement |
|
||||||
case result of |
|
||||||
Left StreamStreamEnd -> return (es, True) |
|
||||||
Left _ -> return (es, False) |
|
||||||
Right e -> collectElems (e:es) |
|
||||||
|
|
||||||
debugConduit :: Pipe l ByteString ByteString u IO b |
|
||||||
debugConduit = forever $ do |
|
||||||
s' <- await |
|
||||||
case s' of |
|
||||||
Just s -> do |
|
||||||
liftIO $ BS.putStrLn (BS.append "in: " s) |
|
||||||
yield s |
|
||||||
Nothing -> return () |
|
||||||
|
|||||||
@ -0,0 +1,285 @@ |
|||||||
|
{-# OPTIONS_HADDOCK hide #-} |
||||||
|
{-# LANGUAGE ScopedTypeVariables #-} |
||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
|
|
||||||
|
module Network.Xmpp.Connection_ where |
||||||
|
|
||||||
|
import Control.Applicative((<$>)) |
||||||
|
import Control.Concurrent (forkIO, threadDelay) |
||||||
|
import System.IO.Error (tryIOError) |
||||||
|
import Control.Monad |
||||||
|
import Control.Monad.IO.Class |
||||||
|
import Control.Monad.Trans.Class |
||||||
|
--import Control.Monad.Trans.Resource |
||||||
|
import qualified Control.Exception.Lifted as Ex |
||||||
|
import qualified GHC.IO.Exception as GIE |
||||||
|
import Control.Monad.State.Strict |
||||||
|
|
||||||
|
import Data.ByteString as BS |
||||||
|
import Data.ByteString.Char8 as BSC8 |
||||||
|
import Data.Conduit |
||||||
|
import Data.Conduit.Binary as CB |
||||||
|
import Data.Conduit.Internal as DCI |
||||||
|
import qualified Data.Conduit.List as CL |
||||||
|
import Data.IORef |
||||||
|
import Data.Text(Text) |
||||||
|
import qualified Data.Text as T |
||||||
|
import Data.XML.Pickle |
||||||
|
import Data.XML.Types |
||||||
|
|
||||||
|
import Network |
||||||
|
import Network.Xmpp.Types |
||||||
|
import Network.Xmpp.Marshal |
||||||
|
import Network.Xmpp.Pickle |
||||||
|
|
||||||
|
import System.IO |
||||||
|
|
||||||
|
import Text.Xml.Stream.Elements |
||||||
|
import Text.XML.Stream.Parse as XP |
||||||
|
import Text.XML.Unresolved(InvalidEventStream(..)) |
||||||
|
|
||||||
|
import System.Log.Logger |
||||||
|
import Data.ByteString.Base64 |
||||||
|
|
||||||
|
import Control.Concurrent.STM.TMVar |
||||||
|
import Control.Monad.Error |
||||||
|
|
||||||
|
-- Enable/disable debug output |
||||||
|
-- This will dump all incoming and outgoing network taffic to the console, |
||||||
|
-- prefixed with "in: " and "out: " respectively |
||||||
|
debug :: Bool |
||||||
|
debug = False |
||||||
|
|
||||||
|
-- TODO: Can the TLS send/recv functions throw something other than an IO error? |
||||||
|
|
||||||
|
wrapIOException :: IO a -> StateT Connection IO (Either XmppFailure a) |
||||||
|
wrapIOException action = do |
||||||
|
r <- liftIO $ tryIOError action |
||||||
|
case r of |
||||||
|
Right b -> return $ Right b |
||||||
|
Left e -> return $ Left $ XmppIOException e |
||||||
|
|
||||||
|
pushElement :: Element -> StateT Connection IO (Either XmppFailure Bool) |
||||||
|
pushElement x = do |
||||||
|
send <- gets (cSend . cHandle) |
||||||
|
wrapIOException $ send $ renderElement x |
||||||
|
|
||||||
|
-- | Encode and send stanza |
||||||
|
pushStanza :: Stanza -> TMVar Connection -> IO (Either XmppFailure Bool) |
||||||
|
pushStanza s = withConnection' . pushElement $ pickleElem xpStanza s |
||||||
|
|
||||||
|
-- XML documents and XMPP streams SHOULD be preceeded by an XML declaration. |
||||||
|
-- UTF-8 is the only supported XMPP encoding. The standalone document |
||||||
|
-- declaration (matching "SDDecl" in the XML standard) MUST NOT be included in |
||||||
|
-- XMPP streams. RFC 6120 defines XMPP only in terms of XML 1.0. |
||||||
|
pushXmlDecl :: StateT Connection IO (Either XmppFailure Bool) |
||||||
|
pushXmlDecl = do |
||||||
|
con <- gets cHandle |
||||||
|
wrapIOException $ (cSend con) "<?xml version=\"1.0\" encoding=\"UTF-8\" ?>" |
||||||
|
|
||||||
|
pushOpenElement :: Element -> StateT Connection IO (Either XmppFailure Bool) |
||||||
|
pushOpenElement e = do |
||||||
|
sink <- gets (cSend . cHandle) |
||||||
|
wrapIOException $ sink $ renderOpenElement e |
||||||
|
|
||||||
|
-- `Connect-and-resumes' the given sink to the connection source, and pulls a |
||||||
|
-- `b' value. |
||||||
|
runEventsSink :: Sink Event IO b -> StateT Connection IO (Either XmppFailure b) |
||||||
|
runEventsSink snk = do -- TODO: Wrap exceptions? |
||||||
|
source <- gets cEventSource |
||||||
|
(src', r) <- lift $ source $$++ snk |
||||||
|
modify (\s -> s{cEventSource = src'}) |
||||||
|
return $ Right r |
||||||
|
|
||||||
|
pullElement :: StateT Connection IO (Either XmppFailure Element) |
||||||
|
pullElement = do |
||||||
|
Ex.catches (do |
||||||
|
e <- runEventsSink (elements =$ await) |
||||||
|
case e of |
||||||
|
Left f -> return $ Left f |
||||||
|
Right Nothing -> return $ Left XmppOtherFailure -- TODO |
||||||
|
Right (Just r) -> return $ Right r |
||||||
|
) |
||||||
|
[ Ex.Handler (\StreamEnd -> return $ Left StreamEndFailure) |
||||||
|
, Ex.Handler (\(InvalidXmppXml s) -- Invalid XML `Event' encountered, or missing element close tag |
||||||
|
-> return $ Left XmppOtherFailure) -- TODO: Log: s |
||||||
|
, Ex.Handler $ \(e :: InvalidEventStream) -- xml-conduit exception |
||||||
|
-> return $ Left XmppOtherFailure -- TODO: Log: (show e) |
||||||
|
] |
||||||
|
|
||||||
|
-- Pulls an element and unpickles it. |
||||||
|
pullUnpickle :: PU [Node] a -> StateT Connection IO (Either XmppFailure a) |
||||||
|
pullUnpickle p = do |
||||||
|
elem <- pullElement |
||||||
|
case elem of |
||||||
|
Left e -> return $ Left e |
||||||
|
Right elem' -> do |
||||||
|
let res = unpickleElem p elem' |
||||||
|
case res of |
||||||
|
Left e -> return $ Left XmppOtherFailure -- TODO: Log |
||||||
|
Right r -> return $ Right r |
||||||
|
|
||||||
|
-- | Pulls a stanza (or stream error) from the stream. |
||||||
|
pullStanza :: TMVar Connection -> IO (Either XmppFailure Stanza) |
||||||
|
pullStanza = withConnection' $ do |
||||||
|
res <- pullUnpickle xpStreamStanza |
||||||
|
case res of |
||||||
|
Left e -> return $ Left e |
||||||
|
Right (Left e) -> return $ Left $ StreamErrorFailure e |
||||||
|
Right (Right r) -> return $ Right r |
||||||
|
|
||||||
|
-- Performs the given IO operation, catches any errors and re-throws everything |
||||||
|
-- except 'ResourceVanished' and IllegalOperation, in which case it will return False instead |
||||||
|
catchPush :: IO () -> IO Bool |
||||||
|
catchPush p = Ex.catch |
||||||
|
(p >> return True) |
||||||
|
(\e -> case GIE.ioe_type e of |
||||||
|
GIE.ResourceVanished -> return False |
||||||
|
GIE.IllegalOperation -> return False |
||||||
|
_ -> Ex.throwIO e |
||||||
|
) |
||||||
|
|
||||||
|
-- Connection state used when there is no connection. |
||||||
|
xmppNoConnection :: Connection |
||||||
|
xmppNoConnection = Connection |
||||||
|
{ cHandle = ConnectionHandle { cSend = \_ -> return False |
||||||
|
, cRecv = \_ -> Ex.throwIO |
||||||
|
XmppOtherFailure |
||||||
|
, cFlush = return () |
||||||
|
, cClose = return () |
||||||
|
} |
||||||
|
, cEventSource = DCI.ResumableSource zeroSource (return ()) |
||||||
|
, cFeatures = SF Nothing [] [] |
||||||
|
, cState = ConnectionClosed |
||||||
|
, cHostName = Nothing |
||||||
|
, cJid = Nothing |
||||||
|
, cStreamLang = Nothing |
||||||
|
, cStreamId = Nothing |
||||||
|
, cPreferredLang = Nothing |
||||||
|
, cToJid = Nothing |
||||||
|
, cJidWhenPlain = False |
||||||
|
, cFrom = Nothing |
||||||
|
} |
||||||
|
where |
||||||
|
zeroSource :: Source IO output |
||||||
|
zeroSource = liftIO . Ex.throwIO $ XmppOtherFailure |
||||||
|
|
||||||
|
connectTcp :: HostName -> PortID -> Text -> IO (Either XmppFailure (TMVar Connection)) |
||||||
|
connectTcp host port hostname = do |
||||||
|
let PortNumber portNumber = port |
||||||
|
debugM "Pontarius.Xmpp" $ "Connecting to " ++ host ++ " on port " ++ |
||||||
|
(show portNumber) ++ " through the realm " ++ (T.unpack hostname) ++ "." |
||||||
|
h <- connectTo host port |
||||||
|
debugM "Pontarius.Xmpp" "Setting NoBuffering mode on handle." |
||||||
|
hSetBuffering h NoBuffering |
||||||
|
let eSource = DCI.ResumableSource |
||||||
|
((sourceHandle h $= logConduit) $= XP.parseBytes def) |
||||||
|
(return ()) |
||||||
|
let hand = ConnectionHandle { cSend = \d -> do |
||||||
|
let d64 = encode d |
||||||
|
debugM "Pontarius.Xmpp" $ |
||||||
|
"Sending TCP data: " ++ (BSC8.unpack d64) |
||||||
|
++ "." |
||||||
|
catchPush $ BS.hPut h d |
||||||
|
, cRecv = \n -> do |
||||||
|
d <- BS.hGetSome h n |
||||||
|
let d64 = encode d |
||||||
|
debugM "Pontarius.Xmpp" $ |
||||||
|
"Received TCP data: " ++ |
||||||
|
(BSC8.unpack d64) ++ "." |
||||||
|
return d |
||||||
|
, cFlush = hFlush h |
||||||
|
, cClose = hClose h |
||||||
|
} |
||||||
|
let con = Connection |
||||||
|
{ cHandle = hand |
||||||
|
, cEventSource = eSource |
||||||
|
, cFeatures = (SF Nothing [] []) |
||||||
|
, cState = ConnectionPlain |
||||||
|
, cHostName = (Just hostname) |
||||||
|
, cJid = Nothing |
||||||
|
, cPreferredLang = Nothing -- TODO: Allow user to set |
||||||
|
, cStreamLang = Nothing |
||||||
|
, cStreamId = Nothing |
||||||
|
, cToJid = Nothing -- TODO: Allow user to set |
||||||
|
, cJidWhenPlain = False -- TODO: Allow user to set |
||||||
|
, cFrom = Nothing |
||||||
|
} |
||||||
|
con' <- mkConnection con |
||||||
|
return $ Right con' |
||||||
|
where |
||||||
|
logConduit :: Conduit ByteString IO ByteString |
||||||
|
logConduit = CL.mapM $ \d -> do |
||||||
|
let d64 = encode d |
||||||
|
debugM "Pontarius.Xmpp" $ "Received TCP data: " ++ (BSC8.unpack d64) ++ |
||||||
|
"." |
||||||
|
return d |
||||||
|
|
||||||
|
|
||||||
|
-- Closes the connection and updates the XmppConMonad Connection state. |
||||||
|
-- killConnection :: TMVar Connection -> IO (Either Ex.SomeException ()) |
||||||
|
killConnection :: TMVar Connection -> IO (Either XmppFailure ()) |
||||||
|
killConnection = withConnection $ do |
||||||
|
cc <- gets (cClose . cHandle) |
||||||
|
err <- wrapIOException cc |
||||||
|
-- (Ex.try cc :: IO (Either Ex.SomeException ())) |
||||||
|
put xmppNoConnection |
||||||
|
return err |
||||||
|
|
||||||
|
-- Sends an IQ request and waits for the response. If the response ID does not |
||||||
|
-- match the outgoing ID, an error is thrown. |
||||||
|
pushIQ' :: StanzaID |
||||||
|
-> Maybe Jid |
||||||
|
-> IQRequestType |
||||||
|
-> Maybe LangTag |
||||||
|
-> Element |
||||||
|
-> TMVar Connection |
||||||
|
-> IO (Either XmppFailure (Either IQError IQResult)) |
||||||
|
pushIQ' iqID to tp lang body con = do |
||||||
|
pushStanza (IQRequestS $ IQRequest iqID Nothing to lang tp body) con |
||||||
|
res <- pullStanza con |
||||||
|
case res of |
||||||
|
Left e -> return $ Left e |
||||||
|
Right (IQErrorS e) -> return $ Right $ Left e |
||||||
|
Right (IQResultS r) -> do |
||||||
|
unless |
||||||
|
(iqID == iqResultID r) . liftIO . Ex.throwIO $ |
||||||
|
XmppOtherFailure |
||||||
|
-- TODO: Log: ("In sendIQ' IDs don't match: " ++ show iqID ++ |
||||||
|
-- " /= " ++ show (iqResultID r) ++ " .") |
||||||
|
return $ Right $ Right r |
||||||
|
_ -> return $ Left XmppOtherFailure |
||||||
|
-- TODO: Log: "sendIQ': unexpected stanza type " |
||||||
|
|
||||||
|
-- | Send "</stream:stream>" and wait for the server to finish processing and to |
||||||
|
-- close the connection. Any remaining elements from the server are returned. |
||||||
|
-- Surpresses StreamEndFailure exceptions, but may throw a StreamCloseError. |
||||||
|
closeStreams :: TMVar Connection -> IO (Either XmppFailure [Element]) |
||||||
|
closeStreams = withConnection $ do |
||||||
|
send <- gets (cSend . cHandle) |
||||||
|
cc <- gets (cClose . cHandle) |
||||||
|
liftIO $ send "</stream:stream>" |
||||||
|
void $ liftIO $ forkIO $ do |
||||||
|
threadDelay 3000000 -- TODO: Configurable value |
||||||
|
(Ex.try cc) :: IO (Either Ex.SomeException ()) |
||||||
|
return () |
||||||
|
collectElems [] |
||||||
|
where |
||||||
|
-- Pulls elements from the stream until the stream ends, or an error is |
||||||
|
-- raised. |
||||||
|
collectElems :: [Element] -> StateT Connection IO (Either XmppFailure [Element]) |
||||||
|
collectElems es = do |
||||||
|
result <- pullElement |
||||||
|
case result of |
||||||
|
Left StreamEndFailure -> return $ Right es |
||||||
|
Left e -> return $ Left $ StreamCloseError (es, e) |
||||||
|
Right e -> collectElems (e:es) |
||||||
|
|
||||||
|
debugConduit :: Pipe l ByteString ByteString u IO b |
||||||
|
debugConduit = forever $ do |
||||||
|
s' <- await |
||||||
|
case s' of |
||||||
|
Just s -> do |
||||||
|
liftIO $ BS.putStrLn (BS.append "in: " s) |
||||||
|
yield s |
||||||
|
Nothing -> return () |
||||||
@ -1,7 +1,7 @@ |
|||||||
{-# OPTIONS_HADDOCK hide #-} |
{-# OPTIONS_HADDOCK hide #-} |
||||||
{-# LANGUAGE DeriveDataTypeable #-} |
{-# LANGUAGE DeriveDataTypeable #-} |
||||||
{-# LANGUAGE OverloadedStrings #-} |
{-# LANGUAGE OverloadedStrings #-} |
||||||
module Text.XML.Stream.Elements where |
module Text.Xml.Stream.Elements where |
||||||
|
|
||||||
import Control.Applicative ((<$>)) |
import Control.Applicative ((<$>)) |
||||||
import Control.Exception |
import Control.Exception |
||||||
Loading…
Reference in new issue