From 0e0c5524f87a2037393c453fabc02472ea2b86d7 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Fri, 10 Feb 2017 17:07:37 +0700 Subject: [PATCH] Lua pipe reader --- quik-connector.cabal | 7 +- src/QuoteSource/PipeReader.hs | 174 ++++++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+), 1 deletion(-) create mode 100644 src/QuoteSource/PipeReader.hs diff --git a/quik-connector.cabal b/quik-connector.cabal index c251b97..4d5f744 100644 --- a/quik-connector.cabal +++ b/quik-connector.cabal @@ -18,6 +18,7 @@ library exposed-modules: QuoteSource.DataImport , QuoteSource.TableParser , QuoteSource.TableParsers.AllParamsTableParser + , QuoteSource.PipeReader , Broker.PaperBroker , Broker.QuikBroker , Broker.QuikBroker.Trans2QuikApi @@ -40,7 +41,6 @@ library , BoundedChan , hslogger , zeromq4-haskell - , hashmap , hashable , unordered-containers , aeson @@ -53,6 +53,7 @@ library , bimap , safe , conduit + , conduit-extra , stm , stm-conduit , http-client @@ -60,6 +61,10 @@ library , utf8-string , connection , text-format + , monad-loops + , extra + , incremental-parser + , attoparsec default-language: Haskell2010 extra-libraries: "user32" other-modules: System.Win32.XlParser diff --git a/src/QuoteSource/PipeReader.hs b/src/QuoteSource/PipeReader.hs new file mode 100644 index 0000000..4c412b3 --- /dev/null +++ b/src/QuoteSource/PipeReader.hs @@ -0,0 +1,174 @@ +{-# LANGUAGE MultiWayIf #-} +{-# LANGUAGE OverloadedStrings #-} + +module QuoteSource.PipeReader ( + startPipeReader, + stopPipeReader + ) where + +import Data.IORef +import qualified Data.Text as T +import qualified Data.Map.Strict as M +import qualified Data.HashSet as HS +import Data.Decimal +import Data.Time.Clock +import Data.Time.Calendar +import ATrade.Types +import Control.Monad +import Control.Monad.Extra +import Control.Monad.Loops +import Data.Maybe +import Foreign.Marshal.Alloc +import qualified Data.Text.Foreign as FT +import System.Win32.File +import System.Win32.Types +import Control.Concurrent.BoundedChan +import Control.Concurrent hiding (readChan, writeChan, writeList2Chan, yield) +import Control.Exception +import Control.Monad.IO.Class +import Control.Applicative +import Safe +import Control.Error.Util +import Data.Attoparsec.Text +import Data.Conduit +import qualified Data.Conduit.List as CL +import Data.Conduit.Attoparsec + +fromDouble = realFracToDecimal 10 + +data PipeReaderHandle = + PipeReaderHandle { + prThreadId :: ThreadId, + running :: IORef Bool + } deriving (Eq) + +data DataLine = CurrentParamLine T.Text Double Integer Double Integer Integer Double Integer Integer + | AllTradeLine T.Text Integer Double Integer UTCTime + +yieldJust :: Maybe a -> Source IO a +yieldJust maybeV = do -- Probably already present in some library + case maybeV of + Just v -> yield v + Nothing -> return () + +win32FileConduit :: HANDLE -> Source IO T.Text +win32FileConduit handle = do -- TODO actually i have to check if pipe is closed + maybeStr <- liftIO $ allocaBytes 4096 $ \buf -> do + r <- win32_ReadFile handle buf 4096 Nothing + if r > 0 + then do + s <- FT.peekCStringLen (buf, fromEnum r) + return $ Just s + else return Nothing + yieldJust maybeStr + win32FileConduit handle + +line2TickConduit :: Conduit DataLine IO Tick +line2TickConduit = do + volumeMap <- liftIO $ newIORef M.empty + ignoreCPSet <- liftIO $ newIORef HS.empty + lastTimestamp <- liftIO $ newIORef $ UTCTime (fromGregorian 1970 1 1) 0 + awaitForever $ \line -> + case line of + CurrentParamLine tickerId last voltoday bid biddepth biddeptht offer offerdepth offerdeptht -> do + ts <- liftIO $ readIORef lastTimestamp + yieldTick tickerId BestBid ts (fromDouble bid) biddepth + yieldTick tickerId BestOffer ts (fromDouble offer) offerdepth + yieldTick tickerId TotalSupply ts (fromInteger offerdeptht) 0 + yieldTick tickerId TotalDemand ts (fromInteger biddeptht) 0 + + shouldParsePrice <- liftIO $ HS.member tickerId <$> readIORef ignoreCPSet + when shouldParsePrice $ do + m <- liftIO $ readIORef volumeMap + case M.lookup tickerId m of + Just vol -> + if | vol < voltoday -> yieldTick tickerId Price ts (fromDouble last) (voltoday - vol) + | vol > voltoday -> yieldTick tickerId Price ts (fromDouble last) vol + | otherwise -> return () + Nothing -> yieldTick tickerId Price ts (fromDouble last) 1 + + liftIO $ atomicModifyIORef' volumeMap (\m -> (M.insert tickerId voltoday m, ())) + + AllTradeLine tickerId flags price volume ts -> do + liftIO $ writeIORef lastTimestamp ts + if + | flags == 1 -> yieldTick tickerId Price ts (fromDouble price) (-volume) + | flags == 2 -> yieldTick tickerId Price ts (fromDouble price) volume + | otherwise -> return () + liftIO $ atomicModifyIORef' ignoreCPSet (\s -> (HS.insert tickerId s, ())) + + where + yieldTick tickerId dtype ts val vol = + yield $ Tick { security = tickerId, + datatype = dtype, + timestamp = ts, + value = val, + volume = vol } + +chanSink :: BoundedChan a -> Sink a IO () +chanSink chan = awaitForever (\t -> liftIO $ writeChan chan t) + +startPipeReader :: T.Text -> BoundedChan Tick -> IO PipeReaderHandle +startPipeReader pipeName tickChan = do + f <- createFile (T.unpack pipeName) gENERIC_READ 0 Nothing oPEN_EXISTING 0 Nothing + when (f == iNVALID_HANDLE_VALUE) $ error $ "Unable to open pipe: " ++ T.unpack pipeName + running' <- newIORef True + tid <- forkIO $ readerThread f running' + return PipeReaderHandle { prThreadId = tid, running = running' } + where + readerThread f running' = runConduit $ (win32FileConduit f) =$= conduitParser parseTrade =$= CL.map snd =$= line2TickConduit =$= chanSink tickChan + parseTrade = parseCurrentParam <|> parseAllTrade + parseCurrentParam = do + string "CT:" + secName <- takeTill (== ':') + string ":" + last <- double + string ";" + voltoday <- decimal + string ";" + bid <- double + string ";" + biddepth <- decimal + string ";" + biddeptht <- decimal + string ";" + offer <- double + string ";" + offerdepth <- decimal + string ";" + offerdeptht <- decimal + string ";" + return $ CurrentParamLine secName last voltoday bid biddepth biddeptht offer offerdepth offerdeptht + + parseAllTrade = do + string "AT:" + secName <- takeTill (== ':') + string ":" + flags <- decimal + string ";" + price <- double + string ";" + qty <- decimal + string ";" + dt <- parseDateTime + string ";" + return $ AllTradeLine secName flags price qty dt + + parseDateTime = do + y <- decimal + string "." + mon <- decimal + string "." + day <- decimal + string " " + h <- fromInteger <$> decimal + string ":" + m <- fromInteger <$> decimal + string ":" + s <- fromInteger <$> decimal + string "." + ms <- fromInteger <$> decimal + return $ UTCTime (fromGregorian y mon day) $ h * 3600 + m * 60 + s + ms / 1000 + +stopPipeReader :: PipeReaderHandle -> IO () +stopPipeReader h = killThread (prThreadId h) >> writeIORef (running h) False