From a7e92fee2b4db5e396a845d25bf8035794a9bcc2 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Mon, 12 Dec 2016 22:05:58 +0700 Subject: [PATCH] Broker Client: fix reconnection --- src/ATrade/Broker/Client.hs | 3 ++- src/ATrade/Broker/Server.hs | 2 +- src/ATrade/QuoteSource/Server.hs | 7 ++++++- src/ATrade/Types.hs | 2 +- stack.yaml | 1 + 5 files changed, 11 insertions(+), 4 deletions(-) diff --git a/src/ATrade/Broker/Client.hs b/src/ATrade/Broker/Client.hs index f2b7c94..41d5066 100644 --- a/src/ATrade/Broker/Client.hs +++ b/src/ATrade/Broker/Client.hs @@ -69,7 +69,8 @@ brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cle Nothing -> putMVar resp (ResponseError "Unable to decode response") Nothing -> do putMVar resp (ResponseError "Response timeout") - writeIORef isTimeout True) + writeIORef isTimeout True + threadDelay 1000000) isZMQError e = "ZMQError" `L.isPrefixOf` show e diff --git a/src/ATrade/Broker/Server.hs b/src/ATrade/Broker/Server.hs index c3c6d2f..6bfbee3 100644 --- a/src/ATrade/Broker/Server.hs +++ b/src/ATrade/Broker/Server.hs @@ -77,7 +77,7 @@ startBrokerServer brokers c ep tradeSinkEp = do mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers debugM "Broker.Server" "Forking broker server thread" - BrokerServerHandle <$> forkOS (brokerServerThread state) <*> forkOS (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv + BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv notificationCallback :: IORef BrokerServerState -> Notification -> IO () notificationCallback state n = do diff --git a/src/ATrade/QuoteSource/Server.hs b/src/ATrade/QuoteSource/Server.hs index eced023..341f528 100644 --- a/src/ATrade/QuoteSource/Server.hs +++ b/src/ATrade/QuoteSource/Server.hs @@ -10,12 +10,14 @@ import Control.Concurrent.BoundedChan import Control.Concurrent hiding (readChan, writeChan) import Control.Exception import Control.Monad +import qualified Data.List as L import qualified Data.Text as T import qualified Data.ByteString.Char8 as B8 import qualified Data.ByteString.Lazy as BL import Data.List.NonEmpty hiding (map) import System.Log.Logger import System.ZMQ4 +import Prelude hiding ((!!)) data QuoteSourceServer = QuoteSourceServerState { ctx :: Context, @@ -47,6 +49,9 @@ serverThread state = do serverThread' QSSTick tick -> do sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick + {-let t = map BL.toStrict $ serializeTick tick-} + {-sendDirect (outSocket state) [SendMore] (L.head t)-} + {-sendDirect (outSocket state) [] (t L.!! 1)-} serverThread' startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer @@ -67,7 +72,7 @@ startQuoteSourceServer chan c ep = do serverThreadId = tid, heartbeatThreadId = hbTid } - stid <- forkOS $ serverThread state + stid <- forkIO $ serverThread state return $ state { serverThreadId = stid } stopQuoteSourceServer :: QuoteSourceServer -> IO () diff --git a/src/ATrade/Types.hs b/src/ATrade/Types.hs index a6408fe..222fcfd 100644 --- a/src/ATrade/Types.hs +++ b/src/ATrade/Types.hs @@ -92,7 +92,7 @@ serializeTick tick = header : [rawdata] putWord32le $ fromIntegral . fracSeconds . timestamp $ tick, putWord32le $ fromIntegral . fromEnum . datatype $ tick, putWord64le $ truncate . value $ tick, - putWord32le $ truncate . (* 1000000000) . fractionalPart $ value tick, + putWord32le $ truncate . (*. 1000000000) . fractionalPart $ value tick, putWord32le $ fromIntegral $ volume tick ] floorPart :: (RealFrac a) => a -> a floorPart x = x - fromIntegral (floor x) diff --git a/stack.yaml b/stack.yaml index 10d69fd..24eff89 100644 --- a/stack.yaml +++ b/stack.yaml @@ -37,6 +37,7 @@ resolver: lts-7.7 # will not be run. This is useful for tweaking upstream packages. packages: - '.' +- '../zeromq-haskell' # Dependency packages to be pulled from upstream that are not in the resolver # (e.g., acme-missiles-0.3) extra-deps: [ "datetime-0.3.1", "hexdump-0.1"]