diff --git a/pontarius-xmpp.cabal b/pontarius-xmpp.cabal index b775eb5..a2304a7 100644 --- a/pontarius-xmpp.cabal +++ b/pontarius-xmpp.cabal @@ -61,12 +61,10 @@ Library , Network.Xmpp.Bind , Network.Xmpp.Concurrent , Network.Xmpp.Concurrent.Types - , Network.Xmpp.Concurrent.Channels - , Network.Xmpp.Concurrent.Channels.Basic - , Network.Xmpp.Concurrent.Channels.IQ - , Network.Xmpp.Concurrent.Channels.Message - , Network.Xmpp.Concurrent.Channels.Presence - , Network.Xmpp.Concurrent.Channels.Types + , Network.Xmpp.Concurrent.Basic + , Network.Xmpp.Concurrent.IQ + , Network.Xmpp.Concurrent.Message + , Network.Xmpp.Concurrent.Presence , Network.Xmpp.Concurrent.Threads , Network.Xmpp.Concurrent.Monad , Network.Xmpp.Connection_ diff --git a/source/Network/Xmpp.hs b/source/Network/Xmpp.hs index 7184ba2..93c78bb 100644 --- a/source/Network/Xmpp.hs +++ b/source/Network/Xmpp.hs @@ -152,7 +152,6 @@ import Data.XML.Types (Element) import Network import Network.Xmpp.Bind import Network.Xmpp.Concurrent -import Network.Xmpp.Concurrent.Channels import Network.Xmpp.Concurrent.Types import Network.Xmpp.Connection_ import Network.Xmpp.Marshal diff --git a/source/Network/Xmpp/Concurrent.hs b/source/Network/Xmpp/Concurrent.hs index 94f0f62..9156b33 100644 --- a/source/Network/Xmpp/Concurrent.hs +++ b/source/Network/Xmpp/Concurrent.hs @@ -1,12 +1,111 @@ {-# OPTIONS_HADDOCK hide #-} +{-# LANGUAGE OverloadedStrings #-} module Network.Xmpp.Concurrent - ( Context - , module Network.Xmpp.Concurrent.Monad + ( module Network.Xmpp.Concurrent.Monad , module Network.Xmpp.Concurrent.Threads - , module Network.Xmpp.Concurrent.Channels + , module Network.Xmpp.Concurrent.Basic + , module Network.Xmpp.Concurrent.Types + , module Network.Xmpp.Concurrent.Message + , module Network.Xmpp.Concurrent.Presence + , module Network.Xmpp.Concurrent.IQ + , toChans + , newSession + , writeWorker ) where -import Network.Xmpp.Concurrent.Types import Network.Xmpp.Concurrent.Monad import Network.Xmpp.Concurrent.Threads -import Network.Xmpp.Concurrent.Channels +import Control.Applicative((<$>),(<*>)) +import Control.Concurrent +import Control.Concurrent.STM +import Control.Monad +import qualified Data.ByteString as BS +import Data.IORef +import qualified Data.Map as Map +import Data.Maybe (fromMaybe) +import Data.XML.Types +import Network.Xmpp.Concurrent.Basic +import Network.Xmpp.Concurrent.IQ +import Network.Xmpp.Concurrent.Message +import Network.Xmpp.Concurrent.Presence +import Network.Xmpp.Concurrent.Types +import Network.Xmpp.Concurrent.Threads +import Network.Xmpp.Marshal +import Network.Xmpp.Pickle +import Network.Xmpp.Types +import Text.Xml.Stream.Elements + +toChans :: TChan Stanza + -> TVar IQHandlers + -> Stanza + -> IO () +toChans stanzaC iqHands sta = atomically $ do + writeTChan stanzaC sta + case sta of + IQRequestS i -> handleIQRequest iqHands i + IQResultS i -> handleIQResponse iqHands (Right i) + IQErrorS i -> handleIQResponse iqHands (Left i) + _ -> return () + where + -- If the IQ request has a namespace, send it through the appropriate channel. + handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () + handleIQRequest handlers iq = do + (byNS, _) <- readTVar handlers + let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq) + case Map.lookup (iqRequestType iq, iqNS) byNS of + Nothing -> return () -- TODO: send error stanza + Just ch -> do + sent <- newTVar False + writeTChan ch $ IQRequestTicket sent iq + handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM () + handleIQResponse handlers iq = do + (byNS, byID) <- readTVar handlers + case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of + (Nothing, _) -> return () -- We are not supposed to send an error. + (Just tmvar, byID') -> do + let answer = either IQResponseError IQResponseResult iq + _ <- tryPutTMVar tmvar answer -- Don't block. + writeTVar handlers (byNS, byID') + where + iqID (Left err) = iqErrorID err + iqID (Right iq') = iqResultID iq' + + +-- | Creates and initializes a new Xmpp context. +newSession :: TMVar Connection -> IO Session +newSession con = do + outC <- newTChanIO + stanzaChan <- newTChanIO + iqHandlers <- newTVarIO (Map.empty, Map.empty) + eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () } + let stanzaHandler = toChans stanzaChan iqHandlers + (kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh con + writer <- forkIO $ writeWorker outC wLock + idRef <- newTVarIO 1 + let getId = atomically $ do + curId <- readTVar idRef + writeTVar idRef (curId + 1 :: Integer) + return . read. show $ curId + return $ Session { stanzaCh = stanzaChan + , outCh = outC + , iqHandlers = iqHandlers + , writeRef = wLock + , readerThread = readerThread + , idGenerator = getId + , conRef = conState + , eventHandlers = eh + , stopThreads = kill >> killThread writer + } + +-- Worker to write stanzas to the stream concurrently. +writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO () +writeWorker stCh writeR = forever $ do + (write, next) <- atomically $ (,) <$> + takeTMVar writeR <*> + readTChan stCh + r <- write $ renderElement (pickleElem xpStanza next) + atomically $ putTMVar writeR write + unless r $ do + atomically $ unGetTChan stCh next -- If the writing failed, the + -- connection is dead. + threadDelay 250000 -- Avoid free spinning. diff --git a/source/Network/Xmpp/Concurrent/Channels/Basic.hs b/source/Network/Xmpp/Concurrent/Basic.hs similarity index 82% rename from source/Network/Xmpp/Concurrent/Channels/Basic.hs rename to source/Network/Xmpp/Concurrent/Basic.hs index e01d920..5b16e4e 100644 --- a/source/Network/Xmpp/Concurrent/Channels/Basic.hs +++ b/source/Network/Xmpp/Concurrent/Basic.hs @@ -1,8 +1,8 @@ {-# OPTIONS_HADDOCK hide #-} -module Network.Xmpp.Concurrent.Channels.Basic where +module Network.Xmpp.Concurrent.Basic where import Control.Concurrent.STM -import Network.Xmpp.Concurrent.Channels.Types +import Network.Xmpp.Concurrent.Types import Network.Xmpp.Types -- | Send a stanza to the server. diff --git a/source/Network/Xmpp/Concurrent/Channels.hs b/source/Network/Xmpp/Concurrent/Channels.hs deleted file mode 100644 index 0e12fc0..0000000 --- a/source/Network/Xmpp/Concurrent/Channels.hs +++ /dev/null @@ -1,112 +0,0 @@ -{-# OPTIONS_HADDOCK hide #-} -{-# LANGUAGE OverloadedStrings #-} -module Network.Xmpp.Concurrent.Channels - ( module Network.Xmpp.Concurrent.Channels.Basic - , module Network.Xmpp.Concurrent.Channels.Types - , module Network.Xmpp.Concurrent.Channels.Message - , module Network.Xmpp.Concurrent.Channels.Presence - , module Network.Xmpp.Concurrent.Channels.IQ - , toChans - , newSession - , writeWorker - ) - - where - -import Control.Applicative((<$>),(<*>)) -import Control.Concurrent -import Control.Concurrent.STM -import Control.Monad -import qualified Data.ByteString as BS -import Data.IORef -import qualified Data.Map as Map -import Data.Maybe (fromMaybe) -import Data.XML.Types -import Network.Xmpp.Concurrent.Channels.Basic -import Network.Xmpp.Concurrent.Channels.IQ -import Network.Xmpp.Concurrent.Channels.Message -import Network.Xmpp.Concurrent.Channels.Presence -import Network.Xmpp.Concurrent.Channels.Types -import Network.Xmpp.Concurrent.Threads -import Network.Xmpp.Concurrent.Types -import Network.Xmpp.Marshal -import Network.Xmpp.Pickle -import Network.Xmpp.Types -import Text.Xml.Stream.Elements - -toChans :: TChan Stanza - -> TVar IQHandlers - -> Stanza - -> IO () -toChans stanzaC iqHands sta = atomically $ do - writeTChan stanzaC sta - case sta of - IQRequestS i -> handleIQRequest iqHands i - IQResultS i -> handleIQResponse iqHands (Right i) - IQErrorS i -> handleIQResponse iqHands (Left i) - _ -> return () - where - -- If the IQ request has a namespace, send it through the appropriate channel. - handleIQRequest :: TVar IQHandlers -> IQRequest -> STM () - handleIQRequest handlers iq = do - (byNS, _) <- readTVar handlers - let iqNS = fromMaybe "" (nameNamespace . elementName $ iqRequestPayload iq) - case Map.lookup (iqRequestType iq, iqNS) byNS of - Nothing -> return () -- TODO: send error stanza - Just ch -> do - sent <- newTVar False - writeTChan ch $ IQRequestTicket sent iq - handleIQResponse :: TVar IQHandlers -> Either IQError IQResult -> STM () - handleIQResponse handlers iq = do - (byNS, byID) <- readTVar handlers - case Map.updateLookupWithKey (\_ _ -> Nothing) (iqID iq) byID of - (Nothing, _) -> return () -- We are not supposed to send an error. - (Just tmvar, byID') -> do - let answer = either IQResponseError IQResponseResult iq - _ <- tryPutTMVar tmvar answer -- Don't block. - writeTVar handlers (byNS, byID') - where - iqID (Left err) = iqErrorID err - iqID (Right iq') = iqResultID iq' - - --- | Creates and initializes a new Xmpp context. -newSession :: TMVar Connection -> IO Session -newSession con = do - outC <- newTChanIO - stanzaChan <- newTChanIO - iqHandlers <- newTVarIO (Map.empty, Map.empty) - eh <- newTVarIO $ EventHandlers { connectionClosedHandler = \_ -> return () } - let stanzaHandler = toChans stanzaChan iqHandlers - (kill, wLock, conState, readerThread) <- startThreadsWith stanzaHandler eh con - writer <- forkIO $ writeWorker outC wLock - idRef <- newTVarIO 1 - let getId = atomically $ do - curId <- readTVar idRef - writeTVar idRef (curId + 1 :: Integer) - return . read. show $ curId - let cont = Context { writeRef = wLock - , readerThread = readerThread - , idGenerator = getId - , conRef = conState - , eventHandlers = eh - , stopThreads = kill >> killThread writer - } - return $ Session { context = cont - , stanzaCh = stanzaChan - , outCh = outC - , iqHandlers = iqHandlers - } - --- Worker to write stanzas to the stream concurrently. -writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO Bool) -> IO () -writeWorker stCh writeR = forever $ do - (write, next) <- atomically $ (,) <$> - takeTMVar writeR <*> - readTChan stCh - r <- write $ renderElement (pickleElem xpStanza next) - atomically $ putTMVar writeR write - unless r $ do - atomically $ unGetTChan stCh next -- If the writing failed, the - -- connection is dead. - threadDelay 250000 -- Avoid free spinning. \ No newline at end of file diff --git a/source/Network/Xmpp/Concurrent/Channels/Types.hs b/source/Network/Xmpp/Concurrent/Channels/Types.hs deleted file mode 100644 index ca0cd3d..0000000 --- a/source/Network/Xmpp/Concurrent/Channels/Types.hs +++ /dev/null @@ -1,32 +0,0 @@ -{-# OPTIONS_HADDOCK hide #-} -module Network.Xmpp.Concurrent.Channels.Types where - -import Control.Concurrent.STM -import Data.IORef -import qualified Data.Map as Map -import Data.Text (Text) -import Network.Xmpp.Concurrent.Types -import Network.Xmpp.Types - --- | A concurrent interface to Pontarius XMPP. -data Session = Session - { context :: Context - , stanzaCh :: TChan Stanza -- All stanzas - , outCh :: TChan Stanza - , iqHandlers :: TVar IQHandlers - -- Writing lock, so that only one thread could write to the stream at any - -- given time. - } - --- | IQHandlers holds the registered channels for incomming IQ requests and --- TMVars of and TMVars for expected IQ responses -type IQHandlers = (Map.Map (IQRequestType, Text) (TChan IQRequestTicket) - , Map.Map StanzaId (TMVar IQResponse) - ) - --- | Contains whether or not a reply has been sent, and the IQ request body to --- reply to. -data IQRequestTicket = IQRequestTicket - { sentRef :: (TVar Bool) - , iqRequestBody :: IQRequest - } diff --git a/source/Network/Xmpp/Concurrent/Channels/IQ.hs b/source/Network/Xmpp/Concurrent/IQ.hs similarity index 94% rename from source/Network/Xmpp/Concurrent/Channels/IQ.hs rename to source/Network/Xmpp/Concurrent/IQ.hs index 4c6ce3d..bd79061 100644 --- a/source/Network/Xmpp/Concurrent/Channels/IQ.hs +++ b/source/Network/Xmpp/Concurrent/IQ.hs @@ -1,5 +1,5 @@ {-# OPTIONS_HADDOCK hide #-} -module Network.Xmpp.Concurrent.Channels.IQ where +module Network.Xmpp.Concurrent.IQ where import Control.Concurrent (forkIO, threadDelay) import Control.Concurrent.STM @@ -11,8 +11,7 @@ import qualified Data.Map as Map import Data.Text (Text) import Data.XML.Types -import Network.Xmpp.Concurrent.Channels.Basic -import Network.Xmpp.Concurrent.Channels.Types +import Network.Xmpp.Concurrent.Basic import Network.Xmpp.Concurrent.Types import Network.Xmpp.Types @@ -27,7 +26,7 @@ sendIQ :: Maybe Int -- ^ Timeout -> Session -> IO (TMVar IQResponse) sendIQ timeOut to tp lang body session = do -- TODO: Add timeout - newId <- idGenerator (context session) + newId <- idGenerator session ref <- atomically $ do resRef <- newEmptyTMVar (byNS, byId) <- readTVar (iqHandlers session) diff --git a/source/Network/Xmpp/Concurrent/Channels/Message.hs b/source/Network/Xmpp/Concurrent/Message.hs similarity index 93% rename from source/Network/Xmpp/Concurrent/Channels/Message.hs rename to source/Network/Xmpp/Concurrent/Message.hs index 5cff80a..b84dc2e 100644 --- a/source/Network/Xmpp/Concurrent/Channels/Message.hs +++ b/source/Network/Xmpp/Concurrent/Message.hs @@ -1,12 +1,12 @@ {-# OPTIONS_HADDOCK hide #-} -module Network.Xmpp.Concurrent.Channels.Message where +module Network.Xmpp.Concurrent.Message where -import Network.Xmpp.Concurrent.Channels.Types +import Network.Xmpp.Concurrent.Types import Control.Concurrent.STM import Data.IORef import Network.Xmpp.Types import Network.Xmpp.Concurrent.Types -import Network.Xmpp.Concurrent.Channels.Basic +import Network.Xmpp.Concurrent.Basic -- | Read an element from the inbound stanza channel, acquiring a copy of the -- channel as necessary. diff --git a/source/Network/Xmpp/Concurrent/Monad.hs b/source/Network/Xmpp/Concurrent/Monad.hs index ac15313..863c985 100644 --- a/source/Network/Xmpp/Concurrent/Monad.hs +++ b/source/Network/Xmpp/Concurrent/Monad.hs @@ -59,7 +59,7 @@ import Network.Xmpp.Connection_ -- ] -- | Executes a function to update the event handlers. -modifyHandlers :: (EventHandlers -> EventHandlers) -> Context -> IO () +modifyHandlers :: (EventHandlers -> EventHandlers) -> Session -> IO () modifyHandlers f session = atomically $ modifyTVar (eventHandlers session) f where -- Borrowing modifyTVar from @@ -71,18 +71,18 @@ modifyHandlers f session = atomically $ modifyTVar (eventHandlers session) f writeTVar var (f x) -- | Sets the handler to be executed when the server connection is closed. -setConnectionClosedHandler_ :: (StreamFailure -> Context -> IO ()) -> Context -> IO () +setConnectionClosedHandler_ :: (StreamFailure -> Session -> IO ()) -> Session -> IO () setConnectionClosedHandler_ eh session = do modifyHandlers (\s -> s{connectionClosedHandler = \e -> eh e session}) session -- | Run an event handler. -runHandler :: (EventHandlers -> IO a) -> Context -> IO a +runHandler :: (EventHandlers -> IO a) -> Session -> IO a runHandler h session = h =<< atomically (readTVar $ eventHandlers session) -- | End the current Xmpp session. -endContext :: Context -> IO () +endContext :: Session -> IO () endContext session = do -- TODO: This has to be idempotent (is it?) closeConnection session stopThreads session @@ -90,7 +90,7 @@ endContext session = do -- TODO: This has to be idempotent (is it?) -- | Close the connection to the server. Closes the stream (by enforcing a -- write lock and sending a element), waits (blocks) for three -- seconds, and then closes the connection. -closeConnection :: Context -> IO () +closeConnection :: Session -> IO () closeConnection session = Ex.mask_ $ do (_send, connection) <- atomically $ liftM2 (,) (takeTMVar $ writeRef session) diff --git a/source/Network/Xmpp/Concurrent/Channels/Presence.hs b/source/Network/Xmpp/Concurrent/Presence.hs similarity index 87% rename from source/Network/Xmpp/Concurrent/Channels/Presence.hs rename to source/Network/Xmpp/Concurrent/Presence.hs index 32ec83f..3cb0d6a 100644 --- a/source/Network/Xmpp/Concurrent/Channels/Presence.hs +++ b/source/Network/Xmpp/Concurrent/Presence.hs @@ -1,12 +1,11 @@ {-# OPTIONS_HADDOCK hide #-} -module Network.Xmpp.Concurrent.Channels.Presence where +module Network.Xmpp.Concurrent.Presence where -import Network.Xmpp.Concurrent.Channels.Types import Control.Concurrent.STM import Data.IORef import Network.Xmpp.Types import Network.Xmpp.Concurrent.Types -import Network.Xmpp.Concurrent.Channels.Basic +import Network.Xmpp.Concurrent.Basic -- | Read an element from the inbound stanza channel, acquiring a copy of the -- channel as necessary. diff --git a/source/Network/Xmpp/Concurrent/Types.hs b/source/Network/Xmpp/Concurrent/Types.hs index 0259c45..212ea1e 100644 --- a/source/Network/Xmpp/Concurrent/Types.hs +++ b/source/Network/Xmpp/Concurrent/Types.hs @@ -12,15 +12,34 @@ import Data.Typeable import Network.Xmpp.Types +import Data.IORef +import qualified Data.Map as Map +import Data.Text (Text) + +import Network.Xmpp.Types + -- | Handlers to be run when the Xmpp session ends and when the Xmpp connection is -- closed. data EventHandlers = EventHandlers { connectionClosedHandler :: StreamFailure -> IO () } --- | Xmpp Context object -data Context = Context - { writeRef :: TMVar (BS.ByteString -> IO Bool) +-- | Interrupt is used to signal to the reader thread that it should stop. Th contained semphore signals the reader to resume it's work. +data Interrupt = Interrupt (TMVar ()) deriving Typeable +instance Show Interrupt where show _ = "" + +instance Ex.Exception Interrupt + + +-- | A concurrent interface to Pontarius XMPP. +data Session = Session + { stanzaCh :: TChan Stanza -- All stanzas + , outCh :: TChan Stanza + , iqHandlers :: TVar IQHandlers + -- Writing lock, so that only one thread could write to the stream at any + -- given time. + -- Fields below are from Context. + , writeRef :: TMVar (BS.ByteString -> IO Bool) , readerThread :: ThreadId , idGenerator :: IO StanzaId -- | Lock (used by withConnection) to make sure that a maximum of one @@ -30,9 +49,15 @@ data Context = Context , stopThreads :: IO () } - --- | Interrupt is used to signal to the reader thread that it should stop. Th contained semphore signals the reader to resume it's work. -data Interrupt = Interrupt (TMVar ()) deriving Typeable -instance Show Interrupt where show _ = "" - -instance Ex.Exception Interrupt +-- | IQHandlers holds the registered channels for incomming IQ requests and +-- TMVars of and TMVars for expected IQ responses +type IQHandlers = (Map.Map (IQRequestType, Text) (TChan IQRequestTicket) + , Map.Map StanzaId (TMVar IQResponse) + ) + +-- | Contains whether or not a reply has been sent, and the IQ request body to +-- reply to. +data IQRequestTicket = IQRequestTicket + { sentRef :: (TVar Bool) + , iqRequestBody :: IQRequest + } diff --git a/source/Network/Xmpp/Session.hs b/source/Network/Xmpp/Session.hs index 857778b..7318067 100644 --- a/source/Network/Xmpp/Session.hs +++ b/source/Network/Xmpp/Session.hs @@ -11,7 +11,7 @@ import Network import qualified Network.TLS as TLS import Network.Xmpp.Bind import Network.Xmpp.Concurrent.Types -import Network.Xmpp.Concurrent.Channels +import Network.Xmpp.Concurrent import Network.Xmpp.Connection_ import Network.Xmpp.Marshal import Network.Xmpp.Pickle @@ -109,4 +109,4 @@ simpleAuth username passwd resource = flip auth resource $ [ -- TODO: scramSha1Plus scramSha1 username Nothing passwd , digestMd5 username Nothing passwd - ] \ No newline at end of file + ] diff --git a/source/Network/Xmpp/Xep/ServiceDiscovery.hs b/source/Network/Xmpp/Xep/ServiceDiscovery.hs index 85a22c2..c025677 100644 --- a/source/Network/Xmpp/Xep/ServiceDiscovery.hs +++ b/source/Network/Xmpp/Xep/ServiceDiscovery.hs @@ -26,7 +26,6 @@ import Data.XML.Types import Network.Xmpp import Network.Xmpp.Concurrent -import Network.Xmpp.Concurrent.Channels import Network.Xmpp.Concurrent.Types import Network.Xmpp.Connection_ import Network.Xmpp.Pickle