Browse Source

Update pipe reader

master
Denis Tereshkin 7 years ago
parent
commit
1545c28943
  1. 14
      app/Main.hs
  2. 2
      app/Version.hs
  3. 4
      quik-connector.cabal
  4. 2
      src/QuoteSource/DataImport.hs
  5. 161
      src/QuoteSource/PipeReader.hs

14
app/Main.hs

@ -109,7 +109,7 @@ main = do @@ -109,7 +109,7 @@ main = do
let serverParams = defaultServerSecurityParams { sspDomain = Just "global",
sspCertificate = serverCert }
bracket (forkIO $ pipeReaderThread ctx config) killThread (\_ -> do
bracket (forkIO $ pipeReaderThread ctx config c2) killThread (\_ -> do
withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do
withZMQTradeSink ctx (tradeSink2 config) (\zmqTradeSink2 -> do
bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) (Just "global")) stopQuoteSourceServer (\_ -> do
@ -127,13 +127,11 @@ main = do @@ -127,13 +127,11 @@ main = do
void $ timeout 1000000 $ killThread forkId
infoM "main" "Main thread done"
where
pipeReaderThread ctx config =
case (tickPipePath config, pipeReaderQsEndpoint config) of
(Just pipe, Just qsep) -> do
infoM "main" $ "Pipe/QS: " ++ pipe ++ "/" ++ qsep
tickChan <- newBoundedChan 10000
bracket (startPipeReader ctx (T.pack pipe) tickChan) stopPipeReader (\_ -> do
bracket (startQuoteSourceServer tickChan ctx (T.pack qsep) (Just "global")) stopQuoteSourceServer (\_ -> forever $ threadDelay 1000000))
pipeReaderThread ctx config qsdataChan =
case pipeReaderQsEndpoint config of
Just qsep -> do
infoM "main" $ "QS: " ++ qsep
bracket (startPipeReader ctx (T.pack qsep) qsdataChan) stopPipeReader (\_ -> forever $ threadDelay 1000000)
_ -> return ()

2
app/Version.hs

@ -10,7 +10,7 @@ import qualified Data.Text as T @@ -10,7 +10,7 @@ import qualified Data.Text as T
import Text.Printf.TH
quikConnectorVersion :: (Int, Int, Int, Int)
quikConnectorVersion = (0, 3, 0, 0)
quikConnectorVersion = (0, 4, 0, 0)
quikConnectorVersionText :: T.Text
quikConnectorVersionText =

4
quik-connector.cabal

@ -1,5 +1,5 @@ @@ -1,5 +1,5 @@
name: quik-connector
version: 0.3.0.0
version: 0.4.0.0
synopsis: Atrade-Quik Connector application
description: Please see README.md
homepage: https://github.com/asakul/quik-connector
@ -49,7 +49,7 @@ library @@ -49,7 +49,7 @@ library
, aeson
, cond
, scientific
, libatrade == 0.7.0.0
, libatrade == 0.8.0.0
, deepseq
, errors
, split

2
src/QuoteSource/DataImport.hs

@ -6,9 +6,9 @@ module QuoteSource.DataImport @@ -6,9 +6,9 @@ module QuoteSource.DataImport
shutdownDataImportServer
) where
import ATrade.Types
import Control.Concurrent.BoundedChan
import Control.Monad.State.Strict
import ATrade.Types
import Data.IORef
import Data.Time.Clock
import QuoteSource.TableParser

161
src/QuoteSource/PipeReader.hs

@ -17,11 +17,12 @@ import Control.Exception @@ -17,11 +17,12 @@ import Control.Exception
import Control.Monad
import Control.Monad.Extra
import Control.Monad.IO.Class
import Control.Monad.Loops
import Data.Attoparsec.Text
import Control.Monad.Loops (whileM_)
import Data.Binary
import Data.Binary.Get
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import Data.Conduit hiding (connect)
import Data.Conduit.Attoparsec
import qualified Data.Conduit.List as CL
import qualified Data.HashSet as HS
import Data.IORef
@ -29,7 +30,6 @@ import qualified Data.Map.Strict as M @@ -29,7 +30,6 @@ import qualified Data.Map.Strict as M
import Data.Maybe
import qualified Data.Text as T
import Data.Text.Encoding
import qualified Data.Text.Foreign as FT
import Data.Time.Calendar
import Data.Time.Clock
import Foreign.Marshal.Alloc
@ -45,144 +45,51 @@ data PipeReaderHandle = @@ -45,144 +45,51 @@ data PipeReaderHandle =
running :: IORef Bool
} deriving (Eq)
data DataLine = CurrentParamLine T.Text Double Integer Double Integer Integer Double Integer Integer
| AllTradeLine T.Text Integer Double Integer UTCTime
deriving (Show, Eq)
yieldJust :: Maybe a -> Source IO a
yieldJust maybeV = do -- Probably already present in some library
case maybeV of
Just v -> yield v
Nothing -> return ()
zmqSocketConduit :: (Receiver a) => Socket a -> Source IO T.Text
zmqSocketConduit sock = do
maybeStr <- liftIO $ do
bs <- receive sock
case decodeUtf8' bs of
Left _ -> return Nothing
Right v -> return (Just v)
yieldJust maybeStr
zmqSocketConduit sock
line2TickConduit :: Conduit DataLine IO Tick
line2TickConduit = do
volumeMap <- liftIO $ newIORef M.empty
ignoreCPSet <- liftIO $ newIORef HS.empty
lastTimestamp <- liftIO $ newIORef $ UTCTime (fromGregorian 1970 1 1) 0
awaitForever $ \line ->
case line of
CurrentParamLine tickerId last voltoday bid biddepth biddeptht offer offerdepth offerdeptht -> do
ts <- liftIO $ readIORef lastTimestamp
yieldTick tickerId BestBid ts (fromDouble bid) biddepth
yieldTick tickerId BestOffer ts (fromDouble offer) offerdepth
yieldTick tickerId TotalSupply ts (fromInteger offerdeptht) 0
yieldTick tickerId TotalDemand ts (fromInteger biddeptht) 0
shouldParsePrice <- liftIO $ HS.member tickerId <$> readIORef ignoreCPSet
when shouldParsePrice $ do
m <- liftIO $ readIORef volumeMap
case M.lookup tickerId m of
Just vol ->
if | vol < voltoday -> yieldTick tickerId LastTradePrice ts (fromDouble last) (voltoday - vol)
| vol > voltoday -> yieldTick tickerId LastTradePrice ts (fromDouble last) vol
| otherwise -> return ()
Nothing -> yieldTick tickerId LastTradePrice ts (fromDouble last) 1
liftIO $ atomicModifyIORef' volumeMap (\m -> (M.insert tickerId voltoday m, ()))
zmqSocketConduit :: (Subscriber a, Receiver a) => T.Text -> Socket a -> IORef Bool -> Source IO [B.ByteString]
zmqSocketConduit ep sock running' = do
liftIO $ do
debugM "PipeReader" $ "Connecting to: " ++ T.unpack ep
connect sock (T.unpack ep)
subscribe sock B.empty
lastHeartbeat <- liftIO $ getCurrentTime >>= newIORef
whileM_ (andM [notTimeout lastHeartbeat, liftIO (readIORef running')]) $ do
evs <- liftIO $ poll 200 [Sock sock [In] Nothing]
unless (null (head evs)) $ do
bs <- liftIO $ receiveMulti sock
when ((not . null $ bs) && (head bs == "SYSTEM#HEARTBEAT")) $ liftIO $ getCurrentTime >>= writeIORef lastHeartbeat
yield bs
zmqSocketConduit ep sock running'
where
notTimeout hb = do
now <- liftIO $ getCurrentTime
last <- liftIO $ readIORef hb
return $ now `diffUTCTime` last < 10
AllTradeLine tickerId flags price volume ts -> do
liftIO $ writeIORef lastTimestamp ts
if
| flags == 1 -> yieldTick tickerId LastTradePrice ts (fromDouble price) volume
| flags == 2 -> yieldTick tickerId LastTradePrice ts (fromDouble price) volume
| otherwise -> return ()
liftIO $ atomicModifyIORef' ignoreCPSet (\s -> (HS.insert tickerId s, ()))
parseBarConduit :: Conduit [B.ByteString] IO (TickerId, BarTimeframe, Bar)
parseBarConduit = awaitForever $ \bs ->
case deserializeBar (BL.fromStrict <$> bs) of
Just (tf, bar) -> yield (barSecurity bar, tf, bar)
_ -> return ()
where
yieldTick tickerId dtype ts val vol =
yield $ Tick { security = tickerId,
datatype = dtype,
timestamp = ts,
value = val,
volume = vol }
qssdataConduit :: Conduit (TickerId, BarTimeframe, Bar) IO QuoteSourceServerData
qssdataConduit = awaitForever $ \(tid, tf, bar) -> yield $ QSSBar (tf, bar)
chanSink :: BoundedChan QuoteSourceServerData -> Sink Tick IO ()
chanSink :: (Show a) => BoundedChan a -> Sink a IO ()
chanSink chan = awaitForever
(\t -> liftIO $ do
writeChan chan (QSSTick t))
(\t -> do
liftIO $ writeChan chan t)
startPipeReader :: Context -> T.Text -> BoundedChan QuoteSourceServerData -> IO PipeReaderHandle
startPipeReader ctx pipeEndpoint tickChan = do
debugM "PipeReader" $ "Trying to open pipe: " ++ T.unpack pipeEndpoint
s <- socket ctx Sub
connect s (T.unpack pipeEndpoint)
subscribe s B.empty
debugM "PipeReader" "Pipe opened"
running' <- newIORef True
tid <- forkIO $ readerThread s running'
return PipeReaderHandle { prThreadId = tid, running = running' }
where
readerThread s running' = runConduit $ (zmqSocketConduit s) =$= conduitParserEither parseTrade =$= handleParseResult =$= line2TickConduit =$= chanSink tickChan
parseTrade = parseCurrentParam <|> parseAllTrade
parseCurrentParam = do
string "CT:"
secName <- takeTill (== ':')
string ":"
last <- double
string ";"
voltoday <- decimal
string ";"
bid <- double
string ";"
biddepth <- decimal
string ";"
biddeptht <- decimal
string ";"
offer <- double
string ";"
offerdepth <- decimal
string ";"
offerdeptht <- decimal
string ";"
return $ CurrentParamLine secName last voltoday bid biddepth biddeptht offer offerdepth offerdeptht
parseAllTrade = do
string "AT:"
secName <- takeTill (== ':')
string ":"
flags <- decimal
string ";"
price <- double
string ";"
qty <- decimal
string ";"
dt <- parseDateTime
string ";"
return $ AllTradeLine secName flags price qty dt
parseDateTime = do
y <- decimal
string "."
mon <- decimal
string "."
day <- decimal
string " "
h <- fromInteger <$> decimal
string ":"
m <- fromInteger <$> decimal
string ":"
s <- fromInteger <$> decimal
string "."
ms <- fromInteger <$> decimal
return $ UTCTime (fromGregorian y mon day) $ h * 3600 + m * 60 + s + ms / 1000
handleParseResult = do
awaitForever $ \res ->
case res of
Left err -> liftIO $ warningM "PipeReader" $ "Can't parse: " ++ show err
Right (_, r) -> yield r
readerThread s running' = runConduit $ (zmqSocketConduit pipeEndpoint s running') =$= parseBarConduit =$= qssdataConduit =$= chanSink tickChan
stopPipeReader :: PipeReaderHandle -> IO ()
stopPipeReader h = killThread (prThreadId h) >> writeIORef (running h) False

Loading…
Cancel
Save