From bb7a6d6b11acd6e4a6dd5267ea76e505eb5b0c39 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 15 Oct 2017 17:50:54 +0700 Subject: [PATCH] Tradesinks fixes --- src/ATrade/Broker/Server.hs | 2 +- .../Broker/TradeSinks/TelegramTradeSink.hs | 6 ++-- src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs | 33 ++++++++++--------- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index b16ac82..18ca453 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -117,7 +117,7 @@ tradeSinkHandler c state tradeSinks = unless (null tradeSinks) $ maybeTrade <- tryReadChan chan case maybeTrade of Just trade -> mapM_ (\x -> x trade) tradeSinks - Nothing -> return () + Nothing -> threadDelay 1000000 where wasKilled = isJust <$> (killMvar <$> readIORef state >>= tryReadMVar) diff --git a/src/ATrade/Broker/TradeSinks/TelegramTradeSink.hs b/src/ATrade/Broker/TradeSinks/TelegramTradeSink.hs index dc5aeba..5c60139 100644 --- a/src/ATrade/Broker/TradeSinks/TelegramTradeSink.hs +++ b/src/ATrade/Broker/TradeSinks/TelegramTradeSink.hs @@ -41,14 +41,16 @@ sinkThread apitoken chatId killMv chan = do man <- newManager $ mkManagerSettings tlsSettings Nothing whileM_ (not <$> wasKilled) $ do maybeTrade <- BC.tryReadChan chan - whenJust maybeTrade (\trade -> sendMessage man apitoken chatId $ format "Trade: {} {} of {} at {} for {} ({}/{})" + case maybeTrade of + Just trade -> sendMessage man apitoken chatId $ format "Trade: {} {} of {} at {} for {} ({}/{})" (show (tradeOperation trade), show (tradeQuantity trade), tradeSecurity trade, show (tradePrice trade), tradeAccount trade, (strategyId . tradeSignalId) trade, - (signalName . tradeSignalId) trade)) + (signalName . tradeSignalId) trade) + Nothing -> threadDelay 1000000 where tlsSettings = TLSSettingsSimple { settingDisableCertificateValidation = True, settingDisableSession = False, settingUseServerName = False } wasKilled = isJust <$> tryReadMVar killMv diff --git a/src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs b/src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs index 76f4bdc..c0edc1d 100644 --- a/src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs +++ b/src/ATrade/Broker/TradeSinks/ZMQTradeSink.hs @@ -30,7 +30,7 @@ withZMQTradeSink ctx tradeSinkEp f = do where sink = BC.writeChan -sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $ do +sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $ handle (\e -> do warningM "Broker.Server" $ "Trade sink: exception: " ++ show (e :: SomeException) ++ "; isZMQ: " ++ show (isZMQError e) when (isZMQError e) $ do @@ -39,20 +39,23 @@ sinkThread ctx tradeSinkEp killMv chan = whileM_ (not <$> wasKilled) $ do where sinkThread' = withSocket ctx Dealer (\sock -> do connect sock $ T.unpack tradeSinkEp - whenM (not <$> wasKilled) $ do - maybeTrade <- BC.tryReadChan chan - case maybeTrade of - Just trade -> do - sendMulti sock $ B.empty :| [encodeTrade trade] - void $ receiveMulti sock - Nothing -> do - sendMulti sock $ B.empty :| [BL.toStrict $ encode TradeSinkHeartBeat] - events <- poll 1000 [Sock sock [In] Nothing] - if L.null . L.head $ events - then warningM "Broker.Server" "Trade sink timeout" - else do - void . receive $ sock -- anything will do - sinkThread') + whenM (not <$> wasKilled) $ sinkThread'' sock) + + sinkThread'' sock = do + maybeTrade <- BC.tryReadChan chan + case maybeTrade of + Just trade -> do + sendMulti sock $ B.empty :| [encodeTrade trade] + void $ receiveMulti sock + Nothing -> do + threadDelay 1000000 + sendMulti sock $ B.empty :| [BL.toStrict $ encode TradeSinkHeartBeat] + events <- poll 5000 [Sock sock [In] Nothing] + if L.null . L.head $ events + then warningM "Broker.Server" "Trade sink timeout" + else do + void . receive $ sock -- anything will do + sinkThread'' sock isZMQError e = "ZMQError" `L.isPrefixOf` show e