From de643d3297f84125ca3e150cbbf9b17f6394182f Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Thu, 3 Jul 2025 11:13:56 +0700 Subject: [PATCH] Fix warnings --- src/Commissions.hs | 5 ++-- src/Config.hs | 40 ++++++++++++++++++++++++--- src/FSM.hs | 5 +--- src/Main.hs | 19 ++++++------- src/PaperBroker.hs | 8 ++---- src/TXMLConnector/Internal.hs | 43 +++++++++++++++++++---------- src/TickerInfoServer.hs | 11 ++++---- src/Transaq.hs | 52 ++++++++++++++--------------------- src/Transaq/Parsing.hs | 23 +++++++--------- transaq-connector.cabal | 6 ++-- 10 files changed, 118 insertions(+), 94 deletions(-) diff --git a/src/Commissions.hs b/src/Commissions.hs index 8c3fd0e..5fcfc8b 100644 --- a/src/Commissions.hs +++ b/src/Commissions.hs @@ -1,5 +1,4 @@ -{-# LANGUAGE DeriveGeneric #-} -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE DeriveGeneric #-} module Commissions ( CommissionConfig(..) @@ -7,7 +6,7 @@ module Commissions ( import qualified Data.Text as T import Dhall -import GHC.Generics +import GHC.Generics () data CommissionConfig = CommissionConfig { comPrefix :: T.Text, diff --git a/src/Config.hs b/src/Config.hs index c318bcd..323e981 100644 --- a/src/Config.hs +++ b/src/Config.hs @@ -1,4 +1,6 @@ -{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE DeriveGeneric #-} +{-# LANGUAGE NamedFieldPuns #-} +{-# LANGUAGE RecordWildCards #-} module Config ( @@ -7,9 +9,13 @@ module Config loadConfig, ) where +import ATrade.Logging (Severity (..)) import Commissions -import qualified Data.Text as T -import Dhall (FromDhall (autoWith), auto, expected, inputFile) +import qualified Data.Text as T +import Dhall (Decoder (..), FromDhall (autoWith), auto, + expected, inputFile, typeError) +import Dhall.Core (Expr (..), FieldSelection (..)) +import qualified Dhall.Map import GHC.Generics data SubscriptionConfig = SubscriptionConfig T.Text T.Text @@ -18,7 +24,8 @@ data SubscriptionConfig = SubscriptionConfig T.Text T.Text instance FromDhall SubscriptionConfig data TransaqConnectorConfig = TransaqConnectorConfig { - logLevel :: Int, + baseLogLevel :: Severity, + componentsLogLevel :: [(T.Text, Severity)], quotesourceEndpoint :: T.Text, brokerEndpoint :: T.Text, brokerNotificationsEndpoint :: T.Text, @@ -49,3 +56,28 @@ instance FromDhall TransaqConnectorConfig loadConfig :: FilePath -> IO TransaqConnectorConfig loadConfig = inputFile auto + +instance FromDhall Severity where + autoWith _ = Decoder {..} + where + extract expr@(Field _ FieldSelection{ fieldSelectionLabel }) = + case fieldSelectionLabel of + "Trace" -> pure Trace + "Debug" -> pure Debug + "Info" -> pure Info + "Warning" -> pure Warning + "Error" -> pure Error + _ -> typeError expected expr + extract expr = typeError expected expr + + expected = pure + (Union + (Dhall.Map.fromList + [ ("Trace", Nothing) + , ("Debug", Nothing) + , ("Info", Nothing) + , ("Warning", Nothing) + , ("Error", Nothing) + ] + ) + ) diff --git a/src/FSM.hs b/src/FSM.hs index 0416f04..157810b 100644 --- a/src/FSM.hs +++ b/src/FSM.hs @@ -38,10 +38,7 @@ runFsm fsm = whileM $ do Nothing -> pure (not . isTerminalState $ currentState) Nothing -> pure False -makeFsm :: (MonadIO m1, - MonadIO m, - FSMState a, - Ord a) => a -> [(a, FSMCallback m a)] -> m1 (FSM a m) +makeFsm :: (MonadIO m1, Ord a) => a -> [(a, FSMCallback m a)] -> m1 (FSM a m) makeFsm initialState handlers = do currentState <- liftIO $ newTVarIO initialState pure $ FSM currentState (M.fromList handlers) diff --git a/src/Main.hs b/src/Main.hs index 6589a49..121a8dc 100644 --- a/src/Main.hs +++ b/src/Main.hs @@ -28,6 +28,7 @@ import Control.Concurrent.BoundedChan (BoundedChan, import Control.Exception (bracket) import Control.Monad (forever, void) import Control.Monad.IO.Class (MonadIO) +import qualified Data.Map as M import qualified Data.Text as T import Data.Time.Clock.POSIX (getPOSIXTime) import Data.Version (showVersion) @@ -52,15 +53,14 @@ import qualified TXMLConnector as Connector import Version (transaqConnectorVersionText, transaqConnector_gitrev) -mkLogger :: (MonadIO m) => Handle -> Severity -> LogAction m Message -mkLogger h sev = cfilter (\m -> msgSeverity m >= sev) (fmtMessage >$< (logTextStdout <> logTextHandle h)) -parseLoglevel :: Int -> Severity -parseLoglevel 0 = Error -parseLoglevel 1 = Warning -parseLoglevel 2 = Info -parseLoglevel 3 = Debug -parseLoglevel _ = Trace +mkLogger :: (MonadIO m) => Severity -> M.Map T.Text Severity -> Handle -> LogAction m Message +mkLogger sev loglevels h = cfilter checkLoglevel (fmtMessage >$< (logTextStdout <> logTextHandle h)) + where + checkLoglevel msg = + case M.lookup (msgComponent msg) loglevels of + Just level -> msgSeverity msg >= level + Nothing -> msgSeverity msg >= sev forkQssChannel :: BoundedChan QuoteSourceServerData @@ -85,9 +85,8 @@ main = do initEventCounters store statsdThread <- forkStatsd (defaultStatsdOptions { host = statsdHost cfg, port = statsdPort cfg, prefix = "transaq_connector"}) store - let loglevel = parseLoglevel (logLevel cfg) withFile "transaq-connector.log" AppendMode $ \logH -> do - let logger = mkLogger logH loglevel + let logger = mkLogger (baseLogLevel cfg) (M.fromList $ componentsLogLevel cfg) logH let log = logWith logger log Info "main" $ "Starting transaq-connector-" <> transaqConnectorVersionText <> diff --git a/src/PaperBroker.hs b/src/PaperBroker.hs index 86de782..889bed6 100644 --- a/src/PaperBroker.hs +++ b/src/PaperBroker.hs @@ -8,8 +8,6 @@ module PaperBroker ( ) where import ATrade.Broker.Backend -import ATrade.Broker.Protocol -import ATrade.Broker.Server import ATrade.Logging (Message, Severity (..), logWith) import ATrade.Types @@ -18,7 +16,6 @@ import Commissions (CommissionConfig (..)) import Control.Concurrent hiding (readChan, writeChan) import Control.Concurrent.BoundedChan import Control.Monad -import Data.Bits import Data.IORef import qualified Data.List as L import qualified Data.Map.Strict as M @@ -28,7 +25,6 @@ import qualified Data.Text.Lazy as TL import Data.Time.Clock import Debug.EventCounters (emitEvent) import Language.Haskell.Printf (t) -import System.ZMQ4 import TickerInfoServer import TickTable (TickTable, lookupTick) @@ -56,7 +52,7 @@ hourMin :: Integer -> Integer -> DiffTime hourMin h m = fromIntegral $ h * 3600 + m * 60 mkPaperBroker :: TickTable -> TickerInfoServerHandle -> BoundedChan Tick -> Price -> [T.Text] -> [CommissionConfig] -> LogAction IO Message -> IO BrokerBackend -mkPaperBroker tickTableH tisH tickChan startCash accounts comms l = do +mkPaperBroker tickTableH tisHandle tickChan startCash accounts comms l = do state <- newIORef PaperBrokerState { pbTid = Nothing, tickTable = tickTableH, @@ -74,7 +70,7 @@ mkPaperBroker tickTableH tisH tickChan startCash accounts comms l = do postMarketCloseTime = hourMin 15 50, commissions = comms, logger = l, - tisH = tisH + tisH = tisHandle } tid <- forkIO $ brokerThread tickChan state tickTableH diff --git a/src/TXMLConnector/Internal.hs b/src/TXMLConnector/Internal.hs index 6bf10a5..b16d265 100644 --- a/src/TXMLConnector/Internal.hs +++ b/src/TXMLConnector/Internal.hs @@ -1,6 +1,7 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GADTs #-} {-# LANGUAGE MultiParamTypeClasses #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -32,7 +33,7 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar', import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue, readTBQueue, writeTBQueue) import Control.Exception -import Control.Monad (forM_, void, when) +import Control.Monad (forM_, unless, void, when) import Control.Monad.Extra (whileM) import qualified Data.Bimap as BM import Data.Maybe (catMaybes, fromMaybe) @@ -104,6 +105,7 @@ import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Reader.Class (MonadReader, asks) import Data.Int (Int64) import qualified Data.Map.Strict as M +import qualified Data.Set as S import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime) import FSM (FSMCallback (..), @@ -141,6 +143,7 @@ data Env = , brokerState :: BrokerState , runVar :: TMVar () , timerVar :: TMVar () + , processedTrades :: TVar (S.Set Int64) } data MainQueueData = @@ -297,19 +300,31 @@ handleTransaqData transaqData = do trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) orderMap <- liftIO $ readTVarIO (bsOrderMap brState) - case maybeCb of - Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of - Just oid -> case M.lookup oid orderMap of - Just order -> do - tisH <- asks tisHandle - let tickerId' = tBoard transaqTrade <> "#" <> tSecCode transaqTrade - maybeTickerInfo <- liftIO $ getTickerInfo tickerId' tisH - let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order maybeTickerInfo) - log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif - liftIO $ cb notif - _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade - _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade - Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for trade notification!" + isAlreadyProcessed <- checkIfTradeIsAlreadyProcessed transaqTrade + unless isAlreadyProcessed $ + case maybeCb of + Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of + Just oid -> case M.lookup oid orderMap of + Just order -> do + tisH <- asks tisHandle + let tickerId' = tBoard transaqTrade <> "#" <> tSecCode transaqTrade + maybeTickerInfo <- liftIO $ getTickerInfo tickerId' tisH + let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order maybeTickerInfo) + log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif + liftIO $ cb notif + addTradeToProcessed transaqTrade + _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade + _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade + Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for trade notification!" + + checkIfTradeIsAlreadyProcessed trade = do + trades <- asks processedTrades + set <- liftIO $ readTVarIO trades + pure $ S.member (tTradeNo trade) set + + addTradeToProcessed trade = do + trades <- asks processedTrades + liftIO $ atomically $ modifyTVar' trades (S.insert $ tTradeNo trade) fromTransaqTrade :: TradeNotification -> Order -> Maybe TickerInfo -> Trade diff --git a/src/TickerInfoServer.hs b/src/TickerInfoServer.hs index 250dc62..fef2088 100644 --- a/src/TickerInfoServer.hs +++ b/src/TickerInfoServer.hs @@ -14,7 +14,7 @@ module TickerInfoServer import ATrade.Logging (Message, Severity (Debug, Warning), logWith) -import ATrade.Types (Tick, TickerId, security) +import ATrade.Types (TickerId) import Colog (LogAction) import Control.Concurrent (ThreadId) import Control.Concurrent.STM (TVar, atomically, newTVarIO, @@ -23,20 +23,19 @@ import Control.Concurrent.STM.TVar (modifyTVar', writeTVar) import Control.Exception (bracket) import Control.Monad.Extra (whileM) import Data.Aeson (FromJSON (parseJSON), - ToJSON (toJSON), decode, - eitherDecode, encode, object, - withObject) + ToJSON (toJSON), eitherDecode, + encode, object, withObject) import Data.Aeson.Types ((.!=), (.:), (.:?), (.=)) import qualified Data.ByteString.Lazy as BL import Data.List.NonEmpty (NonEmpty ((:|))) import qualified Data.Map.Strict as M import qualified Data.Text as T -import Data.Text.Encoding (decodeUtf8With, encodeUtf8) +import Data.Text.Encoding (decodeUtf8With) import Data.Text.Encoding.Error (replace) import Prelude hiding (log) import SlaveThread (fork) import System.ZMQ4 (Context, Router (Router), bind, - connect, receiveMulti, sendMulti, + receiveMulti, sendMulti, withSocket) data TickerInfo = diff --git a/src/Transaq.hs b/src/Transaq.hs index 9874413..1bd78f4 100644 --- a/src/Transaq.hs +++ b/src/Transaq.hs @@ -1,6 +1,5 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE DeriveAnyClass #-} -{-# LANGUAGE DeriveFunctor #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DerivingStrategies #-} {-# LANGUAGE DuplicateRecordFields #-} @@ -76,36 +75,27 @@ module Transaq import Barbies import Barbies.Bare -import Control.Applicative ((<|>)) -import Control.Error.Util (hush) -import Control.Monad (void, when) -import Control.Monad.State (State (..), gets, modify) -import Control.Monad.State.Class (MonadState (..)) -import Data.Attoparsec.Text (Parser, char, decimal, many', - maybeResult, parse, parseOnly, - skipSpace) -import qualified Data.ByteString as BS -import Data.ByteString.Char8 (readInteger) -import Data.Decimal (DecimalRaw (..)) -import Data.Functor.Identity (Identity (..)) -import Data.Int (Int64) -import Data.Maybe (catMaybes, fromMaybe, mapMaybe, - maybeToList) -import qualified Data.Text as T -import Data.Text.Encoding (decodeUtf8With) -import Data.Text.Encoding.Error (lenientDecode) -import Data.Time (fromGregorian) -import Data.Time.Clock (UTCTime (UTCTime)) +import Control.Applicative ((<|>)) +import Control.Error.Util (hush) +import Control.Monad (void) +import Data.Attoparsec.Text (Parser, char, decimal, many', parseOnly, + skipSpace) +import Data.Decimal (DecimalRaw (..)) +import Data.Functor.Identity (Identity (..)) +import Data.Int (Int64) +import Data.Maybe (catMaybes, fromMaybe, mapMaybe, + maybeToList) +import qualified Data.Text as T +import Data.Time (fromGregorian) +import Data.Time.Clock (UTCTime (UTCTime)) import GHC.Generics -import Text.Read (readMaybe) -import Text.XML.Light (Attr (..), CData (cdData), - Element (elName), Node (..), - QName (..), elChildren, findAttr, - findChild, onlyText, strContent, - unode) -import Text.XML.Light.Output (showElement) -import Text.XML.Light.Types (Element (elContent), blank_name) -import Xeno.SAX (Process (..)) +import Text.Read (readMaybe) +import Text.XML.Light (Attr (..), CData (cdData), + Element (elName), Node (..), QName (..), + elChildren, findAttr, findChild, + onlyText, strContent, unode) +import Text.XML.Light.Output (showElement) +import Text.XML.Light.Types (Element (elContent), blank_name) data Language = LanguageRu | LanguageEn deriving (Show, Eq, Ord) @@ -483,7 +473,7 @@ instance TransaqResponseC Element (ResponseCandlesB Bare f) where } :: CandleB Bare f) instance TransaqResponseC T.Text (ResponseCandlesB Bare f) where - fromXml txt = undefined + fromXml _ = undefined data ConnectionState = Connected diff --git a/src/Transaq/Parsing.hs b/src/Transaq/Parsing.hs index 8b3cffd..bb21a39 100644 --- a/src/Transaq/Parsing.hs +++ b/src/Transaq/Parsing.hs @@ -1,12 +1,10 @@ {-# LANGUAGE BangPatterns #-} {-# LANGUAGE DeriveGeneric #-} {-# LANGUAGE DerivingStrategies #-} -{-# LANGUAGE DisambiguateRecordFields #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleInstances #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} -{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE OverloadedRecordDot #-} {-# LANGUAGE RecordWildCards #-} {-# LANGUAGE ScopedTypeVariables #-} @@ -57,9 +55,8 @@ import GHC.Generics (Generic) import Text.Megaparsec (MonadParsec (takeWhileP), Parsec (..), ParsecT, anySingle, customFailure, lookAhead, oneOf, - parse, runParserT, satisfy, single, - try, unexpected, (<|>)) -import Text.Megaparsec (optional) + optional, parse, runParserT, satisfy, + single, try, unexpected, (<|>)) import qualified Text.Megaparsec.Error as ME import Text.Megaparsec.Stream (Stream (..)) import Text.Read (readMaybe) @@ -444,7 +441,7 @@ parseTransaqResponses :: BS.ByteString -> [TransaqResponse] parseTransaqResponses bs = let stream = filter (not . isWhitespaceText) . reverse $ execState (unParsingContext $ process defaultProcess bs) [] in case runST $ runParserT (many txmlParserWrapper) "" stream of - Left err -> [] + Left _ -> [] Right result -> catMaybes result where txmlParserWrapper = (Just <$> txmlParser) <|> (skipTag >> pure Nothing) @@ -495,15 +492,15 @@ txmlParser = do parseResult refResult = do attr <- takeWhileP Nothing isAttr mapM_ (parseResultAttr refResult) attr - void . single $ (XmlOpenEnd "result") + void . single $ XmlOpenEnd "result" t <- anySingle case t of XmlOpen "message" -> do _ <- takeWhileP Nothing isAttr - void . single $ (XmlOpenEnd "message") + void . single $ XmlOpenEnd "message" (XmlText txt) <- satisfy isText - void . single $ (XmlClose "message") - void . single $ (XmlClose "result") + void . single $ XmlClose "message" + void . single $ XmlClose "result" return . TransaqResponseResult $ ResponseFailure txt XmlClose "result" -> do maybeRes <- lift $ readSTRef refResult @@ -515,9 +512,9 @@ txmlParser = do parseResultAttr refResult (XmlAttr "success" "true") = lift $ writeSTRef refResult (Just $ ResponseSuccess Nothing) parseResultAttr refResult (XmlAttr "success" "false") = lift $ writeSTRef refResult (Just $ ResponseFailure "") parseResultAttr refResult attr@(XmlAttr "transactionid" trIdStr) = do - case (readMaybe (T.unpack trIdStr)) :: Maybe Int64 of - t@(Just trId) -> lift $ writeSTRef refResult (Just $ ResponseSuccess t) - Nothing -> unexpected $ ME.Tokens $ NE.singleton attr + case readMaybe (T.unpack trIdStr) :: Maybe Int64 of + t@(Just _) -> lift $ writeSTRef refResult (Just $ ResponseSuccess t) + Nothing -> unexpected $ ME.Tokens $ NE.singleton attr parseResultAttr _ _ = return () parseClient :: STRef s ClientDataPartial -> ParsecT String [XmlStreamEvent] (ST s) TransaqResponse diff --git a/transaq-connector.cabal b/transaq-connector.cabal index c09eb1e..ccc4f86 100644 --- a/transaq-connector.cabal +++ b/transaq-connector.cabal @@ -39,7 +39,7 @@ executable transaq-connector build-depends: base >= 4.7 && < 5 , dhall , eventcounters - , libatrade == 0.16.0.0 + , libatrade == 0.17.0.0 , text , transformers , co-log @@ -79,7 +79,7 @@ executable transaq-connector -Wmissing-home-modules -Wpartial-fields -Wredundant-constraints - -threaded -rtsopts -with-rtsopts=-N + -threaded -rtsopts if os(windows) extra-lib-dirs: lib extra-libraries: txmlconnector64 @@ -162,7 +162,7 @@ benchmark parsing-benchmark , criterion , dhall , eventcounters - , libatrade == 0.15.0.0 + , libatrade == 0.17.0.0 , text , transformers , co-log