|
|
|
@ -102,15 +102,18 @@ tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $ |
|
|
|
withSocket c Req (\sock -> do |
|
|
|
withSocket c Req (\sock -> do |
|
|
|
chan <- tradeSink <$> readIORef state |
|
|
|
chan <- tradeSink <$> readIORef state |
|
|
|
connect sock $ T.unpack tradeSinkEp |
|
|
|
connect sock $ T.unpack tradeSinkEp |
|
|
|
setReceiveTimeout (restrict 5000) sock |
|
|
|
timeoutMv <- newEmptyMVar |
|
|
|
whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ do |
|
|
|
whileM_ (andM [fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar, isNothing <$> tryReadMVar timeoutMv]) $ do |
|
|
|
threadDelay 500000 |
|
|
|
threadDelay 500000 |
|
|
|
maybeTrade <- tryReadChan chan |
|
|
|
maybeTrade <- tryReadChan chan |
|
|
|
case maybeTrade of |
|
|
|
case maybeTrade of |
|
|
|
Just trade -> send sock [] $ encodeTrade trade |
|
|
|
Just trade -> send sock [] $ encodeTrade trade |
|
|
|
Nothing -> do |
|
|
|
Nothing -> do |
|
|
|
send sock [] $ BL.toStrict $ encode TradeSinkHeartBeat |
|
|
|
send sock [] $ BL.toStrict $ encode TradeSinkHeartBeat |
|
|
|
void $ receive sock -- anything will do |
|
|
|
events <- poll 5000 [Sock sock [In] Nothing] |
|
|
|
|
|
|
|
if not . L.null . L.head $ events |
|
|
|
|
|
|
|
then void . receive $ sock -- anything will do |
|
|
|
|
|
|
|
else putMVar timeoutMv () |
|
|
|
) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
where |
|
|
|
where |
|
|
|
|