Browse Source

Remove Context

As previously mentioned, `Context' is simply a bunch of thread
management features. This patch moves the `Context' fields into
`Session', and the `Network.Xmpp.Concurrent.Channel' modules into
`Network.Xmpp.Concurrent'.
master
Jon Kristensen 13 years ago
parent
commit
29e2af8c33
  1. 10
      pontarius-xmpp.cabal
  2. 1
      source/Network/Xmpp.hs
  3. 109
      source/Network/Xmpp/Concurrent.hs
  4. 4
      source/Network/Xmpp/Concurrent/Basic.hs
  5. 112
      source/Network/Xmpp/Concurrent/Channels.hs
  6. 32
      source/Network/Xmpp/Concurrent/Channels/Types.hs
  7. 7
      source/Network/Xmpp/Concurrent/IQ.hs
  8. 6
      source/Network/Xmpp/Concurrent/Message.hs
  9. 10
      source/Network/Xmpp/Concurrent/Monad.hs
  10. 5
      source/Network/Xmpp/Concurrent/Presence.hs
  11. 43
      source/Network/Xmpp/Concurrent/Types.hs
  12. 2
      source/Network/Xmpp/Session.hs
  13. 1
      source/Network/Xmpp/Xep/ServiceDiscovery.hs

10
pontarius-xmpp.cabal

@ -61,12 +61,10 @@ Library @@ -61,12 +61,10 @@ Library
, Network.Xmpp.Bind
, Network.Xmpp.Concurrent
, Network.Xmpp.Concurrent.Types
, Network.Xmpp.Concurrent.Channels
, Network.Xmpp.Concurrent.Channels.Basic
, Network.Xmpp.Concurrent.Channels.IQ
, Network.Xmpp.Concurrent.Channels.Message
, Network.Xmpp.Concurrent.Channels.Presence
, Network.Xmpp.Concurrent.Channels.Types
, Network.Xmpp.Concurrent.Basic
, Network.Xmpp.Concurrent.IQ
, Network.Xmpp.Concurrent.Message
, Network.Xmpp.Concurrent.Presence
, Network.Xmpp.Concurrent.Threads
, Network.Xmpp.Concurrent.Monad
, Network.Xmpp.Connection_

1
source/Network/Xmpp.hs

@ -152,7 +152,6 @@ import Data.XML.Types (Element) @@ -152,7 +152,6 @@ import Data.XML.Types (Element)
import Network
import Network.Xmpp.Bind
import Network.Xmpp.Concurrent
import Network.Xmpp.Concurrent.Channels
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Connection_
import Network.Xmpp.Marshal

109
source/Network/Xmpp/Concurrent.hs

@ -1,12 +1,111 @@ @@ -1,12 +1,111 @@
{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE OverloadedStrings #-}
module Network.Xmpp.Concurrent
( Context
, module Network.Xmpp.Concurrent.Monad
( module Network.Xmpp.Concurrent.Monad
, module Network.Xmpp.Concurrent.Threads
, module Network.Xmpp.Concurrent.Channels
, module Network.Xmpp.Concurrent.Basic
, module Network.Xmpp.Concurrent.Types
, module Network.Xmpp.Concurrent.Message
, module Network.Xmpp.Concurrent.Presence
, module Network.Xmpp.Concurrent.IQ
, toChans
, newSession
, writeWorker
) where
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Concurrent.Monad
import Network.Xmpp.Concurrent.Threads
import Network.Xmpp.Concurrent.Channels
import Control.Applicative((<$>),(<*>))
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import qualified Data.ByteString as BS
import Data.IORef
import qualified Data.Map as Map
import Data.Maybe (fromMaybe)
import Data.XML.Types
import Network.Xmpp.Concurrent.Basic
import Network.Xmpp.Concurrent.IQ
import Network.Xmpp.Concurrent.Message
import Network.Xmpp.Concurrent.Presence
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Concurrent.Threads
import Network.Xmpp.Marshal
import Network.Xmpp.Pickle
import Network.Xmpp.Types
import Text.Xml.Stream.Elements
toChans :: TChan Stanza
-> TVar IQHandlers
-> Stanza
-> IO ()
toChans stanzaC iqHands sta = atomically $ do
writeTChan stanzaC sta
case sta of
IQRequestS i -> handleIQRequest iqHands i
IQResultS i -> handleIQResponse iqHands (Right i)
IQErrorS i -> handleIQResponse iqHands (Left i)
_ -> return ()
where
-- If the IQ request has a namespace, send it through the appropriate channel.
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
handleIQRequest handlers iq = do
(byNS, _) <- readTVar handlers
let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq)
case Map.lookup (iqRequestType iq, iqNS) byNS of
Nothing -> return () -- TODO: send error stanza
Just ch -> do
sent <- newTVar False
writeTChan ch $ IQRequestTicket sent iq
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM ()
handleIQResponse handlers iq = do
(byNS, byID) <- readTVar handlers
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of
(Nothing, _) -> return () -- We are not supposed to send an error.
(Just tmvar, byID') -> do
let answer = either IQResponseError IQResponseResult iq
_ <- tryPutTMVar tmvar answer -- Don't block.
writeTVar handlers (byNS, byID')
where
iqID (Left err) = iqErrorID err
iqID (Right iq') = iqResultID iq'
-- | Creates and initializes a new Xmpp context.
newSession :: TMVar Connection -> IO Session
newSession con = do
outC <- newTChanIO
stanzaChan <- newTChanIO
iqHandlers <- newTVarIO (Map.empty, Map.empty)
eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () }
let stanzaHandler = toChans stanzaChan iqHandlers
(kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh con
writer <- forkIO $ writeWorker outC wLock
idRef <- newTVarIO 1
let getId = atomically $ do
curId <- readTVar idRef
writeTVar idRef (curId + 1 :: Integer)
return . read. show $ curId
return $ Session { stanzaCh = stanzaChan
, outCh = outC
, iqHandlers = iqHandlers
, writeRef = wLock
, readerThread = readerThread
, idGenerator = getId
, conRef = conState
, eventHandlers = eh
, stopThreads = kill >> killThread writer
}
-- Worker to write stanzas to the stream concurrently.
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO ()
writeWorker stCh writeR = forever $ do
(write, next) <- atomically $ (,) <$>
takeTMVar writeR <*>
readTChan stCh
r <- write $ renderElement (pickleElem xpStanza next)
atomically $ putTMVar writeR write
unless r $ do
atomically $ unGetTChan stCh next -- If the writing failed, the
-- connection is dead.
threadDelay 250000 -- Avoid free spinning.

4
source/Network/Xmpp/Concurrent/Channels/Basic.hs → source/Network/Xmpp/Concurrent/Basic.hs

@ -1,8 +1,8 @@ @@ -1,8 +1,8 @@
{-# OPTIONS_HADDOCK hide #-}
module Network.Xmpp.Concurrent.Channels.Basic where
module Network.Xmpp.Concurrent.Basic where
import Control.Concurrent.STM
import Network.Xmpp.Concurrent.Channels.Types
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Types
-- | Send a stanza to the server.

112
source/Network/Xmpp/Concurrent/Channels.hs

@ -1,112 +0,0 @@ @@ -1,112 +0,0 @@
{-# OPTIONS_HADDOCK hide #-}
{-# LANGUAGE OverloadedStrings #-}
module Network.Xmpp.Concurrent.Channels
( module Network.Xmpp.Concurrent.Channels.Basic
, module Network.Xmpp.Concurrent.Channels.Types
, module Network.Xmpp.Concurrent.Channels.Message
, module Network.Xmpp.Concurrent.Channels.Presence
, module Network.Xmpp.Concurrent.Channels.IQ
, toChans
, newSession
, writeWorker
)
where
import Control.Applicative((<$>),(<*>))
import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad
import qualified Data.ByteString as BS
import Data.IORef
import qualified Data.Map as Map
import Data.Maybe (fromMaybe)
import Data.XML.Types
import Network.Xmpp.Concurrent.Channels.Basic
import Network.Xmpp.Concurrent.Channels.IQ
import Network.Xmpp.Concurrent.Channels.Message
import Network.Xmpp.Concurrent.Channels.Presence
import Network.Xmpp.Concurrent.Channels.Types
import Network.Xmpp.Concurrent.Threads
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Marshal
import Network.Xmpp.Pickle
import Network.Xmpp.Types
import Text.Xml.Stream.Elements
toChans :: TChan Stanza
-> TVar IQHandlers
-> Stanza
-> IO ()
toChans stanzaC iqHands sta = atomically $ do
writeTChan stanzaC sta
case sta of
IQRequestS i -> handleIQRequest iqHands i
IQResultS i -> handleIQResponse iqHands (Right i)
IQErrorS i -> handleIQResponse iqHands (Left i)
_ -> return ()
where
-- If the IQ request has a namespace, send it through the appropriate channel.
handleIQRequest :: TVar IQHandlers -> IQRequest -> STM ()
handleIQRequest handlers iq = do
(byNS, _) <- readTVar handlers
let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq)
case Map.lookup (iqRequestType iq, iqNS) byNS of
Nothing -> return () -- TODO: send error stanza
Just ch -> do
sent <- newTVar False
writeTChan ch $ IQRequestTicket sent iq
handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM ()
handleIQResponse handlers iq = do
(byNS, byID) <- readTVar handlers
case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of
(Nothing, _) -> return () -- We are not supposed to send an error.
(Just tmvar, byID') -> do
let answer = either IQResponseError IQResponseResult iq
_ <- tryPutTMVar tmvar answer -- Don't block.
writeTVar handlers (byNS, byID')
where
iqID (Left err) = iqErrorID err
iqID (Right iq') = iqResultID iq'
-- | Creates and initializes a new Xmpp context.
newSession :: TMVar Connection -> IO Session
newSession con = do
outC <- newTChanIO
stanzaChan <- newTChanIO
iqHandlers <- newTVarIO (Map.empty, Map.empty)
eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () }
let stanzaHandler = toChans stanzaChan iqHandlers
(kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh con
writer <- forkIO $ writeWorker outC wLock
idRef <- newTVarIO 1
let getId = atomically $ do
curId <- readTVar idRef
writeTVar idRef (curId + 1 :: Integer)
return . read. show $ curId
let cont = Context { writeRef = wLock
, readerThread = readerThread
, idGenerator = getId
, conRef = conState
, eventHandlers = eh
, stopThreads = kill >> killThread writer
}
return $ Session { context = cont
, stanzaCh = stanzaChan
, outCh = outC
, iqHandlers = iqHandlers
}
-- Worker to write stanzas to the stream concurrently.
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO ()
writeWorker stCh writeR = forever $ do
(write, next) <- atomically $ (,) <$>
takeTMVar writeR <*>
readTChan stCh
r <- write $ renderElement (pickleElem xpStanza next)
atomically $ putTMVar writeR write
unless r $ do
atomically $ unGetTChan stCh next -- If the writing failed, the
-- connection is dead.
threadDelay 250000 -- Avoid free spinning.

32
source/Network/Xmpp/Concurrent/Channels/Types.hs

@ -1,32 +0,0 @@ @@ -1,32 +0,0 @@
{-# OPTIONS_HADDOCK hide #-}
module Network.Xmpp.Concurrent.Channels.Types where
import Control.Concurrent.STM
import Data.IORef
import qualified Data.Map as Map
import Data.Text (Text)
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Types
-- | A concurrent interface to Pontarius XMPP.
data Session = Session
{ context :: Context
, stanzaCh :: TChan Stanza -- All stanzas
, outCh :: TChan Stanza
, iqHandlers :: TVar IQHandlers
-- Writing lock, so that only one thread could write to the stream at any
-- given time.
}
-- | IQHandlers holds the registered channels for incomming IQ requests and
-- TMVars of and TMVars for expected IQ responses
type IQHandlers = (Map.Map (IQRequestType, Text) (TChan IQRequestTicket)
, Map.Map StanzaId (TMVar IQResponse)
)
-- | Contains whether or not a reply has been sent, and the IQ request body to
-- reply to.
data IQRequestTicket = IQRequestTicket
{ sentRef :: (TVar Bool)
, iqRequestBody :: IQRequest
}

7
source/Network/Xmpp/Concurrent/Channels/IQ.hs → source/Network/Xmpp/Concurrent/IQ.hs

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
{-# OPTIONS_HADDOCK hide #-}
module Network.Xmpp.Concurrent.Channels.IQ where
module Network.Xmpp.Concurrent.IQ where
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.STM
@ -11,8 +11,7 @@ import qualified Data.Map as Map @@ -11,8 +11,7 @@ import qualified Data.Map as Map
import Data.Text (Text)
import Data.XML.Types
import Network.Xmpp.Concurrent.Channels.Basic
import Network.Xmpp.Concurrent.Channels.Types
import Network.Xmpp.Concurrent.Basic
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Types
@ -27,7 +26,7 @@ sendIQ :: Maybe Int -- ^ Timeout @@ -27,7 +26,7 @@ sendIQ :: Maybe Int -- ^ Timeout
-> Session
-> IO (TMVar IQResponse)
sendIQ timeOut to tp lang body session = do -- TODO: Add timeout
newId <- idGenerator (context session)
newId <- idGenerator session
ref <- atomically $ do
resRef <- newEmptyTMVar
(byNS, byId) <- readTVar (iqHandlers session)

6
source/Network/Xmpp/Concurrent/Channels/Message.hs → source/Network/Xmpp/Concurrent/Message.hs

@ -1,12 +1,12 @@ @@ -1,12 +1,12 @@
{-# OPTIONS_HADDOCK hide #-}
module Network.Xmpp.Concurrent.Channels.Message where
module Network.Xmpp.Concurrent.Message where
import Network.Xmpp.Concurrent.Channels.Types
import Network.Xmpp.Concurrent.Types
import Control.Concurrent.STM
import Data.IORef
import Network.Xmpp.Types
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Concurrent.Channels.Basic
import Network.Xmpp.Concurrent.Basic
-- | Read an element from the inbound stanza channel, acquiring a copy of the
-- channel as necessary.

10
source/Network/Xmpp/Concurrent/Monad.hs

@ -59,7 +59,7 @@ import Network.Xmpp.Connection_ @@ -59,7 +59,7 @@ import Network.Xmpp.Connection_
-- ]
-- | Executes a function to update the event handlers.
modifyHandlers :: (EventHandlers -> EventHandlers) -> Context -> IO ()
modifyHandlers :: (EventHandlers -> EventHandlers) -> Session -> IO ()
modifyHandlers f session = atomically $ modifyTVar (eventHandlers session) f
where
-- Borrowing modifyTVar from
@ -71,18 +71,18 @@ modifyHandlers f session = atomically $ modifyTVar (eventHandlers session) f @@ -71,18 +71,18 @@ modifyHandlers f session = atomically $ modifyTVar (eventHandlers session) f
writeTVar var (f x)
-- | Sets the handler to be executed when the server connection is closed.
setConnectionClosedHandler_ :: (StreamFailure -> Context -> IO ()) -> Context -> IO ()
setConnectionClosedHandler_ :: (StreamFailure -> Session -> IO ()) -> Session -> IO ()
setConnectionClosedHandler_ eh session = do
modifyHandlers (\s -> s{connectionClosedHandler =
\e -> eh e session}) session
-- | Run an event handler.
runHandler :: (EventHandlers -> IO a) -> Context -> IO a
runHandler :: (EventHandlers -> IO a) -> Session -> IO a
runHandler h session = h =<< atomically (readTVar $ eventHandlers session)
-- | End the current Xmpp session.
endContext :: Context -> IO ()
endContext :: Session -> IO ()
endContext session = do -- TODO: This has to be idempotent (is it?)
closeConnection session
stopThreads session
@ -90,7 +90,7 @@ endContext session = do -- TODO: This has to be idempotent (is it?) @@ -90,7 +90,7 @@ endContext session = do -- TODO: This has to be idempotent (is it?)
-- | Close the connection to the server. Closes the stream (by enforcing a
-- write lock and sending a </stream:stream> element), waits (blocks) for three
-- seconds, and then closes the connection.
closeConnection :: Context -> IO ()
closeConnection :: Session -> IO ()
closeConnection session = Ex.mask_ $ do
(_send, connection) <- atomically $ liftM2 (,)
(takeTMVar $ writeRef session)

5
source/Network/Xmpp/Concurrent/Channels/Presence.hs → source/Network/Xmpp/Concurrent/Presence.hs

@ -1,12 +1,11 @@ @@ -1,12 +1,11 @@
{-# OPTIONS_HADDOCK hide #-}
module Network.Xmpp.Concurrent.Channels.Presence where
module Network.Xmpp.Concurrent.Presence where
import Network.Xmpp.Concurrent.Channels.Types
import Control.Concurrent.STM
import Data.IORef
import Network.Xmpp.Types
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Concurrent.Channels.Basic
import Network.Xmpp.Concurrent.Basic
-- | Read an element from the inbound stanza channel, acquiring a copy of the
-- channel as necessary.

43
source/Network/Xmpp/Concurrent/Types.hs

@ -12,15 +12,34 @@ import Data.Typeable @@ -12,15 +12,34 @@ import Data.Typeable
import Network.Xmpp.Types
import Data.IORef
import qualified Data.Map as Map
import Data.Text (Text)
import Network.Xmpp.Types
-- | Handlers to be run when the Xmpp session ends and when the Xmpp connection is
-- closed.
data EventHandlers = EventHandlers
{ connectionClosedHandler :: StreamFailure -> IO ()
}
-- | Xmpp Context object
data Context = Context
{ writeRef :: TMVar (BS.ByteString -> IO Bool)
-- | Interrupt is used to signal to the reader thread that it should stop. Th contained semphore signals the reader to resume it's work.
data Interrupt = Interrupt (TMVar ()) deriving Typeable
instance Show Interrupt where show _ = "<Interrupt>"
instance Ex.Exception Interrupt
-- | A concurrent interface to Pontarius XMPP.
data Session = Session
{ stanzaCh :: TChan Stanza -- All stanzas
, outCh :: TChan Stanza
, iqHandlers :: TVar IQHandlers
-- Writing lock, so that only one thread could write to the stream at any
-- given time.
-- Fields below are from Context.
, writeRef :: TMVar (BS.ByteString -> IO Bool)
, readerThread :: ThreadId
, idGenerator :: IO StanzaId
-- | Lock (used by withConnection) to make sure that a maximum of one
@ -30,9 +49,15 @@ data Context = Context @@ -30,9 +49,15 @@ data Context = Context
, stopThreads :: IO ()
}
-- | Interrupt is used to signal to the reader thread that it should stop. Th contained semphore signals the reader to resume it's work.
data Interrupt = Interrupt (TMVar ()) deriving Typeable
instance Show Interrupt where show _ = "<Interrupt>"
instance Ex.Exception Interrupt
-- | IQHandlers holds the registered channels for incomming IQ requests and
-- TMVars of and TMVars for expected IQ responses
type IQHandlers = (Map.Map (IQRequestType, Text) (TChan IQRequestTicket)
, Map.Map StanzaId (TMVar IQResponse)
)
-- | Contains whether or not a reply has been sent, and the IQ request body to
-- reply to.
data IQRequestTicket = IQRequestTicket
{ sentRef :: (TVar Bool)
, iqRequestBody :: IQRequest
}

2
source/Network/Xmpp/Session.hs

@ -11,7 +11,7 @@ import Network @@ -11,7 +11,7 @@ import Network
import qualified Network.TLS as TLS
import Network.Xmpp.Bind
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Concurrent.Channels
import Network.Xmpp.Concurrent
import Network.Xmpp.Connection_
import Network.Xmpp.Marshal
import Network.Xmpp.Pickle

1
source/Network/Xmpp/Xep/ServiceDiscovery.hs

@ -26,7 +26,6 @@ import Data.XML.Types @@ -26,7 +26,6 @@ import Data.XML.Types
import Network.Xmpp
import Network.Xmpp.Concurrent
import Network.Xmpp.Concurrent.Channels
import Network.Xmpp.Concurrent.Types
import Network.Xmpp.Connection_
import Network.Xmpp.Pickle

Loading…
Cancel
Save