From c16d323d21ca937beebfd5aa030dd24bbebdcba3 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Mon, 16 Oct 2017 08:46:39 +0700 Subject: [PATCH] Got rid of Data.Decimal (using libatrade-0.3.0.0) --- app/Config.hs | 6 +++ app/Main.hs | 43 ++++++++++++------- quik-connector.cabal | 4 +- src/Broker/PaperBroker.hs | 13 +++--- src/Broker/QuikBroker.hs | 5 +-- src/QuoteSource/PipeReader.hs | 20 ++++----- .../TableParsers/AllParamsTableParser.hs | 11 +++-- 7 files changed, 57 insertions(+), 45 deletions(-) diff --git a/app/Config.hs b/app/Config.hs index 326d97d..8065406 100644 --- a/app/Config.hs +++ b/app/Config.hs @@ -22,6 +22,8 @@ data TableConfig = TableConfig { data Config = Config { quotesourceEndpoint :: String, + pipeReaderQsEndpoint :: Maybe String, + tickPipePath :: Maybe String, brokerserverEndpoint :: String, whitelist :: [T.Text], blacklist :: [T.Text], @@ -46,6 +48,8 @@ readConfig fname = do parseConfig :: Value -> Parser Config parseConfig = withObject "object" $ \obj -> do qse <- obj .: "quotesource-endpoint" + qsePipe <- obj .:? "quotesource-endpoint-pipe-reader" + pipePath <- obj .:? "pipe-reader-path" bse <- obj .: "brokerserver-endpoint" whitelist' <- obj .:? "whitelist" .!= [] blacklist' <- obj .:? "blacklist" .!= [] @@ -61,6 +65,8 @@ parseConfig = withObject "object" $ \obj -> do tgChatId <- obj .: "telegram-chatid" accs <- V.toList <$> obj .: "accounts" return Config { quotesourceEndpoint = qse, + pipeReaderQsEndpoint = qsePipe, + tickPipePath = pipePath, brokerserverEndpoint = bse, whitelist = whitelist', blacklist = blacklist', diff --git a/app/Main.hs b/app/Main.hs index f285608..c636a7e 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -1,4 +1,4 @@ -{-# LANGUAGE OverloadedStrings, OverloadedLabels #-} +{-# LANGUAGE OverloadedStrings, OverloadedLabels, LambdaCase #-} module Main where import System.IO @@ -6,7 +6,7 @@ import System.IO import QuoteSource.DataImport import Control.Concurrent hiding (readChan, writeChan) import Control.Monad -import Control.Exception +import Control.Exception.Safe import Control.Error.Util import qualified GI.Gtk as Gtk import Data.GI.Base @@ -14,6 +14,7 @@ import Control.Concurrent.BoundedChan import ATrade.Types import QuoteSource.TableParsers.AllParamsTableParser import QuoteSource.TableParser +import QuoteSource.PipeReader import ATrade.QuoteSource.Server import ATrade.Broker.TradeSinks.ZMQTradeSink @@ -96,22 +97,32 @@ main = do let serverParams = defaultServerSecurityParams { sspDomain = Just "global", sspCertificate = serverCert } - withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do - withTelegramTradeSink (telegramToken config) (telegramChatId config) (\telegramTradeSink -> do - bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config)) stopQuoteSourceServer (\_ -> do - bracket (startBrokerServer [brokerP, brokerQ] ctx (T.pack $ brokerserverEndpoint config) [telegramTradeSink, zmqTradeSink] serverParams) stopBrokerServer (\_ -> do - void $ Gtk.init Nothing - window <- new Gtk.Window [ #title := "Quik connector" ] - void $ on window #destroy Gtk.mainQuit - #showAll window - Gtk.main) - infoM "main" "BRS down") - debugM "main" "QS done") - debugM "main" "TGTS done") - debugM "main" "ZMQTS done") - debugM "main" "ZAP done") + bracket (forkIO $ pipeReaderThread ctx config) killThread (\_ -> do + withZMQTradeSink ctx (tradeSink config) (\zmqTradeSink -> do + withTelegramTradeSink (telegramToken config) (telegramChatId config) (\telegramTradeSink -> do + bracket (startQuoteSourceServer c2 ctx (T.pack $ quotesourceEndpoint config)) stopQuoteSourceServer (\_ -> do + bracket (startBrokerServer [brokerP, brokerQ] ctx (T.pack $ brokerserverEndpoint config) [telegramTradeSink, zmqTradeSink] serverParams) stopBrokerServer (\_ -> do + void $ Gtk.init Nothing + window <- new Gtk.Window [ #title := "Quik connector" ] + void $ on window #destroy Gtk.mainQuit + #showAll window + Gtk.main) + infoM "main" "BRS down") + debugM "main" "QS done") + debugM "main" "TGTS done") + debugM "main" "ZMQTS done") + debugM "main" "ZAP done")) void $ timeout 1000000 $ killThread forkId infoM "main" "Main thread done" + where + pipeReaderThread ctx config = + case (tickPipePath config, pipeReaderQsEndpoint config) of + (Just pipe, Just qsep) -> do + tickChan <- newBoundedChan 10000 + bracket (startPipeReader (T.pack pipe) tickChan) stopPipeReader (\_ -> do + bracket (startQuoteSourceServer tickChan ctx (T.pack qsep)) stopQuoteSourceServer (\_ -> threadDelay 1000000)) + _ -> return () + loadCertificatesFromDirectory :: FilePath -> IO [CurveCertificate] loadCertificatesFromDirectory filepath = do diff --git a/quik-connector.cabal b/quik-connector.cabal index d11e15b..7690527 100644 --- a/quik-connector.cabal +++ b/quik-connector.cabal @@ -32,7 +32,6 @@ library , data-binary-ieee754 , bytestring , text - , Decimal , time , vector , containers @@ -47,7 +46,7 @@ library , aeson , cond , scientific - , libatrade + , libatrade == 0.3.0.0 , deepseq , errors , split @@ -100,6 +99,7 @@ executable quik-connector-exe , connection , directory , errors + , safe-exceptions default-language: Haskell2010 other-modules: Config -- extra-libraries: "user32" diff --git a/src/Broker/PaperBroker.hs b/src/Broker/PaperBroker.hs index a2ea8b3..4766200 100644 --- a/src/Broker/PaperBroker.hs +++ b/src/Broker/PaperBroker.hs @@ -18,7 +18,6 @@ import qualified Data.Text as T import ATrade.Broker.Protocol import ATrade.Broker.Server import Data.Time.Clock -import Data.Decimal import Data.Maybe import Control.Monad import Control.Concurrent.BoundedChan @@ -35,7 +34,7 @@ data PaperBrokerState = PaperBrokerState { pbTid :: Maybe ThreadId, tickMap :: M.Map TickMapKey Tick, orders :: M.Map OrderId Order, - cash :: ! Decimal, + cash :: !Price, notificationCallback :: Maybe (Notification -> IO ()), pendingOrders :: [Order], @@ -53,7 +52,7 @@ data PaperBrokerState = PaperBrokerState { hourMin :: Integer -> Integer -> DiffTime hourMin h m = fromIntegral $ h * 3600 + m * 60 -mkPaperBroker :: BoundedChan Tick -> Decimal -> [T.Text] -> IO BrokerInterface +mkPaperBroker :: BoundedChan Tick -> Price -> [T.Text] -> IO BrokerInterface mkPaperBroker tickChan startCash accounts = do state <- newIORef PaperBrokerState { pbTid = Nothing, @@ -109,13 +108,13 @@ executePendingOrders tick state = do else return Nothing executeLimitAt price order = case orderOperation order of - Buy -> if (datatype tick == Price && price > value tick && value tick > 0) || (datatype tick == BestOffer && price > value tick && value tick > 0) + Buy -> if (datatype tick == LastTradePrice && price > value tick && value tick > 0) || (datatype tick == BestOffer && price > value tick && value tick > 0) then do debugM "PaperBroker" $ "[1]Executing: pending limit order: " ++ show (security tick) ++ "/" ++ show (orderSecurity order) executeAtTick state order $ tick { value = price } return $ Just $ orderId order else return Nothing - Sell -> if (datatype tick == Price && price < value tick && value tick > 0) || (datatype tick == BestBid && price < value tick && value tick > 0) + Sell -> if (datatype tick == LastTradePrice && price < value tick && value tick > 0) || (datatype tick == BestBid && price < value tick && value tick > 0) then do debugM "PaperBroker" $ "[2]Executing: pending limit order: " ++ show (security tick) ++ "/" ++ show (orderSecurity order) executeAtTick state order $ tick { value = price } @@ -130,7 +129,7 @@ mkTrade tick order timestamp = Trade { tradeOrderId = orderId order, tradePrice = value tick, tradeQuantity = orderQuantity order, - tradeVolume = realFracToDecimal 10 (fromIntegral $ orderQuantity order) * value tick, + tradeVolume = fromInteger (orderQuantity order) * value tick, tradeVolumeCurrency = "TEST", tradeOperation = orderOperation order, tradeAccount = orderAccountId order, @@ -146,7 +145,7 @@ maybeCall proj state arg = do executeAtTick state order tick = do let newOrder = order { orderState = Executed } - let tradeVolume = realFracToDecimal 10 (fromIntegral $ orderQuantity order) * value tick + let tradeVolume = fromInteger (orderQuantity order) * value tick atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume}, ())) debugM "PaperBroker" $ "Executed: " ++ show newOrder ++ "; at tick: " ++ show tick ts <- getCurrentTime diff --git a/src/Broker/QuikBroker.hs b/src/Broker/QuikBroker.hs index 3a8b8a5..d893a70 100644 --- a/src/Broker/QuikBroker.hs +++ b/src/Broker/QuikBroker.hs @@ -11,7 +11,6 @@ import ATrade.Broker.Server import Broker.QuikBroker.Trans2QuikApi hiding (tradeAccount) -import Data.Decimal import Data.IORef import Data.List.Split import qualified Data.List as L @@ -206,9 +205,9 @@ qbTradeCallback state quiktrade = do where tradeFor order = Trade { tradeOrderId = orderId order, - tradePrice = realFracToDecimal 10 $ qtPrice quiktrade, + tradePrice = fromDouble $ qtPrice quiktrade, tradeQuantity = qtQuantity quiktrade, - tradeVolume = realFracToDecimal 10 $ qtVolume quiktrade, + tradeVolume = fromDouble $ qtVolume quiktrade, tradeVolumeCurrency = T.pack $ qtVolumeCurrency quiktrade, tradeOperation = if qtSell quiktrade then Sell else Buy, tradeAccount = orderAccountId order, diff --git a/src/QuoteSource/PipeReader.hs b/src/QuoteSource/PipeReader.hs index 4c412b3..99d1249 100644 --- a/src/QuoteSource/PipeReader.hs +++ b/src/QuoteSource/PipeReader.hs @@ -10,7 +10,6 @@ import Data.IORef import qualified Data.Text as T import qualified Data.Map.Strict as M import qualified Data.HashSet as HS -import Data.Decimal import Data.Time.Clock import Data.Time.Calendar import ATrade.Types @@ -33,8 +32,7 @@ import Data.Attoparsec.Text import Data.Conduit import qualified Data.Conduit.List as CL import Data.Conduit.Attoparsec - -fromDouble = realFracToDecimal 10 +import ATrade.QuoteSource.Server data PipeReaderHandle = PipeReaderHandle { @@ -82,18 +80,18 @@ line2TickConduit = do m <- liftIO $ readIORef volumeMap case M.lookup tickerId m of Just vol -> - if | vol < voltoday -> yieldTick tickerId Price ts (fromDouble last) (voltoday - vol) - | vol > voltoday -> yieldTick tickerId Price ts (fromDouble last) vol + if | vol < voltoday -> yieldTick tickerId LastTradePrice ts (fromDouble last) (voltoday - vol) + | vol > voltoday -> yieldTick tickerId LastTradePrice ts (fromDouble last) vol | otherwise -> return () - Nothing -> yieldTick tickerId Price ts (fromDouble last) 1 + Nothing -> yieldTick tickerId LastTradePrice ts (fromDouble last) 1 liftIO $ atomicModifyIORef' volumeMap (\m -> (M.insert tickerId voltoday m, ())) AllTradeLine tickerId flags price volume ts -> do liftIO $ writeIORef lastTimestamp ts if - | flags == 1 -> yieldTick tickerId Price ts (fromDouble price) (-volume) - | flags == 2 -> yieldTick tickerId Price ts (fromDouble price) volume + | flags == 1 -> yieldTick tickerId LastTradePrice ts (fromDouble price) (-volume) + | flags == 2 -> yieldTick tickerId LastTradePrice ts (fromDouble price) volume | otherwise -> return () liftIO $ atomicModifyIORef' ignoreCPSet (\s -> (HS.insert tickerId s, ())) @@ -105,10 +103,10 @@ line2TickConduit = do value = val, volume = vol } -chanSink :: BoundedChan a -> Sink a IO () -chanSink chan = awaitForever (\t -> liftIO $ writeChan chan t) +chanSink :: BoundedChan QuoteSourceServerData -> Sink Tick IO () +chanSink chan = awaitForever (\t -> liftIO $ writeChan chan (QSSTick t)) -startPipeReader :: T.Text -> BoundedChan Tick -> IO PipeReaderHandle +startPipeReader :: T.Text -> BoundedChan QuoteSourceServerData -> IO PipeReaderHandle startPipeReader pipeName tickChan = do f <- createFile (T.unpack pipeName) gENERIC_READ 0 Nothing oPEN_EXISTING 0 Nothing when (f == iNVALID_HANDLE_VALUE) $ error $ "Unable to open pipe: " ++ T.unpack pipeName diff --git a/src/QuoteSource/TableParsers/AllParamsTableParser.hs b/src/QuoteSource/TableParsers/AllParamsTableParser.hs index 09d6729..b4c8ad2 100644 --- a/src/QuoteSource/TableParsers/AllParamsTableParser.hs +++ b/src/QuoteSource/TableParsers/AllParamsTableParser.hs @@ -7,10 +7,9 @@ module QuoteSource.TableParsers.AllParamsTableParser ( import qualified Data.Map.Strict as M import QuoteSource.TableParser -import ATrade.Types +import ATrade.Types as AT import System.Win32.XlParser import Data.Tuple -import Data.Decimal import Control.Monad.State.Strict import Control.DeepSeq import Data.Time.Clock @@ -43,7 +42,7 @@ columnCodes = M.fromList [ columnToDataType :: TableColumn -> DataType columnToDataType x - | x == CPrice = Price + | x == CPrice = LastTradePrice | x == CBestBid = BestBid | x == CBestAsk = BestOffer | x == CTotalSupply = TotalSupply @@ -106,7 +105,7 @@ parseWithSchema sch (width, height, cells) = do security = force $ securityName classCode ticker, datatype = columnToDataType columnType, timestamp = ts, - value = force $ realFracToDecimal 10 value, + value = fromDouble value, volume = 0 } _ -> return Nothing @@ -121,9 +120,9 @@ parseWithSchema sch (width, height, cells) = do ts <- gets timestampHint return $ Just Tick { security = force $ securityName classCode ticker, - datatype = Price, + datatype = LastTradePrice, timestamp = ts, - value = force $ realFracToDecimal 10 value, + value = fromDouble value, volume = tickVolume} else return Nothing