diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index 5dc08c6..54a9702 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -34,7 +34,7 @@ import Control.Concurrent (MVar, ThreadId, forkIO, killThread, newEmptyMVar, putMVar, readMVar, takeMVar, threadDelay, tryReadMVar, - yield) + tryTakeMVar, yield) import Control.Concurrent.BoundedChan () import Control.Concurrent.MVar (tryPutMVar) import Control.Exception (SomeException, bracket, catch, @@ -155,12 +155,16 @@ brokerClientThread socketIdentity ep cmd comp killMv notificationCallbacks logge handleMessage :: MVar (BrokerServerRequest, MVar BrokerServerResponse, UTCTime) -> BrokerClientEvent -> IO () handleMessage respVar (IncomingResponse resp) = do - log Debug "Broker.Client" $ TL.toStrict $ [t|Incoming message: %?|] resp - (req, respVar, _) <- takeMVar respVar - log Debug "Broker.Client" $ TL.toStrict $ [t|Pending request: %?|] req - if getRequestId req == getResponseRequestId resp - then putMVar respVar resp - else log Warning "Broker.Client" $ TL.toStrict $ [t|Request ID mismatch: %?/%?|] (getRequestId req) (getResponseRequestId resp) + log Debug "Broker.Client" $ TL.toStrict $ [t|Incoming response: %?|] resp + maybePending <- tryTakeMVar respVar + case maybePending of + Just (req, respVar, _) -> do + log Debug "Broker.Client" $ TL.toStrict $ [t|Pending request: %?|] req + if getRequestId req == getResponseRequestId resp + then putMVar respVar resp + else log Warning "Broker.Client" $ TL.toStrict $ [t|Request ID mismatch: %?/%?|] (getRequestId req) (getResponseRequestId resp) + _ -> log Warning "Broker.Client" $ TL.toStrict $ [t|Ignore unsolicited response|] + handleMessage _ (IncomingNotification notif) = callNotificationCallbacks notif callNotificationCallbacks notif = mapM_ (\cb -> cb notif) notificationCallbacks