@ -109,13 +109,17 @@ notificationCallback state n = do
@@ -109,13 +109,17 @@ notificationCallback state n = do
tradeSinkHandler :: Context -> IORef BrokerServerState -> T . Text -> IO ()
tradeSinkHandler c state tradeSinkEp = when ( tradeSinkEp /= " " ) $
whileM_ ( not <$> wasKilled ) $
withSocket c Dealer ( \ sock -> do
setLinger ( restrict 0 ) sock
handle ( \ e -> do
warningM " Broker.Server " $ " Trade sink: exception: " ++ ( show ( e :: SomeException ) ) ++ " ; isZMQ: " ++ show ( isZMQError e )
when ( isZMQError e ) $ do
debugM " Broker.Server " " Rethrowing exception "
throwIO e ) $ withSocket c Dealer ( \ sock -> do
debugM " Broker.Server " " Connecting trade sink socket "
chan <- tradeSink <$> readIORef state
connect sock $ T . unpack tradeSinkEp
timeoutMv <- newEmptyMVar
timeoutMv <- newIORef False
threadDelay 1000000
whileM_ ( andM [ not <$> wasKilled , isNothing <$> tryReadMVar timeoutMv ] ) $ do
whileM_ ( andM [ not <$> wasKilled , readIORef timeoutMv ] ) $ do
maybeTrade <- tryReadChan chan
case maybeTrade of
Just trade -> sendMulti sock $ B . empty :| [ encodeTrade trade ]
@ -125,9 +129,12 @@ tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $
@@ -125,9 +129,12 @@ tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $
events <- poll 5000 [ Sock sock [ In ] Nothing ]
if not . L . null . L . head $ events
then void . receive $ sock -- anything will do
else putMVar timeoutMv () )
else do
writeIORef timeoutMv True
warningM " Broker.Server " " Trade sink timeout " )
where
isZMQError e = " ZMQError " ` L . isPrefixOf ` show e
wasKilled = fmap killMvar ( readIORef state ) >>= fmap isJust . tryReadMVar
encodeTrade :: Trade -> B . ByteString
encodeTrade = BL . toStrict . encode . convertTrade