Browse Source

PipeReader: read from zmq socket

master
Denis Tereshkin 7 years ago
parent
commit
e2608a0bec
  1. 5
      app/Main.hs
  2. 114
      src/QuoteSource/PipeReader.hs

5
app/Main.hs

@ -130,9 +130,10 @@ main = do @@ -130,9 +130,10 @@ main = do
pipeReaderThread ctx config =
case (tickPipePath config, pipeReaderQsEndpoint config) of
(Just pipe, Just qsep) -> do
infoM "main" $ "Pipe/QS: " ++ pipe ++ "/" ++ qsep
tickChan <- newBoundedChan 10000
bracket (startPipeReader (T.pack pipe) tickChan) stopPipeReader (\_ -> do
bracket (startQuoteSourceServer tickChan ctx (T.pack qsep) (Just "global")) stopQuoteSourceServer (\_ -> threadDelay 1000000))
bracket (startPipeReader ctx (T.pack pipe) tickChan) stopPipeReader (\_ -> do
bracket (startQuoteSourceServer tickChan ctx (T.pack qsep) (Just "global")) stopQuoteSourceServer (\_ -> forever $ threadDelay 1000000))
_ -> return ()

114
src/QuoteSource/PipeReader.hs

@ -1,4 +1,4 @@ @@ -1,4 +1,4 @@
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedStrings #-}
module QuoteSource.PipeReader (
@ -6,60 +6,64 @@ module QuoteSource.PipeReader ( @@ -6,60 +6,64 @@ module QuoteSource.PipeReader (
stopPipeReader
) where
import Data.IORef
import qualified Data.Text as T
import qualified Data.Map.Strict as M
import qualified Data.HashSet as HS
import Data.Time.Clock
import Data.Time.Calendar
import ATrade.Types
import Control.Monad
import Control.Monad.Extra
import Control.Monad.Loops
import Data.Maybe
import Foreign.Marshal.Alloc
import qualified Data.Text.Foreign as FT
import System.Win32.File
import System.Win32.Types
import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (readChan, writeChan, writeList2Chan, yield)
import Control.Exception
import Control.Monad.IO.Class
import Control.Applicative
import Safe
import Control.Error.Util
import Data.Attoparsec.Text
import Data.Conduit
import qualified Data.Conduit.List as CL
import Data.Conduit.Attoparsec
import ATrade.QuoteSource.Server
import ATrade.QuoteSource.Server
import ATrade.Types
import Control.Applicative
import Control.Concurrent hiding (readChan, writeChan,
writeList2Chan, yield)
import Control.Concurrent.BoundedChan
import Control.Error.Util
import Control.Exception
import Control.Monad
import Control.Monad.Extra
import Control.Monad.IO.Class
import Control.Monad.Loops
import Data.Attoparsec.Text
import qualified Data.ByteString as B
import Data.Conduit hiding (connect)
import Data.Conduit.Attoparsec
import qualified Data.Conduit.List as CL
import qualified Data.HashSet as HS
import Data.IORef
import qualified Data.Map.Strict as M
import Data.Maybe
import qualified Data.Text as T
import Data.Text.Encoding
import qualified Data.Text.Foreign as FT
import Data.Time.Calendar
import Data.Time.Clock
import Foreign.Marshal.Alloc
import Safe
import System.IO
import System.Log.Logger (debugM, warningM)
import System.ZMQ4
data PipeReaderHandle =
PipeReaderHandle {
prThreadId :: ThreadId,
running :: IORef Bool
running :: IORef Bool
} deriving (Eq)
data DataLine = CurrentParamLine T.Text Double Integer Double Integer Integer Double Integer Integer
| AllTradeLine T.Text Integer Double Integer UTCTime
deriving (Show, Eq)
yieldJust :: Maybe a -> Source IO a
yieldJust maybeV = do -- Probably already present in some library
case maybeV of
Just v -> yield v
Just v -> yield v
Nothing -> return ()
win32FileConduit :: HANDLE -> Source IO T.Text
win32FileConduit handle = do -- TODO actually i have to check if pipe is closed
maybeStr <- liftIO $ allocaBytes 4096 $ \buf -> do
r <- win32_ReadFile handle buf 4096 Nothing
if r > 0
then do
s <- FT.peekCStringLen (buf, fromEnum r)
return $ Just s
else return Nothing
zmqSocketConduit :: (Receiver a) => Socket a -> Source IO T.Text
zmqSocketConduit sock = do
maybeStr <- liftIO $ do
bs <- receive sock
case decodeUtf8' bs of
Left _ -> return Nothing
Right v -> return (Just v)
yieldJust maybeStr
win32FileConduit handle
zmqSocketConduit sock
line2TickConduit :: Conduit DataLine IO Tick
line2TickConduit = do
@ -90,7 +94,7 @@ line2TickConduit = do @@ -90,7 +94,7 @@ line2TickConduit = do
AllTradeLine tickerId flags price volume ts -> do
liftIO $ writeIORef lastTimestamp ts
if
| flags == 1 -> yieldTick tickerId LastTradePrice ts (fromDouble price) (-volume)
| flags == 1 -> yieldTick tickerId LastTradePrice ts (fromDouble price) volume
| flags == 2 -> yieldTick tickerId LastTradePrice ts (fromDouble price) volume
| otherwise -> return ()
liftIO $ atomicModifyIORef' ignoreCPSet (\s -> (HS.insert tickerId s, ()))
@ -104,17 +108,22 @@ line2TickConduit = do @@ -104,17 +108,22 @@ line2TickConduit = do
volume = vol }
chanSink :: BoundedChan QuoteSourceServerData -> Sink Tick IO ()
chanSink chan = awaitForever (\t -> liftIO $ writeChan chan (QSSTick t))
startPipeReader :: T.Text -> BoundedChan QuoteSourceServerData -> IO PipeReaderHandle
startPipeReader pipeName tickChan = do
f <- createFile (T.unpack pipeName) gENERIC_READ 0 Nothing oPEN_EXISTING 0 Nothing
when (f == iNVALID_HANDLE_VALUE) $ error $ "Unable to open pipe: " ++ T.unpack pipeName
chanSink chan = awaitForever
(\t -> liftIO $ do
writeChan chan (QSSTick t))
startPipeReader :: Context -> T.Text -> BoundedChan QuoteSourceServerData -> IO PipeReaderHandle
startPipeReader ctx pipeEndpoint tickChan = do
debugM "PipeReader" $ "Trying to open pipe: " ++ T.unpack pipeEndpoint
s <- socket ctx Sub
connect s (T.unpack pipeEndpoint)
subscribe s B.empty
debugM "PipeReader" "Pipe opened"
running' <- newIORef True
tid <- forkIO $ readerThread f running'
tid <- forkIO $ readerThread s running'
return PipeReaderHandle { prThreadId = tid, running = running' }
where
readerThread f running' = runConduit $ (win32FileConduit f) =$= conduitParser parseTrade =$= CL.map snd =$= line2TickConduit =$= chanSink tickChan
readerThread s running' = runConduit $ (zmqSocketConduit s) =$= conduitParserEither parseTrade =$= handleParseResult =$= line2TickConduit =$= chanSink tickChan
parseTrade = parseCurrentParam <|> parseAllTrade
parseCurrentParam = do
string "CT:"
@ -168,5 +177,12 @@ startPipeReader pipeName tickChan = do @@ -168,5 +177,12 @@ startPipeReader pipeName tickChan = do
ms <- fromInteger <$> decimal
return $ UTCTime (fromGregorian y mon day) $ h * 3600 + m * 60 + s + ms / 1000
handleParseResult = do
awaitForever $ \res ->
case res of
Left err -> liftIO $ warningM "PipeReader" $ "Can't parse: " ++ show err
Right (_, r) -> yield r
stopPipeReader :: PipeReaderHandle -> IO ()
stopPipeReader h = killThread (prThreadId h) >> writeIORef (running h) False

Loading…
Cancel
Save