|
|
|
@ -34,7 +34,7 @@ import Control.Concurrent (MVar, ThreadId, forkIO, |
|
|
|
killThread, newEmptyMVar, |
|
|
|
killThread, newEmptyMVar, |
|
|
|
putMVar, readMVar, takeMVar, |
|
|
|
putMVar, readMVar, takeMVar, |
|
|
|
threadDelay, tryReadMVar, |
|
|
|
threadDelay, tryReadMVar, |
|
|
|
yield) |
|
|
|
tryTakeMVar, yield) |
|
|
|
import Control.Concurrent.BoundedChan () |
|
|
|
import Control.Concurrent.BoundedChan () |
|
|
|
import Control.Concurrent.MVar (tryPutMVar) |
|
|
|
import Control.Concurrent.MVar (tryPutMVar) |
|
|
|
import Control.Exception (SomeException, bracket, catch, |
|
|
|
import Control.Exception (SomeException, bracket, catch, |
|
|
|
@ -155,12 +155,16 @@ brokerClientThread socketIdentity ep cmd comp killMv notificationCallbacks logge |
|
|
|
|
|
|
|
|
|
|
|
handleMessage :: MVar (BrokerServerRequest, MVar BrokerServerResponse, UTCTime) -> BrokerClientEvent -> IO () |
|
|
|
handleMessage :: MVar (BrokerServerRequest, MVar BrokerServerResponse, UTCTime) -> BrokerClientEvent -> IO () |
|
|
|
handleMessage respVar (IncomingResponse resp) = do |
|
|
|
handleMessage respVar (IncomingResponse resp) = do |
|
|
|
log Debug "Broker.Client" $ TL.toStrict $ [t|Incoming message: %?|] resp |
|
|
|
log Debug "Broker.Client" $ TL.toStrict $ [t|Incoming response: %?|] resp |
|
|
|
(req, respVar, _) <- takeMVar respVar |
|
|
|
maybePending <- tryTakeMVar respVar |
|
|
|
log Debug "Broker.Client" $ TL.toStrict $ [t|Pending request: %?|] req |
|
|
|
case maybePending of |
|
|
|
if getRequestId req == getResponseRequestId resp |
|
|
|
Just (req, respVar, _) -> do |
|
|
|
then putMVar respVar resp |
|
|
|
log Debug "Broker.Client" $ TL.toStrict $ [t|Pending request: %?|] req |
|
|
|
else log Warning "Broker.Client" $ TL.toStrict $ [t|Request ID mismatch: %?/%?|] (getRequestId req) (getResponseRequestId resp) |
|
|
|
if getRequestId req == getResponseRequestId resp |
|
|
|
|
|
|
|
then putMVar respVar resp |
|
|
|
|
|
|
|
else log Warning "Broker.Client" $ TL.toStrict $ [t|Request ID mismatch: %?/%?|] (getRequestId req) (getResponseRequestId resp) |
|
|
|
|
|
|
|
_ -> log Warning "Broker.Client" $ TL.toStrict $ [t|Ignore unsolicited response|] |
|
|
|
|
|
|
|
|
|
|
|
handleMessage _ (IncomingNotification notif) = callNotificationCallbacks notif |
|
|
|
handleMessage _ (IncomingNotification notif) = callNotificationCallbacks notif |
|
|
|
|
|
|
|
|
|
|
|
callNotificationCallbacks notif = mapM_ (\cb -> cb notif) notificationCallbacks |
|
|
|
callNotificationCallbacks notif = mapM_ (\cb -> cb notif) notificationCallbacks |
|
|
|
|