|
|
|
|
@ -117,6 +117,7 @@ brokerClientThread socketIdentity ep cmd comp killMv notificationCallbacks logge
@@ -117,6 +117,7 @@ brokerClientThread socketIdentity ep cmd comp killMv notificationCallbacks logge
|
|
|
|
|
Just rawData -> do |
|
|
|
|
if B.length rawData > 0 |
|
|
|
|
then do |
|
|
|
|
log Debug "Broker.Client" $ TL.toStrict $ [t|Got data: %?|] rawData |
|
|
|
|
atomicMapIORef pipeRef (push rawData) |
|
|
|
|
messages <- atomicModifyIORef' pipeRef getMessages |
|
|
|
|
let parsed = mapMaybe decodeEvent messages |
|
|
|
|
@ -154,6 +155,7 @@ brokerClientThread socketIdentity ep cmd comp killMv notificationCallbacks logge
@@ -154,6 +155,7 @@ brokerClientThread socketIdentity ep cmd comp killMv notificationCallbacks logge
|
|
|
|
|
|
|
|
|
|
handleMessage :: MVar (BrokerServerRequest, MVar BrokerServerResponse, UTCTime) -> BrokerClientEvent -> IO () |
|
|
|
|
handleMessage respVar (IncomingResponse resp) = do |
|
|
|
|
log Debug "Broker.Client" $ TL.toStrict $ [t|Incoming message: %?|] resp |
|
|
|
|
(req, respVar, _) <- takeMVar respVar |
|
|
|
|
if getRequestId req == getResponseRequestId resp |
|
|
|
|
then putMVar respVar resp |
|
|
|
|
|