diff --git a/app/Config.hs b/app/Config.hs index e2d2f80..3d94802 100644 --- a/app/Config.hs +++ b/app/Config.hs @@ -23,22 +23,23 @@ data TableConfig = TableConfig { } deriving (Show) data Config = Config { - quotesourceEndpoint :: String, - qtisEndpoint :: String, - pipeReaderQsEndpoint :: Maybe String, - tickPipePath :: Maybe String, - brokerserverEndpoint :: String, - whitelist :: [T.Text], - blacklist :: [T.Text], - brokerServerCertPath :: Maybe FilePath, - brokerClientCertificateDir :: Maybe FilePath, - tables :: [TableConfig], - quikPath :: String, - dllPath :: String, - quikAccounts :: [T.Text], - tradeSink :: T.Text, - tradeSink2 :: T.Text, - commissions :: [CommissionConfig] + quotesourceEndpoint :: String, + qtisEndpoint :: String, + pipeReaderQsEndpoint :: Maybe String, + tickPipePath :: Maybe String, + brokerserverEndpoint :: String, + brokerNotificationsEndpoint :: String, + whitelist :: [T.Text], + blacklist :: [T.Text], + brokerServerCertPath :: Maybe FilePath, + brokerClientCertificateDir :: Maybe FilePath, + tables :: [TableConfig], + quikPath :: String, + dllPath :: String, + quikAccounts :: [T.Text], + tradeSink :: T.Text, + tradeSink2 :: T.Text, + commissions :: [CommissionConfig] } deriving (Show) readConfig :: String -> IO Config @@ -55,6 +56,7 @@ parseConfig = withObject "object" $ \obj -> do qsePipe <- obj .:? "quotesource-endpoint-pipe-reader" pipePath <- obj .:? "pipe-reader-path" bse <- obj .: "brokerserver-endpoint" + bsne <- obj .: "brokerserver-notifications-endpoint" whitelist' <- obj .:? "whitelist" .!= [] blacklist' <- obj .:? "blacklist" .!= [] serverCert <- obj .:? "broker_server_certificate" @@ -73,6 +75,7 @@ parseConfig = withObject "object" $ \obj -> do pipeReaderQsEndpoint = qsePipe, tickPipePath = pipePath, brokerserverEndpoint = bse, + brokerNotificationsEndpoint = bsne, whitelist = whitelist', blacklist = blacklist', brokerServerCertPath = serverCert, diff --git a/app/Main.hs b/app/Main.hs index ff8e649..d6f4a4e 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -14,8 +14,10 @@ import Control.Concurrent.BoundedChan import Control.Error.Util import Control.Exception.Safe import Control.Monad +import Control.Monad.IO.Class (MonadIO) import Data.GI.Base import qualified GI.Gtk as Gtk +import Prelude hiding (log) import QuoteSource.DataImport import QuoteSource.PipeReader import QuoteSource.TableParser @@ -27,10 +29,6 @@ import Broker.PaperBroker import Broker.QuikBroker import System.Directory -import System.Log.Formatter -import System.Log.Handler (setFormatter) -import System.Log.Handler.Simple -import System.Log.Logger import System.Timeout import System.ZMQ4 import System.ZMQ4.ZAP @@ -41,6 +39,13 @@ import Data.Version import ATrade (libatrade_gitrev, libatrade_version) +import ATrade.Logging (Message, Severity (Debug, Info, Warning), + fmtMessage, + logWith) +import Colog (LogAction, + logTextStdout, + (>$<)) +import Colog.Actions (logTextHandle) import Config import TickTable (mkTickTable) import Version @@ -58,83 +63,87 @@ forkBoundedChan size sourceChan = do return (tid, sink1, sink2, sinkQss) - -initLogging :: IO () -initLogging = do - handler <- streamHandler stderr DEBUG >>= - (\x -> return $ - setFormatter x (simpleLogFormatter "$utcTime\t {$loggername} <$prio> -> $msg")) - fhandler <- fileHandler "quik-connector.log" DEBUG >>= - (\x -> return $ - setFormatter x (simpleLogFormatter "$utcTime\t {$loggername} <$prio> -> $msg")) - - hSetBuffering stderr LineBuffering - updateGlobalLogger rootLoggerName (setLevel DEBUG) - updateGlobalLogger rootLoggerName (setHandlers [handler, fhandler]) +mkLogger :: (MonadIO m) => Handle -> LogAction m Message +mkLogger h = fmtMessage >$< (logTextStdout <> logTextHandle h) main :: IO () main = do - initLogging - infoM "main" $ "Starting quik-connector-" ++ T.unpack quikConnectorVersionText ++ "; libatrade-" ++ showVersion libatrade_version ++ "(" ++ libatrade_gitrev ++ ")" - infoM "main" "Loading config" - config <- readConfig "quik-connector.config.json" - - infoM "main" "Config loaded" - chan <- newBoundedChan 10000 - infoM "main" "Starting data import server" - _ <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade" - - (forkId, c0, c1, c2) <- forkBoundedChan 10000 chan - - withContext (\ctx -> do - tickTable <- mkTickTable c0 ctx (T.pack $ qtisEndpoint config) - brokerQ <- mkQuikBroker tickTable (dllPath config) (quikPath config) (quikAccounts config) (commissions config) - brokerP <- mkPaperBroker tickTable c1 1000000 ["demo"] (commissions config) - withZapHandler ctx (\zap -> do - zapSetWhitelist zap "global" $ whitelist config - zapSetBlacklist zap "global" $ blacklist config - - case brokerClientCertificateDir config of - Just certFile -> do - certs <- loadCertificatesFromDirectory certFile - forM_ certs (\cert -> zapAddClientCertificate zap "global" cert) - Nothing -> return () - - serverCert <- case brokerServerCertPath config of - Just certFile -> do - eitherCert <- loadCertificateFromFile certFile - case eitherCert of - Left errorMessage -> do - warningM "main" $ "Unable to load server certificate: " ++ errorMessage - return Nothing - Right cert -> return $ Just cert - Nothing -> return Nothing - let serverParams = defaultServerSecurityParams { sspDomain = Just "global", - sspCertificate = serverCert } - - bracket (forkIO $ pipeReaderThread ctx config c2) killThread (\_ -> do - withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do - withZMQTradeSink ctx (tradeSink2 config) (\zmqTradeSink2 -> do - bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) quoteSourceServerSecurityParams) stopQuoteSourceServer (\_ -> do - bracket (startBrokerServer [brokerP, brokerQ] ctx (T.pack $ brokerserverEndpoint config) [zmqTradeSink2, zmqTradeSink] serverParams) stopBrokerServer (\_ -> do - void $ Gtk.init Nothing - window <- new Gtk.Window [ #title := "Quik connector" ] - void $ on window #destroy Gtk.mainQuit - #showAll window - Gtk.main) - infoM "main" "BRS down") - debugM "main" "QS done") - debugM "main" "TGTS done") - debugM "main" "ZMQTS done") - debugM "main" "ZAP done")) - void $ timeout 1000000 $ killThread forkId - infoM "main" "Main thread done" + withFile "quik-connector.log" AppendMode $ \logH -> do + let logger = mkLogger logH + let log = (logWith logger) + log Info "main" $ "Starting quik-connector-" <> + quikConnectorVersionText <> + "; libatrade-" <> + (T.pack . showVersion) libatrade_version <> + "(" <> + T.pack libatrade_gitrev <> + ")" + log Info "main" "Loading config" + config <- readConfig "quik-connector.config.json" + + log Info "main" "Config loaded" + chan <- newBoundedChan 10000 + log Info "main" "Starting data import server" + _ <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade" + + (forkId, c0, c1, c2) <- forkBoundedChan 10000 chan + + withContext (\ctx -> do + tickTable <- mkTickTable c0 ctx (T.pack $ qtisEndpoint config) + brokerQ <- mkQuikBroker tickTable (dllPath config) (quikPath config) (quikAccounts config) (commissions config) logger + brokerP <- mkPaperBroker tickTable c1 1000000 ["demo"] (commissions config) logger + withZapHandler ctx (\zap -> do + zapSetWhitelist zap "global" $ whitelist config + zapSetBlacklist zap "global" $ blacklist config + + case brokerClientCertificateDir config of + Just certFile -> do + certs <- loadCertificatesFromDirectory certFile + forM_ certs (\cert -> zapAddClientCertificate zap "global" cert) + Nothing -> return () + + serverCert <- case brokerServerCertPath config of + Just certFile -> do + eitherCert <- loadCertificateFromFile certFile + case eitherCert of + Left errorMessage -> do + log Warning "main" $ "Unable to load server certificate: " <> T.pack errorMessage + return Nothing + Right cert -> return $ Just cert + Nothing -> return Nothing + let serverParams = defaultServerSecurityParams { sspDomain = Just "global", + sspCertificate = serverCert } + + bracket (forkIO $ pipeReaderThread ctx config c2 logger) killThread (\_ -> do + withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do + withZMQTradeSink ctx (tradeSink2 config) (\zmqTradeSink2 -> do + bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) quoteSourceServerSecurityParams) stopQuoteSourceServer (\_ -> do + bracket (startBrokerServer + [brokerP, brokerQ] + ctx + (T.pack $ brokerserverEndpoint config) + (T.pack $ brokerNotificationsEndpoint config) + [zmqTradeSink2, zmqTradeSink] + serverParams + logger) stopBrokerServer (\_ -> do + void $ Gtk.init Nothing + window <- new Gtk.Window [ #title := "Quik connector" ] + void $ on window #destroy Gtk.mainQuit + #showAll window + Gtk.main) + log Info "main" "BRS down") + log Debug "main" "QS done") + log Debug "main" "TGTS done") + log Debug "main" "ZMQTS done") + log Debug "main" "ZAP done")) + void $ timeout 1000000 $ killThread forkId + log Info "main" "Main thread done" where - pipeReaderThread ctx config qsdataChan = + pipeReaderThread ctx config qsdataChan logger = case pipeReaderQsEndpoint config of Just qsep -> do - infoM "main" $ "QS: " ++ qsep - bracket (startPipeReader ctx (T.pack qsep) qsdataChan) stopPipeReader (\_ -> forever $ threadDelay 1000000) + logWith logger Info "main" $ "QS: " <> T.pack qsep + bracket (startPipeReader ctx (T.pack qsep) qsdataChan logger) stopPipeReader (\_ -> forever $ threadDelay 1000000) _ -> return () quoteSourceServerSecurityParams = defaultServerSecurityParams { sspDomain = Just "global" } diff --git a/quik-connector.cabal b/quik-connector.cabal index 16d7c11..8ca30b2 100644 --- a/quik-connector.cabal +++ b/quik-connector.cabal @@ -42,14 +42,13 @@ library , transformers , datetime , BoundedChan - , hslogger , zeromq4-haskell , hashable , unordered-containers , aeson , cond , scientific - , libatrade >= 0.9 && < 0.10 + , libatrade >= 0.12 && < 0.13 , deepseq , errors , split @@ -63,13 +62,15 @@ library , http-client-tls , utf8-string , connection - , text-format , monad-loops , extra , incremental-parser , attoparsec , safe-exceptions , iconv + , th-printf + , co-log + , co-log-core default-language: Haskell2010 -- extra-libraries: "user32" other-modules: System.Win32.XlParser @@ -86,7 +87,6 @@ executable quik-connector-exe , haskell-gi-base , gi-gtk , BoundedChan - , hslogger , aeson , bytestring , unordered-containers @@ -106,6 +106,8 @@ executable quik-connector-exe , safe-exceptions , iconv , th-printf + , co-log + , co-log-core default-language: Haskell2010 other-modules: Config , Version diff --git a/src/Broker/PaperBroker.hs b/src/Broker/PaperBroker.hs index 9fe0a20..b721917 100644 --- a/src/Broker/PaperBroker.hs +++ b/src/Broker/PaperBroker.hs @@ -1,4 +1,5 @@ {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} {-# LANGUAGE Strict #-} module Broker.PaperBroker ( @@ -6,10 +7,15 @@ module Broker.PaperBroker ( mkPaperBroker ) where +import ATrade.Broker.Backend import ATrade.Broker.Protocol import ATrade.Broker.Server +import ATrade.Logging (Message, Severity (..), + logWith) import ATrade.Quotes.QTIS import ATrade.Types +import Colog (LogAction) +import Commissions (CommissionConfig (..)) import Control.Concurrent hiding (readChan, writeChan) import Control.Concurrent.BoundedChan import Control.Monad @@ -20,11 +26,10 @@ import qualified Data.List as L import qualified Data.Map.Strict as M import Data.Maybe import qualified Data.Text as T +import qualified Data.Text.Lazy as TL import Data.Time.Clock -import System.Log.Logger +import Language.Haskell.Printf (t) import System.ZMQ4 - -import Commissions (CommissionConfig (..)) import TickTable (TickKey (..), TickTableH, getTick, getTickerInfo) @@ -33,7 +38,7 @@ data PaperBrokerState = PaperBrokerState { tickTable :: TickTableH, orders :: M.Map OrderId Order, cash :: !Price, - notificationCallback :: Maybe (Notification -> IO ()), + notificationCallback :: Maybe (BrokerBackendNotification -> IO ()), pendingOrders :: [Order], fortsClassCodes :: [T.Text], @@ -45,14 +50,15 @@ data PaperBrokerState = PaperBrokerState { postMarketStartTime :: DiffTime, postMarketFixTime :: DiffTime, postMarketCloseTime :: DiffTime, - commissions :: [CommissionConfig] + commissions :: [CommissionConfig], + logger :: LogAction IO Message } hourMin :: Integer -> Integer -> DiffTime hourMin h m = fromIntegral $ h * 3600 + m * 60 -mkPaperBroker :: TickTableH -> BoundedChan Tick -> Price -> [T.Text] -> [CommissionConfig] -> IO BrokerInterface -mkPaperBroker tickTableH tickChan startCash accounts comms = do +mkPaperBroker :: TickTableH -> BoundedChan Tick -> Price -> [T.Text] -> [CommissionConfig] -> LogAction IO Message -> IO BrokerBackend +mkPaperBroker tickTableH tickChan startCash accounts comms l = do state <- newIORef PaperBrokerState { pbTid = Nothing, tickTable = tickTableH, @@ -68,18 +74,19 @@ mkPaperBroker tickTableH tickChan startCash accounts comms = do postMarketStartTime = hourMin 15 40, postMarketFixTime = hourMin 15 45, postMarketCloseTime = hourMin 15 50, - commissions = comms + commissions = comms, + logger = l } tid <- forkIO $ brokerThread tickChan state atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ())) - return BrokerInterface { + return BrokerBackend { accounts = accounts, setNotificationCallback = pbSetNotificationCallback state, submitOrder = pbSubmitOrder state, - cancelOrder = pbCancelOrder state, - stopBroker = pbDestroyBroker state } + cancelOrder = void . pbCancelOrder state, + stop = pbDestroyBroker state } brokerThread :: BoundedChan Tick -> IORef PaperBrokerState -> IO () @@ -101,7 +108,7 @@ executePendingOrders tick state = do then case orderPrice order of Market -> do - debugM "PaperBroker" "Executing: pending market order" + log Debug "PaperBroker" "Executing: pending market order" executeAtTick state order tick return $ Just $ orderId order Limit price -> @@ -109,22 +116,27 @@ executePendingOrders tick state = do _ -> return Nothing else return Nothing + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt + executeLimitAt price order = case orderOperation order of - Buy -> if (datatype tick == LastTradePrice && price > value tick && value tick > 0) || (datatype tick == BestOffer && price > value tick && value tick > 0) + Buy -> if (datatype tick == LastTradePrice && price > value tick && value tick > 0) || + (datatype tick == BestOffer && price > value tick && value tick > 0) then do - debugM "PaperBroker" $ "[1]Executing: pending limit order: " ++ show (security tick) ++ "/" ++ show (orderSecurity order) + log Debug "PaperBroker" $ TL.toStrict $ [t|[1]Executing: pending limit order: %Q/%Q|] (security tick) (orderSecurity order) executeAtTick state order $ tick { value = price } return $ Just $ orderId order else return Nothing Sell -> if (datatype tick == LastTradePrice && price < value tick && value tick > 0) || (datatype tick == BestBid && price < value tick && value tick > 0) then do - debugM "PaperBroker" $ "[2]Executing: pending limit order: " ++ show (security tick) ++ "/" ++ show (orderSecurity order) + log Debug "PaperBroker" $ TL.toStrict $ [t|[2]Executing: pending limit order: %Q/%Q|] (security tick) (orderSecurity order) executeAtTick state order $ tick { value = price } return $ Just $ orderId order else return Nothing -pbSetNotificationCallback :: IORef PaperBrokerState -> Maybe (Notification -> IO ()) -> IO() +pbSetNotificationCallback :: IORef PaperBrokerState -> Maybe (BrokerBackendNotification -> IO ()) -> IO() pbSetNotificationCallback state callback = atomicModifyIORef' state (\s -> (s { notificationCallback = callback }, ()) ) mkTrade :: TickerInfo -> Tick -> Order -> UTCTime -> Maybe CommissionConfig -> Trade @@ -157,10 +169,10 @@ executeAtTick state order tick = do comm <- L.find (\comdef -> comPrefix comdef `T.isPrefixOf` security tick) . commissions <$> readIORef state let tradeVolume = fromInteger (orderQuantity order) * value tick * fromInteger (tiLotSize tickerInfo) atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume}, ())) - debugM "PaperBroker" $ "Executed: " ++ show newOrder ++ "; at tick: " ++ show tick + log Debug "PaperBroker" $ TL.toStrict $ [t|Executed: %? at tick: %?|] newOrder tick ts <- getCurrentTime - maybeCall notificationCallback state $ TradeNotification $ mkTrade tickerInfo tick order ts comm - maybeCall notificationCallback state $ OrderNotification (orderId order) Executed + maybeCall notificationCallback state $ BackendTradeNotification $ mkTrade tickerInfo tick order ts comm + maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Executed where obtainTickerInfo tickerId = do table <- tickTable <$> readIORef state @@ -170,16 +182,20 @@ executeAtTick state order tick = do _ -> return TickerInfo { tiTicker = tickerId, tiLotSize = 1, tiTickSize = 1 } + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt + rejectOrder state order = do let newOrder = order { orderState = Rejected } in atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ())) - maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted - maybeCall notificationCallback state $ OrderNotification (orderId order) Rejected + maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted + maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Rejected pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO () pbSubmitOrder state order = do - infoM "PaperBroker" $ "Submitted order: " ++ show order + log Info "PaperBroker" $ "Submitted order: " <> (T.pack . show) order case orderPrice order of Market -> executeMarketOrder state order Limit price -> submitLimitOrder price state order @@ -187,6 +203,9 @@ pbSubmitOrder state order = do StopMarket trigger -> submitStopMarketOrder state order where + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt executeMarketOrder state order = do tm <- tickTable <$> readIORef state tickMb <- getTick tm key @@ -200,25 +219,26 @@ pbSubmitOrder state order = do else do tm <- tickTable <$> readIORef state tickMb <- getTick tm key - debugM "PaperBroker" $ "Limit order submitted, looking up: " ++ show key + log Debug "PaperBroker" $ "Limit order submitted, looking up: " <> (T.pack . show) key case tickMb of Nothing -> do let newOrder = order { orderState = Submitted } atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ())) - maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted + maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted Just tick -> do marketOpenTime' <- marketOpenTime <$> readIORef state - if (((orderOperation order == Buy) && (value tick < price)) || ((orderOperation order == Sell) && (value tick > price)) && (utctDayTime (timestamp tick) >= marketOpenTime')) + if (((orderOperation order == Buy) && (value tick < price)) || + ((orderOperation order == Sell) && (value tick > price)) && (utctDayTime (timestamp tick) >= marketOpenTime')) then do - maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted + maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted executeAtTick state order tick else do let newOrder = order { orderState = Submitted } atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , pendingOrders = newOrder : pendingOrders s}, ())) - maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted + maybeCall notificationCallback state $ BackendOrderNotification (orderId order) Submitted - submitStopOrder _ _ = warningM "PaperBroker" $ "Not implemented: Submitted order: " ++ show order - submitStopMarketOrder _ _ = warningM "PaperBroker" $ "Not implemented: Submitted order: " ++ show order + submitStopOrder _ _ = log Warning "PaperBroker" $ "Not implemented: Submitted order: " <> (T.pack . show) order + submitStopMarketOrder _ _ = log Warning "PaperBroker" $ "Not implemented: Submitted order: " <> (T.pack . show) order orderDatatype = case orderOperation order of Buy -> BestOffer @@ -230,7 +250,7 @@ pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO Bool pbCancelOrder state oid = do atomicModifyIORef' state (\s -> (s { pendingOrders = L.filter (\o -> orderId o /= oid) (pendingOrders s), orders = M.adjustWithKey (\_ v -> v { orderState = Cancelled }) oid (orders s) }, ())) - maybeCall notificationCallback state $ OrderNotification oid Cancelled + maybeCall notificationCallback state $ BackendOrderNotification oid Cancelled return True pbDestroyBroker :: IORef PaperBrokerState -> IO () @@ -240,8 +260,3 @@ pbDestroyBroker state = do Just tid -> killThread tid Nothing -> return () -{- -pbGetOrder :: IORef PaperBrokerState -> OrderId -> IO (Maybe Order) -pbGetOrder state oid = M.lookup oid . orders <$> readIORef state --} - diff --git a/src/Broker/QuikBroker.hs b/src/Broker/QuikBroker.hs index 09d36c3..3bcf79d 100644 --- a/src/Broker/QuikBroker.hs +++ b/src/Broker/QuikBroker.hs @@ -1,49 +1,55 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE MultiWayIf #-} {-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE MultiWayIf #-} -{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE QuasiQuotes #-} module Broker.QuikBroker ( mkQuikBroker ) where -import ATrade.Types -import ATrade.Broker.Protocol -import ATrade.Broker.Server -import ATrade.Quotes.QTIS (TickerInfo(..)) +import ATrade.Broker.Backend +import ATrade.Broker.Protocol +import ATrade.Broker.Server +import ATrade.Quotes.QTIS (TickerInfo (..)) +import ATrade.Types -import Broker.QuikBroker.Trans2QuikApi hiding (tradeAccount) +import Broker.QuikBroker.Trans2QuikApi hiding (logger, tradeAccount) -import Data.IORef -import Data.List.Split -import qualified Data.List as L -import qualified Data.Map as M -import qualified Data.Bimap as BM -import qualified Data.Text as T -import qualified Data.Text.Lazy as TL -import Data.Text.Format +import qualified Data.Bimap as BM +import Data.IORef +import qualified Data.List as L +import Data.List.Split +import qualified Data.Map as M +import qualified Data.Text as T +import qualified Data.Text.Lazy as TL -import Control.Monad -import Control.Concurrent -import Control.Concurrent.BoundedChan -import Control.Monad.Trans.Except -import Control.Monad.IO.Class -import System.Log.Logger +import ATrade.Logging (Message, Severity (..), + logWith) +import Colog (LogAction) +import Control.Concurrent +import Control.Concurrent.BoundedChan +import Control.Monad +import Control.Monad.IO.Class +import Control.Monad.Trans.Except +import Language.Haskell.Printf (t) -import Safe +import Safe -import Commissions (CommissionConfig(..)) -import TickTable (TickTableH, getTick, getTickerInfo, TickKey(..)) +import Commissions (CommissionConfig (..)) +import TickTable (TickKey (..), TickTableH, + getTick, getTickerInfo) type QuikOrderId = Integer data QuikBrokerState = QuikBrokerState { - notificationCallback :: Maybe (Notification -> IO ()), - quik :: IORef Quik, - orderMap :: M.Map OrderId Order, - orderIdMap :: BM.Bimap QuikOrderId OrderId, - trans2orderid :: M.Map Integer Order, - transIdCounter :: Integer, - tickTable :: TickTableH + notificationCallback :: Maybe (BrokerBackendNotification -> IO ()), + quik :: IORef Quik, + orderMap :: M.Map OrderId Order, + orderIdMap :: BM.Bimap QuikOrderId OrderId, + trans2orderid :: M.Map Integer Order, + transIdCounter :: Integer, + tickTable :: TickTableH, + logger :: LogAction IO Message } nextTransId state = atomicModifyIORef' state (\s -> (s { transIdCounter = transIdCounter s + 1 }, transIdCounter s)) @@ -52,11 +58,11 @@ maybeCall proj state arg = do cb <- proj <$> readIORef state case cb of Just callback -> callback arg - Nothing -> return () + Nothing -> return () -mkQuikBroker :: TickTableH -> FilePath -> FilePath -> [T.Text] -> [CommissionConfig] -> IO BrokerInterface -mkQuikBroker tt dllPath quikPath accs comms = do - q <- mkQuik dllPath quikPath +mkQuikBroker :: TickTableH -> FilePath -> FilePath -> [T.Text] -> [CommissionConfig] -> LogAction IO Message -> IO BrokerBackend +mkQuikBroker tt dllPath quikPath accs comms l = do + q <- mkQuik dllPath quikPath l msgChan <- newBoundedChan 100 @@ -67,17 +73,18 @@ mkQuikBroker tt dllPath quikPath accs comms = do orderIdMap = BM.empty, trans2orderid = M.empty, transIdCounter = 1, - tickTable = tt + tickTable = tt, + logger = l } setCallbacks q (qbTransactionCallback state) (qbOrderCallback state) (qbTradeCallback state comms) - return BrokerInterface { + return BrokerBackend { accounts = accs, setNotificationCallback = qbSetNotificationCallback state, submitOrder = qbSubmitOrder state, - cancelOrder = qbCancelOrder state, - stopBroker = qbStopBroker state + cancelOrder = void . qbCancelOrder state, + stop = qbStopBroker state } qbSetNotificationCallback state maybecb = atomicModifyIORef' state (\s -> (s { @@ -88,24 +95,28 @@ qbSubmitOrder state order = do transId <- nextTransId state atomicModifyIORef' state (\s -> (s { trans2orderid = M.insert transId order (trans2orderid s) }, ())) - debugM "Quik" "Getting ticktable" + log Debug "Quik" "Getting ticktable" tt <- tickTable <$> readIORef state - debugM "Quik" "Getting tickerinfo from ticktable" + log Debug "Quik" "Getting tickerinfo from ticktable" tickerInfoMb <- getTickerInfo tt (orderSecurity order) - debugM "Quik" "Getting liquid ticks" + log Debug "Quik" "Getting liquid ticks" liquidTickMb <- getTick tt (TickKey (orderSecurity order) (if orderOperation order == Buy then BestOffer else BestBid)) - debugM "Quik" "Obtained" + log Debug "Quik" "Obtained" case (tickerInfoMb, liquidTickMb) of - (Just !tickerInfo, Just !liquidTick) -> + (Just !tickerInfo, Just !liquidTick) -> case makeTransactionString tickerInfo liquidTick transId order of Just transStr -> do rc <- quikSendTransaction q transStr - debugM "Quik" $ "Sending transaction string: " ++ transStr + log Debug "Quik" $ "Sending transaction string: " <> T.pack transStr case rc of - Left errmsg -> warningM "Quik" $ "Unable to send transaction: " ++ T.unpack errmsg - Right _ -> debugM "Quik" $ "Order submitted: " ++ show order - Nothing -> warningM "Quik" $ "Unable to compose transaction string: " ++ show order - _ -> warningM "Quik" $ "Unable to obtain data: " ++ show tickerInfoMb ++ "/" ++ show liquidTickMb + Left errmsg -> log Warning "Quik" $ "Unable to send transaction: " <> errmsg + Right _ -> log Debug "Quik" $ "Order submitted: " <> (T.pack . show) order + Nothing -> log Warning "Quik" $ "Unable to compose transaction string: " <> (T.pack . show) order + _ -> log Warning "Quik" $ TL.toStrict $ [t|Unable to obtain data: %?/%?|] tickerInfoMb liquidTickMb + where + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt qbCancelOrder state orderid = do @@ -118,10 +129,14 @@ qbCancelOrder state orderid = do Just transString -> do rc <- quikSendTransaction q transString case rc of - Left errmsg -> warningM "Quik" ("Unable to send transaction: " ++ T.unpack errmsg) >> return False - Right _ -> debugM "Quik" ("Order cancelled: " ++ show orderid) >> return True - Nothing -> warningM "Quik" ("Unable to compose transaction string: " ++ show orderid) >> return False - _ -> warningM "Quik" ("Got request to cancel unknown order: " ++ show orderid) >> return False + Left errmsg -> log Warning "Quik" ("Unable to send transaction: " <> errmsg) >> return False + Right _ -> log Debug "Quik" ("Order cancelled: " <> (T.pack . show) orderid) >> return True + Nothing -> log Warning "Quik" ("Unable to compose transaction string: " <> (T.pack . show) orderid) >> return False + _ -> log Warning "Quik" ("Got request to cancel unknown order: " <> (T.pack . show) orderid) >> return False + where + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt qbStopBroker state = return () @@ -139,11 +154,11 @@ makeTransactionString tickerInfo liquidTick transId order = _ -> Nothing where orderTypeCode = case orderPrice order of - Market -> "L" + Market -> "L" Limit _ -> "L" - _ -> "X" + _ -> "X" operationCode = case orderOperation order of - Buy -> "B" + Buy -> "B" Sell -> "S" classcode = headMay . splitOn "#" . T.unpack $ orderSecurity order seccode = (`atMay` 1) . splitOn "#" . T.unpack $ orderSecurity order @@ -179,7 +194,7 @@ qbTransactionCallback state success transactionId orderNum = do newOrder <- if success then registerOrder orderNum $ order { orderState = Unsubmitted } else registerOrder orderNum $ order { orderState = Rejected } - maybeCall notificationCallback state (OrderNotification (orderId newOrder) (orderState newOrder)) + maybeCall notificationCallback state (BackendOrderNotification (orderId newOrder) (orderState newOrder)) Nothing -> return () where @@ -190,7 +205,7 @@ qbTransactionCallback state success transactionId orderNum = do qbOrderCallback state quikorder = do orders <- orderMap <$> readIORef state idMap <- orderIdMap <$> readIORef state - debugM "Quik" $ "Order: " ++ show quikorder + log Debug "Quik" $ "Order: " <> (T.pack . show) quikorder case BM.lookup (qoOrderId quikorder) idMap >>= flip M.lookup orders of Just order -> do updatedOrder <- if | qoStatus quikorder /= 1 && qoStatus quikorder /= 2 -> @@ -201,8 +216,8 @@ qbOrderCallback state quikorder = do submitted order | qoStatus quikorder == 2 -> cancelled order - maybeCall notificationCallback state (OrderNotification (orderId updatedOrder) (orderState updatedOrder)) - Nothing -> warningM "Quik" $ "Unknown order: state callback called: " ++ show quikorder + maybeCall notificationCallback state (BackendOrderNotification (orderId updatedOrder) (orderState updatedOrder)) + Nothing -> log Warning "Quik" $ "Unknown order: state callback called: " <> (T.pack . show) quikorder where updateOrder :: Order -> IO Order @@ -214,15 +229,19 @@ qbOrderCallback state quikorder = do submitted order = updateOrder $ order { orderState = Submitted } cancelled order = updateOrder $ order { orderState = Cancelled } + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt + qbTradeCallback state comms quiktrade = do orders <- orderMap <$> readIORef state idMap <- orderIdMap <$> readIORef state - debugM "Quik" $ "Trade: " ++ show quiktrade + log Debug "Quik" $ "Trade: " <> (T.pack . show) quiktrade case BM.lookup (qtOrderId quiktrade) idMap >>= flip M.lookup orders of Just order -> do - debugM "Quik" $ "Found comm: " ++ show (L.find (\x -> comPrefix x `T.isPrefixOf` orderSecurity order) comms) - maybeCall notificationCallback state (TradeNotification $ tradeFor order) - Nothing -> warningM "Quik" $ "Incoming trade for unknown order: " ++ show quiktrade + log Debug "Quik" $ "Found comm: " <> (T.pack . show) (L.find (\x -> comPrefix x `T.isPrefixOf` orderSecurity order) comms) + maybeCall notificationCallback state (BackendTradeNotification $ tradeFor order) + Nothing -> log Warning "Quik" $ "Incoming trade for unknown order: " <> (T.pack . show) quiktrade where tradeFor order = Trade { tradeOrderId = orderId order, @@ -241,3 +260,6 @@ qbTradeCallback state comms quiktrade = do Just com -> vol * fromDouble (0.01 * comPercentage com) + fromDouble (comFixed com) * fromIntegral qty Nothing -> 0 + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt diff --git a/src/Broker/QuikBroker/Trans2QuikApi.hs b/src/Broker/QuikBroker/Trans2QuikApi.hs index e1c1efe..dc26265 100644 --- a/src/Broker/QuikBroker/Trans2QuikApi.hs +++ b/src/Broker/QuikBroker/Trans2QuikApi.hs @@ -1,5 +1,6 @@ {-# LANGUAGE DeriveDataTypeable #-} {-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE QuasiQuotes #-} module Broker.QuikBroker.Trans2QuikApi ( Trans2QuikApi(..), @@ -12,7 +13,9 @@ module Broker.QuikBroker.Trans2QuikApi ( quikSendTransaction ) where +import ATrade.Logging (Message, Severity (..), logWith) import Codec.Text.IConv +import Colog (LogAction) import Control.Concurrent import Control.Error.Util import Control.Exception.Safe @@ -26,6 +29,7 @@ import Data.Ratio import qualified Data.Set as S import qualified Data.Text as T import Data.Text.Encoding +import qualified Data.Text.Lazy as TL import Data.Time.Calendar import Data.Time.Clock import Data.Typeable @@ -33,7 +37,8 @@ import Foreign import Foreign.C.String import Foreign.C.Types import Foreign.Marshal.Array -import System.Log.Logger +import Language.Haskell.Printf (t) +import Prelude hiding (log) import System.Win32.DLL import System.Win32.Types @@ -393,7 +398,9 @@ data Quik = Quik { watchdogTid :: ThreadId, handledTrades :: S.Set CLLong, - handledOrders :: S.Set QuikOrder + handledOrders :: S.Set QuikOrder, + + logger :: LogAction IO Message } quikSendTransaction :: IORef Quik -> String -> IO (Either T.Text ()) @@ -417,11 +424,12 @@ setCallbacks quik transCb orCb tradeCb = atomicModifyIORef' quik (\s -> hlTradeCallback = Just tradeCb }, ())) -mkQuik :: FilePath -> FilePath -> IO (IORef Quik) -mkQuik dllpath quikpath = do +mkQuik :: FilePath -> FilePath -> LogAction IO Message -> IO (IORef Quik) +mkQuik dllpath quikpath l = do api <- loadQuikApi dllpath - debugM "Quik" "Dll loaded" + let log = logWith l + log Debug "Quik" "Dll loaded" myTid <- myThreadId state <- newIORef Quik { quikApi = api, @@ -432,7 +440,8 @@ mkQuik dllpath quikpath = do hlOrderCallback = Nothing, hlTradeCallback = Nothing, handledTrades = S.empty, - handledOrders = S.empty } + handledOrders = S.empty, + logger = l } conncb' <- mkConnectionStatusCallback (defaultConnectionCb state) transcb' <- mkTransactionsReplyCallback (defaultTransactionReplyCb state) @@ -446,25 +455,29 @@ mkQuik dllpath quikpath = do tid <- forkIO $ watchdog quikpath state atomicModifyIORef' state (\s -> (s { watchdogTid = tid }, ())) - debugM "Quik" "mkQuik done" + log Debug "Quik" "mkQuik done" return state defaultConnectionCb :: IORef Quik -> LONG -> LONG -> LPSTR -> IO () defaultConnectionCb state event errorCode infoMessage - | event == ecQuikConnected = infoM "Quik" "Quik connected" >> atomicModifyIORef' state (\s -> (s { connectedToServer = True }, ()) ) - | event == ecQuikDisconnected = infoM "Quik" "Quik disconnected" >> atomicModifyIORef' state (\s -> (s { connectedToServer = False }, ()) ) - | event == ecDllConnected = infoM "Quik" "DLL connected" >> atomicModifyIORef' state (\s -> (s { connectedToDll = True }, ()) ) - | event == ecDllDisconnected = infoM "Quik" "DLL disconnected" >> atomicModifyIORef' state (\s -> (s { connectedToDll = True }, ()) ) - | otherwise = debugM "Quik" $ "Connection event: " ++ show event + | event == ecQuikConnected = log Info "Quik" "Quik connected" >> atomicModifyIORef' state (\s -> (s { connectedToServer = True }, ()) ) + | event == ecQuikDisconnected = log Info "Quik" "Quik disconnected" >> atomicModifyIORef' state (\s -> (s { connectedToServer = False }, ()) ) + | event == ecDllConnected = log Info "Quik" "DLL connected" >> atomicModifyIORef' state (\s -> (s { connectedToDll = True }, ()) ) + | event == ecDllDisconnected = log Info "Quik" "DLL disconnected" >> atomicModifyIORef' state (\s -> (s { connectedToDll = True }, ()) ) + | otherwise = log Debug "Quik" $ "Connection event: " <> (T.pack . show) event + where + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt defaultTransactionReplyCb :: IORef Quik -> LONG -> LONG -> LONG -> DWORD -> CLLong -> LPSTR -> CIntPtr -> IO () defaultTransactionReplyCb state transactionResult errorCode replyCode transId orderNum replyMessage replyDesc = do - debugM "Quik" $ "Transaction cb:" ++ show transactionResult ++ "/" ++ show errorCode ++ "/" ++ show replyCode + log Debug "Quik" $ TL.toStrict $ [t|Transaction cb: %d/%d/%d|] transactionResult errorCode replyCode when (replyMessage /= nullPtr) $ do s <- convert "CP1251" "UTF-8" . BL.fromStrict <$> BS.packCString replyMessage case decodeUtf8' (BL.toStrict s) of - Left _ -> warningM "Quik" "Unable to decode utf-8" - Right msg -> debugM "Quik" $ "Transaction cb message:" ++ T.unpack msg + Left _ -> log Warning "Quik" "Unable to decode utf-8" + Right msg -> log Debug "Quik" $ "Transaction cb message:" <> msg maybecb <- hlTransactionCallback <$> readIORef state case maybecb of @@ -472,10 +485,13 @@ defaultTransactionReplyCb state transactionResult errorCode replyCode transId or Nothing -> return () where rcInsufficientFunds = 4 + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt defaultOrderCb :: IORef Quik -> LONG -> DWORD -> CLLong -> LPSTR -> LPSTR -> CDouble -> CLLong -> CDouble -> LONG -> LONG -> CIntPtr -> IO () defaultOrderCb state mode transId dnumber classCode secCode price balance value sell status desc = do - debugM "Quik" $ "Trade cb: " ++ show mode ++ "/" ++ show dnumber ++ "/" ++ show transId + log Debug "Quik" $ TL.toStrict $ [t|Trade cb: %d/%d/%d|] mode dnumber transId orders <- handledOrders <$> readIORef state when (mode == 0) $ do maybecb <- hlOrderCallback <$> readIORef state @@ -487,21 +503,24 @@ defaultOrderCb state mode transId dnumber classCode secCode price balance value case maybecb of Just cb -> cb order Nothing -> return () - where - mkOrder :: String -> String -> QuikOrder - mkOrder sclass ssec = QuikOrder { - qoTransId = toInteger transId, - qoOrderId = toInteger dnumber, - qoTicker = sclass ++ "#" ++ ssec, - qoPrice = toDouble price, - qoBalance = toInteger balance, - qoSell = sell == 1, - qoStatus = fromIntegral status - } + where + mkOrder :: String -> String -> QuikOrder + mkOrder sclass ssec = QuikOrder { + qoTransId = toInteger transId, + qoOrderId = toInteger dnumber, + qoTicker = sclass ++ "#" ++ ssec, + qoPrice = toDouble price, + qoBalance = toInteger balance, + qoSell = sell == 1, + qoStatus = fromIntegral status + } + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt defaultTradeCb :: IORef Quik -> LONG -> CLLong -> CLLong -> LPSTR -> LPSTR -> CDouble -> CLLong -> CDouble -> LONG -> CIntPtr -> IO () defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sell desc = do - debugM "Quik" $ "Trade cb: " ++ show mode ++ "/" ++ show dnumber + log Debug "Quik" $ TL.toStrict $ [t|Trade cb: %d/%d|] mode dnumber trades <- handledTrades <$> readIORef state when (mode == 0 && dnumber `S.notMember` trades) $ do atomicModifyIORef' state (\s -> (s { handledTrades = S.insert dnumber (handledTrades s) }, ())) @@ -517,8 +536,8 @@ defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sel currency <- tradeCurrency api desc >>= peekCString cb (trade ssec sclass ymd hms us currency) Nothing -> return () - where - trade ssec sclass ymd hms us currency = QuikTrade { + where + trade ssec sclass ymd hms us currency = QuikTrade { qtOrderId = toInteger orderNum, qtTicker = sclass ++ "#" ++ ssec, qtPrice = toDouble price, @@ -528,8 +547,8 @@ defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sel qtVolumeCurrency = currency, qtTimestamp = adjustTimestamp $ mkTimestamp ymd hms us } - adjustTimestamp = addUTCTime (-3 * 3600) -- MSK -> UTC - mkTimestamp ymd hms us = UTCTime (fromGregorian y mon d) (fromInteger (h * 3600 + m * 60 + s) + fromRational (us % 1000000)) + adjustTimestamp = addUTCTime (-3 * 3600) -- MSK -> UTC + mkTimestamp ymd hms us = UTCTime (fromGregorian y mon d) (fromInteger (h * 3600 + m * 60 + s) + fromRational (us % 1000000)) where y = ymd `div` 10000 mon = fromEnum $ (ymd `mod` 10000) `div` 100 @@ -537,6 +556,9 @@ defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sel h = hms `div` 10000 m = (hms `mod` 10000) `div` 100 s = hms `mod` 100 + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt watchdog :: FilePath -> IORef Quik -> IO () @@ -552,23 +574,27 @@ watchdog quikpath state = do err <- setConnectionStatusCallback api conncb errorCode errorMsg 1024 if err /= ecSuccess - then warningM "Quik.Watchdog" $ "Error: " ++ show err + then log Warning "Quik.Watchdog" $ TL.toStrict $ [t|Error: %d|] err else forever $ do conn <- connectedToDll <$> readIORef state handle - (\(QuikException errmsg rc) -> warningM "Quik.Watchdog" $ (T.unpack errmsg) ++ " (" ++ show rc ++ ")") $ - unless conn $ - withCString quikpath (\path -> do - err <- connect api path errorCode errorMsg 1024 - if err /= ecSuccess && err /= ecAlreadyConnectedToQuik - then warningM "Quik.Watchdog" $ "Unable to connect: " ++ show err - else withCString "" (\emptyStr -> do - throwIfErr "setTransactionsReplyCallback returned error" $ setTransactionsReplyCallback api transcb errorCode errorMsg 1024 - throwIfErr "subscribeOrders returned error" $ subscribeOrders api emptyStr emptyStr - startOrders api orcb - throwIfErr "subscribeTrades returned error" $ subscribeTrades api emptyStr emptyStr - startTrades api tradecb)) + (\(QuikException errmsg rc) -> log Warning "Quik.Watchdog" $ TL.toStrict $ [t|%Q (%d)|] errmsg rc) $ + unless conn $ + withCString quikpath (\path -> do + err <- connect api path errorCode errorMsg 1024 + if err /= ecSuccess && err /= ecAlreadyConnectedToQuik + then log Debug "Quik.Watchdog" $ "Unable to connect: " <> (T.pack . show) err + else withCString "" (\emptyStr -> do + throwIfErr "setTransactionsReplyCallback returned error" $ setTransactionsReplyCallback api transcb errorCode errorMsg 1024 + throwIfErr "subscribeOrders returned error" $ subscribeOrders api emptyStr emptyStr + startOrders api orcb + throwIfErr "subscribeTrades returned error" $ subscribeTrades api emptyStr emptyStr + startTrades api tradecb)) threadDelay 10000000)) + where + log sev comp txt = do + l <- logger <$> readIORef state + logWith l sev comp txt throwIfErr :: T.Text -> IO LONG -> IO () throwIfErr errmsg action = do diff --git a/src/QuoteSource/PipeReader.hs b/src/QuoteSource/PipeReader.hs index 8737af5..98995aa 100644 --- a/src/QuoteSource/PipeReader.hs +++ b/src/QuoteSource/PipeReader.hs @@ -6,8 +6,11 @@ module QuoteSource.PipeReader ( stopPipeReader ) where +import ATrade.Logging (Message, Severity (..), + logWith) import ATrade.QuoteSource.Server import ATrade.Types +import Colog (LogAction) import Control.Applicative import Control.Concurrent hiding (readChan, writeChan, writeList2Chan, yield) @@ -35,7 +38,6 @@ import Data.Time.Clock import Foreign.Marshal.Alloc import Safe import System.IO -import System.Log.Logger (debugM, warningM) import System.ZMQ4 @@ -46,10 +48,10 @@ data PipeReaderHandle = } deriving (Eq) -zmqSocketConduit :: (Subscriber a, Receiver a) => T.Text -> Socket a -> IORef Bool -> Source IO [B.ByteString] -zmqSocketConduit ep sock running' = do +zmqSocketConduit :: (Subscriber a, Receiver a) => T.Text -> Socket a -> IORef Bool -> LogAction IO Message -> Source IO [B.ByteString] +zmqSocketConduit ep sock running' logger = do liftIO $ do - debugM "PipeReader" $ "Connecting to: " ++ T.unpack ep + logWith logger Info "PipeReader" $ "Connecting to: " <> ep connect sock (T.unpack ep) subscribe sock B.empty lastHeartbeat <- liftIO $ getCurrentTime >>= newIORef @@ -59,7 +61,7 @@ zmqSocketConduit ep sock running' = do bs <- liftIO $ receiveMulti sock when ((not . null $ bs) && (head bs == "SYSTEM#HEARTBEAT")) $ liftIO $ getCurrentTime >>= writeIORef lastHeartbeat yield bs - zmqSocketConduit ep sock running' + zmqSocketConduit ep sock running' logger where notTimeout hb = do now <- liftIO $ getCurrentTime @@ -80,16 +82,16 @@ chanSink chan = awaitForever (\t -> do liftIO $ writeChan chan t) -startPipeReader :: Context -> T.Text -> BoundedChan QuoteSourceServerData -> IO PipeReaderHandle -startPipeReader ctx pipeEndpoint tickChan = do - debugM "PipeReader" $ "Trying to open pipe: " ++ T.unpack pipeEndpoint +startPipeReader :: Context -> T.Text -> BoundedChan QuoteSourceServerData -> LogAction IO Message -> IO PipeReaderHandle +startPipeReader ctx pipeEndpoint tickChan logger = do + logWith logger Debug "PipeReader" $ "Trying to open pipe: " <> pipeEndpoint s <- socket ctx Sub - debugM "PipeReader" "Pipe opened" + logWith logger Info "PipeReader" "Pipe opened" running' <- newIORef True tid <- forkIO $ readerThread s running' return PipeReaderHandle { prThreadId = tid, running = running' } where - readerThread s running' = runConduit $ (zmqSocketConduit pipeEndpoint s running') =$= parseBarConduit =$= qssdataConduit =$= chanSink tickChan + readerThread s running' = runConduit $ (zmqSocketConduit pipeEndpoint s running' logger) =$= parseBarConduit =$= qssdataConduit =$= chanSink tickChan stopPipeReader :: PipeReaderHandle -> IO () stopPipeReader h = killThread (prThreadId h) >> writeIORef (running h) False diff --git a/src/System/Win32/DDE.hs b/src/System/Win32/DDE.hs index 128d6b8..725113e 100644 --- a/src/System/Win32/DDE.hs +++ b/src/System/Win32/DDE.hs @@ -40,7 +40,6 @@ import Foreign import Foreign.C.Types import Foreign.C.String import Foreign.Marshal.Array -import System.Log.Logger (debugM, warningM) import qualified Data.ByteString.Lazy as BL @@ -122,7 +121,6 @@ ddeCallback state msgType format hConv hsz1 hsz2 hData dwData1 dwData2 handleConnect state hsz1 hsz2 = do myDdeState <- readIORef state maybeAppName <- queryString myDdeState 256 hsz2 - debugM "DDE" $ "Handle connect:" ++ show maybeAppName case maybeAppName of Just incomingAppName -> do if incomingAppName == appName myDdeState @@ -140,7 +138,6 @@ ddeCallback state msgType format hConv hsz1 hsz2 hData dwData1 dwData2 Just topic -> withDdeData hData (\xlData -> do case runGetOrFail xlParser $ BL.fromStrict xlData of Left (_, _, errmsg) -> do - warningM "DDE" $ "Parsing error: " ++ show errmsg return ddeResultFalse Right (_, _, table) -> do rc <- (dataCallback myDdeState) topic table diff --git a/src/TickTable.hs b/src/TickTable.hs index 14ce8d8..f6403ad 100644 --- a/src/TickTable.hs +++ b/src/TickTable.hs @@ -24,8 +24,6 @@ import Data.IORef (IORef, newIORef, atomicModifyIORef', readIORef) import qualified Data.Map.Strict as M import qualified Data.Text as T -import System.Log.Logger (debugM) - import System.ZMQ4 (Context) data TickKey = TickKey TickerId DataType @@ -62,7 +60,6 @@ mkTickTable chan ctx qtisEndpoint = do qtisThread r qtisChan ctx qtisEndpoint = forever $ do threadDelay 1000000 requests <- readListFromChan qtisChan - debugM "TickTable" $ "Requested info for tickers: " ++ show requests ti <- qtisGetTickersInfo ctx qtisEndpoint (catMaybes $ fmap requestToTicker requests) forM_ ti (\newInfo -> atomicModifyIORef' r (\s -> (s { tickerInfo = M.insert (tiTicker newInfo) newInfo $! tickerInfo s }, ()))) diff --git a/stack.yaml b/stack.yaml index 6c3c208..270a7b2 100644 --- a/stack.yaml +++ b/stack.yaml @@ -15,7 +15,7 @@ # resolver: # name: custom-snapshot # location: "./custom-snapshot.yaml" -resolver: lts-17.14 +resolver: lts-18.18 # User packages to be built. # Various formats can be used as shown in the example below. @@ -39,13 +39,19 @@ packages: - '.' - '../libatrade' - '../zeromq4-haskell-zap' -- '../iconv' +- '../iconv-0.4.1.3' # Dependency packages to be pulled from upstream that are not in the resolver # (e.g., acme-missiles-0.3) -extra-deps: [ "datetime-0.3.1", "cond-0.4.1.1", "gi-gtk-3.0.23", "gi-gdk-3.0.16", "gi-gdkpixbuf-2.0.16", "gi-gio-2.0.18", "gi-pango-1.0.16", "text-format-0.3.2", "th-printf-0.5.1"] +extra-deps: +- datetime-0.3.1 +- cond-0.4.1.1 +- co-log-0.4.0.1@sha256:3d4c17f37693c80d1aa2c41669bc3438fac3e89dc5f479e57d79bc3ddc4dfcc5,5087 +- ansi-terminal-0.10.3@sha256:e2fbcef5f980dc234c7ad8e2fa433b0e8109132c9e643bc40ea5608cd5697797,3226 # Override default flag values for local packages and extra-deps -flags: {} +flags: + mintty: + Win32-2-13-1: false # Extra package databases containing global packages extra-package-dbs: []