From b766eeae61136e0d8780c98cc488d9dec838be40 Mon Sep 17 00:00:00 2001 From: Denis Tereshkin Date: Mon, 12 Dec 2016 11:33:54 +0700 Subject: [PATCH] Various tweaks --- app/Main.hs | 30 ++++++++++--------- src/Broker/PaperBroker.hs | 41 +++++++++++++------------- src/Broker/QuikBroker.hs | 6 ++-- src/Broker/QuikBroker/Trans2QuikApi.hs | 6 ++-- src/Network/Telegram.hs | 8 ++--- src/QuoteSource/DataImport.hs | 8 ++--- src/System/Win32/DDE.hs | 35 +++++++++++----------- 7 files changed, 67 insertions(+), 67 deletions(-) diff --git a/app/Main.hs b/app/Main.hs index 7f996b9..783b9ab 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -4,15 +4,13 @@ module Main where import System.IO import QuoteSource.DataImport -import Control.Concurrent hiding (readChan) +import Control.Concurrent hiding (readChan, writeChan) import Control.Monad import Control.Exception import Control.Monad.IO.Class import Data.IORef import Graphics.UI.Gtk hiding (Action, backspace) import Control.Concurrent.BoundedChan -import Control.Concurrent.STM -import Control.Concurrent.STM.TBQueue import ATrade.Types import QuoteSource.TableParsers.AllParamsTableParser import QuoteSource.TableParser @@ -40,6 +38,9 @@ import Control.Monad.Trans.Except import Broker.QuikBroker.Trans2QuikApi import Network.Telegram +import Network.Connection +import Network.HTTP.Client +import Network.HTTP.Client.TLS data TableConfig = TableConfig { parserId :: String, @@ -104,15 +105,14 @@ parseConfig = withObject "object" $ \obj -> do tableName = tn, tableParams = params } -forkBoundedChan :: Int -> TBQueue Tick -> IO (ThreadId, TBQueue Tick, TBQueue QuoteSourceServerData) +forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan QuoteSourceServerData) forkBoundedChan size source = do - sink <- atomically $ newTBQueue size - sinkQss <- atomically $ newTBQueue size + sink <- newBoundedChan size + sinkQss <- newBoundedChan size tid <- forkIO $ forever $ do - v <- atomically $ readTBQueue source - atomically $ do - writeTBQueue sink v - writeTBQueue sinkQss (QSSTick v) + v <- readChan source + writeChan sink v + writeChan sinkQss (QSSTick v) return (tid, sink, sinkQss) @@ -133,15 +133,17 @@ main = do config <- readConfig "quik-connector.config.json" infoM "main" "Config loaded" - chan <- atomically $ newTBQueue 1000 + chan <- newBoundedChan 10000 infoM "main" "Starting data import server" dis <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade" - (forkId, c1, c2) <- forkBoundedChan 1000 chan + (forkId, c1, c2) <- forkBoundedChan 10000 chan broker <- mkPaperBroker c1 1000000 ["demo"] - eitherBrokerQ <- runExceptT $ mkQuikBroker (dllPath config) (quikPath config) (quikAccounts config) (Just (telegramToken config, telegramChatId config)) - tgCtx <- mkTelegramContext (telegramToken config) + man <- newManager (mkManagerSettings (TLSSettingsSimple { settingDisableCertificateValidation = True, settingDisableSession = False, settingUseServerName = False }) Nothing) + infoM "main" "Http manager created" + eitherBrokerQ <- runExceptT $ mkQuikBroker man (dllPath config) (quikPath config) (quikAccounts config) (Just (telegramToken config, telegramChatId config)) + tgCtx <- mkTelegramContext man (telegramToken config) sendMessage tgCtx (telegramChatId config) "Goldmine-Quik connector started" case eitherBrokerQ of Left errmsg -> warningM "main" $ "Can't load quik broker: " ++ T.unpack errmsg diff --git a/src/Broker/PaperBroker.hs b/src/Broker/PaperBroker.hs index ee1aac0..b0b21e8 100644 --- a/src/Broker/PaperBroker.hs +++ b/src/Broker/PaperBroker.hs @@ -10,8 +10,6 @@ module Broker.PaperBroker ( import Control.DeepSeq import Data.Hashable import Data.Bits -import Control.Concurrent.STM -import Control.Concurrent.STM.TBQueue import ATrade.Types import Data.IORef import qualified Data.HashMap.Strict as M @@ -21,6 +19,7 @@ import ATrade.Broker.Server import Data.Time.Clock import Data.Decimal import Control.Monad +import Control.Concurrent.BoundedChan import Control.Concurrent hiding (readChan) import System.Log.Logger @@ -38,9 +37,9 @@ data PaperBrokerState = PaperBrokerState { notificationCallback :: Maybe (Notification -> IO ()) } -mkPaperBroker :: TBQueue Tick -> Decimal -> [T.Text] -> IO BrokerInterface +mkPaperBroker :: BoundedChan Tick -> Decimal -> [T.Text] -> IO BrokerInterface mkPaperBroker tickChan startCash accounts = do - state <- atomically $ newTVar PaperBrokerState { + state <- newIORef PaperBrokerState { pbTid = Nothing, tickMap = M.empty, orders = M.empty, @@ -48,7 +47,7 @@ mkPaperBroker tickChan startCash accounts = do notificationCallback = Nothing } tid <- forkIO $ brokerThread tickChan state - atomically $ modifyTVar' state (\s -> s { pbTid = Just tid }) + atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ())) return BrokerInterface { accounts = accounts, @@ -57,18 +56,18 @@ mkPaperBroker tickChan startCash accounts = do cancelOrder = pbCancelOrder state, stopBroker = pbDestroyBroker state } -brokerThread :: TBQueue Tick -> TVar PaperBrokerState -> IO () -brokerThread chan state = forever $ atomically $ do - tick <- readTBQueue chan - modifyTVar' state (\s -> s { tickMap = M.insert (makeKey tick) tick $! tickMap s }) +brokerThread :: BoundedChan Tick -> IORef PaperBrokerState -> IO () +brokerThread chan state = forever $ do + tick <- readChan chan + atomicModifyIORef' state (\s -> (s { tickMap = M.insert (makeKey tick) tick $! tickMap s }, ())) where makeKey !tick = TickMapKey (security $! tick) (datatype tick) -pbSetNotificationCallback :: TVar PaperBrokerState -> Maybe (Notification -> IO ()) -> IO() -pbSetNotificationCallback state callback = atomically $ modifyTVar' state (\s -> s { notificationCallback = callback } ) +pbSetNotificationCallback :: IORef PaperBrokerState -> Maybe (Notification -> IO ()) -> IO() +pbSetNotificationCallback state callback = atomicModifyIORef' state (\s -> (s { notificationCallback = callback }, ()) ) -pbSubmitOrder :: TVar PaperBrokerState -> Order -> IO () +pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO () pbSubmitOrder state order = do infoM "PaperBroker" $ "Submitted order: " ++ show order case orderPrice order of @@ -79,14 +78,14 @@ pbSubmitOrder state order = do where executeMarketOrder state order = do - tm <- atomically $ tickMap <$> readTVar state + tm <- tickMap <$> readIORef state case M.lookup key tm of Nothing -> let newOrder = order { orderState = OrderError } in - atomically $ modifyTVar' state (\s -> s { orders = M.insert (orderId order) newOrder $ orders s }) + atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ())) Just tick -> let newOrder = order { orderState = Executed } tradeVolume = (realFracToDecimal 10 (fromIntegral $ orderQuantity order) * value tick) in do - atomically $ modifyTVar' state (\s -> s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume}) + atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume}, ())) debugM "PaperBroker" $ "Executed: " ++ show newOrder ts <- getCurrentTime maybeCall notificationCallback state $ TradeNotification $ mkTrade tick order ts @@ -102,7 +101,7 @@ pbSubmitOrder state order = do key = TickMapKey (orderSecurity order) (orderDatatype order) maybeCall proj state arg = do - cb <- atomically $ proj <$> readTVar state + cb <- proj <$> readIORef state case cb of Just callback -> callback arg Nothing -> return () @@ -121,16 +120,16 @@ pbSubmitOrder state order = do tradeSignalId = orderSignalId order } -pbCancelOrder :: TVar PaperBrokerState -> OrderId -> IO Bool +pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO Bool pbCancelOrder state order = undefined -pbDestroyBroker :: TVar PaperBrokerState -> IO () +pbDestroyBroker :: IORef PaperBrokerState -> IO () pbDestroyBroker state = do - maybeTid <- atomically $ pbTid <$> readTVar state + maybeTid <- pbTid <$> readIORef state case maybeTid of Just tid -> killThread tid Nothing -> return () -pbGetOrder :: TVar PaperBrokerState -> OrderId -> IO (Maybe Order) -pbGetOrder state oid = atomically $ M.lookup oid . orders <$> readTVar state +pbGetOrder :: IORef PaperBrokerState -> OrderId -> IO (Maybe Order) +pbGetOrder state oid = M.lookup oid . orders <$> readIORef state diff --git a/src/Broker/QuikBroker.hs b/src/Broker/QuikBroker.hs index 68db91b..16b0706 100644 --- a/src/Broker/QuikBroker.hs +++ b/src/Broker/QuikBroker.hs @@ -62,15 +62,15 @@ messageThread tgCtx chatId msgChan = forever $ do Nothing -> threadDelay 500000 -mkQuikBroker :: FilePath -> FilePath -> [T.Text] -> Maybe (T.Text, T.Text) -> ExceptT T.Text IO BrokerInterface -mkQuikBroker dllPath quikPath accs tgParams = do +mkQuikBroker :: Manager -> FilePath -> FilePath -> [T.Text] -> Maybe (T.Text, T.Text) -> ExceptT T.Text IO BrokerInterface +mkQuikBroker man dllPath quikPath accs tgParams = do q <- mkQuik dllPath quikPath msgChan <- liftIO $ newBoundedChan 100 msgTid <- liftIO $ case tgParams of Nothing -> return Nothing Just (tgToken, chatId) -> do - tgCtx <- mkTelegramContext tgToken + tgCtx <- mkTelegramContext man tgToken tid <- forkIO $ messageThread tgCtx chatId msgChan return $ Just tid diff --git a/src/Broker/QuikBroker/Trans2QuikApi.hs b/src/Broker/QuikBroker/Trans2QuikApi.hs index ebbffcb..b55f489 100644 --- a/src/Broker/QuikBroker/Trans2QuikApi.hs +++ b/src/Broker/QuikBroker/Trans2QuikApi.hs @@ -429,7 +429,7 @@ mkQuik dllpath quikpath = do orderCallback = orcb', tradeCallback = tradecb' }, ()))) - tid <- liftIO (forkIO $ watchdog quikpath state) + tid <- liftIO (forkOS $ watchdog quikpath state) liftIO $ atomicModifyIORef' state (\s -> (s { watchdogTid = tid }, ())) liftIO $ debugM "Quik" "mkQuik done" return state @@ -522,7 +522,7 @@ watchdog quikpath state = do tradecb <- tradeCallback <$> readIORef state alloca (\errorCode -> - allocaBytes 1024 (\errorMsg -> do + allocaBytes 2048 (\errorMsg -> do err <- setConnectionStatusCallback api conncb errorCode errorMsg 1024 if err /= ecSuccess @@ -544,7 +544,7 @@ watchdog quikpath state = do case res of Left err -> warningM "Quik.Watchdog" $ "Unable to set callbacks: " ++ show err Right _ -> debugM "Quik.Watchdog" "Callbacks are set")) - threadDelay 5000000)) + threadDelay 1000)) throwIfErr :: IO LONG -> ExceptT T.Text IO () throwIfErr action = do diff --git a/src/Network/Telegram.hs b/src/Network/Telegram.hs index d0e9c83..3f333b4 100644 --- a/src/Network/Telegram.hs +++ b/src/Network/Telegram.hs @@ -3,7 +3,8 @@ module Network.Telegram ( mkTelegramContext, - sendMessage + sendMessage, + Manager ) where import Control.Monad @@ -22,9 +23,8 @@ data TelegramContext = TelegramContext { httpMan :: Manager } -mkTelegramContext :: T.Text -> IO TelegramContext -mkTelegramContext token = do - man <- newManager (mkManagerSettings (TLSSettingsSimple { settingDisableCertificateValidation = True, settingDisableSession = False, settingUseServerName = False }) Nothing) +mkTelegramContext :: Manager -> T.Text -> IO TelegramContext +mkTelegramContext man token = do return TelegramContext { httpMan = man, tgToken = token } diff --git a/src/QuoteSource/DataImport.hs b/src/QuoteSource/DataImport.hs index 2a56239..d499e5b 100644 --- a/src/QuoteSource/DataImport.hs +++ b/src/QuoteSource/DataImport.hs @@ -7,8 +7,6 @@ module QuoteSource.DataImport ) where import Control.Concurrent.BoundedChan -import Control.Concurrent.STM -import Control.Concurrent.STM.TBQueue import Control.Monad.State.Strict import ATrade.Types import Data.IORef @@ -22,7 +20,7 @@ import qualified Data.Map as M data ServerState = ServerState { appName :: String, parsers :: IORef (M.Map String TableParserInstance), - tickChannel :: TBQueue Tick + tickChannel :: BoundedChan Tick } ddeCallback :: ServerState -> String -> (Int, Int, [XlData]) -> IO Bool @@ -34,12 +32,12 @@ ddeCallback state topic table = do let stateWithTimeHint = giveTimestampHint myParser timeHint let (ticks, newState) = runState (parseXlTable table) stateWithTimeHint modifyIORef' (parsers state) (\s -> newState `seq` s `seq` M.insert topic (MkTableParser newState) s) - mapM_ (atomically . writeTBQueue (tickChannel state)) ticks + mapM_ (writeChan (tickChannel state)) ticks return True _ -> return False -initDataImportServer :: [TableParserInstance] -> TBQueue Tick -> String -> IO (ServerState, IORef DdeState) +initDataImportServer :: [TableParserInstance] -> BoundedChan Tick -> String -> IO (ServerState, IORef DdeState) initDataImportServer parsers tickChan applicationName = do parsers <- newIORef $ M.fromList $ map (\(MkTableParser p) -> (getTableId p, MkTableParser p)) parsers let s = ServerState { appName = applicationName, parsers = parsers, tickChannel = tickChan } diff --git a/src/System/Win32/DDE.hs b/src/System/Win32/DDE.hs index 280ff7b..760cc4b 100644 --- a/src/System/Win32/DDE.hs +++ b/src/System/Win32/DDE.hs @@ -31,7 +31,7 @@ import Control.Monad import Data.Bits import Data.Binary.Get import Data.Typeable -import Data.ByteString hiding (map) +import Data.ByteString hiding (map, putStrLn) import Data.IORef import System.Win32.XlParser import System.Win32.DLL @@ -65,31 +65,31 @@ ddeCpWinAnsi = 1004 instance Exception DdeException -foreign import WINDOWS_CCONV unsafe "windows.h DdeInitializeW" +foreign import WINDOWS_CCONV "windows.h DdeInitializeW" ddeInitialize :: LPDWORD -> FunPtr DdeCallback -> DWORD -> DWORD -> IO CUInt -foreign import WINDOWS_CCONV unsafe "windows.h DdeUninitialize" +foreign import WINDOWS_CCONV "windows.h DdeUninitialize" ddeUninitialize :: DWORD -> IO BOOL -foreign import WINDOWS_CCONV unsafe "windows.h DdeCreateStringHandleW" +foreign import WINDOWS_CCONV "windows.h DdeCreateStringHandleW" ddeCreateStringHandle :: DWORD -> LPSTR -> CInt -> IO HANDLE -foreign import WINDOWS_CCONV unsafe "windows.h DdeFreeStringHandleW" +foreign import WINDOWS_CCONV "windows.h DdeFreeStringHandleW" ddeFreeStringHandle :: DWORD -> LPSTR -> IO HANDLE -foreign import WINDOWS_CCONV unsafe "windows.h DdeNameService" +foreign import WINDOWS_CCONV "windows.h DdeNameService" ddeNameService :: DWORD -> HANDLE -> HANDLE -> CInt -> IO HANDLE -foreign import WINDOWS_CCONV unsafe "windows.h DdeCmpStringHandles" +foreign import WINDOWS_CCONV "windows.h DdeCmpStringHandles" ddeCmpStringHandles :: HANDLE -> HANDLE -> IO CInt -foreign import WINDOWS_CCONV unsafe "windows.h DdeQueryStringW" +foreign import WINDOWS_CCONV "windows.h DdeQueryStringW" ddeQueryString :: DWORD -> HANDLE -> CString -> DWORD -> CInt -> IO DWORD -foreign import WINDOWS_CCONV unsafe "windows.h DdeAccessData" +foreign import WINDOWS_CCONV "windows.h DdeAccessData" ddeAccessData :: HANDLE -> LPDWORD -> IO (Ptr CUChar) -foreign import WINDOWS_CCONV unsafe "windows.h DdeUnaccessData" +foreign import WINDOWS_CCONV "windows.h DdeUnaccessData" ddeUnaccessData :: HANDLE -> IO () foreign import WINDOWS_CCONV "wrapper" @@ -133,13 +133,14 @@ ddeCallback state msgType format hConv hsz1 hsz2 hData dwData1 dwData2 maybeTopic <- queryString myDdeState 256 hsz1 case maybeTopic of Nothing -> return ddeResultFalse - Just topic -> withDdeData hData (\xlData -> case runGetOrFail xlParser $ BL.fromStrict xlData of - Left (_, _, errmsg) -> return ddeResultFalse - Right (_, _, table) -> do - rc <- (dataCallback myDdeState) topic table - return $ if rc - then ddeResultAck - else ddeResultFalse ) + Just topic -> withDdeData hData (\xlData -> do + case runGetOrFail xlParser $ BL.fromStrict xlData of + Left (_, _, errmsg) -> return ddeResultFalse + Right (_, _, table) -> do + rc <- (dataCallback myDdeState) topic table + return $ if rc + then ddeResultAck + else ddeResultFalse ) initializeDde :: String -> String -> DdeDataCallback -> IO (IORef DdeState) initializeDde appName topic callback = alloca (\instancePtr -> do