Browse Source

Various tweaks

master
Denis Tereshkin 9 years ago
parent
commit
b766eeae61
  1. 30
      app/Main.hs
  2. 41
      src/Broker/PaperBroker.hs
  3. 6
      src/Broker/QuikBroker.hs
  4. 6
      src/Broker/QuikBroker/Trans2QuikApi.hs
  5. 8
      src/Network/Telegram.hs
  6. 8
      src/QuoteSource/DataImport.hs
  7. 23
      src/System/Win32/DDE.hs

30
app/Main.hs

@ -4,15 +4,13 @@ module Main where @@ -4,15 +4,13 @@ module Main where
import System.IO
import QuoteSource.DataImport
import Control.Concurrent hiding (readChan)
import Control.Concurrent hiding (readChan, writeChan)
import Control.Monad
import Control.Exception
import Control.Monad.IO.Class
import Data.IORef
import Graphics.UI.Gtk hiding (Action, backspace)
import Control.Concurrent.BoundedChan
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import ATrade.Types
import QuoteSource.TableParsers.AllParamsTableParser
import QuoteSource.TableParser
@ -40,6 +38,9 @@ import Control.Monad.Trans.Except @@ -40,6 +38,9 @@ import Control.Monad.Trans.Except
import Broker.QuikBroker.Trans2QuikApi
import Network.Telegram
import Network.Connection
import Network.HTTP.Client
import Network.HTTP.Client.TLS
data TableConfig = TableConfig {
parserId :: String,
@ -104,15 +105,14 @@ parseConfig = withObject "object" $ \obj -> do @@ -104,15 +105,14 @@ parseConfig = withObject "object" $ \obj -> do
tableName = tn,
tableParams = params }
forkBoundedChan :: Int -> TBQueue Tick -> IO (ThreadId, TBQueue Tick, TBQueue QuoteSourceServerData)
forkBoundedChan :: Int -> BoundedChan Tick -> IO (ThreadId, BoundedChan Tick, BoundedChan QuoteSourceServerData)
forkBoundedChan size source = do
sink <- atomically $ newTBQueue size
sinkQss <- atomically $ newTBQueue size
sink <- newBoundedChan size
sinkQss <- newBoundedChan size
tid <- forkIO $ forever $ do
v <- atomically $ readTBQueue source
atomically $ do
writeTBQueue sink v
writeTBQueue sinkQss (QSSTick v)
v <- readChan source
writeChan sink v
writeChan sinkQss (QSSTick v)
return (tid, sink, sinkQss)
@ -133,15 +133,17 @@ main = do @@ -133,15 +133,17 @@ main = do
config <- readConfig "quik-connector.config.json"
infoM "main" "Config loaded"
chan <- atomically $ newTBQueue 1000
chan <- newBoundedChan 10000
infoM "main" "Starting data import server"
dis <- initDataImportServer [MkTableParser $ mkAllParamsTableParser "allparams"] chan "atrade"
(forkId, c1, c2) <- forkBoundedChan 1000 chan
(forkId, c1, c2) <- forkBoundedChan 10000 chan
broker <- mkPaperBroker c1 1000000 ["demo"]
eitherBrokerQ <- runExceptT $ mkQuikBroker (dllPath config) (quikPath config) (quikAccounts config) (Just (telegramToken config, telegramChatId config))
tgCtx <- mkTelegramContext (telegramToken config)
man <- newManager (mkManagerSettings (TLSSettingsSimple { settingDisableCertificateValidation = True, settingDisableSession = False, settingUseServerName = False }) Nothing)
infoM "main" "Http manager created"
eitherBrokerQ <- runExceptT $ mkQuikBroker man (dllPath config) (quikPath config) (quikAccounts config) (Just (telegramToken config, telegramChatId config))
tgCtx <- mkTelegramContext man (telegramToken config)
sendMessage tgCtx (telegramChatId config) "Goldmine-Quik connector started"
case eitherBrokerQ of
Left errmsg -> warningM "main" $ "Can't load quik broker: " ++ T.unpack errmsg

41
src/Broker/PaperBroker.hs

@ -10,8 +10,6 @@ module Broker.PaperBroker ( @@ -10,8 +10,6 @@ module Broker.PaperBroker (
import Control.DeepSeq
import Data.Hashable
import Data.Bits
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import ATrade.Types
import Data.IORef
import qualified Data.HashMap.Strict as M
@ -21,6 +19,7 @@ import ATrade.Broker.Server @@ -21,6 +19,7 @@ import ATrade.Broker.Server
import Data.Time.Clock
import Data.Decimal
import Control.Monad
import Control.Concurrent.BoundedChan
import Control.Concurrent hiding (readChan)
import System.Log.Logger
@ -38,9 +37,9 @@ data PaperBrokerState = PaperBrokerState { @@ -38,9 +37,9 @@ data PaperBrokerState = PaperBrokerState {
notificationCallback :: Maybe (Notification -> IO ())
}
mkPaperBroker :: TBQueue Tick -> Decimal -> [T.Text] -> IO BrokerInterface
mkPaperBroker :: BoundedChan Tick -> Decimal -> [T.Text] -> IO BrokerInterface
mkPaperBroker tickChan startCash accounts = do
state <- atomically $ newTVar PaperBrokerState {
state <- newIORef PaperBrokerState {
pbTid = Nothing,
tickMap = M.empty,
orders = M.empty,
@ -48,7 +47,7 @@ mkPaperBroker tickChan startCash accounts = do @@ -48,7 +47,7 @@ mkPaperBroker tickChan startCash accounts = do
notificationCallback = Nothing }
tid <- forkIO $ brokerThread tickChan state
atomically $ modifyTVar' state (\s -> s { pbTid = Just tid })
atomicModifyIORef' state (\s -> (s { pbTid = Just tid }, ()))
return BrokerInterface {
accounts = accounts,
@ -57,18 +56,18 @@ mkPaperBroker tickChan startCash accounts = do @@ -57,18 +56,18 @@ mkPaperBroker tickChan startCash accounts = do
cancelOrder = pbCancelOrder state,
stopBroker = pbDestroyBroker state }
brokerThread :: TBQueue Tick -> TVar PaperBrokerState -> IO ()
brokerThread chan state = forever $ atomically $ do
tick <- readTBQueue chan
modifyTVar' state (\s -> s { tickMap = M.insert (makeKey tick) tick $! tickMap s })
brokerThread :: BoundedChan Tick -> IORef PaperBrokerState -> IO ()
brokerThread chan state = forever $ do
tick <- readChan chan
atomicModifyIORef' state (\s -> (s { tickMap = M.insert (makeKey tick) tick $! tickMap s }, ()))
where
makeKey !tick = TickMapKey (security $! tick) (datatype tick)
pbSetNotificationCallback :: TVar PaperBrokerState -> Maybe (Notification -> IO ()) -> IO()
pbSetNotificationCallback state callback = atomically $ modifyTVar' state (\s -> s { notificationCallback = callback } )
pbSetNotificationCallback :: IORef PaperBrokerState -> Maybe (Notification -> IO ()) -> IO()
pbSetNotificationCallback state callback = atomicModifyIORef' state (\s -> (s { notificationCallback = callback }, ()) )
pbSubmitOrder :: TVar PaperBrokerState -> Order -> IO ()
pbSubmitOrder :: IORef PaperBrokerState -> Order -> IO ()
pbSubmitOrder state order = do
infoM "PaperBroker" $ "Submitted order: " ++ show order
case orderPrice order of
@ -79,14 +78,14 @@ pbSubmitOrder state order = do @@ -79,14 +78,14 @@ pbSubmitOrder state order = do
where
executeMarketOrder state order = do
tm <- atomically $ tickMap <$> readTVar state
tm <- tickMap <$> readIORef state
case M.lookup key tm of
Nothing -> let newOrder = order { orderState = OrderError } in
atomically $ modifyTVar' state (\s -> s { orders = M.insert (orderId order) newOrder $ orders s })
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s }, ()))
Just tick -> let newOrder = order { orderState = Executed }
tradeVolume = (realFracToDecimal 10 (fromIntegral $ orderQuantity order) * value tick) in do
atomically $ modifyTVar' state (\s -> s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume})
atomicModifyIORef' state (\s -> (s { orders = M.insert (orderId order) newOrder $ orders s , cash = cash s - tradeVolume}, ()))
debugM "PaperBroker" $ "Executed: " ++ show newOrder
ts <- getCurrentTime
maybeCall notificationCallback state $ TradeNotification $ mkTrade tick order ts
@ -102,7 +101,7 @@ pbSubmitOrder state order = do @@ -102,7 +101,7 @@ pbSubmitOrder state order = do
key = TickMapKey (orderSecurity order) (orderDatatype order)
maybeCall proj state arg = do
cb <- atomically $ proj <$> readTVar state
cb <- proj <$> readIORef state
case cb of
Just callback -> callback arg
Nothing -> return ()
@ -121,16 +120,16 @@ pbSubmitOrder state order = do @@ -121,16 +120,16 @@ pbSubmitOrder state order = do
tradeSignalId = orderSignalId order }
pbCancelOrder :: TVar PaperBrokerState -> OrderId -> IO Bool
pbCancelOrder :: IORef PaperBrokerState -> OrderId -> IO Bool
pbCancelOrder state order = undefined
pbDestroyBroker :: TVar PaperBrokerState -> IO ()
pbDestroyBroker :: IORef PaperBrokerState -> IO ()
pbDestroyBroker state = do
maybeTid <- atomically $ pbTid <$> readTVar state
maybeTid <- pbTid <$> readIORef state
case maybeTid of
Just tid -> killThread tid
Nothing -> return ()
pbGetOrder :: TVar PaperBrokerState -> OrderId -> IO (Maybe Order)
pbGetOrder state oid = atomically $ M.lookup oid . orders <$> readTVar state
pbGetOrder :: IORef PaperBrokerState -> OrderId -> IO (Maybe Order)
pbGetOrder state oid = M.lookup oid . orders <$> readIORef state

6
src/Broker/QuikBroker.hs

@ -62,15 +62,15 @@ messageThread tgCtx chatId msgChan = forever $ do @@ -62,15 +62,15 @@ messageThread tgCtx chatId msgChan = forever $ do
Nothing -> threadDelay 500000
mkQuikBroker :: FilePath -> FilePath -> [T.Text] -> Maybe (T.Text, T.Text) -> ExceptT T.Text IO BrokerInterface
mkQuikBroker dllPath quikPath accs tgParams = do
mkQuikBroker :: Manager -> FilePath -> FilePath -> [T.Text] -> Maybe (T.Text, T.Text) -> ExceptT T.Text IO BrokerInterface
mkQuikBroker man dllPath quikPath accs tgParams = do
q <- mkQuik dllPath quikPath
msgChan <- liftIO $ newBoundedChan 100
msgTid <- liftIO $ case tgParams of
Nothing -> return Nothing
Just (tgToken, chatId) -> do
tgCtx <- mkTelegramContext tgToken
tgCtx <- mkTelegramContext man tgToken
tid <- forkIO $ messageThread tgCtx chatId msgChan
return $ Just tid

6
src/Broker/QuikBroker/Trans2QuikApi.hs

@ -429,7 +429,7 @@ mkQuik dllpath quikpath = do @@ -429,7 +429,7 @@ mkQuik dllpath quikpath = do
orderCallback = orcb',
tradeCallback = tradecb' }, ())))
tid <- liftIO (forkIO $ watchdog quikpath state)
tid <- liftIO (forkOS $ watchdog quikpath state)
liftIO $ atomicModifyIORef' state (\s -> (s { watchdogTid = tid }, ()))
liftIO $ debugM "Quik" "mkQuik done"
return state
@ -522,7 +522,7 @@ watchdog quikpath state = do @@ -522,7 +522,7 @@ watchdog quikpath state = do
tradecb <- tradeCallback <$> readIORef state
alloca (\errorCode ->
allocaBytes 1024 (\errorMsg -> do
allocaBytes 2048 (\errorMsg -> do
err <- setConnectionStatusCallback api conncb errorCode errorMsg 1024
if err /= ecSuccess
@ -544,7 +544,7 @@ watchdog quikpath state = do @@ -544,7 +544,7 @@ watchdog quikpath state = do
case res of
Left err -> warningM "Quik.Watchdog" $ "Unable to set callbacks: " ++ show err
Right _ -> debugM "Quik.Watchdog" "Callbacks are set"))
threadDelay 5000000))
threadDelay 1000))
throwIfErr :: IO LONG -> ExceptT T.Text IO ()
throwIfErr action = do

8
src/Network/Telegram.hs

@ -3,7 +3,8 @@ @@ -3,7 +3,8 @@
module Network.Telegram
(
mkTelegramContext,
sendMessage
sendMessage,
Manager
) where
import Control.Monad
@ -22,9 +23,8 @@ data TelegramContext = TelegramContext { @@ -22,9 +23,8 @@ data TelegramContext = TelegramContext {
httpMan :: Manager
}
mkTelegramContext :: T.Text -> IO TelegramContext
mkTelegramContext token = do
man <- newManager (mkManagerSettings (TLSSettingsSimple { settingDisableCertificateValidation = True, settingDisableSession = False, settingUseServerName = False }) Nothing)
mkTelegramContext :: Manager -> T.Text -> IO TelegramContext
mkTelegramContext man token = do
return TelegramContext { httpMan = man, tgToken = token }

8
src/QuoteSource/DataImport.hs

@ -7,8 +7,6 @@ module QuoteSource.DataImport @@ -7,8 +7,6 @@ module QuoteSource.DataImport
) where
import Control.Concurrent.BoundedChan
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Monad.State.Strict
import ATrade.Types
import Data.IORef
@ -22,7 +20,7 @@ import qualified Data.Map as M @@ -22,7 +20,7 @@ import qualified Data.Map as M
data ServerState = ServerState {
appName :: String,
parsers :: IORef (M.Map String TableParserInstance),
tickChannel :: TBQueue Tick
tickChannel :: BoundedChan Tick
}
ddeCallback :: ServerState -> String -> (Int, Int, [XlData]) -> IO Bool
@ -34,12 +32,12 @@ ddeCallback state topic table = do @@ -34,12 +32,12 @@ ddeCallback state topic table = do
let stateWithTimeHint = giveTimestampHint myParser timeHint
let (ticks, newState) = runState (parseXlTable table) stateWithTimeHint
modifyIORef' (parsers state) (\s -> newState `seq` s `seq` M.insert topic (MkTableParser newState) s)
mapM_ (atomically . writeTBQueue (tickChannel state)) ticks
mapM_ (writeChan (tickChannel state)) ticks
return True
_ -> return False
initDataImportServer :: [TableParserInstance] -> TBQueue Tick -> String -> IO (ServerState, IORef DdeState)
initDataImportServer :: [TableParserInstance] -> BoundedChan Tick -> String -> IO (ServerState, IORef DdeState)
initDataImportServer parsers tickChan applicationName = do
parsers <- newIORef $ M.fromList $ map (\(MkTableParser p) -> (getTableId p, MkTableParser p)) parsers
let s = ServerState { appName = applicationName, parsers = parsers, tickChannel = tickChan }

23
src/System/Win32/DDE.hs

@ -31,7 +31,7 @@ import Control.Monad @@ -31,7 +31,7 @@ import Control.Monad
import Data.Bits
import Data.Binary.Get
import Data.Typeable
import Data.ByteString hiding (map)
import Data.ByteString hiding (map, putStrLn)
import Data.IORef
import System.Win32.XlParser
import System.Win32.DLL
@ -65,31 +65,31 @@ ddeCpWinAnsi = 1004 @@ -65,31 +65,31 @@ ddeCpWinAnsi = 1004
instance Exception DdeException
foreign import WINDOWS_CCONV unsafe "windows.h DdeInitializeW"
foreign import WINDOWS_CCONV "windows.h DdeInitializeW"
ddeInitialize :: LPDWORD -> FunPtr DdeCallback -> DWORD -> DWORD -> IO CUInt
foreign import WINDOWS_CCONV unsafe "windows.h DdeUninitialize"
foreign import WINDOWS_CCONV "windows.h DdeUninitialize"
ddeUninitialize :: DWORD -> IO BOOL
foreign import WINDOWS_CCONV unsafe "windows.h DdeCreateStringHandleW"
foreign import WINDOWS_CCONV "windows.h DdeCreateStringHandleW"
ddeCreateStringHandle :: DWORD -> LPSTR -> CInt -> IO HANDLE
foreign import WINDOWS_CCONV unsafe "windows.h DdeFreeStringHandleW"
foreign import WINDOWS_CCONV "windows.h DdeFreeStringHandleW"
ddeFreeStringHandle :: DWORD -> LPSTR -> IO HANDLE
foreign import WINDOWS_CCONV unsafe "windows.h DdeNameService"
foreign import WINDOWS_CCONV "windows.h DdeNameService"
ddeNameService :: DWORD -> HANDLE -> HANDLE -> CInt -> IO HANDLE
foreign import WINDOWS_CCONV unsafe "windows.h DdeCmpStringHandles"
foreign import WINDOWS_CCONV "windows.h DdeCmpStringHandles"
ddeCmpStringHandles :: HANDLE -> HANDLE -> IO CInt
foreign import WINDOWS_CCONV unsafe "windows.h DdeQueryStringW"
foreign import WINDOWS_CCONV "windows.h DdeQueryStringW"
ddeQueryString :: DWORD -> HANDLE -> CString -> DWORD -> CInt -> IO DWORD
foreign import WINDOWS_CCONV unsafe "windows.h DdeAccessData"
foreign import WINDOWS_CCONV "windows.h DdeAccessData"
ddeAccessData :: HANDLE -> LPDWORD -> IO (Ptr CUChar)
foreign import WINDOWS_CCONV unsafe "windows.h DdeUnaccessData"
foreign import WINDOWS_CCONV "windows.h DdeUnaccessData"
ddeUnaccessData :: HANDLE -> IO ()
foreign import WINDOWS_CCONV "wrapper"
@ -133,7 +133,8 @@ ddeCallback state msgType format hConv hsz1 hsz2 hData dwData1 dwData2 @@ -133,7 +133,8 @@ ddeCallback state msgType format hConv hsz1 hsz2 hData dwData1 dwData2
maybeTopic <- queryString myDdeState 256 hsz1
case maybeTopic of
Nothing -> return ddeResultFalse
Just topic -> withDdeData hData (\xlData -> case runGetOrFail xlParser $ BL.fromStrict xlData of
Just topic -> withDdeData hData (\xlData -> do
case runGetOrFail xlParser $ BL.fromStrict xlData of
Left (_, _, errmsg) -> return ddeResultFalse
Right (_, _, table) -> do
rc <- (dataCallback myDdeState) topic table

Loading…
Cancel
Save