|
|
|
@ -13,9 +13,6 @@ import Network.XMPP.Types |
|
|
|
import Control.Applicative((<$>),(<*>)) |
|
|
|
import Control.Applicative((<$>),(<*>)) |
|
|
|
import Control.Concurrent |
|
|
|
import Control.Concurrent |
|
|
|
import Control.Concurrent.STM |
|
|
|
import Control.Concurrent.STM |
|
|
|
import Control.Concurrent.STM.TChan |
|
|
|
|
|
|
|
import Control.Concurrent.STM.TMVar |
|
|
|
|
|
|
|
import Control.Exception (throwTo) |
|
|
|
|
|
|
|
import qualified Control.Exception.Lifted as Ex |
|
|
|
import qualified Control.Exception.Lifted as Ex |
|
|
|
import Control.Monad |
|
|
|
import Control.Monad |
|
|
|
import Control.Monad.IO.Class |
|
|
|
import Control.Monad.IO.Class |
|
|
|
@ -24,38 +21,33 @@ import Control.Monad.Trans.Reader |
|
|
|
import Control.Monad.Trans.Resource |
|
|
|
import Control.Monad.Trans.Resource |
|
|
|
import Control.Monad.Trans.State |
|
|
|
import Control.Monad.Trans.State |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import qualified Data.ByteString as BS |
|
|
|
import qualified Data.ByteString as BS |
|
|
|
import Data.Conduit |
|
|
|
import Data.Conduit |
|
|
|
import qualified Data.Conduit.List as CL |
|
|
|
import qualified Data.Conduit.List as CL |
|
|
|
import Data.Default (def) |
|
|
|
import Data.Default (def) |
|
|
|
import Data.IORef |
|
|
|
import Data.IORef |
|
|
|
import qualified Data.Map as Map |
|
|
|
import qualified Data.Map as Map |
|
|
|
import Data.Maybe |
|
|
|
|
|
|
|
import qualified Data.Text as Text |
|
|
|
import qualified Data.Text as Text |
|
|
|
import Data.Text(Text) |
|
|
|
import Data.Text(Text) |
|
|
|
import Data.Typeable |
|
|
|
import Data.Typeable |
|
|
|
|
|
|
|
|
|
|
|
import Data.XML.Types |
|
|
|
import Data.XML.Types |
|
|
|
|
|
|
|
|
|
|
|
import Network.XMPP.Types |
|
|
|
|
|
|
|
import Network.XMPP.Monad |
|
|
|
import Network.XMPP.Monad |
|
|
|
import Network.XMPP.Marshal |
|
|
|
import Network.XMPP.Marshal |
|
|
|
import Network.XMPP.Pickle |
|
|
|
import Network.XMPP.Pickle |
|
|
|
|
|
|
|
|
|
|
|
import System.IO |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import Text.XML.Stream.Elements |
|
|
|
import Text.XML.Stream.Elements |
|
|
|
import qualified Text.XML.Stream.Render as XR |
|
|
|
import qualified Text.XML.Stream.Render as XR |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
type IQHandlers = (Map.Map (IQType, Text) (TChan IQ), Map.Map Text (TMVar IQ)) |
|
|
|
|
|
|
|
|
|
|
|
data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message)) |
|
|
|
data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message)) |
|
|
|
, presenceRef :: IORef (Maybe (TChan Presence)) |
|
|
|
, presenceRef :: IORef (Maybe (TChan Presence)) |
|
|
|
, mShadow :: TChan Message -- the original chan |
|
|
|
, mShadow :: TChan Message -- the original chan |
|
|
|
, pShadow :: TChan Presence -- the original chan |
|
|
|
, pShadow :: TChan Presence -- the original chan |
|
|
|
, outCh :: TChan Stanza |
|
|
|
, outCh :: TChan Stanza |
|
|
|
, iqHandlers :: TVar ( Map.Map (IQType, Text) (TChan IQ) |
|
|
|
, iqHandlers :: TVar IQHandlers |
|
|
|
, Map.Map Text (TMVar IQ) |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
, writeRef :: TMVar (BS.ByteString -> IO () ) |
|
|
|
, writeRef :: TMVar (BS.ByteString -> IO () ) |
|
|
|
, readerThread :: ThreadId |
|
|
|
, readerThread :: ThreadId |
|
|
|
, idGenerator :: IO Text |
|
|
|
, idGenerator :: IO Text |
|
|
|
@ -95,16 +87,17 @@ readWorker messageC presenceC iqC s = Ex.catch (forever . flip runStateT s $ do |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO ()) -> IO () |
|
|
|
writeWorker :: TChan Stanza -> TMVar (BS.ByteString -> IO ()) -> IO () |
|
|
|
writeWorker stCh writeRef = forever $ do |
|
|
|
writeWorker stCh writeR = forever $ do |
|
|
|
(write, next) <- atomically $ (,) <$> |
|
|
|
(write, next) <- atomically $ (,) <$> |
|
|
|
takeTMVar writeRef <*> |
|
|
|
takeTMVar writeR <*> |
|
|
|
readTChan stCh |
|
|
|
readTChan stCh |
|
|
|
outBS <- CL.sourceList (elementToEvents $ pickleElem stanzaP next) |
|
|
|
outBS <- CL.sourceList (elementToEvents $ pickleElem stanzaP next) |
|
|
|
$= XR.renderBytes def $$ CL.consume |
|
|
|
$= XR.renderBytes def $$ CL.consume |
|
|
|
forM outBS write |
|
|
|
_ <- forM outBS write |
|
|
|
atomically $ putTMVar writeRef write |
|
|
|
atomically $ putTMVar writeR write |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
handleIQs :: MonadIO m => TVar IQHandlers -> TChan IQ -> m a |
|
|
|
handleIQs handlers iqC = liftIO . forever . atomically $ do |
|
|
|
handleIQs handlers iqC = liftIO . forever . atomically $ do |
|
|
|
iq <- readTChan iqC |
|
|
|
iq <- readTChan iqC |
|
|
|
(byNS, byID) <- readTVar handlers |
|
|
|
(byNS, byID) <- readTVar handlers |
|
|
|
@ -118,13 +111,15 @@ handleIQs handlers iqC = liftIO . forever . atomically $ do |
|
|
|
Set -> case Map.lookup (Set, iqNS) byNS of |
|
|
|
Set -> case Map.lookup (Set, iqNS) byNS of |
|
|
|
Nothing -> return () -- TODO: send error stanza |
|
|
|
Nothing -> return () -- TODO: send error stanza |
|
|
|
Just ch -> writeTChan ch iq |
|
|
|
Just ch -> writeTChan ch iq |
|
|
|
Result -> case Map.updateLookupWithKey (\_ _ -> Nothing) |
|
|
|
-- Result / Error : |
|
|
|
|
|
|
|
_ -> case Map.updateLookupWithKey (\_ _ -> Nothing) |
|
|
|
(iqId iq) byID of |
|
|
|
(iqId iq) byID of |
|
|
|
(Nothing, _) -> return () -- we are not supposed |
|
|
|
(Nothing, _) -> return () -- we are not supposed |
|
|
|
-- to send an error |
|
|
|
-- to send an error |
|
|
|
(Just tmvar, byID') -> do |
|
|
|
(Just tmvar, byID') -> do |
|
|
|
tryPutTMVar tmvar iq -- don't block |
|
|
|
_ <- tryPutTMVar tmvar iq -- don't block |
|
|
|
writeTVar handlers (byNS, byID) |
|
|
|
writeTVar handlers (byNS, byID') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- Two streams: input and output. Threads read from input stream and write to output stream. |
|
|
|
-- Two streams: input and output. Threads read from input stream and write to output stream. |
|
|
|
@ -149,25 +144,17 @@ startThreads = do |
|
|
|
presenceC <- liftIO newTChanIO |
|
|
|
presenceC <- liftIO newTChanIO |
|
|
|
iqC <- liftIO newTChanIO |
|
|
|
iqC <- liftIO newTChanIO |
|
|
|
outC <- liftIO newTChanIO |
|
|
|
outC <- liftIO newTChanIO |
|
|
|
iqHandlers <- liftIO $ newTVarIO ( Map.empty, Map.empty) |
|
|
|
handlers <- liftIO $ newTVarIO ( Map.empty, Map.empty) |
|
|
|
pushEvents <- gets sConPush |
|
|
|
|
|
|
|
pushBS <- gets sConPushBS |
|
|
|
|
|
|
|
lw <- liftIO . forkIO $ writeWorker outC writeLock |
|
|
|
lw <- liftIO . forkIO $ writeWorker outC writeLock |
|
|
|
cp <- liftIO . forkIO $ connPersist writeLock |
|
|
|
cp <- liftIO . forkIO $ connPersist writeLock |
|
|
|
iqh <- liftIO . forkIO $ handleIQs iqHandlers iqC |
|
|
|
iqh <- liftIO . forkIO $ handleIQs handlers iqC |
|
|
|
s <- get |
|
|
|
s <- get |
|
|
|
rd <- lift . resourceForkIO $ readWorker messageC presenceC iqC s |
|
|
|
rd <- lift . resourceForkIO $ readWorker messageC presenceC iqC s |
|
|
|
return (messageC, presenceC, iqHandlers, outC, killConnection writeLock [lw, rd, cp], writeLock, rd) |
|
|
|
return (messageC, presenceC, handlers, outC, killConnection writeLock [lw, rd, cp, iqh], writeLock, rd) |
|
|
|
where |
|
|
|
where |
|
|
|
loopWrite writeLock pushEvents out' = forever $ do |
|
|
|
|
|
|
|
next <- liftIO . atomically $ ( takeTMVar writeLock |
|
|
|
|
|
|
|
>> readTChan out') |
|
|
|
|
|
|
|
pushEvents . elementToEvents $ pickleElem stanzaP next |
|
|
|
|
|
|
|
liftIO . atomically $ putTMVar writeLock () |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
killConnection writeLock threads = liftIO $ do |
|
|
|
killConnection writeLock threads = liftIO $ do |
|
|
|
atomically $ takeTMVar writeLock |
|
|
|
_ <- atomically $ takeTMVar writeLock -- Should we put it back? |
|
|
|
forM threads killThread |
|
|
|
_ <- forM threads killThread |
|
|
|
return() |
|
|
|
return() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@ -195,7 +182,7 @@ listenIQChan tp ns = do |
|
|
|
runThreaded :: XMPPThread a |
|
|
|
runThreaded :: XMPPThread a |
|
|
|
-> XMPPMonad a |
|
|
|
-> XMPPMonad a |
|
|
|
runThreaded a = do |
|
|
|
runThreaded a = do |
|
|
|
(mC, pC, hand, outC, stopThreads, writeR, reader ) <- startThreads |
|
|
|
(mC, pC, hand, outC, _stopThreads, writeR, rdr ) <- startThreads |
|
|
|
workermCh <- liftIO . newIORef $ Nothing |
|
|
|
workermCh <- liftIO . newIORef $ Nothing |
|
|
|
workerpCh <- liftIO . newIORef $ Nothing |
|
|
|
workerpCh <- liftIO . newIORef $ Nothing |
|
|
|
idRef <- liftIO $ newTVarIO 1 |
|
|
|
idRef <- liftIO $ newTVarIO 1 |
|
|
|
@ -203,13 +190,14 @@ runThreaded a = do |
|
|
|
curId <- readTVar idRef |
|
|
|
curId <- readTVar idRef |
|
|
|
writeTVar idRef (curId + 1 :: Integer) |
|
|
|
writeTVar idRef (curId + 1 :: Integer) |
|
|
|
return . Text.pack $ show curId |
|
|
|
return . Text.pack $ show curId |
|
|
|
liftIO $ runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR reader getId) |
|
|
|
liftIO $ runReaderT a (Thread workermCh workerpCh mC pC outC hand writeR rdr getId) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- | get the inbound stanza channel, duplicates from master if necessary |
|
|
|
-- | get the inbound stanza channel, duplicates from master if necessary |
|
|
|
-- please note that once duplicated it will keep filling up, call |
|
|
|
-- please note that once duplicated it will keep filling up, call |
|
|
|
-- 'dropMessageChan' to allow it to be garbage collected |
|
|
|
-- 'dropMessageChan' to allow it to be garbage collected |
|
|
|
|
|
|
|
getMessageChan :: XMPPThread (TChan Message) |
|
|
|
getMessageChan = do |
|
|
|
getMessageChan = do |
|
|
|
mChR <- asks messagesRef |
|
|
|
mChR <- asks messagesRef |
|
|
|
mCh <- liftIO $ readIORef mChR |
|
|
|
mCh <- liftIO $ readIORef mChR |
|
|
|
@ -219,9 +207,10 @@ getMessageChan = do |
|
|
|
mCh' <- liftIO $ atomically $ dupTChan shadow |
|
|
|
mCh' <- liftIO $ atomically $ dupTChan shadow |
|
|
|
liftIO $ writeIORef mChR (Just mCh') |
|
|
|
liftIO $ writeIORef mChR (Just mCh') |
|
|
|
return mCh' |
|
|
|
return mCh' |
|
|
|
Just mCh -> return mCh |
|
|
|
Just mCh' -> return mCh' |
|
|
|
|
|
|
|
|
|
|
|
-- | see 'getMessageChan' |
|
|
|
-- | see 'getMessageChan' |
|
|
|
|
|
|
|
getPresenceChan :: XMPPThread (TChan Presence) |
|
|
|
getPresenceChan = do |
|
|
|
getPresenceChan = do |
|
|
|
pChR <- asks presenceRef |
|
|
|
pChR <- asks presenceRef |
|
|
|
pCh <- liftIO $ readIORef pChR |
|
|
|
pCh <- liftIO $ readIORef pChR |
|
|
|
@ -231,7 +220,7 @@ getPresenceChan = do |
|
|
|
pCh' <- liftIO $ atomically $ dupTChan shadow |
|
|
|
pCh' <- liftIO $ atomically $ dupTChan shadow |
|
|
|
liftIO $ writeIORef pChR (Just pCh') |
|
|
|
liftIO $ writeIORef pChR (Just pCh') |
|
|
|
return pCh' |
|
|
|
return pCh' |
|
|
|
Just pCh -> return pCh |
|
|
|
Just pCh' -> return pCh' |
|
|
|
|
|
|
|
|
|
|
|
-- | Drop the local end of the inbound stanza channel |
|
|
|
-- | Drop the local end of the inbound stanza channel |
|
|
|
-- from our context so it can be GC-ed |
|
|
|
-- from our context so it can be GC-ed |
|
|
|
@ -313,9 +302,10 @@ connPersist lock = forever $ do |
|
|
|
singleThreaded :: XMPPMonad () -> XMPPThread () |
|
|
|
singleThreaded :: XMPPMonad () -> XMPPThread () |
|
|
|
singleThreaded a = do |
|
|
|
singleThreaded a = do |
|
|
|
writeLock <- asks writeRef |
|
|
|
writeLock <- asks writeRef |
|
|
|
reader <- asks readerThread |
|
|
|
rdr <- asks readerThread |
|
|
|
liftIO . atomically $ takeTMVar writeLock |
|
|
|
_ <- liftIO . atomically $ takeTMVar writeLock -- we replace it with the |
|
|
|
liftIO . throwTo reader . ReaderSignal $ do |
|
|
|
-- one returned by a |
|
|
|
|
|
|
|
liftIO . throwTo rdr . ReaderSignal $ do |
|
|
|
a |
|
|
|
a |
|
|
|
out <- gets sConPushBS |
|
|
|
out <- gets sConPushBS |
|
|
|
liftIO . atomically $ putTMVar writeLock out |
|
|
|
liftIO . atomically $ putTMVar writeLock out |
|
|
|
|