diff --git a/src/Data/Conduit/TLS.hs b/src/Data/Conduit/TLS.hs index 61aeb5e..141eeb0 100644 --- a/src/Data/Conduit/TLS.hs +++ b/src/Data/Conduit/TLS.hs @@ -1,7 +1,7 @@ {-# Language NoMonomorphismRestriction #-} module Data.Conduit.TLS ( tlsinit - , conduitStdout +-- , conduitStdout , module TLS , module TLSExtra ) @@ -50,14 +50,3 @@ tlsinit tlsParams handle = do , snk , \s -> sendData clientContext $ BL.fromChunks [s] ) --- TODO: remove - -conduitStdout - :: MonadResource m => Conduit BS.ByteString m BS.ByteString -conduitStdout = conduitIO - (return ()) - (\_ -> return ()) - (\_ bs -> do - liftIO $ BS.putStrLn bs - return $ IOProducing [bs]) - (const $ return []) \ No newline at end of file diff --git a/src/Network/XMPP/Bind.hs b/src/Network/XMPP/Bind.hs index e8610df..10cdf60 100644 --- a/src/Network/XMPP/Bind.hs +++ b/src/Network/XMPP/Bind.hs @@ -42,9 +42,7 @@ bindP c = xpElemNodes "{urn:ietf:params:xml:ns:xmpp-bind}bind" c xmppThreadedBind :: Maybe Text -> XMPPThread Text xmppThreadedBind rsrc = do - liftIO $ putStrLn "bind..." answer <- sendIQ' Nothing Set (bindBody rsrc) - liftIO . putStrLn $ "Answer: " ++ show answer let (IQ Nothing Nothing _ Result b) = answer let (JID _n _d (Just r)) = unpickleElem jidP b return r diff --git a/src/Network/XMPP/Concurrent.hs b/src/Network/XMPP/Concurrent.hs index 595d154..cd1cf74 100644 --- a/src/Network/XMPP/Concurrent.hs +++ b/src/Network/XMPP/Concurrent.hs @@ -41,7 +41,9 @@ import Network.XMPP.Pickle import Text.XML.Stream.Elements import qualified Text.XML.Stream.Render as XR -type IQHandlers = (Map.Map (IQType, Text) (TChan IQ), Map.Map Text (TMVar IQ)) +type IQHandlers = (Map.Map (IQType, Text) (TChan (IQ, TVar Bool)) + , Map.Map Text (TMVar IQ) + ) data Thread = Thread { messagesRef :: IORef (Maybe (TChan Message)) , presenceRef :: IORef (Maybe (TChan Presence)) @@ -101,21 +103,19 @@ handleIQs handlers iqC = liftIO . forever . atomically $ do iq <- readTChan iqC (byNS, byID) <- readTVar handlers let iqNS = fromMaybe ("") (nameNamespace . elementName . iqBody $ iq) - case iqType iq of - Get -> case Map.lookup (Get, iqNS) byNS of - Nothing -> return () -- TODO: send error stanza - Just ch -> writeTChan ch iq - Set -> case Map.lookup (Set, iqNS) byNS of - Nothing -> return () -- TODO: send error stanza - Just ch -> writeTChan ch iq - -- Result / Error : - _ -> case Map.updateLookupWithKey (\_ _ -> Nothing) - (iqId iq) byID of - (Nothing, _) -> return () -- we are not supposed - -- to send an error - (Just tmvar, byID') -> do - _ <- tryPutTMVar tmvar iq -- don't block - writeTVar handlers (byNS, byID') + case () of () | (iqType iq) `elem` [Get, Set] -> + case Map.lookup (Get, iqNS) byNS of + Nothing -> return () -- TODO: send error stanza + Just ch -> do + sent <- newTVar False + writeTChan ch (iq, sent) + | otherwise -> case Map.updateLookupWithKey (\_ _ -> Nothing) + (iqId iq) byID of + (Nothing, _) -> return () -- we are not supposed + -- to send an error + (Just tmvar, byID') -> do + _ <- tryPutTMVar tmvar iq -- don't block + writeTVar handlers (byNS, byID') @@ -126,9 +126,7 @@ handleIQs handlers iqC = liftIO . forever . atomically $ do startThreads :: XMPPMonad ( TChan Message , TChan Presence - , TVar ( Map.Map (IQType, Text) (TChan IQ) - , Map.Map Text (TMVar IQ) - ) + , TVar IQHandlers , TChan Stanza, IO () , TMVar (BS.ByteString -> IO ()) , ThreadId @@ -161,7 +159,7 @@ startThreads = do -- them listenIQChan :: IQType -- ^ type of IQs to receive (Get / Set) -> Text -- ^ namespace of the child element - -> XMPPThread (Bool, TChan IQ) + -> XMPPThread (Bool, TChan (IQ, TVar Bool)) listenIQChan tp ns = do handlers <- asks iqHandlers liftIO . atomically $ do @@ -171,8 +169,8 @@ listenIQChan tp ns = do (tp,ns) iqCh byNS writeTVar handlers (byNS', byID) return $ case present of - Nothing -> (False, iqCh) - Just iqCh' -> (True, iqCh') + Nothing -> (True, iqCh) + Just iqCh' -> (False, iqCh') -- | Start worker threads and run action. The supplied action will run -- in the calling thread. use 'forkXMPP' to start another thread. @@ -330,4 +328,16 @@ sendIQ to tp body = do -- TODO: add timeout sendIQ' :: Maybe JID -> IQType -> Element -> XMPPThread IQ sendIQ' to tp body = do ref <- sendIQ to tp body - liftIO . atomically $ takeTMVar ref \ No newline at end of file + liftIO . atomically $ takeTMVar ref + +answerIQ :: MonadIO m => (IQ, TVar Bool) -> Element -> ReaderT Thread m Bool +answerIQ ((IQ from _to id _tp _bd), sentRef) body = do + out <- asks outCh + liftIO . atomically $ do + sent <- readTVar sentRef + case sent of + False -> do + writeTVar sentRef True + writeTChan out . SIQ $ IQ Nothing from id Result body + return True + True -> return False diff --git a/src/Network/XMPP/Monad.hs b/src/Network/XMPP/Monad.hs index dac363a..5e1631a 100644 --- a/src/Network/XMPP/Monad.hs +++ b/src/Network/XMPP/Monad.hs @@ -12,7 +12,6 @@ import Data.ByteString as BS import Data.Conduit import Data.Conduit.Binary as CB import Data.Conduit.List as CL -import Data.Conduit.TLS import Data.Text(Text) import Data.XML.Pickle import Data.XML.Types @@ -64,13 +63,13 @@ xmppFromHandle -> IO (a, XMPPState) xmppFromHandle handle hostname username res f = runResourceT $ do liftIO $ hSetBuffering handle NoBuffering - let raw = CB.sourceHandle handle $= conduitStdout + let raw = CB.sourceHandle handle let src = raw $= XP.parseBytes def let st = XMPPState src (raw) (\xs -> CL.sourceList xs - $$ XR.renderBytes def =$ conduitStdout =$ CB.sinkHandle handle) + $$ XR.renderBytes def =$ CB.sinkHandle handle) (BS.hPut handle) (Just handle) def diff --git a/src/Network/XMPP/Pickle.hs b/src/Network/XMPP/Pickle.hs index 4260086..c1b15c9 100644 --- a/src/Network/XMPP/Pickle.hs +++ b/src/Network/XMPP/Pickle.hs @@ -53,7 +53,9 @@ right (Right r) = r unpickleElem :: PU [Node] c -> Element -> c -unpickleElem p = right . unpickle (xpNodeElem p) +unpickleElem p x = case unpickle (xpNodeElem p) x of + Left l -> error $ l ++ "\n saw: " ++ show x + Right r -> r pickleElem :: PU [Node] a -> a -> Element pickleElem p = pickle $ xpNodeElem p diff --git a/src/Network/XMPP/Types.hs b/src/Network/XMPP/Types.hs index f4a5eeb..205454a 100644 --- a/src/Network/XMPP/Types.hs +++ b/src/Network/XMPP/Types.hs @@ -1,17 +1,17 @@ module Network.XMPP.Types where -- proudly "borrowed" from haskell-xmpp -import Control.Monad.Trans.State +import Control.Monad.Trans.State import qualified Data.ByteString as BS -import Data.Conduit -import Data.Default -import Data.List.Split as L -import Data.Text as Text +import Data.Conduit +import Data.Default +import Data.List.Split as L +import Data.Text as Text -import Data.XML.Types +import Data.XML.Types -import System.IO +import System.IO -- | Jabber ID (JID) datatype data JID = JID { node :: Maybe Text diff --git a/src/Tests.hs b/src/Tests.hs new file mode 100644 index 0000000..f18c6b9 --- /dev/null +++ b/src/Tests.hs @@ -0,0 +1,121 @@ +{-# LANGUAGE PackageImports, OverloadedStrings #-} +module Example where + +import Network.XMPP +import Control.Concurrent +import Control.Concurrent.STM +import Control.Monad +import Control.Monad.IO.Class + +import Data.Maybe +import Data.Text (Text) +import qualified Data.Text as Text +import Data.XML.Pickle +import Data.XML.Types + +import Network.XMPP.Pickle + +import System.Environment + +testUser1 :: JID +testUser1 = read "testuser1@species64739.dyndns.org/bot1" + +testUser2 :: JID +testUser2 = read "testuser2@species64739.dyndns.org/bot2" + +superviser :: JID +superviser = read "uart14@species64739.dyndns.org" + + +attXmpp :: STM a -> XMPPThread a +attXmpp = liftIO . atomically + +testNS :: Text +testNS = "xmpp:library:test" + +data Payload = Payload Int Bool Text deriving (Eq, Show) + +payloadP = xpWrap (\((counter,flag) , message) -> Payload counter flag message) + (\(Payload counter flag message) ->((counter,flag) , message)) $ + xpElem (Name "request" (Just testNS) Nothing) + (xpPair + (xpAttr "counter" xpPrim) + (xpAttr "flag" xpPrim) + ) + (xpElemNodes (Name "message" (Just testNS) Nothing) + (xpContent xpId)) + +invertPayload (Payload count flag message) = Payload (count + 1) (not flag) (Text.reverse message) + +iqResponder = do + (free, chan) <- listenIQChan Get testNS + unless free $ liftIO $ putStrLn "Channel was already taken" + >> error "hanging up" + forever $ do + next@(iq,_) <- liftIO . atomically $ readTChan chan + let payload = unpickleElem payloadP $ iqBody iq + let answerPayload = invertPayload payload + let answerBody = pickleElem payloadP answerPayload + answerIQ next answerBody + +autoAccept :: XMPPThread () +autoAccept = forever $ do + st <- pullPresence + case st of + Presence from _ idq (Just Subscribe) _ _ _ _ -> + sendS . SPresence $ + Presence Nothing from idq (Just Subscribed) Nothing Nothing Nothing [] + _ -> return () + +sendUser txt = sendS . SMessage $ Message Nothing superviser Nothing Nothing Nothing + (Just (Text.pack txt)) Nothing [] + + +expect debug x y | x == y = debug "Ok." + | otherwise = do + let failMSG = "failed" ++ show x ++ " /= " ++ show y + debug failMSG + sendUser failMSG + + + +runMain :: (String -> STM ()) -> Int -> IO () +runMain debug number = do + let (we, them, active) = case number of + 1 -> (testUser1, testUser2,True) + 2 -> (testUser2, testUser1,False) + _ -> error "Need either 1 or 2" + sessionConnect "localhost" + "species64739.dyndns.org" + (fromJust $ node we) (resource we) $ do + let debug' = liftIO . atomically . debug . + (("Thread " ++ show number ++ ":") ++) + singleThreaded $ xmppSASL "pwd" + xmppThreadedBind (resource we) + singleThreaded $ xmppSession + sendS . SPresence $ Presence Nothing Nothing Nothing Nothing (Just Available) Nothing Nothing [] + forkXMPP autoAccept + forkXMPP iqResponder + -- sendS . SPresence $ Presence Nothing (Just them) Nothing (Just Subscribe) Nothing Nothing Nothing [] + let delay = if active then 1000000 else 5000000 + when active . void . forkXMPP . void . forM [1..10] $ \count -> do + let message = Text.pack . show $ node we + let payload = Payload count (even count) (Text.pack $ show count) + let body = pickleElem payloadP payload + answer <- sendIQ' (Just them) Get body + let answerPayload = unpickleElem payloadP (iqBody answer) + expect debug' (invertPayload payload) answerPayload + liftIO $ threadDelay delay + sendUser "All tests done" + liftIO . forever $ threadDelay 10000000 + return () + return () + + +main = do + out <- newTChanIO + forkIO . forever $ atomically (readTChan out) >>= putStrLn + let debugOut = writeTChan out + forkIO $ runMain debugOut 1 + runMain debugOut 2 +