|
|
|
@ -98,12 +98,12 @@ notificationCallback state n = do |
|
|
|
|
|
|
|
|
|
|
|
tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO () |
|
|
|
tradeSinkHandler :: Context -> IORef BrokerServerState -> T.Text -> IO () |
|
|
|
tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $ |
|
|
|
tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $ |
|
|
|
whileM_ (fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar) $ |
|
|
|
whileM_ (not <$> wasKilled) $ |
|
|
|
withSocket c Req (\sock -> do |
|
|
|
withSocket c Req (\sock -> do |
|
|
|
chan <- tradeSink <$> readIORef state |
|
|
|
chan <- tradeSink <$> readIORef state |
|
|
|
connect sock $ T.unpack tradeSinkEp |
|
|
|
connect sock $ T.unpack tradeSinkEp |
|
|
|
timeoutMv <- newEmptyMVar |
|
|
|
timeoutMv <- newEmptyMVar |
|
|
|
whileM_ (andM [fmap killMvar (readIORef state) >>= fmap isNothing . tryReadMVar, isNothing <$> tryReadMVar timeoutMv]) $ do |
|
|
|
whileM_ (andM [not <$> wasKilled, isNothing <$> tryReadMVar timeoutMv]) $ do |
|
|
|
threadDelay 500000 |
|
|
|
threadDelay 500000 |
|
|
|
maybeTrade <- tryReadChan chan |
|
|
|
maybeTrade <- tryReadChan chan |
|
|
|
case maybeTrade of |
|
|
|
case maybeTrade of |
|
|
|
@ -113,10 +113,10 @@ tradeSinkHandler c state tradeSinkEp = when (tradeSinkEp /= "") $ |
|
|
|
events <- poll 5000 [Sock sock [In] Nothing] |
|
|
|
events <- poll 5000 [Sock sock [In] Nothing] |
|
|
|
if not . L.null . L.head $ events |
|
|
|
if not . L.null . L.head $ events |
|
|
|
then void . receive $ sock -- anything will do |
|
|
|
then void . receive $ sock -- anything will do |
|
|
|
else putMVar timeoutMv () |
|
|
|
else putMVar timeoutMv ()) |
|
|
|
) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
where |
|
|
|
where |
|
|
|
|
|
|
|
wasKilled = fmap killMvar (readIORef state) >>= fmap isJust . tryReadMVar |
|
|
|
encodeTrade :: Trade -> B.ByteString |
|
|
|
encodeTrade :: Trade -> B.ByteString |
|
|
|
encodeTrade = BL.toStrict . encode . convertTrade |
|
|
|
encodeTrade = BL.toStrict . encode . convertTrade |
|
|
|
convertTrade trade = TradeSinkTrade { |
|
|
|
convertTrade trade = TradeSinkTrade { |
|
|
|
|