From 33f9433fa9346702cd556e45cfabace04d6524b5 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sun, 5 Dec 2021 19:04:05 +0700 Subject: [PATCH] Param pipe reader --- app/Main.hs | 54 ++++++++++++++++------------------- src/QuoteSource/PipeReader.hs | 38 ++++++++++++++++++++---- 2 files changed, 57 insertions(+), 35 deletions(-) diff --git a/app/Main.hs b/app/Main.hs index d6f4a4e..e18b3e5 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -8,20 +8,17 @@ import System.IO import ATrade.QuoteSource.Server import ATrade.Types -import Control.Concurrent hiding (readChan, - writeChan) +import Control.Concurrent hiding (readChan, + writeChan) import Control.Concurrent.BoundedChan import Control.Error.Util import Control.Exception.Safe import Control.Monad -import Control.Monad.IO.Class (MonadIO) +import Control.Monad.IO.Class (MonadIO) import Data.GI.Base -import qualified GI.Gtk as Gtk -import Prelude hiding (log) -import QuoteSource.DataImport +import qualified GI.Gtk as Gtk +import Prelude hiding (log) import QuoteSource.PipeReader -import QuoteSource.TableParser -import QuoteSource.TableParsers.AllParamsTableParser import ATrade.Broker.Server import ATrade.Broker.TradeSinks.ZMQTradeSink @@ -34,32 +31,33 @@ import System.ZMQ4 import System.ZMQ4.ZAP import Data.Maybe -import qualified Data.Text as T +import qualified Data.Text as T import Data.Version -import ATrade (libatrade_gitrev, - libatrade_version) -import ATrade.Logging (Message, Severity (Debug, Info, Warning), - fmtMessage, - logWith) -import Colog (LogAction, - logTextStdout, - (>$<)) -import Colog.Actions (logTextHandle) +import ATrade (libatrade_gitrev, + libatrade_version) +import ATrade.Logging (Message, Severity (Debug, Info, Warning), + fmtMessage, logWith) +import Colog (LogAction, + logTextStdout, (>$<)) +import Colog.Actions (logTextHandle) import Config -import TickTable (mkTickTable) +import TickTable (mkTickTable) import Version -forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan Tick, BoundedChan QuoteSourceServerData) +forkBoundedChan :: Int -> BoundedChan QuoteSourceServerData -> IO (ThreadId, BoundedChan Tick, BoundedChan Tick, BoundedChan QuoteSourceServerData) forkBoundedChan size sourceChan = do sink1 <- newBoundedChan size sink2 <- newBoundedChan size sinkQss <- newBoundedChan size tid <- forkIO $ forever $ do v <- readChan sourceChan - writeChan sink1 v - writeChan sink2 v - writeChan sinkQss (QSSTick v) + case v of + QSSTick t -> do + writeChan sink1 t + writeChan sink2 t + _ -> return () + writeChan sinkQss v return (tid, sink1, sink2, sinkQss) @@ -83,8 +81,6 @@ main = do log Info "main" "Config loaded" chan <- newBoundedChan 10000 - log Info "main" "Starting data import server" - _ <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade" (forkId, c0, c1, c2) <- forkBoundedChan 10000 chan @@ -114,7 +110,7 @@ main = do let serverParams = defaultServerSecurityParams { sspDomain = Just "global", sspCertificate = serverCert } - bracket (forkIO $ pipeReaderThread ctx config c2 logger) killThread (\_ -> do + bracket (forkIO $ pipeReaderThread ctx config chan logger) killThread (\_ -> do withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do withZMQTradeSink ctx (tradeSink2 config) (\zmqTradeSink2 -> do bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) quoteSourceServerSecurityParams) stopQuoteSourceServer (\_ -> do @@ -140,10 +136,10 @@ main = do log Info "main" "Main thread done" where pipeReaderThread ctx config qsdataChan logger = - case pipeReaderQsEndpoint config of - Just qsep -> do + case (pipeReaderQsEndpoint config, tickPipePath config) of + (Just qsep, Just tep) -> do logWith logger Info "main" $ "QS: " <> T.pack qsep - bracket (startPipeReader ctx (T.pack qsep) qsdataChan logger) stopPipeReader (\_ -> forever $ threadDelay 1000000) + bracket (startPipeReader ctx (T.pack qsep) (T.pack tep) qsdataChan logger) stopPipeReader (\_ -> forever $ threadDelay 1000000) _ -> return () quoteSourceServerSecurityParams = defaultServerSecurityParams { sspDomain = Just "global" } diff --git a/src/QuoteSource/PipeReader.hs b/src/QuoteSource/PipeReader.hs index 98995aa..2d7df50 100644 --- a/src/QuoteSource/PipeReader.hs +++ b/src/QuoteSource/PipeReader.hs @@ -43,10 +43,21 @@ import System.ZMQ4 data PipeReaderHandle = PipeReaderHandle { - prThreadId :: ThreadId, - running :: IORef Bool + prThreadId :: ThreadId, + prTickThreadId :: ThreadId, + running :: IORef Bool } deriving (Eq) +deserializeTicks :: [B.ByteString] -> [Tick] +deserializeTicks (secname:raw:_) = case decodeUtf8' secname of + Right tid -> deserializeWithName tid $ BL.fromStrict raw + Left _ -> [] + where + deserializeWithName secNameT raw = case deserializeTickBody raw of + (rest, Just tick) -> tick { security = secNameT } : deserializeWithName secNameT rest + _ -> [] + +deserializeTicks _ = [] zmqSocketConduit :: (Subscriber a, Receiver a) => T.Text -> Socket a -> IORef Bool -> LogAction IO Message -> Source IO [B.ByteString] zmqSocketConduit ep sock running' logger = do @@ -74,24 +85,39 @@ parseBarConduit = awaitForever $ \bs -> Just (tf, bar) -> yield (barSecurity bar, tf, bar) _ -> return () +parseTickConduit :: Conduit [B.ByteString] IO ([Tick]) +parseTickConduit = awaitForever $ \bs -> do + yield $ deserializeTicks bs + qssdataConduit :: Conduit (TickerId, BarTimeframe, Bar) IO QuoteSourceServerData qssdataConduit = awaitForever $ \(tid, tf, bar) -> yield $ QSSBar (tf, bar) +qsstickdataConduit :: Conduit [Tick] IO QuoteSourceServerData +qsstickdataConduit = awaitForever $ \ticks -> forM_ ticks $ \tick -> yield $ QSSTick tick + chanSink :: (Show a) => BoundedChan a -> Sink a IO () chanSink chan = awaitForever (\t -> do liftIO $ writeChan chan t) -startPipeReader :: Context -> T.Text -> BoundedChan QuoteSourceServerData -> LogAction IO Message -> IO PipeReaderHandle -startPipeReader ctx pipeEndpoint tickChan logger = do +startPipeReader :: Context -> T.Text -> T.Text -> BoundedChan QuoteSourceServerData -> LogAction IO Message -> IO PipeReaderHandle +startPipeReader ctx pipeEndpoint tickPipeEndpoint tickChan logger = do logWith logger Debug "PipeReader" $ "Trying to open pipe: " <> pipeEndpoint s <- socket ctx Sub logWith logger Info "PipeReader" "Pipe opened" + tickSocket <- socket ctx Sub + logWith logger Info "PipeReader" "Tick pipe opened" running' <- newIORef True tid <- forkIO $ readerThread s running' - return PipeReaderHandle { prThreadId = tid, running = running' } + tid2 <- forkIO $ tickReaderThread tickSocket running' + return PipeReaderHandle { prThreadId = tid, prTickThreadId = tid2, running = running' } where readerThread s running' = runConduit $ (zmqSocketConduit pipeEndpoint s running' logger) =$= parseBarConduit =$= qssdataConduit =$= chanSink tickChan + tickReaderThread s running' = runConduit $ + (zmqSocketConduit tickPipeEndpoint s running' logger) + =$= parseTickConduit + =$= qsstickdataConduit + =$= chanSink tickChan stopPipeReader :: PipeReaderHandle -> IO () -stopPipeReader h = killThread (prThreadId h) >> writeIORef (running h) False +stopPipeReader h = killThread (prThreadId h) >> killThread (prTickThreadId h) >> writeIORef (running h) False