|
|
|
@ -1,4 +1,5 @@ |
|
|
|
{-# LANGUAGE OverloadedStrings #-} |
|
|
|
{-# LANGUAGE OverloadedStrings #-} |
|
|
|
|
|
|
|
{-# LANGUAGE NoMonomorphismRestriction #-} |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
module Network.XMPP.Concurrent |
|
|
|
module Network.XMPP.Concurrent |
|
|
|
@ -20,24 +21,26 @@ import Control.Monad.Trans.State |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import qualified Data.ByteString as BS |
|
|
|
import qualified Data.ByteString as BS |
|
|
|
|
|
|
|
import qualified Data.Map as Map |
|
|
|
import Data.Maybe |
|
|
|
import Data.Maybe |
|
|
|
import Data.IORef |
|
|
|
import Data.IORef |
|
|
|
|
|
|
|
import Data.Text(Text) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import Data.XML.Types |
|
|
|
|
|
|
|
|
|
|
|
import Network.XMPP.Types |
|
|
|
import Network.XMPP.Types |
|
|
|
import Network.XMPP.Monad |
|
|
|
import Network.XMPP.Monad |
|
|
|
import Network.XMPP.Marshal |
|
|
|
import Network.XMPP.Marshal |
|
|
|
import Network.XMPP.Pickle |
|
|
|
import Network.XMPP.Pickle |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import System.IO |
|
|
|
import System.IO |
|
|
|
|
|
|
|
|
|
|
|
import Text.XML.Expat.Format |
|
|
|
import Text.XML.Stream.Elements |
|
|
|
import Text.XML.Expat.Pickle |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message)) |
|
|
|
data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message)) |
|
|
|
, presenceRef :: IORef (Maybe (TChan Presence)) |
|
|
|
, presenceRef :: IORef (Maybe (TChan Presence)) |
|
|
|
, mShadow :: TChan Stanza -- the original chan |
|
|
|
, mShadow :: TChan Message -- the original chan |
|
|
|
, pShadow :: TChan Stanza -- the original chan |
|
|
|
, pShadow :: TChan Presence -- the original chan |
|
|
|
, outCh :: TChan Stanza |
|
|
|
, outCh :: TChan Stanza |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
@ -47,34 +50,56 @@ type XMPPThread a = ReaderT Thread IO a |
|
|
|
-- | Runs thread in XmppState monad |
|
|
|
-- | Runs thread in XmppState monad |
|
|
|
-- returns channel of incoming and outgoing stances, respectively |
|
|
|
-- returns channel of incoming and outgoing stances, respectively |
|
|
|
-- and an Action to stop the Threads and close the connection |
|
|
|
-- and an Action to stop the Threads and close the connection |
|
|
|
startThreads :: XMPPMonad (TChan Stanza, TChan Stanza, IO ()) |
|
|
|
startThreads |
|
|
|
|
|
|
|
:: XMPPMonad ( TChan Message |
|
|
|
|
|
|
|
, TChan Presence |
|
|
|
|
|
|
|
, TVar ( Map.Map (IQType, Text) (TChan IQ) |
|
|
|
|
|
|
|
, Map.Map Text (TMVar IQ) |
|
|
|
|
|
|
|
) |
|
|
|
|
|
|
|
, TChan Stanza, IO () |
|
|
|
|
|
|
|
) |
|
|
|
startThreads = do |
|
|
|
startThreads = do |
|
|
|
writeLock <- liftIO $ newTMVarIO () |
|
|
|
writeLock <- liftIO $ newTMVarIO () |
|
|
|
messagesC <- liftIO newTChanIO |
|
|
|
messageC <- liftIO newTChanIO |
|
|
|
presenceC <- liftIO newTChanIO |
|
|
|
presenceC <- liftIO newTChanIO |
|
|
|
iqC <- liftIO newTChanIO |
|
|
|
iqC <- liftIO newTChanIO |
|
|
|
outC <- liftIO newTChanIO |
|
|
|
outC <- liftIO newTChanIO |
|
|
|
iqHandlers <- liftIO newTVarIO |
|
|
|
iqHandlers <- liftIO $ newTVarIO ( Map.empty, Map.empty) |
|
|
|
pushBS <- gets sConPush |
|
|
|
pushEvents <- gets sConPush |
|
|
|
lw <- liftIO . forkIO $ loopWrite writeLock pushBS outC |
|
|
|
pushBS <- gets sConPushBS |
|
|
|
|
|
|
|
lw <- lift . resourceForkIO $ loopWrite writeLock pushEvents outC |
|
|
|
cp <- liftIO . forkIO $ connPersist pushBS writeLock |
|
|
|
cp <- liftIO . forkIO $ connPersist pushBS writeLock |
|
|
|
|
|
|
|
iqh <- lift . resourceForkIO $ handleIQs iqHandlers iqC |
|
|
|
s <- get |
|
|
|
s <- get |
|
|
|
rd <- lift . resourceForkIO . void . flip runStateT s . forever $ do |
|
|
|
rd <- lift . resourceForkIO . void . flip runStateT s . forever $ do |
|
|
|
s <- pull |
|
|
|
sta <- pull |
|
|
|
case s of |
|
|
|
case sta of |
|
|
|
SMessage m -> liftIO . atomically $ writeTChan messageC m |
|
|
|
SMessage m -> liftIO . atomically $ writeTChan messageC m |
|
|
|
SPresence p -> liftIO . atomically $ writeTChan presenceC p |
|
|
|
SPresence p -> liftIO . atomically $ writeTChan presenceC p |
|
|
|
SIQ i -> liftIO . atomically $ writeTChan presenceC i |
|
|
|
SIQ i -> liftIO . atomically $ writeTChan iqC i |
|
|
|
return (inC, outC, killConnection writeLock [lw, rd, cp]) |
|
|
|
return (messageC, presenceC, iqHandlers, outC, killConnection writeLock [lw, rd, cp]) |
|
|
|
where |
|
|
|
where |
|
|
|
loopWrite writeLock pushBS out' = forever $ do |
|
|
|
loopWrite writeLock pushEvents out' = forever $ do |
|
|
|
next <- liftIO . atomically $ ( takeTMVar writeLock >> readTChan out') |
|
|
|
next <- liftIO . atomically $ ( takeTMVar writeLock |
|
|
|
liftIO . pushBS . formatNode' $ pickleElem stanzaP next |
|
|
|
>> readTChan out') |
|
|
|
|
|
|
|
pushEvents . elementToEvents $ pickleElem stanzaP next |
|
|
|
liftIO . atomically $ putTMVar writeLock () |
|
|
|
liftIO . atomically $ putTMVar writeLock () |
|
|
|
iqHandler handlers iqC = forever $ do |
|
|
|
handleIQs handlers iqC = liftIO . forever . atomically $ do |
|
|
|
iq <- liftIO . atomically $ readTChan iqC |
|
|
|
iq <- readTChan iqC |
|
|
|
|
|
|
|
(byNS, byID) <- readTVar handlers |
|
|
|
|
|
|
|
let iqNS' = nameNamespace . elementName . iqBody $ iq |
|
|
|
|
|
|
|
case iqNS' of |
|
|
|
|
|
|
|
Nothing -> return () -- TODO: send error stanza |
|
|
|
|
|
|
|
Just iqNS -> case iqType iq of |
|
|
|
|
|
|
|
Get -> case Map.lookup (Get, iqNS) byNS of |
|
|
|
|
|
|
|
Nothing -> return () -- TODO: send error stanza |
|
|
|
|
|
|
|
Just ch -> writeTChan ch iq |
|
|
|
|
|
|
|
Set -> case Map.lookup (Set, iqNS) byNS of |
|
|
|
|
|
|
|
Nothing -> return () -- TODO: send error stanza |
|
|
|
|
|
|
|
Just ch -> writeTChan ch iq |
|
|
|
|
|
|
|
Result -> case Map.lookup (iqId iq) byID of |
|
|
|
|
|
|
|
Nothing -> return () -- ?? Should we be sending an error? |
|
|
|
|
|
|
|
Just tmvar -> putTMVar tmvar iq |
|
|
|
killConnection writeLock threads = liftIO $ do |
|
|
|
killConnection writeLock threads = liftIO $ do |
|
|
|
atomically $ takeTMVar writeLock |
|
|
|
atomically $ takeTMVar writeLock |
|
|
|
forM threads killThread |
|
|
|
forM threads killThread |
|
|
|
@ -83,44 +108,70 @@ startThreads = do |
|
|
|
runThreaded :: XMPPThread a |
|
|
|
runThreaded :: XMPPThread a |
|
|
|
-> XMPPMonad ThreadId |
|
|
|
-> XMPPMonad ThreadId |
|
|
|
runThreaded a = do |
|
|
|
runThreaded a = do |
|
|
|
(inC, outC, stopThreads) <- startThreads |
|
|
|
(mC, pC, hand, outC, stopThreads) <- startThreads |
|
|
|
workerInCh <- liftIO . newIORef $ Just inC |
|
|
|
workermCh <- liftIO . newIORef $ Just mC |
|
|
|
|
|
|
|
workerpCh <- liftIO . newIORef $ Just pC |
|
|
|
worker <- liftIO . forkIO $ do |
|
|
|
worker <- liftIO . forkIO $ do |
|
|
|
runReaderT a (Thread workerInCh inC outC) |
|
|
|
runReaderT a (Thread workermCh workerpCh mC pC outC) |
|
|
|
return () |
|
|
|
return () |
|
|
|
return worker |
|
|
|
return worker |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- | get the inbound stanza channel, duplicate from master if necessary |
|
|
|
-- | get the inbound stanza channel, duplicate from master if necessary |
|
|
|
-- please note that once duplicated it will keep filling up |
|
|
|
-- please note that once duplicated it will keep filling up |
|
|
|
getInChan = do |
|
|
|
getMessageChan = do |
|
|
|
inChR <- asks inChRef |
|
|
|
mChR <- asks messagesRef |
|
|
|
inCh <- liftIO $ readIORef inChR |
|
|
|
mCh <- liftIO $ readIORef mChR |
|
|
|
case inCh of |
|
|
|
case mCh of |
|
|
|
Nothing -> do |
|
|
|
Nothing -> do |
|
|
|
shadow <- asks shadowInCh |
|
|
|
shadow <- asks mShadow |
|
|
|
inCh' <- liftIO $ atomically $ dupTChan shadow |
|
|
|
mCh' <- liftIO $ atomically $ dupTChan shadow |
|
|
|
liftIO $ writeIORef inChR (Just inCh') |
|
|
|
liftIO $ writeIORef mChR (Just mCh') |
|
|
|
return inCh' |
|
|
|
return mCh' |
|
|
|
Just inCh -> return inCh |
|
|
|
Just mCh -> return mCh |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- | get the inbound stanza channel, duplicate from master if necessary |
|
|
|
|
|
|
|
-- please note that once duplicated it will keep filling up |
|
|
|
|
|
|
|
getPresenceChan = do |
|
|
|
|
|
|
|
pChR <- asks presenceRef |
|
|
|
|
|
|
|
pCh <- liftIO $ readIORef pChR |
|
|
|
|
|
|
|
case pCh of |
|
|
|
|
|
|
|
Nothing -> do |
|
|
|
|
|
|
|
shadow <- asks pShadow |
|
|
|
|
|
|
|
pCh' <- liftIO $ atomically $ dupTChan shadow |
|
|
|
|
|
|
|
liftIO $ writeIORef pChR (Just pCh') |
|
|
|
|
|
|
|
return pCh' |
|
|
|
|
|
|
|
Just pCh -> return pCh |
|
|
|
|
|
|
|
|
|
|
|
-- | Drop the local end of the inbound stanza channel |
|
|
|
-- | Drop the local end of the inbound stanza channel |
|
|
|
-- from our context so it can be GC-ed |
|
|
|
-- from our context so it can be GC-ed |
|
|
|
dropInChan :: XMPPThread () |
|
|
|
dropMessageChan :: XMPPThread () |
|
|
|
dropInChan = do |
|
|
|
dropMessageChan = do |
|
|
|
r <- asks inChRef |
|
|
|
r <- asks messagesRef |
|
|
|
|
|
|
|
liftIO $ writeIORef r Nothing |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
dropPresenceChan :: XMPPThread () |
|
|
|
|
|
|
|
dropPresenceChan = do |
|
|
|
|
|
|
|
r <- asks presenceRef |
|
|
|
liftIO $ writeIORef r Nothing |
|
|
|
liftIO $ writeIORef r Nothing |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- | Read an element from the inbound stanza channel, acquiring a copy |
|
|
|
|
|
|
|
-- of the channel as necessary |
|
|
|
|
|
|
|
pullMessage :: XMPPThread Message |
|
|
|
|
|
|
|
pullMessage = do |
|
|
|
|
|
|
|
c <- getMessageChan |
|
|
|
|
|
|
|
st <- liftIO $ atomically $ readTChan c |
|
|
|
|
|
|
|
return st |
|
|
|
|
|
|
|
|
|
|
|
-- | Read an element from the inbound stanza channel, acquiring a copy |
|
|
|
-- | Read an element from the inbound stanza channel, acquiring a copy |
|
|
|
-- of the channel as necessary |
|
|
|
-- of the channel as necessary |
|
|
|
pullS :: XMPPThread Stanza |
|
|
|
pullPresence :: XMPPThread Presence |
|
|
|
pullS = do |
|
|
|
pullPresence = do |
|
|
|
c <- getInChan |
|
|
|
c <- getPresenceChan |
|
|
|
st <- liftIO $ atomically $ readTChan c |
|
|
|
st <- liftIO $ atomically $ readTChan c |
|
|
|
return st |
|
|
|
return st |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
-- | Send a stanza to the server |
|
|
|
-- | Send a stanza to the server |
|
|
|
sendS :: Stanza -> XMPPThread () |
|
|
|
sendS :: Stanza -> XMPPThread () |
|
|
|
sendS a = do |
|
|
|
sendS a = do |
|
|
|
@ -132,16 +183,28 @@ sendS a = do |
|
|
|
withNewThread :: XMPPThread () -> XMPPThread ThreadId |
|
|
|
withNewThread :: XMPPThread () -> XMPPThread ThreadId |
|
|
|
withNewThread a = do |
|
|
|
withNewThread a = do |
|
|
|
thread <- ask |
|
|
|
thread <- ask |
|
|
|
inCH' <- liftIO $ newIORef Nothing |
|
|
|
mCH' <- liftIO $ newIORef Nothing |
|
|
|
liftIO $ forkIO $ runReaderT a (thread {inChRef = inCH'}) |
|
|
|
pCH' <- liftIO $ newIORef Nothing |
|
|
|
|
|
|
|
liftIO $ forkIO $ runReaderT a (thread {messagesRef = mCH' |
|
|
|
|
|
|
|
,presenceRef = pCH' |
|
|
|
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
waitForMessage :: (Message -> Bool) -> XMPPThread Message |
|
|
|
|
|
|
|
waitForMessage f = do |
|
|
|
|
|
|
|
s <- pullMessage |
|
|
|
|
|
|
|
if (f s) then |
|
|
|
|
|
|
|
return s |
|
|
|
|
|
|
|
else do |
|
|
|
|
|
|
|
waitForMessage f |
|
|
|
|
|
|
|
|
|
|
|
waitFor :: (Stanza -> Bool) -> XMPPThread Stanza |
|
|
|
waitForPresence :: (Presence -> Bool) -> XMPPThread Presence |
|
|
|
waitFor f = do |
|
|
|
waitForPresence f = do |
|
|
|
s <- pullS |
|
|
|
s <- pullPresence |
|
|
|
if (f s) then |
|
|
|
if (f s) then |
|
|
|
return s |
|
|
|
return s |
|
|
|
else do |
|
|
|
else do |
|
|
|
waitFor f |
|
|
|
waitForPresence f |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
connPersist :: (BS.ByteString -> IO ()) -> TMVar () -> IO () |
|
|
|
connPersist :: (BS.ByteString -> IO ()) -> TMVar () -> IO () |
|
|
|
connPersist pushBS lock = forever $ do |
|
|
|
connPersist pushBS lock = forever $ do |
|
|
|
|