From 1545c28943964c4ff473d800b4ebeba93d79d430 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Mon, 17 Jun 2019 23:32:01 +0700 Subject: [PATCH] Update pipe reader --- app/Main.hs | 14 ++- app/Version.hs | 2 +- quik-connector.cabal | 4 +- src/QuoteSource/DataImport.hs | 22 ++--- src/QuoteSource/PipeReader.hs | 161 +++++++--------------------------- 5 files changed, 54 insertions(+), 149 deletions(-) diff --git a/app/Main.hs b/app/Main.hs index 64dcd2c..0210b1c 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -109,7 +109,7 @@ main = do let serverParams = defaultServerSecurityParams { sspDomain = Just "global", sspCertificate = serverCert } - bracket (forkIO $ pipeReaderThread ctx config) killThread (\_ -> do + bracket (forkIO $ pipeReaderThread ctx config c2) killThread (\_ -> do withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do withZMQTradeSink ctx (tradeSink2 config) (\zmqTradeSink2 -> do bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) (Just "global")) stopQuoteSourceServer (\_ -> do @@ -127,13 +127,11 @@ main = do void $ timeout 1000000 $ killThread forkId infoM "main" "Main thread done" where - pipeReaderThread ctx config = - case (tickPipePath config, pipeReaderQsEndpoint config) of - (Just pipe, Just qsep) -> do - infoM "main" $ "Pipe/QS: " ++ pipe ++ "/" ++ qsep - tickChan <- newBoundedChan 10000 - bracket (startPipeReader ctx (T.pack pipe) tickChan) stopPipeReader (\_ -> do - bracket (startQuoteSourceServer tickChan ctx (T.pack qsep) (Just "global")) stopQuoteSourceServer (\_ -> forever $ threadDelay 1000000)) + pipeReaderThread ctx config qsdataChan = + case pipeReaderQsEndpoint config of + Just qsep -> do + infoM "main" $ "QS: " ++ qsep + bracket (startPipeReader ctx (T.pack qsep) qsdataChan) stopPipeReader (\_ -> forever $ threadDelay 1000000) _ -> return () diff --git a/app/Version.hs b/app/Version.hs index 325f1ff..9bee198 100644 --- a/app/Version.hs +++ b/app/Version.hs @@ -10,7 +10,7 @@ import qualified Data.Text as T import Text.Printf.TH quikConnectorVersion :: (Int, Int, Int, Int) -quikConnectorVersion = (0, 3, 0, 0) +quikConnectorVersion = (0, 4, 0, 0) quikConnectorVersionText :: T.Text quikConnectorVersionText = diff --git a/quik-connector.cabal b/quik-connector.cabal index df7b289..ad60a9a 100644 --- a/quik-connector.cabal +++ b/quik-connector.cabal @@ -1,5 +1,5 @@ name: quik-connector -version: 0.3.0.0 +version: 0.4.0.0 synopsis: Atrade-Quik Connector application description: Please see README.md homepage: https://github.com/asakul/quik-connector @@ -49,7 +49,7 @@ library , aeson , cond , scientific - , libatrade == 0.7.0.0 + , libatrade == 0.8.0.0 , deepseq , errors , split diff --git a/src/QuoteSource/DataImport.hs b/src/QuoteSource/DataImport.hs index d499e5b..d897959 100644 --- a/src/QuoteSource/DataImport.hs +++ b/src/QuoteSource/DataImport.hs @@ -6,20 +6,20 @@ module QuoteSource.DataImport shutdownDataImportServer ) where -import Control.Concurrent.BoundedChan -import Control.Monad.State.Strict -import ATrade.Types -import Data.IORef -import Data.Time.Clock -import QuoteSource.TableParser -import System.Win32.DDE -import System.Win32.XlParser +import ATrade.Types +import Control.Concurrent.BoundedChan +import Control.Monad.State.Strict +import Data.IORef +import Data.Time.Clock +import QuoteSource.TableParser +import System.Win32.DDE +import System.Win32.XlParser -import qualified Data.Map as M +import qualified Data.Map as M data ServerState = ServerState { - appName :: String, - parsers :: IORef (M.Map String TableParserInstance), + appName :: String, + parsers :: IORef (M.Map String TableParserInstance), tickChannel :: BoundedChan Tick } diff --git a/src/QuoteSource/PipeReader.hs b/src/QuoteSource/PipeReader.hs index d33ee32..8737af5 100644 --- a/src/QuoteSource/PipeReader.hs +++ b/src/QuoteSource/PipeReader.hs @@ -17,11 +17,12 @@ import Control.Exception import Control.Monad import Control.Monad.Extra import Control.Monad.IO.Class -import Control.Monad.Loops -import Data.Attoparsec.Text +import Control.Monad.Loops (whileM_) +import Data.Binary +import Data.Binary.Get import qualified Data.ByteString as B +import qualified Data.ByteString.Lazy as BL import Data.Conduit hiding (connect) -import Data.Conduit.Attoparsec import qualified Data.Conduit.List as CL import qualified Data.HashSet as HS import Data.IORef @@ -29,7 +30,6 @@ import qualified Data.Map.Strict as M import Data.Maybe import qualified Data.Text as T import Data.Text.Encoding -import qualified Data.Text.Foreign as FT import Data.Time.Calendar import Data.Time.Clock import Foreign.Marshal.Alloc @@ -45,144 +45,51 @@ data PipeReaderHandle = running :: IORef Bool } deriving (Eq) -data DataLine = CurrentParamLine T.Text Double Integer Double Integer Integer Double Integer Integer - | AllTradeLine T.Text Integer Double Integer UTCTime - deriving (Show, Eq) -yieldJust :: Maybe a -> Source IO a -yieldJust maybeV = do -- Probably already present in some library - case maybeV of - Just v -> yield v - Nothing -> return () - -zmqSocketConduit :: (Receiver a) => Socket a -> Source IO T.Text -zmqSocketConduit sock = do - maybeStr <- liftIO $ do - bs <- receive sock - case decodeUtf8' bs of - Left _ -> return Nothing - Right v -> return (Just v) - yieldJust maybeStr - zmqSocketConduit sock - -line2TickConduit :: Conduit DataLine IO Tick -line2TickConduit = do - volumeMap <- liftIO $ newIORef M.empty - ignoreCPSet <- liftIO $ newIORef HS.empty - lastTimestamp <- liftIO $ newIORef $ UTCTime (fromGregorian 1970 1 1) 0 - awaitForever $ \line -> - case line of - CurrentParamLine tickerId last voltoday bid biddepth biddeptht offer offerdepth offerdeptht -> do - ts <- liftIO $ readIORef lastTimestamp - yieldTick tickerId BestBid ts (fromDouble bid) biddepth - yieldTick tickerId BestOffer ts (fromDouble offer) offerdepth - yieldTick tickerId TotalSupply ts (fromInteger offerdeptht) 0 - yieldTick tickerId TotalDemand ts (fromInteger biddeptht) 0 - - shouldParsePrice <- liftIO $ HS.member tickerId <$> readIORef ignoreCPSet - when shouldParsePrice $ do - m <- liftIO $ readIORef volumeMap - case M.lookup tickerId m of - Just vol -> - if | vol < voltoday -> yieldTick tickerId LastTradePrice ts (fromDouble last) (voltoday - vol) - | vol > voltoday -> yieldTick tickerId LastTradePrice ts (fromDouble last) vol - | otherwise -> return () - Nothing -> yieldTick tickerId LastTradePrice ts (fromDouble last) 1 - - liftIO $ atomicModifyIORef' volumeMap (\m -> (M.insert tickerId voltoday m, ())) +zmqSocketConduit :: (Subscriber a, Receiver a) => T.Text -> Socket a -> IORef Bool -> Source IO [B.ByteString] +zmqSocketConduit ep sock running' = do + liftIO $ do + debugM "PipeReader" $ "Connecting to: " ++ T.unpack ep + connect sock (T.unpack ep) + subscribe sock B.empty + lastHeartbeat <- liftIO $ getCurrentTime >>= newIORef + whileM_ (andM [notTimeout lastHeartbeat, liftIO (readIORef running')]) $ do + evs <- liftIO $ poll 200 [Sock sock [In] Nothing] + unless (null (head evs)) $ do + bs <- liftIO $ receiveMulti sock + when ((not . null $ bs) && (head bs == "SYSTEM#HEARTBEAT")) $ liftIO $ getCurrentTime >>= writeIORef lastHeartbeat + yield bs + zmqSocketConduit ep sock running' + where + notTimeout hb = do + now <- liftIO $ getCurrentTime + last <- liftIO $ readIORef hb + return $ now `diffUTCTime` last < 10 - AllTradeLine tickerId flags price volume ts -> do - liftIO $ writeIORef lastTimestamp ts - if - | flags == 1 -> yieldTick tickerId LastTradePrice ts (fromDouble price) volume - | flags == 2 -> yieldTick tickerId LastTradePrice ts (fromDouble price) volume - | otherwise -> return () - liftIO $ atomicModifyIORef' ignoreCPSet (\s -> (HS.insert tickerId s, ())) +parseBarConduit :: Conduit [B.ByteString] IO (TickerId, BarTimeframe, Bar) +parseBarConduit = awaitForever $ \bs -> + case deserializeBar (BL.fromStrict <$> bs) of + Just (tf, bar) -> yield (barSecurity bar, tf, bar) + _ -> return () - where - yieldTick tickerId dtype ts val vol = - yield $ Tick { security = tickerId, - datatype = dtype, - timestamp = ts, - value = val, - volume = vol } +qssdataConduit :: Conduit (TickerId, BarTimeframe, Bar) IO QuoteSourceServerData +qssdataConduit = awaitForever $ \(tid, tf, bar) -> yield $ QSSBar (tf, bar) -chanSink :: BoundedChan QuoteSourceServerData -> Sink Tick IO () +chanSink :: (Show a) => BoundedChan a -> Sink a IO () chanSink chan = awaitForever - (\t -> liftIO $ do - writeChan chan (QSSTick t)) + (\t -> do + liftIO $ writeChan chan t) startPipeReader :: Context -> T.Text -> BoundedChan QuoteSourceServerData -> IO PipeReaderHandle startPipeReader ctx pipeEndpoint tickChan = do debugM "PipeReader" $ "Trying to open pipe: " ++ T.unpack pipeEndpoint s <- socket ctx Sub - connect s (T.unpack pipeEndpoint) - subscribe s B.empty debugM "PipeReader" "Pipe opened" running' <- newIORef True tid <- forkIO $ readerThread s running' return PipeReaderHandle { prThreadId = tid, running = running' } where - readerThread s running' = runConduit $ (zmqSocketConduit s) =$= conduitParserEither parseTrade =$= handleParseResult =$= line2TickConduit =$= chanSink tickChan - parseTrade = parseCurrentParam <|> parseAllTrade - parseCurrentParam = do - string "CT:" - secName <- takeTill (== ':') - string ":" - last <- double - string ";" - voltoday <- decimal - string ";" - bid <- double - string ";" - biddepth <- decimal - string ";" - biddeptht <- decimal - string ";" - offer <- double - string ";" - offerdepth <- decimal - string ";" - offerdeptht <- decimal - string ";" - return $ CurrentParamLine secName last voltoday bid biddepth biddeptht offer offerdepth offerdeptht - - parseAllTrade = do - string "AT:" - secName <- takeTill (== ':') - string ":" - flags <- decimal - string ";" - price <- double - string ";" - qty <- decimal - string ";" - dt <- parseDateTime - string ";" - return $ AllTradeLine secName flags price qty dt - - parseDateTime = do - y <- decimal - string "." - mon <- decimal - string "." - day <- decimal - string " " - h <- fromInteger <$> decimal - string ":" - m <- fromInteger <$> decimal - string ":" - s <- fromInteger <$> decimal - string "." - ms <- fromInteger <$> decimal - return $ UTCTime (fromGregorian y mon day) $ h * 3600 + m * 60 + s + ms / 1000 - - handleParseResult = do - awaitForever $ \res -> - case res of - Left err -> liftIO $ warningM "PipeReader" $ "Can't parse: " ++ show err - Right (_, r) -> yield r - + readerThread s running' = runConduit $ (zmqSocketConduit pipeEndpoint s running') =$= parseBarConduit =$= qssdataConduit =$= chanSink tickChan stopPipeReader :: PipeReaderHandle -> IO () stopPipeReader h = killThread (prThreadId h) >> writeIORef (running h) False