diff --git a/app/Main.hs b/app/Main.hs index 576ec49..bd7fb7c 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -37,6 +37,8 @@ import qualified Data.Text as T import Control.Monad.Trans.Except import Broker.QuikBroker.Trans2QuikApi +import Network.Telegram + data TableConfig = TableConfig { parserId :: String, tableName :: String, @@ -50,7 +52,9 @@ data Config = Config { quikPath :: String, dllPath :: String, quikAccounts :: [T.Text], - tradeSink :: T.Text + tradeSink :: T.Text, + telegramToken :: T.Text, + telegramChatId :: T.Text } deriving (Show) readConfig :: String -> IO Config @@ -70,6 +74,8 @@ parseConfig = withObject "object" $ \obj -> do qp <- obj .: "quik-path" dp <- obj .: "dll-path" trsink <- obj .: "trade-sink" + tgToken <- obj .: "telegram-token" + tgChatId <- obj .: "telegram-chatid" accs <- V.toList <$> obj .: "accounts" return Config { quotesourceEndpoint = qse, brokerserverEndpoint = bse, @@ -77,7 +83,9 @@ parseConfig = withObject "object" $ \obj -> do quikPath = qp, dllPath = dp, quikAccounts = fmap T.pack accs, - tradeSink = trsink } + tradeSink = trsink, + telegramToken = tgToken, + telegramChatId = tgChatId } where parseTables :: Value -> Parser [TableConfig] parseTables = withArray "array" $ \arr -> mapM parseTableConfig (V.toList arr) @@ -121,7 +129,9 @@ main = do (forkId, c1, c2) <- forkBoundedChan 1000 chan broker <- mkPaperBroker c1 1000000 ["demo"] - eitherBrokerQ <- runExceptT $ mkQuikBroker (dllPath config) (quikPath config) (quikAccounts config) + eitherBrokerQ <- runExceptT $ mkQuikBroker (dllPath config) (quikPath config) (quikAccounts config) (Just (telegramToken config, telegramChatId config)) + tgCtx <- mkTelegramContext (telegramToken config) + sendMessage tgCtx (telegramChatId config) "Goldmine-Quik connector started" case eitherBrokerQ of Left errmsg -> warningM "main" $ "Can't load quik broker: " ++ T.unpack errmsg Right brokerQ -> diff --git a/quik-connector.cabal b/quik-connector.cabal index 8d7f621..9f5f015 100644 --- a/quik-connector.cabal +++ b/quik-connector.cabal @@ -21,6 +21,7 @@ library , Broker.PaperBroker , Broker.QuikBroker , Broker.QuikBroker.Trans2QuikApi + , Network.Telegram ghc-options: -Wincomplete-patterns build-depends: base >= 4.7 && < 5 , Win32 @@ -54,6 +55,11 @@ library , conduit , stm , stm-conduit + , http-client + , http-client-tls + , utf8-string + , connection + , text-format default-language: Haskell2010 extra-libraries: "user32" other-modules: System.Win32.XlParser @@ -79,6 +85,10 @@ executable quik-connector-exe , libatrade , transformers , stm + , http-client + , http-client-tls + , utf8-string + , connection default-language: Haskell2010 extra-libraries: "user32" diff --git a/src/Broker/QuikBroker.hs b/src/Broker/QuikBroker.hs index 087cf5b..68db91b 100644 --- a/src/Broker/QuikBroker.hs +++ b/src/Broker/QuikBroker.hs @@ -18,11 +18,18 @@ import qualified Data.List as L import qualified Data.Map as M import qualified Data.Bimap as BM import qualified Data.Text as T +import qualified Data.Text.Lazy as TL +import Data.Text.Format +import Control.Monad +import Control.Concurrent +import Control.Concurrent.BoundedChan import Control.Monad.Trans.Except import Control.Monad.IO.Class import System.Log.Logger +import Network.Telegram + import Safe type QuikOrderId = Integer @@ -33,7 +40,9 @@ data QuikBrokerState = QuikBrokerState { orderMap :: M.Map OrderId Order, orderIdMap :: BM.Bimap QuikOrderId OrderId, trans2orderid :: M.Map Integer Order, - transIdCounter :: Integer + transIdCounter :: Integer, + messageChan :: BoundedChan T.Text, + messageTid :: Maybe ThreadId } nextTransId state = atomicModifyIORef' state (\s -> (s { transIdCounter = transIdCounter s + 1 }, transIdCounter s)) @@ -44,17 +53,36 @@ maybeCall proj state arg = do Just callback -> callback arg Nothing -> return () -mkQuikBroker :: FilePath -> FilePath -> [T.Text] -> ExceptT T.Text IO BrokerInterface -mkQuikBroker dllPath quikPath accs = do +messageThread tgCtx chatId msgChan = forever $ do + maybeMsg <- tryReadChan msgChan + case maybeMsg of + Just msg -> do + sendMessage tgCtx chatId msg + warningM "Quik.Telegram" $ "Telegram message sent: " ++ T.unpack msg + Nothing -> threadDelay 500000 + + +mkQuikBroker :: FilePath -> FilePath -> [T.Text] -> Maybe (T.Text, T.Text) -> ExceptT T.Text IO BrokerInterface +mkQuikBroker dllPath quikPath accs tgParams = do q <- mkQuik dllPath quikPath + msgChan <- liftIO $ newBoundedChan 100 + msgTid <- liftIO $ case tgParams of + Nothing -> return Nothing + Just (tgToken, chatId) -> do + tgCtx <- mkTelegramContext tgToken + tid <- forkIO $ messageThread tgCtx chatId msgChan + return $ Just tid + state <- liftIO $ newIORef QuikBrokerState { notificationCallback = Nothing, quik = q, orderMap = M.empty, orderIdMap = BM.empty, trans2orderid = M.empty, - transIdCounter = 1 + transIdCounter = 1, + messageChan = msgChan, + messageTid = msgTid } setCallbacks q (qbTransactionCallback state) (qbOrderCallback state) (qbTradeCallback state) @@ -78,6 +106,7 @@ qbSubmitOrder state order = do case makeTransactionString transId order of Just transStr -> do rc <- quikSendTransaction q transStr + debugM "Quik" $ "Sending transaction string: " ++ transStr case rc of Left errmsg -> warningM "Quik" $ "Unable to send transaction: " ++ T.unpack errmsg Right _ -> debugM "Quik" $ "Order submitted: " ++ show order @@ -193,7 +222,10 @@ qbTradeCallback state quiktrade = do idMap <- orderIdMap <$> readIORef state debugM "Quik" $ "Trade: " ++ show quiktrade case BM.lookup (qtOrderId quiktrade) idMap >>= flip M.lookup orders of - Just order -> maybeCall notificationCallback state (TradeNotification $ tradeFor order) + Just order -> do + msgChan <- messageChan <$> readIORef state + tryWriteChan msgChan $ TL.toStrict $ format "Trade: {} of {} at {} for account {}" (show (tradeOperation (tradeFor order)), orderSecurity order, qtPrice quiktrade, orderAccountId order) + maybeCall notificationCallback state (TradeNotification $ tradeFor order) Nothing -> warningM "Quik" $ "Incoming trade for unknown order: " ++ show quiktrade where tradeFor order = Trade { diff --git a/src/Broker/QuikBroker/Trans2QuikApi.hs b/src/Broker/QuikBroker/Trans2QuikApi.hs index b3eae1f..ebbffcb 100644 --- a/src/Broker/QuikBroker/Trans2QuikApi.hs +++ b/src/Broker/QuikBroker/Trans2QuikApi.hs @@ -442,6 +442,7 @@ defaultConnectionCb state event errorCode infoMessage defaultTransactionReplyCb :: IORef Quik -> LONG -> LONG -> LONG -> DWORD -> CLLong -> LPSTR -> CIntPtr -> IO () defaultTransactionReplyCb state transactionResult errorCode replyCode transId orderNum replyMessage replyDesc = do + debugM "Quik" $ "Transaction cb:" ++ show transactionResult ++ "/" ++ show errorCode ++ "/" ++ show replyCode maybecb <- hlTransactionCallback <$> readIORef state case maybecb of Just cb -> cb (transactionResult == ecSuccess) (toInteger transId) (toInteger orderNum) @@ -449,6 +450,7 @@ defaultTransactionReplyCb state transactionResult errorCode replyCode transId or defaultOrderCb :: IORef Quik -> LONG -> DWORD -> CLLong -> LPSTR -> LPSTR -> CDouble -> CLLong -> CDouble -> LONG -> LONG -> CIntPtr -> IO () defaultOrderCb state mode transId dnumber classCode secCode price balance value sell status desc = do + debugM "Quik" $ "Trade cb: " ++ show mode ++ "/" ++ show dnumber ++ "/" ++ show transId orders <- handledOrders <$> readIORef state when (mode == 0) $ do maybecb <- hlOrderCallback <$> readIORef state @@ -474,6 +476,7 @@ defaultOrderCb state mode transId dnumber classCode secCode price balance value defaultTradeCb :: IORef Quik -> LONG -> CLLong -> CLLong -> LPSTR -> LPSTR -> CDouble -> CLLong -> CDouble -> LONG -> CIntPtr -> IO () defaultTradeCb state mode dnumber orderNum classCode secCode price qty value sell desc = do + debugM "Quik" $ "Trade cb: " ++ show mode ++ "/" ++ show dnumber trades <- handledTrades <$> readIORef state when (mode == 0 && dnumber `S.notMember` trades) $ do atomicModifyIORef' state (\s -> (s { handledTrades = S.insert dnumber (handledTrades s) }, ())) @@ -520,13 +523,6 @@ watchdog quikpath state = do alloca (\errorCode -> allocaBytes 1024 (\errorMsg -> do - withCString "" (\emptyStr -> do - (runExceptT $ do - throwIfErr $ setTransactionsReplyCallback api transcb errorCode errorMsg 1024 - throwIfErr $ subscribeOrders api emptyStr emptyStr - liftIO $ startOrders api orcb - throwIfErr $ subscribeTrades api emptyStr emptyStr - liftIO $ startTrades api tradecb)) err <- setConnectionStatusCallback api conncb errorCode errorMsg 1024 if err /= ecSuccess @@ -536,7 +532,18 @@ watchdog quikpath state = do when (conn == ecDllNotConnected) $ withCString quikpath (\path -> do err <- connect api path errorCode errorMsg 1024 - when (err /= ecSuccess) $ warningM "Quik.Watchdog" $ "Unable to connect: " ++ show err) + if err /= ecSuccess + then warningM "Quik.Watchdog" $ "Unable to connect: " ++ show err + else withCString "" (\emptyStr -> do + res <- (runExceptT $ do + throwIfErr $ setTransactionsReplyCallback api transcb errorCode errorMsg 1024 + throwIfErr $ subscribeOrders api emptyStr emptyStr + liftIO $ startOrders api orcb + throwIfErr $ subscribeTrades api emptyStr emptyStr + liftIO $ startTrades api tradecb) + case res of + Left err -> warningM "Quik.Watchdog" $ "Unable to set callbacks: " ++ show err + Right _ -> debugM "Quik.Watchdog" "Callbacks are set")) threadDelay 5000000)) throwIfErr :: IO LONG -> ExceptT T.Text IO () diff --git a/src/Network/Telegram.hs b/src/Network/Telegram.hs new file mode 100644 index 0000000..d0e9c83 --- /dev/null +++ b/src/Network/Telegram.hs @@ -0,0 +1,35 @@ +{-# LANGUAGE OverloadedStrings #-} + +module Network.Telegram +( + mkTelegramContext, + sendMessage +) where + +import Control.Monad + +import Network.Connection +import Network.HTTP.Client +import Network.HTTP.Client.TLS + +import qualified Data.Text as T +import qualified Data.ByteString.UTF8 as BU8 +import Data.Aeson +import Data.Aeson.Types + +data TelegramContext = TelegramContext { + tgToken :: T.Text, + httpMan :: Manager +} + +mkTelegramContext :: T.Text -> IO TelegramContext +mkTelegramContext token = do + man <- newManager (mkManagerSettings (TLSSettingsSimple { settingDisableCertificateValidation = True, settingDisableSession = False, settingUseServerName = False }) Nothing) + return TelegramContext { httpMan = man, tgToken = token } + + +sendMessage :: TelegramContext -> T.Text -> T.Text -> IO () +sendMessage ctx chatId text = do + req <- parseUrl $ "https://api.telegram.org/bot" ++ (T.unpack $ tgToken ctx) ++ "/sendMessage" + void $ withResponse (req { method = "POST", requestHeaders = [("Content-Type", BU8.fromString "application/json")], requestBody = (RequestBodyLBS . encode) (object ["chat_id" .= chatId, "text" .= text]) }) (httpMan ctx) (\resp -> brConsume (responseBody resp)) + diff --git a/stack.yaml b/stack.yaml index e501ad5..b7747a5 100644 --- a/stack.yaml +++ b/stack.yaml @@ -15,7 +15,7 @@ # resolver: # name: custom-snapshot # location: "./custom-snapshot.yaml" -resolver: lts-7.0 +resolver: lts-7.7 # User packages to be built. # Various formats can be used as shown in the example below. @@ -40,7 +40,7 @@ packages: - '../libatrade' # Dependency packages to be pulled from upstream that are not in the resolver # (e.g., acme-missiles-0.3) -extra-deps: [ "datetime-0.3.1", "cond-0.4.1.1"] +extra-deps: [ "datetime-0.3.1", "cond-0.4.1.1" ] # Override default flag values for local packages and extra-deps flags: {}