|
|
|
@ -27,6 +27,7 @@ import qualified Data.ByteString.Lazy as BL |
|
|
|
import Data.Text.Encoding |
|
|
|
import Data.Text.Encoding |
|
|
|
import System.ZMQ4 |
|
|
|
import System.ZMQ4 |
|
|
|
import System.Log.Logger |
|
|
|
import System.Log.Logger |
|
|
|
|
|
|
|
import System.Timeout |
|
|
|
|
|
|
|
|
|
|
|
data BrokerClientHandle = BrokerClientHandle { |
|
|
|
data BrokerClientHandle = BrokerClientHandle { |
|
|
|
tid :: ThreadId, |
|
|
|
tid :: ThreadId, |
|
|
|
@ -45,19 +46,15 @@ brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cle |
|
|
|
cleanup = putMVar comp () |
|
|
|
cleanup = putMVar comp () |
|
|
|
brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Req (\sock -> do |
|
|
|
brokerClientThread' = whileM_ (isNothing <$> tryReadMVar killMv) $ withSocket ctx Req (\sock -> do |
|
|
|
connect sock $ T.unpack ep |
|
|
|
connect sock $ T.unpack ep |
|
|
|
finally (brokerClientThread'' sock) (close sock)) |
|
|
|
whileM_ (isNothing <$> tryReadMVar killMv) $ do |
|
|
|
brokerClientThread'' sock = whileM_ (isNothing <$> tryReadMVar killMv) $ do |
|
|
|
|
|
|
|
request <- takeMVar cmd |
|
|
|
request <- takeMVar cmd |
|
|
|
send sock [] (BL.toStrict $ encode request) |
|
|
|
send sock [] (BL.toStrict $ encode request) |
|
|
|
events <- poll 1000 [Sock sock [In] Nothing] |
|
|
|
incomingMessage <- timeout 1000000 $ receive sock |
|
|
|
if (not . null) $ L.head events |
|
|
|
case incomingMessage of |
|
|
|
then do |
|
|
|
Just msg -> case decode . BL.fromStrict $ msg of |
|
|
|
maybeResponse <- decode . BL.fromStrict <$> receive sock |
|
|
|
Just response -> putMVar resp response |
|
|
|
case maybeResponse of |
|
|
|
Nothing -> putMVar resp (ResponseError "Unable to decode response") |
|
|
|
Just response -> putMVar resp response |
|
|
|
Nothing -> putMVar resp (ResponseError "Response timeout")) |
|
|
|
Nothing -> putMVar resp (ResponseError "Unable to decode response") |
|
|
|
|
|
|
|
else |
|
|
|
|
|
|
|
putMVar resp (ResponseError "Response timeout") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
startBrokerClient :: Context -> T.Text -> IO BrokerClientHandle |
|
|
|
startBrokerClient :: Context -> T.Text -> IO BrokerClientHandle |
|
|
|
startBrokerClient ctx endpoint = do |
|
|
|
startBrokerClient ctx endpoint = do |
|
|
|
|