From e2608a0becb9958c6991c337b2ce5bb54b3620c7 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Tue, 5 Mar 2019 09:28:22 +0700 Subject: [PATCH] PipeReader: read from zmq socket --- app/Main.hs | 5 +- src/QuoteSource/PipeReader.hs | 124 +++++++++++++++++++--------------- 2 files changed, 73 insertions(+), 56 deletions(-) diff --git a/app/Main.hs b/app/Main.hs index c76ac9c..64dcd2c 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -130,9 +130,10 @@ main = do pipeReaderThread ctx config = case (tickPipePath config, pipeReaderQsEndpoint config) of (Just pipe, Just qsep) -> do + infoM "main" $ "Pipe/QS: " ++ pipe ++ "/" ++ qsep tickChan <- newBoundedChan 10000 - bracket (startPipeReader (T.pack pipe) tickChan) stopPipeReader (\_ -> do - bracket (startQuoteSourceServer tickChan ctx (T.pack qsep) (Just "global")) stopQuoteSourceServer (\_ -> threadDelay 1000000)) + bracket (startPipeReader ctx (T.pack pipe) tickChan) stopPipeReader (\_ -> do + bracket (startQuoteSourceServer tickChan ctx (T.pack qsep) (Just "global")) stopQuoteSourceServer (\_ -> forever $ threadDelay 1000000)) _ -> return () diff --git a/src/QuoteSource/PipeReader.hs b/src/QuoteSource/PipeReader.hs index 99d1249..d33ee32 100644 --- a/src/QuoteSource/PipeReader.hs +++ b/src/QuoteSource/PipeReader.hs @@ -1,4 +1,4 @@ -{-# LANGUAGE MultiWayIf #-} +{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE OverloadedStrings #-} module QuoteSource.PipeReader ( @@ -6,60 +6,64 @@ module QuoteSource.PipeReader ( stopPipeReader ) where -import Data.IORef -import qualified Data.Text as T -import qualified Data.Map.Strict as M -import qualified Data.HashSet as HS -import Data.Time.Clock -import Data.Time.Calendar -import ATrade.Types -import Control.Monad -import Control.Monad.Extra -import Control.Monad.Loops -import Data.Maybe -import Foreign.Marshal.Alloc -import qualified Data.Text.Foreign as FT -import System.Win32.File -import System.Win32.Types -import Control.Concurrent.BoundedChan -import Control.Concurrent hiding (readChan, writeChan, writeList2Chan, yield) -import Control.Exception -import Control.Monad.IO.Class -import Control.Applicative -import Safe -import Control.Error.Util -import Data.Attoparsec.Text -import Data.Conduit -import qualified Data.Conduit.List as CL -import Data.Conduit.Attoparsec -import ATrade.QuoteSource.Server +import ATrade.QuoteSource.Server +import ATrade.Types +import Control.Applicative +import Control.Concurrent hiding (readChan, writeChan, + writeList2Chan, yield) +import Control.Concurrent.BoundedChan +import Control.Error.Util +import Control.Exception +import Control.Monad +import Control.Monad.Extra +import Control.Monad.IO.Class +import Control.Monad.Loops +import Data.Attoparsec.Text +import qualified Data.ByteString as B +import Data.Conduit hiding (connect) +import Data.Conduit.Attoparsec +import qualified Data.Conduit.List as CL +import qualified Data.HashSet as HS +import Data.IORef +import qualified Data.Map.Strict as M +import Data.Maybe +import qualified Data.Text as T +import Data.Text.Encoding +import qualified Data.Text.Foreign as FT +import Data.Time.Calendar +import Data.Time.Clock +import Foreign.Marshal.Alloc +import Safe +import System.IO +import System.Log.Logger (debugM, warningM) +import System.ZMQ4 + data PipeReaderHandle = PipeReaderHandle { prThreadId :: ThreadId, - running :: IORef Bool - } deriving (Eq) + running :: IORef Bool + } deriving (Eq) -data DataLine = CurrentParamLine T.Text Double Integer Double Integer Integer Double Integer Integer +data DataLine = CurrentParamLine T.Text Double Integer Double Integer Integer Double Integer Integer | AllTradeLine T.Text Integer Double Integer UTCTime + deriving (Show, Eq) yieldJust :: Maybe a -> Source IO a yieldJust maybeV = do -- Probably already present in some library case maybeV of - Just v -> yield v + Just v -> yield v Nothing -> return () -win32FileConduit :: HANDLE -> Source IO T.Text -win32FileConduit handle = do -- TODO actually i have to check if pipe is closed - maybeStr <- liftIO $ allocaBytes 4096 $ \buf -> do - r <- win32_ReadFile handle buf 4096 Nothing - if r > 0 - then do - s <- FT.peekCStringLen (buf, fromEnum r) - return $ Just s - else return Nothing +zmqSocketConduit :: (Receiver a) => Socket a -> Source IO T.Text +zmqSocketConduit sock = do + maybeStr <- liftIO $ do + bs <- receive sock + case decodeUtf8' bs of + Left _ -> return Nothing + Right v -> return (Just v) yieldJust maybeStr - win32FileConduit handle + zmqSocketConduit sock line2TickConduit :: Conduit DataLine IO Tick line2TickConduit = do @@ -67,7 +71,7 @@ line2TickConduit = do ignoreCPSet <- liftIO $ newIORef HS.empty lastTimestamp <- liftIO $ newIORef $ UTCTime (fromGregorian 1970 1 1) 0 awaitForever $ \line -> - case line of + case line of CurrentParamLine tickerId last voltoday bid biddepth biddeptht offer offerdepth offerdeptht -> do ts <- liftIO $ readIORef lastTimestamp yieldTick tickerId BestBid ts (fromDouble bid) biddepth @@ -90,11 +94,11 @@ line2TickConduit = do AllTradeLine tickerId flags price volume ts -> do liftIO $ writeIORef lastTimestamp ts if - | flags == 1 -> yieldTick tickerId LastTradePrice ts (fromDouble price) (-volume) + | flags == 1 -> yieldTick tickerId LastTradePrice ts (fromDouble price) volume | flags == 2 -> yieldTick tickerId LastTradePrice ts (fromDouble price) volume | otherwise -> return () liftIO $ atomicModifyIORef' ignoreCPSet (\s -> (HS.insert tickerId s, ())) - + where yieldTick tickerId dtype ts val vol = yield $ Tick { security = tickerId, @@ -102,20 +106,25 @@ line2TickConduit = do timestamp = ts, value = val, volume = vol } - + chanSink :: BoundedChan QuoteSourceServerData -> Sink Tick IO () -chanSink chan = awaitForever (\t -> liftIO $ writeChan chan (QSSTick t)) +chanSink chan = awaitForever + (\t -> liftIO $ do + writeChan chan (QSSTick t)) -startPipeReader :: T.Text -> BoundedChan QuoteSourceServerData -> IO PipeReaderHandle -startPipeReader pipeName tickChan = do - f <- createFile (T.unpack pipeName) gENERIC_READ 0 Nothing oPEN_EXISTING 0 Nothing - when (f == iNVALID_HANDLE_VALUE) $ error $ "Unable to open pipe: " ++ T.unpack pipeName +startPipeReader :: Context -> T.Text -> BoundedChan QuoteSourceServerData -> IO PipeReaderHandle +startPipeReader ctx pipeEndpoint tickChan = do + debugM "PipeReader" $ "Trying to open pipe: " ++ T.unpack pipeEndpoint + s <- socket ctx Sub + connect s (T.unpack pipeEndpoint) + subscribe s B.empty + debugM "PipeReader" "Pipe opened" running' <- newIORef True - tid <- forkIO $ readerThread f running' + tid <- forkIO $ readerThread s running' return PipeReaderHandle { prThreadId = tid, running = running' } where - readerThread f running' = runConduit $ (win32FileConduit f) =$= conduitParser parseTrade =$= CL.map snd =$= line2TickConduit =$= chanSink tickChan - parseTrade = parseCurrentParam <|> parseAllTrade + readerThread s running' = runConduit $ (zmqSocketConduit s) =$= conduitParserEither parseTrade =$= handleParseResult =$= line2TickConduit =$= chanSink tickChan + parseTrade = parseCurrentParam <|> parseAllTrade parseCurrentParam = do string "CT:" secName <- takeTill (== ':') @@ -168,5 +177,12 @@ startPipeReader pipeName tickChan = do ms <- fromInteger <$> decimal return $ UTCTime (fromGregorian y mon day) $ h * 3600 + m * 60 + s + ms / 1000 + handleParseResult = do + awaitForever $ \res -> + case res of + Left err -> liftIO $ warningM "PipeReader" $ "Can't parse: " ++ show err + Right (_, r) -> yield r + + stopPipeReader :: PipeReaderHandle -> IO () stopPipeReader h = killThread (prThreadId h) >> writeIORef (running h) False