From fba2f78fa1ad45829bbc8a2f9d38397b27f72706 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Sat, 2 Mar 2024 15:48:23 +0700 Subject: [PATCH] Parse rest of the messages via xeno --- src/TXMLConnector/Internal.hs | 45 +++++++++++++---------------------- src/Transaq/Parsing.hs | 24 ++++++++++++++++--- 2 files changed, 37 insertions(+), 32 deletions(-) diff --git a/src/TXMLConnector/Internal.hs b/src/TXMLConnector/Internal.hs index 50b51db..e1cd9b3 100644 --- a/src/TXMLConnector/Internal.hs +++ b/src/TXMLConnector/Internal.hs @@ -2,6 +2,7 @@ {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE MultiParamTypeClasses #-} +{-# LANGUAGE ScopedTypeVariables #-} module TXMLConnector.Internal ( @@ -30,6 +31,7 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar', tryReadTMVar, writeTVar) import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue, readTBQueue, writeTBQueue) +import Control.Exception import Control.Monad (forM_, void, when) import Control.Monad.Extra (whileM) import qualified Data.Bimap as BM @@ -37,12 +39,10 @@ import Data.Functor.Identity (Identity (..)) import Data.Maybe (catMaybes, fromMaybe, mapMaybe) import qualified Data.Text as T import Debug.EventCounters (emitEvent) +import Debug.Trace import qualified Deque.Strict as D import SlaveThread (fork) -import Text.XML.Light.Input (parseXML) -import Text.XML.Light.Types (Content (Elem), - Element (elName), - QName (qName)) +import System.IO import TickTable (TickTable, insertTick) import Transaq (AllTradesTrade, AllTradesTradeB (..), Candle, @@ -84,9 +84,12 @@ import Transaq (AllTradesTrade, TransaqResponseC (fromXml), UnfilledAction (..), kCandleKindId, kPeriod, state) +import Transaq.Parsing (parseTransaqResponsesFromText) import TXML (MonadTXML, freeCallback, initialize, sendCommand, setCallback) +import Xeno.Errors +import Xeno.Types import ATrade.Broker.Backend (BrokerBackendNotification (..)) import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) @@ -207,7 +210,7 @@ workThread = do Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str Right _ -> do queue <- asks transaqQueue - rc' <- setCallback (parseAndWrite queue) + rc' <- setCallback (parseAndWrite' queue) case rc' of Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" Just cb -> do @@ -230,28 +233,12 @@ workThread = do parseTransaqLogLevel _ = TXML.Info parseAndWrite :: TBQueue TransaqResponse -> T.Text -> IO Bool parseAndWrite queue xml = do - let parsed = mapMaybe parseContent $ parseXML xml + let parsed = parseTransaqResponsesFromText xml atomically $ mapM_ (writeTBQueue queue) parsed pure True - -parseContent :: Content -> Maybe TransaqResponse -parseContent (Elem el) = parseElement - where - parseElement = case qName $ elName el of - "candles" -> TransaqResponseCandles <$> fromXml el - "server_status" -> TransaqResponseServerStatus <$> fromXml el - "markets" -> TransaqResponseMarkets <$> fromXml el - "candlekinds" -> TransaqResponseCandleKinds <$> fromXml el - "securities" -> TransaqResponseSecurities <$> fromXml el - "sec_info" -> TransaqResponseSecInfo <$> fromXml el - "quotations" -> TransaqResponseQuotations <$> fromXml el - "alltrades" -> TransaqResponseAllTrades <$> fromXml el - "quotes" -> TransaqResponseQuotes <$> fromXml el - "orders" -> TransaqResponseOrders <$> fromXml el - "trades" -> TransaqResponseTrades <$> fromXml el - "result" -> TransaqResponseResult <$> fromXml el - _ -> Nothing -parseContent _ = Nothing + parseAndWrite' :: TBQueue TransaqResponse -> T.Text -> IO Bool + parseAndWrite' queue xml = parseAndWrite queue xml `catch` + (\(ex :: XenoException) -> printExceptions "" [ex] >> print (T.take 50 xml) >> pure False) handleTransaqData :: (MonadIO m, MonadReader Env m, @@ -414,7 +401,7 @@ handleConnected = do MainQueuePingServer -> do maybeServerStatus <- sendCommand $ toXml CommandServerStatus case maybeServerStatus of - Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of + Left serverStatusRaw -> case parseTransaqResponsesFromText serverStatusRaw of ((TransaqResponseResult (ResponseFailure _)):_) -> do pure $ Just StageConnection _ -> do @@ -446,8 +433,8 @@ handleConnected = do v <- sendCommand . toXml $ cmd case v of Left result -> do - case headMay (parseXML result) >>= parseContent of - Just (TransaqResponseResult (ResponseSuccess (Just transactionId'))) -> do + case parseTransaqResponsesFromText result of + ((TransaqResponseResult (ResponseSuccess (Just transactionId'))):_) -> do brState <- asks brokerState respVar <- asks responseVar liftIO $ atomically $ do @@ -463,7 +450,7 @@ handleConnected = do _ -> pure () log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <> (T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId' - Just (TransaqResponseResult (ResponseFailure err)) -> do + ((TransaqResponseResult (ResponseFailure err)):_) -> do brState <- asks brokerState log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) diff --git a/src/Transaq/Parsing.hs b/src/Transaq/Parsing.hs index 4b1aadc..17bcc21 100644 --- a/src/Transaq/Parsing.hs +++ b/src/Transaq/Parsing.hs @@ -9,6 +9,7 @@ {-# LANGUAGE MultiWayIf #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE RecordWildCards #-} +{-# LANGUAGE ScopedTypeVariables #-} {-# LANGUAGE StandaloneDeriving #-} {-# LANGUAGE TypeFamilies #-} @@ -17,12 +18,14 @@ module Transaq.Parsing ParsingProcess(..), ParsingContext(..), defaultProcess, - parseTransaqResponses + parseTransaqResponses, + parseTransaqResponsesFromText ) where import Barbies.Bare (Bare, Covered) import Control.Applicative (many) import Control.Error.Util (hush) +import Control.Exception import Control.Monad (void, when) import Control.Monad.ST (ST, runST) import Control.Monad.State (MonadState, State, execState, @@ -33,6 +36,7 @@ import Data.Attoparsec.Text (Parser, char, decimal, many', skipSpace) import qualified Data.ByteString as BS import Data.ByteString.Char8 (readInteger) +import qualified Data.ByteString.Lazy as BL import Data.Char (isSpace) import Data.Functor.Identity (Identity) import Data.Int (Int64) @@ -43,6 +47,8 @@ import Data.STRef.Strict (STRef, modifySTRef', newSTRef, import qualified Data.Text as T import Data.Text.Encoding (decodeUtf8, decodeUtf8With) import Data.Text.Encoding.Error (lenientDecode) +import Data.Text.Lazy (fromStrict) +import Data.Text.Lazy.Encoding (encodeUtf8) import Data.Time (fromGregorian) import Data.Time.Clock (UTCTime (UTCTime)) import Data.Void (Void) @@ -431,12 +437,24 @@ newtype ParsingContext a = ParsingContext { unParsingContext :: State [XmlStream type ParsingProcess = Process (ParsingContext ()) +parseTransaqResponsesFromText :: T.Text -> [TransaqResponse] +parseTransaqResponsesFromText = parseTransaqResponses . BL.toStrict . encodeUtf8 . fromStrict + parseTransaqResponses :: BS.ByteString -> [TransaqResponse] parseTransaqResponses bs = let stream = filter (not . isWhitespaceText) . reverse $ execState (unParsingContext $ process defaultProcess bs) [] in - case runST $ runParserT (many txmlParser) "" stream of + case runST $ runParserT (many txmlParserWrapper) "" stream of Left err -> [] - Right result -> result + Right result -> catMaybes result + where + txmlParserWrapper = (Just <$> txmlParser) <|> (skipTag >> pure Nothing) + skipTag :: ParsecT String [XmlStreamEvent] (ST s) (Maybe TransaqResponse) + skipTag = do + x <- satisfy isOpenTag + case x of + XmlOpen tagname -> ignoreTag tagname >> pure Nothing + _ -> customFailure "Expected tag open" + txmlParser :: ParsecT String [XmlStreamEvent] (ST s) TransaqResponse txmlParser = do