Browse Source

Param pipe reader

master
Denis Tereshkin 4 years ago
parent
commit
33f9433fa9
  1. 30
      app/Main.hs
  2. 34
      src/QuoteSource/PipeReader.hs

30
app/Main.hs

@ -18,10 +18,7 @@ import Control.Monad.IO.Class (MonadIO)
import Data.GI.Base import Data.GI.Base
import qualified GI.Gtk as Gtk import qualified GI.Gtk as Gtk
import Prelude hiding (log) import Prelude hiding (log)
import QuoteSource.DataImport
import QuoteSource.PipeReader import QuoteSource.PipeReader
import QuoteSource.TableParser
import QuoteSource.TableParsers.AllParamsTableParser
import ATrade.Broker.Server import ATrade.Broker.Server
import ATrade.Broker.TradeSinks.ZMQTradeSink import ATrade.Broker.TradeSinks.ZMQTradeSink
@ -40,26 +37,27 @@ import Data.Version
import ATrade (libatrade_gitrev, import ATrade (libatrade_gitrev,
libatrade_version) libatrade_version)
import ATrade.Logging (Message, Severity (Debug, Info, Warning), import ATrade.Logging (Message, Severity (Debug, Info, Warning),
fmtMessage, fmtMessage, logWith)
logWith)
import Colog (LogAction, import Colog (LogAction,
logTextStdout, logTextStdout, (>$<))
(>$<))
import Colog.Actions (logTextHandle) import Colog.Actions (logTextHandle)
import Config import Config
import TickTable (mkTickTable) import TickTable (mkTickTable)
import Version import Version
forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan Tick, BoundedChan QuoteSourceServerData) forkBoundedChan :: Int -> BoundedChan QuoteSourceServerData -> IO (ThreadId, BoundedChan Tick, BoundedChan Tick, BoundedChan QuoteSourceServerData)
forkBoundedChan size sourceChan = do forkBoundedChan size sourceChan = do
sink1 <- newBoundedChan size sink1 <- newBoundedChan size
sink2 <- newBoundedChan size sink2 <- newBoundedChan size
sinkQss <- newBoundedChan size sinkQss <- newBoundedChan size
tid <- forkIO $ forever $ do tid <- forkIO $ forever $ do
v <- readChan sourceChan v <- readChan sourceChan
writeChan sink1 v case v of
writeChan sink2 v QSSTick t -> do
writeChan sinkQss (QSSTick v) writeChan sink1 t
writeChan sink2 t
_ -> return ()
writeChan sinkQss v
return (tid, sink1, sink2, sinkQss) return (tid, sink1, sink2, sinkQss)
@ -83,8 +81,6 @@ main = do
log Info "main" "Config loaded" log Info "main" "Config loaded"
chan <- newBoundedChan 10000 chan <- newBoundedChan 10000
log Info "main" "Starting data import server"
_ <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade"
(forkId, c0, c1, c2) <- forkBoundedChan 10000 chan (forkId, c0, c1, c2) <- forkBoundedChan 10000 chan
@ -114,7 +110,7 @@ main = do
let serverParams = defaultServerSecurityParams { sspDomain = Just "global", let serverParams = defaultServerSecurityParams { sspDomain = Just "global",
sspCertificate = serverCert } sspCertificate = serverCert }
bracket (forkIO $ pipeReaderThread ctx config c2 logger) killThread (\_ -> do bracket (forkIO $ pipeReaderThread ctx config chan logger) killThread (\_ -> do
withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do
withZMQTradeSink ctx (tradeSink2 config) (\zmqTradeSink2 -> do withZMQTradeSink ctx (tradeSink2 config) (\zmqTradeSink2 -> do
bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) quoteSourceServerSecurityParams) stopQuoteSourceServer (\_ -> do bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) quoteSourceServerSecurityParams) stopQuoteSourceServer (\_ -> do
@ -140,10 +136,10 @@ main = do
log Info "main" "Main thread done" log Info "main" "Main thread done"
where where
pipeReaderThread ctx config qsdataChan logger = pipeReaderThread ctx config qsdataChan logger =
case pipeReaderQsEndpoint config of case (pipeReaderQsEndpoint config, tickPipePath config) of
Just qsep -> do (Just qsep, Just tep) -> do
logWith logger Info "main" $ "QS: " <> T.pack qsep logWith logger Info "main" $ "QS: " <> T.pack qsep
bracket (startPipeReader ctx (T.pack qsep) qsdataChan logger) stopPipeReader (\_ -> forever $ threadDelay 1000000) bracket (startPipeReader ctx (T.pack qsep) (T.pack tep) qsdataChan logger) stopPipeReader (\_ -> forever $ threadDelay 1000000)
_ -> return () _ -> return ()
quoteSourceServerSecurityParams = defaultServerSecurityParams { sspDomain = Just "global" } quoteSourceServerSecurityParams = defaultServerSecurityParams { sspDomain = Just "global" }

34
src/QuoteSource/PipeReader.hs

@ -44,9 +44,20 @@ import System.ZMQ4
data PipeReaderHandle = data PipeReaderHandle =
PipeReaderHandle { PipeReaderHandle {
prThreadId :: ThreadId, prThreadId :: ThreadId,
prTickThreadId :: ThreadId,
running :: IORef Bool running :: IORef Bool
} deriving (Eq) } deriving (Eq)
deserializeTicks :: [B.ByteString] -> [Tick]
deserializeTicks (secname:raw:_) = case decodeUtf8' secname of
Right tid -> deserializeWithName tid $ BL.fromStrict raw
Left _ -> []
where
deserializeWithName secNameT raw = case deserializeTickBody raw of
(rest, Just tick) -> tick { security = secNameT } : deserializeWithName secNameT rest
_ -> []
deserializeTicks _ = []
zmqSocketConduit :: (Subscriber a, Receiver a) => T.Text -> Socket a -> IORef Bool -> LogAction IO Message -> Source IO [B.ByteString] zmqSocketConduit :: (Subscriber a, Receiver a) => T.Text -> Socket a -> IORef Bool -> LogAction IO Message -> Source IO [B.ByteString]
zmqSocketConduit ep sock running' logger = do zmqSocketConduit ep sock running' logger = do
@ -74,24 +85,39 @@ parseBarConduit = awaitForever $ \bs ->
Just (tf, bar) -> yield (barSecurity bar, tf, bar) Just (tf, bar) -> yield (barSecurity bar, tf, bar)
_ -> return () _ -> return ()
parseTickConduit :: Conduit [B.ByteString] IO ([Tick])
parseTickConduit = awaitForever $ \bs -> do
yield $ deserializeTicks bs
qssdataConduit :: Conduit (TickerId, BarTimeframe, Bar) IO QuoteSourceServerData qssdataConduit :: Conduit (TickerId, BarTimeframe, Bar) IO QuoteSourceServerData
qssdataConduit = awaitForever $ \(tid, tf, bar) -> yield $ QSSBar (tf, bar) qssdataConduit = awaitForever $ \(tid, tf, bar) -> yield $ QSSBar (tf, bar)
qsstickdataConduit :: Conduit [Tick] IO QuoteSourceServerData
qsstickdataConduit = awaitForever $ \ticks -> forM_ ticks $ \tick -> yield $ QSSTick tick
chanSink :: (Show a) => BoundedChan a -> Sink a IO () chanSink :: (Show a) => BoundedChan a -> Sink a IO ()
chanSink chan = awaitForever chanSink chan = awaitForever
(\t -> do (\t -> do
liftIO $ writeChan chan t) liftIO $ writeChan chan t)
startPipeReader :: Context -> T.Text -> BoundedChan QuoteSourceServerData -> LogAction IO Message -> IO PipeReaderHandle startPipeReader :: Context -> T.Text -> T.Text -> BoundedChan QuoteSourceServerData -> LogAction IO Message -> IO PipeReaderHandle
startPipeReader ctx pipeEndpoint tickChan logger = do startPipeReader ctx pipeEndpoint tickPipeEndpoint tickChan logger = do
logWith logger Debug "PipeReader" $ "Trying to open pipe: " <> pipeEndpoint logWith logger Debug "PipeReader" $ "Trying to open pipe: " <> pipeEndpoint
s <- socket ctx Sub s <- socket ctx Sub
logWith logger Info "PipeReader" "Pipe opened" logWith logger Info "PipeReader" "Pipe opened"
tickSocket <- socket ctx Sub
logWith logger Info "PipeReader" "Tick pipe opened"
running' <- newIORef True running' <- newIORef True
tid <- forkIO $ readerThread s running' tid <- forkIO $ readerThread s running'
return PipeReaderHandle { prThreadId = tid, running = running' } tid2 <- forkIO $ tickReaderThread tickSocket running'
return PipeReaderHandle { prThreadId = tid, prTickThreadId = tid2, running = running' }
where where
readerThread s running' = runConduit $ (zmqSocketConduit pipeEndpoint s running' logger) =$= parseBarConduit =$= qssdataConduit =$= chanSink tickChan readerThread s running' = runConduit $ (zmqSocketConduit pipeEndpoint s running' logger) =$= parseBarConduit =$= qssdataConduit =$= chanSink tickChan
tickReaderThread s running' = runConduit $
(zmqSocketConduit tickPipeEndpoint s running' logger)
=$= parseTickConduit
=$= qsstickdataConduit
=$= chanSink tickChan
stopPipeReader :: PipeReaderHandle -> IO () stopPipeReader :: PipeReaderHandle -> IO ()
stopPipeReader h = killThread (prThreadId h) >> writeIORef (running h) False stopPipeReader h = killThread (prThreadId h) >> killThread (prTickThreadId h) >> writeIORef (running h) False

Loading…
Cancel
Save