|
|
|
|
@ -90,10 +90,19 @@ brokerClientThread socketIdentity ctx ep cmd comp killMv secParams = finally bro
@@ -90,10 +90,19 @@ brokerClientThread socketIdentity ctx ep cmd comp killMv secParams = finally bro
|
|
|
|
|
isZMQError e = "ZMQError" `L.isPrefixOf` show e |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
notificationThread :: ClientIdentity -> [NotificationCallback] -> Context -> T.Text -> MVar () -> IO () |
|
|
|
|
notificationThread clientIdentity callbacks ctx ep killMv = flip finally (return ()) $ do |
|
|
|
|
notificationThread :: ClientIdentity -> [NotificationCallback] -> Context -> T.Text -> MVar () -> ClientSecurityParams -> IO () |
|
|
|
|
notificationThread clientIdentity callbacks ctx ep killMv secParams = flip finally (return ()) $ do |
|
|
|
|
whileM_ (isNothing <$> tryReadMVar killMv) $ |
|
|
|
|
withSocket ctx Sub $ \sock -> do |
|
|
|
|
|
|
|
|
|
setLinger (restrict 0) sock |
|
|
|
|
|
|
|
|
|
case cspCertificate secParams of |
|
|
|
|
Just clientCert -> zapApplyCertificate clientCert sock |
|
|
|
|
Nothing -> return () |
|
|
|
|
case cspServerCertificate secParams of |
|
|
|
|
Just serverCert -> zapSetServerCertificate serverCert sock |
|
|
|
|
Nothing -> return () |
|
|
|
|
setTcpKeepAlive On sock |
|
|
|
|
setTcpKeepAliveCount (restrict 5) sock |
|
|
|
|
setTcpKeepAliveIdle (restrict 60) sock |
|
|
|
|
@ -124,7 +133,7 @@ startBrokerClient socketIdentity ctx endpoint notifEndpoint notificationCallback
@@ -124,7 +133,7 @@ startBrokerClient socketIdentity ctx endpoint notifEndpoint notificationCallback
|
|
|
|
|
cmdVar <- newEmptyMVar :: IO (MVar (BrokerServerRequest, MVar BrokerServerResponse)) |
|
|
|
|
tid <- forkIO (brokerClientThread socketIdentity ctx endpoint cmdVar compMv killMv secParams) |
|
|
|
|
notifSqnumRef <- newIORef (NotificationSqnum 0) |
|
|
|
|
notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint killMv) |
|
|
|
|
notifThreadId <- forkIO (notificationThread (T.decodeUtf8 socketIdentity) notificationCallbacks ctx notifEndpoint killMv secParams) |
|
|
|
|
|
|
|
|
|
return BrokerClientHandle { |
|
|
|
|
tid = tid, |
|
|
|
|
|