@ -109,11 +109,12 @@ tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO ()
@@ -109,11 +109,12 @@ tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO ()
tradeSinkHandler c state tradeSinkEp = when ( tradeSinkEp /= " " ) $
whileM_ ( not <$> wasKilled ) $
withSocket c Dealer ( \ sock -> do
debugM " Broker.Server " " Connecting trade sink socket "
chan <- tradeSink <$> readIORef state
connect sock $ T . unpack tradeSinkEp
timeoutMv <- newEmptyMVar
timeoutMv <- newIORef False
threadDelay 1000000
whileM_ ( andM [ not <$> wasKilled , isNothing <$> tryReadMVar timeoutMv ] ) $ do
whileM_ ( andM [ not <$> wasKilled , readIORef timeoutMv ] ) $ do
maybeTrade <- tryReadChan chan
case maybeTrade of
Just trade -> sendMulti sock $ B . empty :| [ encodeTrade trade ]
@ -123,7 +124,9 @@ tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $
@@ -123,7 +124,9 @@ tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $
events <- poll 5000 [ Sock sock [ In ] Nothing ]
if not . L . null . L . head $ events
then void . receive $ sock -- anything will do
else putMVar timeoutMv () )
else do
writeIORef timeoutMv True
warningM " Broker.Server " " Trade sink timeout " )
where
wasKilled = fmap killMvar ( readIORef state ) >>= fmap isJust . tryReadMVar