From f22dd216b8b08a3a6e7a4175fe3bc9970971eeff Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 18 Oct 2016 19:36:58 +0700 Subject: [PATCH] Various fixes --- src/ATrade/QuoteSource/Client.hs | 5 ++++- src/ATrade/QuoteSource/Server.hs | 12 +++++++++--- src/ATrade/Types.hs | 11 +++++++++++ 3 files changed, 24 insertions(+), 4 deletions(-) diff --git a/src/ATrade/QuoteSource/Client.hs b/src/ATrade/QuoteSource/Client.hs index 3d4a195..5a89682 100644 --- a/src/ATrade/QuoteSource/Client.hs +++ b/src/ATrade/QuoteSource/Client.hs @@ -43,6 +43,8 @@ startQuoteSourceClient chan tickers ctx endpoint = do mapM_ (\t -> subscribe sock $ encodeUtf8 t) tickers subscribe sock $ B8.pack "SYSTEM#HEARTBEAT" + now <- getCurrentTime + writeIORef lastHeartbeat now whileM_ (notTimeout lastHeartbeat) $ do evs <- poll 200 [Sock sock [In] Nothing] when ((L.length . L.head) evs > 0) $ do @@ -53,7 +55,8 @@ startQuoteSourceClient chan tickers ctx endpoint = do then writeIORef lastHeartbeat now else case deserializeTick rawTick of Just tick -> writeChan chan tick - Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick") + Nothing -> warningM "QuoteSource.Client" "Error: can't deserialize tick" + debugM "QuoteSource.Client" "Heartbeat timeout") notTimeout ts = do now <- getCurrentTime diff --git a/src/ATrade/QuoteSource/Server.hs b/src/ATrade/QuoteSource/Server.hs index bbbcdcc..dae1225 100644 --- a/src/ATrade/QuoteSource/Server.hs +++ b/src/ATrade/QuoteSource/Server.hs @@ -22,7 +22,8 @@ data QuoteSourceServer = QuoteSourceServerState { outSocket :: Socket Pub, tickChannel :: BoundedChan QuoteSourceServerData, completionMvar :: MVar (), - serverThreadId :: ThreadId + serverThreadId :: ThreadId, + heartbeatThreadId :: ThreadId } data QuoteSourceServerData = QSSTick Tick | QSSHeartbeat | QSSKill @@ -53,17 +54,22 @@ startQuoteSourceServer chan c ep = do sock <- socket c Pub bind sock $ T.unpack ep tid <- myThreadId + hbTid <- forkIO $ forever $ do + threadDelay 1000000 + writeChan chan QSSHeartbeat + mv <- newEmptyMVar let state = QuoteSourceServerState { ctx = c, outSocket = sock, tickChannel = chan, completionMvar = mv, - serverThreadId = tid + serverThreadId = tid, + heartbeatThreadId = hbTid } stid <- forkIO $ serverThread state return $ state { serverThreadId = stid } stopQuoteSourceServer :: QuoteSourceServer -> IO () -stopQuoteSourceServer server = writeChan (tickChannel server) QSSKill >> readMVar (completionMvar server) +stopQuoteSourceServer server = killThread (heartbeatThreadId server) >> writeChan (tickChannel server) QSSKill >> readMVar (completionMvar server) diff --git a/src/ATrade/Types.hs b/src/ATrade/Types.hs index 459afaa..3033083 100644 --- a/src/ATrade/Types.hs +++ b/src/ATrade/Types.hs @@ -3,6 +3,7 @@ module ATrade.Types ( TickerId, Tick(..), + Bar(..), DataType(..), serializeTick, deserializeTick, @@ -136,6 +137,16 @@ deserializeTick (header:rawData:_) = case runGetOrFail parseTick rawData of deserializeTick _ = Nothing +data Bar = Bar { + barSecurity :: !T.Text, + barTimestamp :: !UTCTime, + barOpen :: !Decimal, + barHigh :: !Decimal, + barLow :: !Decimal, + barClose :: !Decimal, + barVolume :: !Integer +} deriving (Show, Eq) + data SignalId = SignalId { strategyId :: T.Text, signalName :: T.Text,