Browse Source

emit statistics to statsd

master
Denis Tereshkin 2 years ago
parent
commit
15949922d3
  1. 2
      src/Config.hs
  2. 22
      src/Main.hs
  3. 15
      src/TXMLConnector/Internal.hs
  4. 9
      src/TickerInfoServer.hs
  5. 1
      stack.yaml
  6. 4
      transaq-connector.cabal

2
src/Config.hs

@ -35,6 +35,8 @@ data TransaqConnectorConfig = TransaqConnectorConfig {
tradesinkDashboard :: T.Text, tradesinkDashboard :: T.Text,
gotifyUri :: T.Text, gotifyUri :: T.Text,
gotifyToken :: T.Text, gotifyToken :: T.Text,
statsdHost :: T.Text,
statsdPort :: Int,
allTradesSubscriptions :: [SubscriptionConfig], allTradesSubscriptions :: [SubscriptionConfig],
quotationsSubscriptions :: [SubscriptionConfig], quotationsSubscriptions :: [SubscriptionConfig],
quotesSubscriptions :: [SubscriptionConfig] quotesSubscriptions :: [SubscriptionConfig]

22
src/Main.hs

@ -18,20 +18,27 @@ import Colog (LogAction, cfilter,
import Colog.Actions (logTextHandle) import Colog.Actions (logTextHandle)
import Config (TransaqConnectorConfig (..), import Config (TransaqConnectorConfig (..),
loadConfig) loadConfig)
import Control.Concurrent (threadDelay) import Control.Concurrent (killThread,
threadDelay)
import Control.Concurrent.BoundedChan (newBoundedChan) import Control.Concurrent.BoundedChan (newBoundedChan)
import Control.Exception (bracket) import Control.Exception (bracket)
import Control.Monad (forever, void) import Control.Monad (forever, void)
import Control.Monad.IO.Class (MonadIO) import Control.Monad.IO.Class (MonadIO)
import qualified Data.Text as T import qualified Data.Text as T
import Data.Version (showVersion) import Data.Version (showVersion)
import Debug.EventCounters (initEventCounters) import Debug.EventCounters (emitEvent,
initEventCounters)
import HistoryProviderServer (withHistoryProviderServer) import HistoryProviderServer (withHistoryProviderServer)
import Network.URI (parseURI) import Network.URI (parseURI)
import Prelude hiding (log) import Prelude hiding (log)
import System.IO (Handle, import System.IO (Handle,
IOMode (AppendMode), IOMode (AppendMode),
withFile) withFile)
import System.Metrics (newStore)
import System.Remote.Monitoring.Statsd (StatsdOptions (..),
defaultStatsdOptions,
forkStatsd,
statsdThreadId)
import System.ZMQ4 (withContext) import System.ZMQ4 (withContext)
import TickerInfoServer (withTickerInfoServer) import TickerInfoServer (withTickerInfoServer)
import qualified TXMLConnector as Connector import qualified TXMLConnector as Connector
@ -49,8 +56,11 @@ parseLoglevel _ = Trace
main :: IO () main :: IO ()
main = do main = do
initEventCounters
cfg <- loadConfig "transaq-connector.dhall" cfg <- loadConfig "transaq-connector.dhall"
store <- newStore
initEventCounters store
statsdThread <- forkStatsd (defaultStatsdOptions { host = statsdHost cfg, port = statsdPort cfg, prefix = "transaq_connector"}) store
let loglevel = parseLoglevel (logLevel cfg) let loglevel = parseLoglevel (logLevel cfg)
withFile "transaq-connector.log" AppendMode $ \logH -> do withFile "transaq-connector.log" AppendMode $ \logH -> do
let logger = mkLogger logH loglevel let logger = mkLogger logH loglevel
@ -86,7 +96,9 @@ main = do
log Info "main" "Stopping TXMLConnector" log Info "main" "Stopping TXMLConnector"
Connector.stop txml) $ \_ -> do Connector.stop txml) $ \_ -> do
withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml tisH logger id $ \_ -> do withHistoryProviderServer ctx (historyProviderEndpoint cfg) txml tisH logger id $ \_ -> do
forever $ threadDelay 1000000 forever $ do
threadDelay 200000
emitEvent "main_loop"
log Info "main" "Shutting down" log Info "main" "Shutting down"
killThread $ statsdThreadId statsdThread

15
src/TXMLConnector/Internal.hs

@ -114,7 +114,7 @@ import GHC.Exts (IsList (..))
import Prelude hiding (log) import Prelude hiding (log)
import TickerInfoServer (TickerInfo (..), import TickerInfoServer (TickerInfo (..),
TickerInfoServerHandle, TickerInfoServerHandle,
putTickerInfo) getTickerInfo, putTickerInfo)
import qualified Transaq import qualified Transaq
import qualified TXML import qualified TXML
@ -312,20 +312,26 @@ handleTransaqData transaqData = do
Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of
Just oid -> case M.lookup oid orderMap of Just oid -> case M.lookup oid orderMap of
Just order -> do Just order -> do
let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order) tisH <- asks tisHandle
let tickerId' = tBoard transaqTrade <> "#" <> tSecCode transaqTrade
maybeTickerInfo <- liftIO $ getTickerInfo tickerId' tisH
let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order maybeTickerInfo)
log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif
liftIO $ cb notif liftIO $ cb notif
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade
Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for trade notification!" Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for trade notification!"
fromTransaqTrade transaqTrade order = fromTransaqTrade transaqTrade order maybeTickerInfo =
let vol = case maybeTickerInfo of
Just tickerInfo -> (tPrice transaqTrade / tiTickSize tickerInfo * tiTickPrice tickerInfo)
Nothing -> tPrice transaqTrade in
Trade Trade
{ {
tradeOrderId = orderId order tradeOrderId = orderId order
, tradePrice = fromDouble (tPrice transaqTrade) , tradePrice = fromDouble (tPrice transaqTrade)
, tradeQuantity = fromIntegral $ tQuantity transaqTrade , tradeQuantity = fromIntegral $ tQuantity transaqTrade
, tradeVolume = fromDouble $ tValue transaqTrade , tradeVolume = fromDouble vol
, tradeVolumeCurrency = "" , tradeVolumeCurrency = ""
, tradeOperation = fromDirection (tBuysell transaqTrade) , tradeOperation = fromDirection (tBuysell transaqTrade)
, tradeAccount = tClient transaqTrade <> "#" <> tUnion transaqTrade , tradeAccount = tClient transaqTrade <> "#" <> tUnion transaqTrade
@ -633,6 +639,7 @@ securityToTickerInfo sec =
tiTicker = sBoard sec <> "#" <> sSeccode sec tiTicker = sBoard sec <> "#" <> sSeccode sec
, tiLotSize = sLotSize sec , tiLotSize = sLotSize sec
, tiTickSize = sMinStep sec , tiTickSize = sMinStep sec
, tiTickPrice = sPointCost sec
} }
parseSecurityId :: TickerId -> Maybe SecurityId parseSecurityId :: TickerId -> Maybe SecurityId

9
src/TickerInfoServer.hs

@ -26,7 +26,7 @@ import Data.Aeson (FromJSON (parseJSON),
ToJSON (toJSON), decode, ToJSON (toJSON), decode,
eitherDecode, encode, object, eitherDecode, encode, object,
withObject) withObject)
import Data.Aeson.Types ((.:), (.=)) import Data.Aeson.Types ((.!=), (.:), (.:?), (.=))
import qualified Data.ByteString.Lazy as BL import qualified Data.ByteString.Lazy as BL
import Data.List.NonEmpty (NonEmpty ((:|))) import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
@ -44,6 +44,7 @@ data TickerInfo =
tiTicker :: TickerId tiTicker :: TickerId
, tiLotSize :: Int , tiLotSize :: Int
, tiTickSize :: Double , tiTickSize :: Double
, tiTickPrice :: Double
} deriving (Show, Eq, Ord) } deriving (Show, Eq, Ord)
instance FromJSON TickerInfo where instance FromJSON TickerInfo where
@ -51,12 +52,14 @@ instance FromJSON TickerInfo where
TickerInfo <$> TickerInfo <$>
obj .: "ticker" <*> obj .: "ticker" <*>
obj .: "lot_size" <*> obj .: "lot_size" <*>
obj .: "tick_size") obj .: "tick_size" <*>
obj .:? "tick_price" .!= 1)
instance ToJSON TickerInfo where instance ToJSON TickerInfo where
toJSON ti = object [ "ticker" .= tiTicker ti, toJSON ti = object [ "ticker" .= tiTicker ti,
"lot_size" .= tiLotSize ti, "lot_size" .= tiLotSize ti,
"tick_size" .= tiTickSize ti ] "tick_size" .= tiTickSize ti,
"tick_price" .= tiTickPrice ti]
newtype TickerInfoRequest = newtype TickerInfoRequest =
TickerInfoRequest TickerInfoRequest

1
stack.yaml

@ -33,6 +33,7 @@ packages:
- ../libatrade - ../libatrade
- ../zeromq4-haskell-zap - ../zeromq4-haskell-zap
- ../eventcounters - ../eventcounters
- ../ekg-statsd
# Dependency packages to be pulled from upstream that are not in the resolver. # Dependency packages to be pulled from upstream that are not in the resolver.
# These entries can reference officially published versions as well as # These entries can reference officially published versions as well as

4
transaq-connector.cabal

@ -57,6 +57,8 @@ executable transaq-connector
, bimap , bimap
, deque , deque
, network-uri , network-uri
, ekg-statsd
, ekg-core
extra-lib-dirs: lib extra-lib-dirs: lib
ghc-options: -Wall ghc-options: -Wall
-Wcompat -Wcompat
@ -115,6 +117,8 @@ test-suite transaq-connector-test
, bimap , bimap
, deque , deque
, network-uri , network-uri
, ekg-statsd
, ekg-core
default-extensions: OverloadedStrings default-extensions: OverloadedStrings
, MultiWayIf , MultiWayIf
, MultiParamTypeClasses , MultiParamTypeClasses

Loading…
Cancel
Save