diff --git a/app/Config.hs b/app/Config.hs index 18d465b..e2d2f80 100644 --- a/app/Config.hs +++ b/app/Config.hs @@ -1,44 +1,44 @@ -{-# LANGUAGE OverloadedStrings, OverloadedLabels #-} +{-# LANGUAGE OverloadedLabels #-} +{-# LANGUAGE OverloadedStrings #-} module Config ( TableConfig(..), Config(..), readConfig -) where +) where -import Commissions (CommissionConfig) -import Data.Aeson -import Data.Aeson.Types +import Commissions (CommissionConfig) +import Data.Aeson +import Data.Aeson.Types import qualified Data.ByteString.Lazy as BL -import qualified Data.HashMap.Strict as HM -import qualified Data.Vector as V -import qualified Data.Text as T +import qualified Data.HashMap.Strict as HM +import qualified Data.Text as T +import qualified Data.Vector as V data TableConfig = TableConfig { - parserId :: String, - tableName :: String, + parserId :: String, + tableName :: String, tableParams :: Value } deriving (Show) data Config = Config { - quotesourceEndpoint :: String, - qtisEndpoint :: String, - pipeReaderQsEndpoint :: Maybe String, - tickPipePath :: Maybe String, - brokerserverEndpoint :: String, - whitelist :: [T.Text], - blacklist :: [T.Text], - brokerServerCertPath :: Maybe FilePath, + quotesourceEndpoint :: String, + qtisEndpoint :: String, + pipeReaderQsEndpoint :: Maybe String, + tickPipePath :: Maybe String, + brokerserverEndpoint :: String, + whitelist :: [T.Text], + blacklist :: [T.Text], + brokerServerCertPath :: Maybe FilePath, brokerClientCertificateDir :: Maybe FilePath, - tables :: [TableConfig], - quikPath :: String, - dllPath :: String, - quikAccounts :: [T.Text], - tradeSink :: T.Text, - telegramToken :: T.Text, - telegramChatId :: T.Text, - commissions :: [CommissionConfig] + tables :: [TableConfig], + quikPath :: String, + dllPath :: String, + quikAccounts :: [T.Text], + tradeSink :: T.Text, + tradeSink2 :: T.Text, + commissions :: [CommissionConfig] } deriving (Show) readConfig :: String -> IO Config @@ -46,7 +46,7 @@ readConfig fname = do content <- BL.readFile fname case decode content >>= parseMaybe parseConfig of Just config -> return config - Nothing -> error "Unable to load config" + Nothing -> error "Unable to load config" parseConfig :: Value -> Parser Config parseConfig = withObject "object" $ \obj -> do @@ -60,13 +60,12 @@ parseConfig = withObject "object" $ \obj -> do serverCert <- obj .:? "broker_server_certificate" clientCerts <- obj .:? "broker_client_certificates" rt <- case HM.lookup "tables" obj of - Just v -> parseTables v + Just v -> parseTables v Nothing -> fail "Expected tables array" qp <- obj .: "quik-path" dp <- obj .: "dll-path" trsink <- obj .: "trade-sink" - tgToken <- obj .: "telegram-token" - tgChatId <- obj .: "telegram-chatid" + trsink2 <- obj .: "trade-sink2" commissionsConfig <- obj .: "commissions" accs <- V.toList <$> obj .: "accounts" return Config { quotesourceEndpoint = qse, @@ -83,8 +82,7 @@ parseConfig = withObject "object" $ \obj -> do dllPath = dp, quikAccounts = fmap T.pack accs, tradeSink = trsink, - telegramToken = tgToken, - telegramChatId = tgChatId, + tradeSink2 = trsink2, commissions = commissionsConfig } where parseTables :: Value -> Parser [TableConfig] @@ -95,7 +93,7 @@ parseConfig = withObject "object" $ \obj -> do pid <- obj .: "parser-id" tn <- obj .: "table-name" params <- case HM.lookup "params" obj of - Just x -> return x + Just x -> return x Nothing -> return $ Object HM.empty return TableConfig { parserId = pid, diff --git a/app/Main.hs b/app/Main.hs index a4b96a7..c76ac9c 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -1,42 +1,46 @@ -{-# LANGUAGE OverloadedStrings, OverloadedLabels, LambdaCase #-} +{-# LANGUAGE LambdaCase #-} +{-# LANGUAGE OverloadedLabels #-} +{-# LANGUAGE OverloadedStrings #-} + module Main where -import System.IO - -import QuoteSource.DataImport -import Control.Concurrent hiding (readChan, writeChan) -import Control.Monad -import Control.Exception.Safe -import Control.Error.Util -import qualified GI.Gtk as Gtk -import Data.GI.Base -import Control.Concurrent.BoundedChan -import ATrade.Types -import QuoteSource.TableParsers.AllParamsTableParser -import QuoteSource.TableParser -import QuoteSource.PipeReader -import ATrade.QuoteSource.Server - -import ATrade.Broker.TradeSinks.ZMQTradeSink -import ATrade.Broker.TradeSinks.TelegramTradeSink -import ATrade.Broker.Server -import Broker.PaperBroker -import Broker.QuikBroker - -import System.Directory -import System.Timeout -import System.Log.Logger -import System.Log.Handler.Simple -import System.Log.Handler (setFormatter) -import System.Log.Formatter -import System.ZMQ4 -import System.ZMQ4.ZAP - -import qualified Data.Text as T -import Data.Maybe - -import Config -import TickTable (mkTickTable) +import System.IO + +import ATrade.QuoteSource.Server +import ATrade.Types +import Control.Concurrent hiding (readChan, + writeChan) +import Control.Concurrent.BoundedChan +import Control.Error.Util +import Control.Exception.Safe +import Control.Monad +import Data.GI.Base +import qualified GI.Gtk as Gtk +import QuoteSource.DataImport +import QuoteSource.PipeReader +import QuoteSource.TableParser +import QuoteSource.TableParsers.AllParamsTableParser + +import ATrade.Broker.Server +import ATrade.Broker.TradeSinks.ZMQTradeSink +import Broker.PaperBroker +import Broker.QuikBroker + +import System.Directory +import System.Log.Formatter +import System.Log.Handler (setFormatter) +import System.Log.Handler.Simple +import System.Log.Logger +import System.Timeout +import System.ZMQ4 +import System.ZMQ4.ZAP + +import Data.Maybe +import qualified Data.Text as T + +import Config +import TickTable (mkTickTable) +import Version forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan Tick, BoundedChan QuoteSourceServerData) forkBoundedChan size sourceChan = do @@ -57,14 +61,18 @@ initLogging = do handler <- streamHandler stderr DEBUG >>= (\x -> return $ setFormatter x (simpleLogFormatter "$utcTime\t {$loggername} <$prio> -> $msg")) + fhandler <- fileHandler "quik-connector.log" DEBUG >>= + (\x -> return $ + setFormatter x (simpleLogFormatter "$utcTime\t {$loggername} <$prio> -> $msg")) hSetBuffering stderr LineBuffering updateGlobalLogger rootLoggerName (setLevel DEBUG) - updateGlobalLogger rootLoggerName (setHandlers [handler]) + updateGlobalLogger rootLoggerName (setHandlers [handler, fhandler]) main :: IO () main = do initLogging + infoM "main" $ "Starting quik-connector-" ++ T.unpack quikConnectorVersionText infoM "main" "Loading config" config <- readConfig "quik-connector.config.json" @@ -103,9 +111,9 @@ main = do bracket (forkIO $ pipeReaderThread ctx config) killThread (\_ -> do withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do - withTelegramTradeSink (telegramToken config) (telegramChatId config) (\telegramTradeSink -> do + withZMQTradeSink ctx (tradeSink2 config) (\zmqTradeSink2 -> do bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config) (Just "global")) stopQuoteSourceServer (\_ -> do - bracket (startBrokerServer [brokerP, brokerQ] ctx (T.pack $ brokerserverEndpoint config) [telegramTradeSink, zmqTradeSink] serverParams) stopBrokerServer (\_ -> do + bracket (startBrokerServer [brokerP, brokerQ] ctx (T.pack $ brokerserverEndpoint config) [zmqTradeSink2, zmqTradeSink] serverParams) stopBrokerServer (\_ -> do void $ Gtk.init Nothing window <- new Gtk.Window [ #title := "Quik connector" ] void $ on window #destroy Gtk.mainQuit @@ -126,7 +134,7 @@ main = do bracket (startPipeReader (T.pack pipe) tickChan) stopPipeReader (\_ -> do bracket (startQuoteSourceServer tickChan ctx (T.pack qsep) (Just "global")) stopQuoteSourceServer (\_ -> threadDelay 1000000)) _ -> return () - + loadCertificatesFromDirectory :: FilePath -> IO [CurveCertificate] loadCertificatesFromDirectory filepath = do diff --git a/app/Version.hs b/app/Version.hs new file mode 100644 index 0000000..17736b0 --- /dev/null +++ b/app/Version.hs @@ -0,0 +1,20 @@ +{-# LANGUAGE QuasiQuotes #-} + +module Version + ( + quikConnectorVersion, + quikConnectorVersionText + ) where + +import qualified Data.Text as T +import Text.Printf.TH + +quikConnectorVersion :: (Int, Int, Int, Int) +quikConnectorVersion = (0, 2, 3, 0) + +quikConnectorVersionText :: T.Text +quikConnectorVersionText = + [st|%d.%d.%d.%d|] v1 v2 v3 v4 + where + (v1, v2, v3, v4) = quikConnectorVersion + diff --git a/quik-connector.cabal b/quik-connector.cabal index 83ced18..39262bd 100644 --- a/quik-connector.cabal +++ b/quik-connector.cabal @@ -1,5 +1,5 @@ name: quik-connector -version: 0.2.1.0 +version: 0.2.3.0 synopsis: Atrade-Quik Connector application description: Please see README.md homepage: https://github.com/asakul/quik-connector @@ -49,7 +49,7 @@ library , aeson , cond , scientific - , libatrade == 0.4.0.0 + , libatrade == 0.7.0.0 , deepseq , errors , split @@ -105,8 +105,10 @@ executable quik-connector-exe , errors , safe-exceptions , iconv + , th-printf default-language: Haskell2010 other-modules: Config + , Version -- extra-libraries: "user32" test-suite quik-connector-test diff --git a/src/ATrade/Quotes/QTIS.hs b/src/ATrade/Quotes/QTIS.hs index 04c19de..4bba973 100644 --- a/src/ATrade/Quotes/QTIS.hs +++ b/src/ATrade/Quotes/QTIS.hs @@ -7,19 +7,19 @@ module ATrade.Quotes.QTIS qtisGetTickersInfo' ) where -import ATrade.Types -import Control.Monad -import Data.Aeson -import Data.Maybe +import ATrade.Types +import Control.Monad +import Data.Aeson import qualified Data.ByteString.Char8 as BC8 -import qualified Data.ByteString.Lazy as BL -import qualified Data.Text as T -import System.ZMQ4 -import System.Log.Logger +import qualified Data.ByteString.Lazy as BL +import Data.Maybe +import qualified Data.Text as T +import System.Log.Logger +import System.ZMQ4 data TickerInfo = TickerInfo { - tiTicker :: T.Text, - tiLotSize :: Integer, + tiTicker :: T.Text, + tiLotSize :: Integer, tiTickSize :: Price } deriving (Show, Eq) @@ -47,6 +47,7 @@ qtisGetTickersInfo ctx endpoint tickers = debugM "QTIS" $ "Requesting: " ++ T.unpack tickerId send sock [] $ BL.toStrict (tickerRequest tickerId) response <- receiveMulti sock + debugM "QTIS" $ show response let r = parseResponse response debugM "QTIS" $ "Got response: " ++ show r return r)) @@ -57,4 +58,4 @@ qtisGetTickersInfo ctx endpoint tickers = then decode $ BL.fromStrict payload else Nothing parseResponse _ = Nothing - + diff --git a/src/Broker/PaperBroker.hs b/src/Broker/PaperBroker.hs index 8bd1e96..9fe0a20 100644 --- a/src/Broker/PaperBroker.hs +++ b/src/Broker/PaperBroker.hs @@ -1,51 +1,51 @@ {-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE BangPatterns #-} -{-# LANGUAGE Strict #-} +{-# LANGUAGE Strict #-} module Broker.PaperBroker ( PaperBrokerState, mkPaperBroker ) where -import Data.Hashable -import Data.Bits -import ATrade.Types -import Data.IORef -import qualified Data.List as L -import qualified Data.Map.Strict as M -import qualified Data.Text as T -import ATrade.Broker.Protocol -import ATrade.Broker.Server -import Data.Time.Clock -import Data.Maybe -import Control.Monad -import Control.Concurrent.BoundedChan -import Control.Concurrent hiding (readChan, writeChan) -import System.Log.Logger -import ATrade.Quotes.QTIS -import System.ZMQ4 - -import Commissions (CommissionConfig(..)) -import TickTable (TickTableH, TickKey(..), getTick, getTickerInfo) +import ATrade.Broker.Protocol +import ATrade.Broker.Server +import ATrade.Quotes.QTIS +import ATrade.Types +import Control.Concurrent hiding (readChan, writeChan) +import Control.Concurrent.BoundedChan +import Control.Monad +import Data.Bits +import Data.Hashable +import Data.IORef +import qualified Data.List as L +import qualified Data.Map.Strict as M +import Data.Maybe +import qualified Data.Text as T +import Data.Time.Clock +import System.Log.Logger +import System.ZMQ4 + +import Commissions (CommissionConfig (..)) +import TickTable (TickKey (..), TickTableH, + getTick, getTickerInfo) data PaperBrokerState = PaperBrokerState { - pbTid :: Maybe ThreadId, - tickTable :: TickTableH, - orders :: M.Map OrderId Order, - cash :: !Price, - notificationCallback :: Maybe (Notification -> IO ()), - pendingOrders :: [Order], - - fortsClassCodes :: [T.Text], + pbTid :: Maybe ThreadId, + tickTable :: TickTableH, + orders :: M.Map OrderId Order, + cash :: !Price, + notificationCallback :: Maybe (Notification -> IO ()), + pendingOrders :: [Order], + + fortsClassCodes :: [T.Text], fortsOpenTimeIntervals :: [(DiffTime, DiffTime)], - auctionableClassCodes :: [T.Text], - premarketStartTime :: DiffTime, - marketOpenTime :: DiffTime, - postMarketStartTime :: DiffTime, - postMarketFixTime :: DiffTime, - postMarketCloseTime :: DiffTime, - commissions :: [CommissionConfig] + auctionableClassCodes :: [T.Text], + premarketStartTime :: DiffTime, + marketOpenTime :: DiffTime, + postMarketStartTime :: DiffTime, + postMarketFixTime :: DiffTime, + postMarketCloseTime :: DiffTime, + commissions :: [CommissionConfig] } hourMin :: Integer -> Integer -> DiffTime @@ -80,8 +80,8 @@ mkPaperBroker tickTableH tickChan startCash accounts comms = do submitOrder = pbSubmitOrder state, cancelOrder = pbCancelOrder state, stopBroker = pbDestroyBroker state } - - + + brokerThread :: BoundedChan Tick -> IORef PaperBrokerState -> IO () brokerThread chan state = forever $ do tick <- readChan chan @@ -90,9 +90,11 @@ brokerThread chan state = forever $ do executePendingOrders tick state executePendingOrders tick state = do + marketOpenTime' <- marketOpenTime <$> readIORef state po <- pendingOrders <$> readIORef state - executedIds <- catMaybes <$> mapM execute po - atomicModifyIORef' state (\s -> (s { pendingOrders = L.filter (\order -> orderId order `L.notElem` executedIds) (pendingOrders s)}, ())) + when (utctDayTime (timestamp tick) >= marketOpenTime') $ do + executedIds <- catMaybes <$> mapM execute po + atomicModifyIORef' state (\s -> (s { pendingOrders = L.filter (\order -> orderId order `L.notElem` executedIds) (pendingOrders s)}, ())) where execute order = if security tick == orderSecurity order @@ -102,7 +104,7 @@ executePendingOrders tick state = do debugM "PaperBroker" "Executing: pending market order" executeAtTick state order tick return $ Just $ orderId order - Limit price -> + Limit price -> executeLimitAt price order _ -> return Nothing else return Nothing @@ -147,7 +149,7 @@ maybeCall proj state arg = do cb <- proj <$> readIORef state case cb of Just callback -> callback arg - Nothing -> return () + Nothing -> return () executeAtTick state order tick = do let newOrder = order { orderState = Executed } @@ -179,8 +181,8 @@ pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO () pbSubmitOrder state order = do infoM "PaperBroker" $ "Submitted order: " ++ show order case orderPrice order of - Market -> executeMarketOrder state order - Limit price -> submitLimitOrder price state order + Market -> executeMarketOrder state order + Limit price -> submitLimitOrder price state order Stop price trigger -> submitStopOrder state order StopMarket trigger -> submitStopMarketOrder state order @@ -204,8 +206,9 @@ pbSubmitOrder state order = do let newOrder = order { orderState = Submitted } atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ())) maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted - Just tick -> - if ((orderOperation order == Buy) && (value tick < price)) || ((orderOperation order == Sell) && (value tick > price)) + Just tick -> do + marketOpenTime' <- marketOpenTime <$> readIORef state + if (((orderOperation order == Buy) && (value tick < price)) || ((orderOperation order == Sell) && (value tick > price)) && (utctDayTime (timestamp tick) >= marketOpenTime')) then do maybeCall notificationCallback state $ OrderNotification (orderId order) Submitted executeAtTick state order tick @@ -218,7 +221,7 @@ pbSubmitOrder state order = do submitStopMarketOrder _ _ = warningM "PaperBroker" $ "Not implemented: Submitted order: " ++ show order orderDatatype = case orderOperation order of - Buy -> BestOffer + Buy -> BestOffer Sell -> BestBid key = TickKey (orderSecurity order) orderDatatype @@ -235,7 +238,7 @@ pbDestroyBroker state = do maybeTid <- pbTid <$> readIORef state case maybeTid of Just tid -> killThread tid - Nothing -> return () + Nothing -> return () {- pbGetOrder :: IORef PaperBrokerState -> OrderId -> IO (Maybe Order) diff --git a/src/Broker/QuikBroker.hs b/src/Broker/QuikBroker.hs index 19fd0b3..09d36c3 100644 --- a/src/Broker/QuikBroker.hs +++ b/src/Broker/QuikBroker.hs @@ -1,5 +1,6 @@ {-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE MultiWayIf #-} +{-# LANGUAGE BangPatterns #-} module Broker.QuikBroker ( mkQuikBroker @@ -87,11 +88,15 @@ qbSubmitOrder state order = do transId <- nextTransId state atomicModifyIORef' state (\s -> (s { trans2orderid = M.insert transId order (trans2orderid s) }, ())) + debugM "Quik" "Getting ticktable" tt <- tickTable <$> readIORef state + debugM "Quik" "Getting tickerinfo from ticktable" tickerInfoMb <- getTickerInfo tt (orderSecurity order) + debugM "Quik" "Getting liquid ticks" liquidTickMb <- getTick tt (TickKey (orderSecurity order) (if orderOperation order == Buy then BestOffer else BestBid)) + debugM "Quik" "Obtained" case (tickerInfoMb, liquidTickMb) of - (Just tickerInfo, Just liquidTick) -> + (Just !tickerInfo, Just !liquidTick) -> case makeTransactionString tickerInfo liquidTick transId order of Just transStr -> do rc <- quikSendTransaction q transStr @@ -144,8 +149,8 @@ makeTransactionString tickerInfo liquidTick transId order = seccode = (`atMay` 1) . splitOn "#" . T.unpack $ orderSecurity order price = case orderPrice order of Market -> if orderOperation order == Buy - then removeTrailingZeros . show $ value liquidTick - 10 * tiTickSize tickerInfo - else removeTrailingZeros . show $ value liquidTick + 10 * tiTickSize tickerInfo + then removeTrailingZeros . show $ value liquidTick + 10 * tiTickSize tickerInfo + else removeTrailingZeros . show $ value liquidTick - 10 * tiTickSize tickerInfo Limit p -> removeTrailingZeros . show $ p _ -> "0" removeTrailingZeros v = if '.' `L.elem` v then L.dropWhileEnd (== '.') . L.dropWhileEnd (== '0') $ v else v diff --git a/src/TickTable.hs b/src/TickTable.hs index 6237d29..14ce8d8 100644 --- a/src/TickTable.hs +++ b/src/TickTable.hs @@ -16,7 +16,7 @@ import Control.Concurrent (forkIO, ThreadId, threadDelay) import Control.Concurrent.BoundedChan (BoundedChan, newBoundedChan, readChan, tryReadChan, writeChan) import Control.Concurrent.MVar (newEmptyMVar) -import Control.Monad (forM_, when, void) +import Control.Monad (forM_, when, void, forever) import Data.Maybe (catMaybes, isNothing) import Data.IORef (IORef, newIORef, atomicModifyIORef', readIORef) @@ -24,19 +24,22 @@ import Data.IORef (IORef, newIORef, atomicModifyIORef', readIORef) import qualified Data.Map.Strict as M import qualified Data.Text as T +import System.Log.Logger (debugM) + import System.ZMQ4 (Context) data TickKey = TickKey TickerId DataType deriving (Show, Ord, Eq) data TickTable = TickTable { - ticks :: M.Map TickKey Tick, - tickerInfo :: M.Map TickerId TickerInfo + ticks :: !(M.Map TickKey Tick), + tickerInfo :: !(M.Map TickerId TickerInfo) } type TickTableH = IORef TickTable data QTISThreadRequest = RequestTickerInfo TickerId | Shutdown + deriving (Show, Eq) mkTickTable :: BoundedChan Tick -> Context -> T.Text -> IO (IORef TickTable) mkTickTable chan ctx qtisEndpoint = do @@ -48,7 +51,7 @@ mkTickTable chan ctx qtisEndpoint = do void $ forkIO $ tickTableThread qtisChan r shutdownMVar qtisTid return r where - tickTableThread qtisChan r shutdownMVar qtisTid = do + tickTableThread qtisChan r shutdownMVar qtisTid = forever $ do t <- readChan chan atomicModifyIORef' r (\s -> (s { ticks = M.insert (TickKey (security t) (datatype t)) t $! ticks s }, ())) when (datatype t == LastTradePrice) $ do @@ -56,9 +59,10 @@ mkTickTable chan ctx qtisEndpoint = do when (isNothing $ M.lookup (security t) infoMap) $ writeChan qtisChan $ RequestTickerInfo (security t) - qtisThread r qtisChan ctx qtisEndpoint = do + qtisThread r qtisChan ctx qtisEndpoint = forever $ do threadDelay 1000000 requests <- readListFromChan qtisChan + debugM "TickTable" $ "Requested info for tickers: " ++ show requests ti <- qtisGetTickersInfo ctx qtisEndpoint (catMaybes $ fmap requestToTicker requests) forM_ ti (\newInfo -> atomicModifyIORef' r (\s -> (s { tickerInfo = M.insert (tiTicker newInfo) newInfo $! tickerInfo s }, ()))) diff --git a/stack.yaml b/stack.yaml index f77f424..9d353a6 100644 --- a/stack.yaml +++ b/stack.yaml @@ -15,7 +15,7 @@ # resolver: # name: custom-snapshot # location: "./custom-snapshot.yaml" -resolver: lts-8.18 +resolver: lts-12.9 # User packages to be built. # Various formats can be used as shown in the example below. @@ -42,7 +42,7 @@ packages: - '../iconv' # Dependency packages to be pulled from upstream that are not in the resolver # (e.g., acme-missiles-0.3) -extra-deps: [ "datetime-0.3.1", "cond-0.4.1.1"] +extra-deps: [ "datetime-0.3.1", "cond-0.4.1.1", "gi-gtk-3.0.23", "gi-gdk-3.0.16", "gi-gdkpixbuf-2.0.16", "gi-gio-2.0.18", "gi-pango-1.0.16", "text-format-0.3.2", "th-printf-0.5.1"] # Override default flag values for local packages and extra-deps flags: {}