@ -108,7 +108,11 @@ notificationCallback state n = do
@@ -108,7 +108,11 @@ notificationCallback state n = do
tradeSinkHandler :: Context -> IORef BrokerServerState -> T . Text -> IO ()
tradeSinkHandler c state tradeSinkEp = when ( tradeSinkEp /= " " ) $
whileM_ ( not <$> wasKilled ) $
withSocket c Dealer ( \ sock -> do
handle ( \ e -> do
warningM " Broker.Server " $ " Trade sink: exception: " ++ ( show ( e :: SomeException ) ) ++ " ; isZMQ: " ++ show ( isZMQError e )
when ( isZMQError e ) $ do
debugM " Broker.Server " " Rethrowing exception "
throwIO e ) $ withSocket c Dealer ( \ sock -> do
debugM " Broker.Server " " Connecting trade sink socket "
chan <- tradeSink <$> readIORef state
connect sock $ T . unpack tradeSinkEp
@ -129,6 +133,7 @@ tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $
@@ -129,6 +133,7 @@ tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $
warningM " Broker.Server " " Trade sink timeout " )
where
isZMQError e = " ZMQError " ` L . isPrefixOf ` show e
wasKilled = fmap killMvar ( readIORef state ) >>= fmap isJust . tryReadMVar
encodeTrade :: Trade -> B . ByteString
encodeTrade = BL . toStrict . encode . convertTrade