Browse Source

Parse rest of the messages via xeno

master
Denis Tereshkin 2 years ago
parent
commit
fba2f78fa1
  1. 45
      src/TXMLConnector/Internal.hs
  2. 24
      src/Transaq/Parsing.hs

45
src/TXMLConnector/Internal.hs

@ -2,6 +2,7 @@ @@ -2,6 +2,7 @@
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE ScopedTypeVariables #-}
module TXMLConnector.Internal
(
@ -30,6 +31,7 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar', @@ -30,6 +31,7 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar',
tryReadTMVar, writeTVar)
import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue,
readTBQueue, writeTBQueue)
import Control.Exception
import Control.Monad (forM_, void, when)
import Control.Monad.Extra (whileM)
import qualified Data.Bimap as BM
@ -37,12 +39,10 @@ import Data.Functor.Identity (Identity (..)) @@ -37,12 +39,10 @@ import Data.Functor.Identity (Identity (..))
import Data.Maybe (catMaybes, fromMaybe, mapMaybe)
import qualified Data.Text as T
import Debug.EventCounters (emitEvent)
import Debug.Trace
import qualified Deque.Strict as D
import SlaveThread (fork)
import Text.XML.Light.Input (parseXML)
import Text.XML.Light.Types (Content (Elem),
Element (elName),
QName (qName))
import System.IO
import TickTable (TickTable, insertTick)
import Transaq (AllTradesTrade,
AllTradesTradeB (..), Candle,
@ -84,9 +84,12 @@ import Transaq (AllTradesTrade, @@ -84,9 +84,12 @@ import Transaq (AllTradesTrade,
TransaqResponseC (fromXml),
UnfilledAction (..),
kCandleKindId, kPeriod, state)
import Transaq.Parsing (parseTransaqResponsesFromText)
import TXML (MonadTXML, freeCallback,
initialize, sendCommand,
setCallback)
import Xeno.Errors
import Xeno.Types
import ATrade.Broker.Backend (BrokerBackendNotification (..))
import ATrade.QuoteSource.Server (QuoteSourceServerData (..))
@ -207,7 +210,7 @@ workThread = do @@ -207,7 +210,7 @@ workThread = do
Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str
Right _ -> do
queue <- asks transaqQueue
rc' <- setCallback (parseAndWrite queue)
rc' <- setCallback (parseAndWrite' queue)
case rc' of
Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback"
Just cb -> do
@ -230,28 +233,12 @@ workThread = do @@ -230,28 +233,12 @@ workThread = do
parseTransaqLogLevel _ = TXML.Info
parseAndWrite :: TBQueue TransaqResponse -> T.Text -> IO Bool
parseAndWrite queue xml = do
let parsed = mapMaybe parseContent $ parseXML xml
let parsed = parseTransaqResponsesFromText xml
atomically $ mapM_ (writeTBQueue queue) parsed
pure True
parseContent :: Content -> Maybe TransaqResponse
parseContent (Elem el) = parseElement
where
parseElement = case qName $ elName el of
"candles" -> TransaqResponseCandles <$> fromXml el
"server_status" -> TransaqResponseServerStatus <$> fromXml el
"markets" -> TransaqResponseMarkets <$> fromXml el
"candlekinds" -> TransaqResponseCandleKinds <$> fromXml el
"securities" -> TransaqResponseSecurities <$> fromXml el
"sec_info" -> TransaqResponseSecInfo <$> fromXml el
"quotations" -> TransaqResponseQuotations <$> fromXml el
"alltrades" -> TransaqResponseAllTrades <$> fromXml el
"quotes" -> TransaqResponseQuotes <$> fromXml el
"orders" -> TransaqResponseOrders <$> fromXml el
"trades" -> TransaqResponseTrades <$> fromXml el
"result" -> TransaqResponseResult <$> fromXml el
_ -> Nothing
parseContent _ = Nothing
parseAndWrite' :: TBQueue TransaqResponse -> T.Text -> IO Bool
parseAndWrite' queue xml = parseAndWrite queue xml `catch`
(\(ex :: XenoException) -> printExceptions "" [ex] >> print (T.take 50 xml) >> pure False)
handleTransaqData :: (MonadIO m,
MonadReader Env m,
@ -414,7 +401,7 @@ handleConnected = do @@ -414,7 +401,7 @@ handleConnected = do
MainQueuePingServer -> do
maybeServerStatus <- sendCommand $ toXml CommandServerStatus
case maybeServerStatus of
Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of
Left serverStatusRaw -> case parseTransaqResponsesFromText serverStatusRaw of
((TransaqResponseResult (ResponseFailure _)):_) -> do
pure $ Just StageConnection
_ -> do
@ -446,8 +433,8 @@ handleConnected = do @@ -446,8 +433,8 @@ handleConnected = do
v <- sendCommand . toXml $ cmd
case v of
Left result -> do
case headMay (parseXML result) >>= parseContent of
Just (TransaqResponseResult (ResponseSuccess (Just transactionId'))) -> do
case parseTransaqResponsesFromText result of
((TransaqResponseResult (ResponseSuccess (Just transactionId'))):_) -> do
brState <- asks brokerState
respVar <- asks responseVar
liftIO $ atomically $ do
@ -463,7 +450,7 @@ handleConnected = do @@ -463,7 +450,7 @@ handleConnected = do
_ -> pure ()
log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <>
(T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId'
Just (TransaqResponseResult (ResponseFailure err)) -> do
((TransaqResponseResult (ResponseFailure err)):_) -> do
brState <- asks brokerState
log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)

24
src/Transaq/Parsing.hs

@ -9,6 +9,7 @@ @@ -9,6 +9,7 @@
{-# LANGUAGE MultiWayIf #-}
{-# LANGUAGE OverloadedRecordDot #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE StandaloneDeriving #-}
{-# LANGUAGE TypeFamilies #-}
@ -17,12 +18,14 @@ module Transaq.Parsing @@ -17,12 +18,14 @@ module Transaq.Parsing
ParsingProcess(..),
ParsingContext(..),
defaultProcess,
parseTransaqResponses
parseTransaqResponses,
parseTransaqResponsesFromText
) where
import Barbies.Bare (Bare, Covered)
import Control.Applicative (many)
import Control.Error.Util (hush)
import Control.Exception
import Control.Monad (void, when)
import Control.Monad.ST (ST, runST)
import Control.Monad.State (MonadState, State, execState,
@ -33,6 +36,7 @@ import Data.Attoparsec.Text (Parser, char, decimal, many', @@ -33,6 +36,7 @@ import Data.Attoparsec.Text (Parser, char, decimal, many',
skipSpace)
import qualified Data.ByteString as BS
import Data.ByteString.Char8 (readInteger)
import qualified Data.ByteString.Lazy as BL
import Data.Char (isSpace)
import Data.Functor.Identity (Identity)
import Data.Int (Int64)
@ -43,6 +47,8 @@ import Data.STRef.Strict (STRef, modifySTRef', newSTRef, @@ -43,6 +47,8 @@ import Data.STRef.Strict (STRef, modifySTRef', newSTRef,
import qualified Data.Text as T
import Data.Text.Encoding (decodeUtf8, decodeUtf8With)
import Data.Text.Encoding.Error (lenientDecode)
import Data.Text.Lazy (fromStrict)
import Data.Text.Lazy.Encoding (encodeUtf8)
import Data.Time (fromGregorian)
import Data.Time.Clock (UTCTime (UTCTime))
import Data.Void (Void)
@ -431,12 +437,24 @@ newtype ParsingContext a = ParsingContext { unParsingContext :: State [XmlStream @@ -431,12 +437,24 @@ newtype ParsingContext a = ParsingContext { unParsingContext :: State [XmlStream
type ParsingProcess = Process (ParsingContext ())
parseTransaqResponsesFromText :: T.Text -> [TransaqResponse]
parseTransaqResponsesFromText = parseTransaqResponses . BL.toStrict . encodeUtf8 . fromStrict
parseTransaqResponses :: BS.ByteString -> [TransaqResponse]
parseTransaqResponses bs =
let stream = filter (not . isWhitespaceText) . reverse $ execState (unParsingContext $ process defaultProcess bs) [] in
case runST $ runParserT (many txmlParser) "" stream of
case runST $ runParserT (many txmlParserWrapper) "" stream of
Left err -> []
Right result -> result
Right result -> catMaybes result
where
txmlParserWrapper = (Just <$> txmlParser) <|> (skipTag >> pure Nothing)
skipTag :: ParsecT String [XmlStreamEvent] (ST s) (Maybe TransaqResponse)
skipTag = do
x <- satisfy isOpenTag
case x of
XmlOpen tagname -> ignoreTag tagname >> pure Nothing
_ -> customFailure "Expected tag open"
txmlParser :: ParsecT String [XmlStreamEvent] (ST s) TransaqResponse
txmlParser = do

Loading…
Cancel
Save