|
|
|
|
@ -48,11 +48,18 @@ brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cle
@@ -48,11 +48,18 @@ brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cle
|
|
|
|
|
debugM "Broker.Client" "Starting event loop" |
|
|
|
|
handle (\e -> do |
|
|
|
|
warningM "Broker.Client" $ "Broker client: exception: " ++ (show (e :: SomeException)) ++ "; isZMQ: " ++ show (isZMQError e) |
|
|
|
|
unless (isZMQError e) $ do |
|
|
|
|
debugM "Broker.Client" "Rethrowing exception" |
|
|
|
|
throwIO e) $ withSocket ctx Req (\sock -> do |
|
|
|
|
if isZMQError e |
|
|
|
|
then do |
|
|
|
|
debugM "Broker.Client" "Rethrowing exception" |
|
|
|
|
throwIO e |
|
|
|
|
else do |
|
|
|
|
putMVar resp (ResponseError "Response error")) $ withSocket ctx Req (\sock -> do |
|
|
|
|
debugM "Broker.Client" $ "Connecting to: " ++ show (T.unpack ep) |
|
|
|
|
connect sock $ T.unpack ep |
|
|
|
|
whileM_ (isNothing <$> tryReadMVar killMv) $ do |
|
|
|
|
debugM "Broker.Client" $ "Connected" |
|
|
|
|
isTimeout <- newIORef False |
|
|
|
|
|
|
|
|
|
whileM_ (andM [isNothing <$> tryReadMVar killMv, (== False) <$> readIORef isTimeout]) $ do |
|
|
|
|
request <- takeMVar cmd |
|
|
|
|
send sock [] (BL.toStrict $ encode request) |
|
|
|
|
incomingMessage <- timeout 1000000 $ receive sock |
|
|
|
|
@ -60,7 +67,9 @@ brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cle
@@ -60,7 +67,9 @@ brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cle
|
|
|
|
|
Just msg -> case decode . BL.fromStrict $ msg of |
|
|
|
|
Just response -> putMVar resp response |
|
|
|
|
Nothing -> putMVar resp (ResponseError "Unable to decode response") |
|
|
|
|
Nothing -> putMVar resp (ResponseError "Response timeout")) |
|
|
|
|
Nothing -> do |
|
|
|
|
putMVar resp (ResponseError "Response timeout") |
|
|
|
|
writeIORef isTimeout True) |
|
|
|
|
isZMQError e = "ZMQError" `L.isPrefixOf` show e |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|