Browse Source

Broker Client: fix reconnection

master
Denis Tereshkin 9 years ago
parent
commit
a7e92fee2b
  1. 3
      src/ATrade/Broker/Client.hs
  2. 2
      src/ATrade/Broker/Server.hs
  3. 7
      src/ATrade/QuoteSource/Server.hs
  4. 2
      src/ATrade/Types.hs
  5. 1
      stack.yaml

3
src/ATrade/Broker/Client.hs

@ -69,7 +69,8 @@ brokerClientThread ctx ep cmd resp comp killMv = finally brokerClientThread' cle
Nothing -> putMVar resp (ResponseError "Unable to decode response") Nothing -> putMVar resp (ResponseError "Unable to decode response")
Nothing -> do Nothing -> do
putMVar resp (ResponseError "Response timeout") putMVar resp (ResponseError "Response timeout")
writeIORef isTimeout True) writeIORef isTimeout True
threadDelay 1000000)
isZMQError e = "ZMQError" `L.isPrefixOf` show e isZMQError e = "ZMQError" `L.isPrefixOf` show e

2
src/ATrade/Broker/Server.hs

@ -77,7 +77,7 @@ startBrokerServer brokers c ep tradeSinkEp = do
mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers mapM_ (\bro -> setNotificationCallback bro (Just $ notificationCallback state)) brokers
debugM "Broker.Server" "Forking broker server thread" debugM "Broker.Server" "Forking broker server thread"
BrokerServerHandle <$> forkOS (brokerServerThread state) <*> forkOS (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv BrokerServerHandle <$> forkIO (brokerServerThread state) <*> forkIO (tradeSinkHandler c state tradeSinkEp) <*> pure compMv <*> pure killMv
notificationCallback :: IORef BrokerServerState -> Notification -> IO () notificationCallback :: IORef BrokerServerState -> Notification -> IO ()
notificationCallback state n = do notificationCallback state n = do

7
src/ATrade/QuoteSource/Server.hs

@ -10,12 +10,14 @@ import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (readChan, writeChan) import Control.Concurrent hiding (readChan, writeChan)
import Control.Exception import Control.Exception
import Control.Monad import Control.Monad
import qualified Data.List as L
import qualified Data.Text as T import qualified Data.Text as T
import qualified Data.ByteString.Char8 as B8 import qualified Data.ByteString.Char8 as B8
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL
import Data.List.NonEmpty hiding (map) import Data.List.NonEmpty hiding (map)
import System.Log.Logger import System.Log.Logger
import System.ZMQ4 import System.ZMQ4
import Prelude hiding ((!!))
data QuoteSourceServer = QuoteSourceServerState { data QuoteSourceServer = QuoteSourceServerState {
ctx :: Context, ctx :: Context,
@ -47,6 +49,9 @@ serverThread state = do
serverThread' serverThread'
QSSTick tick -> do QSSTick tick -> do
sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick sendMulti (outSocket state) $ fromList . map BL.toStrict $ serializeTick tick
{-let t = map BL.toStrict $ serializeTick tick-}
{-sendDirect (outSocket state) [SendMore] (L.head t)-}
{-sendDirect (outSocket state) [] (t L.!! 1)-}
serverThread' serverThread'
startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer startQuoteSourceServer :: BoundedChan QuoteSourceServerData -> Context -> T.Text -> IO QuoteSourceServer
@ -67,7 +72,7 @@ startQuoteSourceServer chan c ep = do
serverThreadId = tid, serverThreadId = tid,
heartbeatThreadId = hbTid heartbeatThreadId = hbTid
} }
stid <- forkOS $ serverThread state stid <- forkIO $ serverThread state
return $ state { serverThreadId = stid } return $ state { serverThreadId = stid }
stopQuoteSourceServer :: QuoteSourceServer -> IO () stopQuoteSourceServer :: QuoteSourceServer -> IO ()

2
src/ATrade/Types.hs

@ -92,7 +92,7 @@ serializeTick tick = header : [rawdata]
putWord32le $ fromIntegral . fracSeconds . timestamp $ tick, putWord32le $ fromIntegral . fracSeconds . timestamp $ tick,
putWord32le $ fromIntegral . fromEnum . datatype $ tick, putWord32le $ fromIntegral . fromEnum . datatype $ tick,
putWord64le $ truncate . value $ tick, putWord64le $ truncate . value $ tick,
putWord32le $ truncate . (* 1000000000) . fractionalPart $ value tick, putWord32le $ truncate . (*. 1000000000) . fractionalPart $ value tick,
putWord32le $ fromIntegral $ volume tick ] putWord32le $ fromIntegral $ volume tick ]
floorPart :: (RealFrac a) => a -> a floorPart :: (RealFrac a) => a -> a
floorPart x = x - fromIntegral (floor x) floorPart x = x - fromIntegral (floor x)

1
stack.yaml

@ -37,6 +37,7 @@ resolver: lts-7.7
# will not be run. This is useful for tweaking upstream packages. # will not be run. This is useful for tweaking upstream packages.
packages: packages:
- '.' - '.'
- '../zeromq-haskell'
# Dependency packages to be pulled from upstream that are not in the resolver # Dependency packages to be pulled from upstream that are not in the resolver
# (e.g., acme-missiles-0.3) # (e.g., acme-missiles-0.3)
extra-deps: [ "datetime-0.3.1", "hexdump-0.1"] extra-deps: [ "datetime-0.3.1", "hexdump-0.1"]

Loading…
Cancel
Save