Browse Source

Telegram support

master
Denis Tereshkin 9 years ago
parent
commit
4b5db76ca9
  1. 16
      app/Main.hs
  2. 10
      quik-connector.cabal
  3. 42
      src/Broker/QuikBroker.hs
  4. 23
      src/Broker/QuikBroker/Trans2QuikApi.hs
  5. 35
      src/Network/Telegram.hs
  6. 4
      stack.yaml

16
app/Main.hs

@ -37,6 +37,8 @@ import qualified Data.Text as T
import Control.Monad.Trans.Except import Control.Monad.Trans.Except
import Broker.QuikBroker.Trans2QuikApi import Broker.QuikBroker.Trans2QuikApi
import Network.Telegram
data TableConfig = TableConfig { data TableConfig = TableConfig {
parserId :: String, parserId :: String,
tableName :: String, tableName :: String,
@ -50,7 +52,9 @@ data Config = Config {
quikPath :: String, quikPath :: String,
dllPath :: String, dllPath :: String,
quikAccounts :: [T.Text], quikAccounts :: [T.Text],
tradeSink :: T.Text tradeSink :: T.Text,
telegramToken :: T.Text,
telegramChatId :: T.Text
} deriving (Show) } deriving (Show)
readConfig :: String -> IO Config readConfig :: String -> IO Config
@ -70,6 +74,8 @@ parseConfig = withObject "object" $ \obj -> do
qp <- obj .: "quik-path" qp <- obj .: "quik-path"
dp <- obj .: "dll-path" dp <- obj .: "dll-path"
trsink <- obj .: "trade-sink" trsink <- obj .: "trade-sink"
tgToken <- obj .: "telegram-token"
tgChatId <- obj .: "telegram-chatid"
accs <- V.toList <$> obj .: "accounts" accs <- V.toList <$> obj .: "accounts"
return Config { quotesourceEndpoint = qse, return Config { quotesourceEndpoint = qse,
brokerserverEndpoint = bse, brokerserverEndpoint = bse,
@ -77,7 +83,9 @@ parseConfig = withObject "object" $ \obj -> do
quikPath = qp, quikPath = qp,
dllPath = dp, dllPath = dp,
quikAccounts = fmap T.pack accs, quikAccounts = fmap T.pack accs,
tradeSink = trsink } tradeSink = trsink,
telegramToken = tgToken,
telegramChatId = tgChatId }
where where
parseTables :: Value -> Parser [TableConfig] parseTables :: Value -> Parser [TableConfig]
parseTables = withArray "array" $ \arr -> mapM parseTableConfig (V.toList arr) parseTables = withArray "array" $ \arr -> mapM parseTableConfig (V.toList arr)
@ -121,7 +129,9 @@ main = do
(forkId, c1, c2) <- forkBoundedChan 1000 chan (forkId, c1, c2) <- forkBoundedChan 1000 chan
broker <- mkPaperBroker c1 1000000 ["demo"] broker <- mkPaperBroker c1 1000000 ["demo"]
eitherBrokerQ <- runExceptT $ mkQuikBroker (dllPath config) (quikPath config) (quikAccounts config) eitherBrokerQ <- runExceptT $ mkQuikBroker (dllPath config) (quikPath config) (quikAccounts config) (Just (telegramToken config, telegramChatId config))
tgCtx <- mkTelegramContext (telegramToken config)
sendMessage tgCtx (telegramChatId config) "Goldmine-Quik connector started"
case eitherBrokerQ of case eitherBrokerQ of
Left errmsg -> warningM "main" $ "Can't load quik broker: " ++ T.unpack errmsg Left errmsg -> warningM "main" $ "Can't load quik broker: " ++ T.unpack errmsg
Right brokerQ -> Right brokerQ ->

10
quik-connector.cabal

@ -21,6 +21,7 @@ library
, Broker.PaperBroker , Broker.PaperBroker
, Broker.QuikBroker , Broker.QuikBroker
, Broker.QuikBroker.Trans2QuikApi , Broker.QuikBroker.Trans2QuikApi
, Network.Telegram
ghc-options: -Wincomplete-patterns ghc-options: -Wincomplete-patterns
build-depends: base >= 4.7 && < 5 build-depends: base >= 4.7 && < 5
, Win32 , Win32
@ -54,6 +55,11 @@ library
, conduit , conduit
, stm , stm
, stm-conduit , stm-conduit
, http-client
, http-client-tls
, utf8-string
, connection
, text-format
default-language: Haskell2010 default-language: Haskell2010
extra-libraries: "user32" extra-libraries: "user32"
other-modules: System.Win32.XlParser other-modules: System.Win32.XlParser
@ -79,6 +85,10 @@ executable quik-connector-exe
, libatrade , libatrade
, transformers , transformers
, stm , stm
, http-client
, http-client-tls
, utf8-string
, connection
default-language: Haskell2010 default-language: Haskell2010
extra-libraries: "user32" extra-libraries: "user32"

42
src/Broker/QuikBroker.hs

@ -18,11 +18,18 @@ import qualified Data.List as L
import qualified Data.Map as M import qualified Data.Map as M
import qualified Data.Bimap as BM import qualified Data.Bimap as BM
import qualified Data.Text as T import qualified Data.Text as T
import qualified Data.Text.Lazy as TL
import Data.Text.Format
import Control.Monad
import Control.Concurrent
import Control.Concurrent.BoundedChan
import Control.Monad.Trans.Except import Control.Monad.Trans.Except
import Control.Monad.IO.Class import Control.Monad.IO.Class
import System.Log.Logger import System.Log.Logger
import Network.Telegram
import Safe import Safe
type QuikOrderId = Integer type QuikOrderId = Integer
@ -33,7 +40,9 @@ data QuikBrokerState = QuikBrokerState {
orderMap :: M.Map OrderId Order, orderMap :: M.Map OrderId Order,
orderIdMap :: BM.Bimap QuikOrderId OrderId, orderIdMap :: BM.Bimap QuikOrderId OrderId,
trans2orderid :: M.Map Integer Order, trans2orderid :: M.Map Integer Order,
transIdCounter :: Integer transIdCounter :: Integer,
messageChan :: BoundedChan T.Text,
messageTid :: Maybe ThreadId
} }
nextTransId state = atomicModifyIORef' state (\s -> (s { transIdCounter = transIdCounter s + 1 }, transIdCounter s)) nextTransId state = atomicModifyIORef' state (\s -> (s { transIdCounter = transIdCounter s + 1 }, transIdCounter s))
@ -44,17 +53,36 @@ maybeCall proj state arg = do
Just callback -> callback arg Just callback -> callback arg
Nothing -> return () Nothing -> return ()
mkQuikBroker :: FilePath -> FilePath -> [T.Text] -> ExceptT T.Text IO BrokerInterface messageThread tgCtx chatId msgChan = forever $ do
mkQuikBroker dllPath quikPath accs = do maybeMsg <- tryReadChan msgChan
case maybeMsg of
Just msg -> do
sendMessage tgCtx chatId msg
warningM "Quik.Telegram" $ "Telegram message sent: " ++ T.unpack msg
Nothing -> threadDelay 500000
mkQuikBroker :: FilePath -> FilePath -> [T.Text] -> Maybe (T.Text, T.Text) -> ExceptT T.Text IO BrokerInterface
mkQuikBroker dllPath quikPath accs tgParams = do
q <- mkQuik dllPath quikPath q <- mkQuik dllPath quikPath
msgChan <- liftIO $ newBoundedChan 100
msgTid <- liftIO $ case tgParams of
Nothing -> return Nothing
Just (tgToken, chatId) -> do
tgCtx <- mkTelegramContext tgToken
tid <- forkIO $ messageThread tgCtx chatId msgChan
return $ Just tid
state <- liftIO $ newIORef QuikBrokerState { state <- liftIO $ newIORef QuikBrokerState {
notificationCallback = Nothing, notificationCallback = Nothing,
quik = q, quik = q,
orderMap = M.empty, orderMap = M.empty,
orderIdMap = BM.empty, orderIdMap = BM.empty,
trans2orderid = M.empty, trans2orderid = M.empty,
transIdCounter = 1 transIdCounter = 1,
messageChan = msgChan,
messageTid = msgTid
} }
setCallbacks q (qbTransactionCallback state) (qbOrderCallback state) (qbTradeCallback state) setCallbacks q (qbTransactionCallback state) (qbOrderCallback state) (qbTradeCallback state)
@ -78,6 +106,7 @@ qbSubmitOrder state order = do
case makeTransactionString transId order of case makeTransactionString transId order of
Just transStr -> do Just transStr -> do
rc <- quikSendTransaction q transStr rc <- quikSendTransaction q transStr
debugM "Quik" $ "Sending transaction string: " ++ transStr
case rc of case rc of
Left errmsg -> warningM "Quik" $ "Unable to send transaction: " ++ T.unpack errmsg Left errmsg -> warningM "Quik" $ "Unable to send transaction: " ++ T.unpack errmsg
Right _ -> debugM "Quik" $ "Order submitted: " ++ show order Right _ -> debugM "Quik" $ "Order submitted: " ++ show order
@ -193,7 +222,10 @@ qbTradeCallback state quiktrade = do
idMap <- orderIdMap <$> readIORef state idMap <- orderIdMap <$> readIORef state
debugM "Quik" $ "Trade: " ++ show quiktrade debugM "Quik" $ "Trade: " ++ show quiktrade
case BM.lookup (qtOrderId quiktrade) idMap >>= flip M.lookup orders of case BM.lookup (qtOrderId quiktrade) idMap >>= flip M.lookup orders of
Just order -> maybeCall notificationCallback state (TradeNotification $ tradeFor order) Just order -> do
msgChan <- messageChan <$> readIORef state
tryWriteChan msgChan $ TL.toStrict $ format "Trade: {} of {} at {} for account {}" (show (tradeOperation (tradeFor order)), orderSecurity order, qtPrice quiktrade, orderAccountId order)
maybeCall notificationCallback state (TradeNotification $ tradeFor order)
Nothing -> warningM "Quik" $ "Incoming trade for unknown order: " ++ show quiktrade Nothing -> warningM "Quik" $ "Incoming trade for unknown order: " ++ show quiktrade
where where
tradeFor order = Trade { tradeFor order = Trade {

23
src/Broker/QuikBroker/Trans2QuikApi.hs

@ -442,6 +442,7 @@ defaultConnectionCb state event errorCode infoMessage
defaultTransactionReplyCb :: IORef Quik -> LONG -> LONG -> LONG -> DWORD -> CLLong -> LPSTR -> CIntPtr -> IO () defaultTransactionReplyCb :: IORef Quik -> LONG -> LONG -> LONG -> DWORD -> CLLong -> LPSTR -> CIntPtr -> IO ()
defaultTransactionReplyCb state transactionResult errorCode replyCode transId orderNum replyMessage replyDesc = do defaultTransactionReplyCb state transactionResult errorCode replyCode transId orderNum replyMessage replyDesc = do
debugM "Quik" $ "Transaction cb:" ++ show transactionResult ++ "/" ++ show errorCode ++ "/" ++ show replyCode
maybecb <- hlTransactionCallback <$> readIORef state maybecb <- hlTransactionCallback <$> readIORef state
case maybecb of case maybecb of
Just cb -> cb (transactionResult == ecSuccess) (toInteger transId) (toInteger orderNum) Just cb -> cb (transactionResult == ecSuccess) (toInteger transId) (toInteger orderNum)
@ -449,6 +450,7 @@ defaultTransactionReplyCb state transactionResult errorCode replyCode transId or
defaultOrderCb :: IORef Quik -> LONG -> DWORD -> CLLong -> LPSTR -> LPSTR -> CDouble -> CLLong -> CDouble -> LONG -> LONG -> CIntPtr -> IO () defaultOrderCb :: IORef Quik -> LONG -> DWORD -> CLLong -> LPSTR -> LPSTR -> CDouble -> CLLong -> CDouble -> LONG -> LONG -> CIntPtr -> IO ()
defaultOrderCb state mode transId dnumber classCode secCode price balance value sell status desc = do defaultOrderCb state mode transId dnumber classCode secCode price balance value sell status desc = do
debugM "Quik" $ "Trade cb: " ++ show mode ++ "/" ++ show dnumber ++ "/" ++ show transId
orders <- handledOrders <$> readIORef state orders <- handledOrders <$> readIORef state
when (mode == 0) $ do when (mode == 0) $ do
maybecb <- hlOrderCallback <$> readIORef state maybecb <- hlOrderCallback <$> readIORef state
@ -474,6 +476,7 @@ defaultOrderCb state mode transId dnumber classCode secCode price balance value
defaultTradeCb :: IORef Quik -> LONG -> CLLong -> CLLong -> LPSTR -> LPSTR -> CDouble -> CLLong -> CDouble -> LONG -> CIntPtr -> IO () defaultTradeCb :: IORef Quik -> LONG -> CLLong -> CLLong -> LPSTR -> LPSTR -> CDouble -> CLLong -> CDouble -> LONG -> CIntPtr -> IO ()
defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sell desc = do defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sell desc = do
debugM "Quik" $ "Trade cb: " ++ show mode ++ "/" ++ show dnumber
trades <- handledTrades <$> readIORef state trades <- handledTrades <$> readIORef state
when (mode == 0 && dnumber `S.notMember` trades) $ do when (mode == 0 && dnumber `S.notMember` trades) $ do
atomicModifyIORef' state (\s -> (s { handledTrades = S.insert dnumber (handledTrades s) }, ())) atomicModifyIORef' state (\s -> (s { handledTrades = S.insert dnumber (handledTrades s) }, ()))
@ -520,13 +523,6 @@ watchdog quikpath state = do
alloca (\errorCode -> alloca (\errorCode ->
allocaBytes 1024 (\errorMsg -> do allocaBytes 1024 (\errorMsg -> do
withCString "" (\emptyStr -> do
(runExceptT $ do
throwIfErr $ setTransactionsReplyCallback api transcb errorCode errorMsg 1024
throwIfErr $ subscribeOrders api emptyStr emptyStr
liftIO $ startOrders api orcb
throwIfErr $ subscribeTrades api emptyStr emptyStr
liftIO $ startTrades api tradecb))
err <- setConnectionStatusCallback api conncb errorCode errorMsg 1024 err <- setConnectionStatusCallback api conncb errorCode errorMsg 1024
if err /= ecSuccess if err /= ecSuccess
@ -536,7 +532,18 @@ watchdog quikpath state = do
when (conn == ecDllNotConnected) $ when (conn == ecDllNotConnected) $
withCString quikpath (\path -> do withCString quikpath (\path -> do
err <- connect api path errorCode errorMsg 1024 err <- connect api path errorCode errorMsg 1024
when (err /= ecSuccess) $ warningM "Quik.Watchdog" $ "Unable to connect: " ++ show err) if err /= ecSuccess
then warningM "Quik.Watchdog" $ "Unable to connect: " ++ show err
else withCString "" (\emptyStr -> do
res <- (runExceptT $ do
throwIfErr $ setTransactionsReplyCallback api transcb errorCode errorMsg 1024
throwIfErr $ subscribeOrders api emptyStr emptyStr
liftIO $ startOrders api orcb
throwIfErr $ subscribeTrades api emptyStr emptyStr
liftIO $ startTrades api tradecb)
case res of
Left err -> warningM "Quik.Watchdog" $ "Unable to set callbacks: " ++ show err
Right _ -> debugM "Quik.Watchdog" "Callbacks are set"))
threadDelay 5000000)) threadDelay 5000000))
throwIfErr :: IO LONG -> ExceptT T.Text IO () throwIfErr :: IO LONG -> ExceptT T.Text IO ()

35
src/Network/Telegram.hs

@ -0,0 +1,35 @@
{-# LANGUAGE OverloadedStrings #-}
module Network.Telegram
(
mkTelegramContext,
sendMessage
) where
import Control.Monad
import Network.Connection
import Network.HTTP.Client
import Network.HTTP.Client.TLS
import qualified Data.Text as T
import qualified Data.ByteString.UTF8 as BU8
import Data.Aeson
import Data.Aeson.Types
data TelegramContext = TelegramContext {
tgToken :: T.Text,
httpMan :: Manager
}
mkTelegramContext :: T.Text -> IO TelegramContext
mkTelegramContext token = do
man <- newManager (mkManagerSettings (TLSSettingsSimple { settingDisableCertificateValidation = True, settingDisableSession = False, settingUseServerName = False }) Nothing)
return TelegramContext { httpMan = man, tgToken = token }
sendMessage :: TelegramContext -> T.Text -> T.Text -> IO ()
sendMessage ctx chatId text = do
req <- parseUrl $ "https://api.telegram.org/bot" ++ (T.unpack $ tgToken ctx) ++ "/sendMessage"
void $ withResponse (req { method = "POST", requestHeaders = [("Content-Type", BU8.fromString "application/json")], requestBody = (RequestBodyLBS . encode) (object ["chat_id" .= chatId, "text" .= text]) }) (httpMan ctx) (\resp -> brConsume (responseBody resp))

4
stack.yaml

@ -15,7 +15,7 @@
# resolver: # resolver:
# name: custom-snapshot # name: custom-snapshot
# location: "./custom-snapshot.yaml" # location: "./custom-snapshot.yaml"
resolver: lts-7.0 resolver: lts-7.7
# User packages to be built. # User packages to be built.
# Various formats can be used as shown in the example below. # Various formats can be used as shown in the example below.
@ -40,7 +40,7 @@ packages:
- '../libatrade' - '../libatrade'
# Dependency packages to be pulled from upstream that are not in the resolver # Dependency packages to be pulled from upstream that are not in the resolver
# (e.g., acme-missiles-0.3) # (e.g., acme-missiles-0.3)
extra-deps: [ "datetime-0.3.1", "cond-0.4.1.1"] extra-deps: [ "datetime-0.3.1", "cond-0.4.1.1" ]
# Override default flag values for local packages and extra-deps # Override default flag values for local packages and extra-deps
flags: {} flags: {}

Loading…
Cancel
Save