{-# LANGUAGE CPP #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} module TXMLConnector ( start , TXMLConnector.stop , TXMLConnectorHandle , makeRequest , makeBrokerBackend ) where import ATrade.Logging (Message, Severity (..), log, logWith) import Colog (HasLog (getLogAction, setLogAction), LogAction (LogAction, unLogAction)) import Config (SubscriptionConfig (SubscriptionConfig), TransaqConnectorConfig (..), transaqHost, transaqLogLevel, transaqLogPath, transaqLogin, transaqPassword, transaqPort) import Control.Concurrent (ThreadId, forkIO, threadDelay) import Control.Concurrent.STM (TVar, atomically, modifyTVar', newEmptyTMVar, newEmptyTMVarIO, newTVarIO, orElse, putTMVar, readTMVar, readTVar, readTVarIO, takeTMVar, tryPutTMVar, tryReadTMVar, writeTVar) import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue, newTBQueue, readTBQueue, writeTBQueue) import Control.Monad (forM_, forever, void, when) import Control.Monad.Extra (whileM) import qualified Data.Bimap as BM import Data.Maybe (mapMaybe) import qualified Data.Text as T import qualified Deque.Strict as D import Text.XML.Light.Input (parseXML) import Text.XML.Light.Types (Content (Elem), Element (elName), QName (qName)) import TickTable (TickTable, insertTick, lookupTick, newTickTable) import Transaq (AllTradesTrade (..), Candle (..), ClientData (..), CommandChangePass (..), CommandConnect (..), CommandDisconnect (CommandDisconnect), CommandGetHistoryData (CommandGetHistoryData), CommandServerStatus (..), CommandSubscribe (..), ConnectionState (Disconnected), Language (LanguageEn), MarketInfo (..), OrderNotification (..), OrderStatus (..), Quotation (..), ResponseAllTrades (ResponseAllTrades), ResponseCandleKinds (ResponseCandleKinds), ResponseCandles (..), ResponseCandlesStatus (StatusPending), ResponseClient (ResponseClient), ResponseMarkets (ResponseMarkets), ResponseOrders (ResponseOrders), ResponseQuotations (ResponseQuotations), ResponseQuotes (ResponseQuotes), ResponseResult (..), ResponseSecurities (ResponseSecurities), ResponseTrades (ResponseTrades), Security (..), SecurityId (..), TradeNotification (..), TransaqCommand (toXml), TransaqResponse (..), TransaqResponse (..), TransaqResponseC (fromXml), UnfilledAction (..), kCandleKindId, kPeriod, state) import TXML (LogLevel, MonadTXML, freeCallback, initialize, sendCommand, setCallback) import ATrade.Broker.Backend (BrokerBackend (..), BrokerBackendNotification (..)) import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) import ATrade.Types (Bar (..), BarTimeframe (unBarTimeframe), DataType (BestBid, BestOffer, LastTradePrice), Order (..), OrderId, OrderPrice (..), OrderState (..), Tick (..), TickerId, Trade (..), fromDouble, toDouble) import qualified ATrade.Types as AT import Colog.Monad (WithLog) import Control.Applicative ((<|>)) import Control.Concurrent.BoundedChan (BoundedChan, writeChan) import Control.Concurrent.STM.TMVar (TMVar) import Control.Error (headMay) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Reader (ReaderT (runReaderT)) import Control.Monad.Reader.Class (MonadReader, asks) import Data.Int (Int64) import qualified Data.Map.Strict as M import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime) import FSM (FSMCallback (..), FSMState (isTerminalState), makeFsm, runFsm) import GHC.Exts (IsList (..)) import Prelude hiding (log) import TickerInfoServer (TickerInfo (..), TickerInfoServerHandle, putTickerInfo) import qualified Transaq import qualified TXML import TXMLConnector.Internal (BrokerState (..), ConnectionStage (..), Env (..), Request (..), Response (..), workThread) #if defined(mingw32_HOST_OS) import qualified Win32.TXML as TXMLImpl #else import qualified Linux.TXML as TXMLImpl #endif data ConnectionParams = ConnectionParams { cpLogin :: T.Text , cpPassword :: T.Text , cpHost :: T.Text , cpPort :: Int , cpLogPath :: T.Text , cpLogLevel :: LogLevel } deriving (Show, Eq, Ord) data TXMLConnectorHandle = TXMLConnectorHandle { threadId :: ThreadId , notificationQueue :: TBQueue TransaqResponse , hRequestVar :: TMVar Request , hResponseVar :: TMVar (TMVar Response) , hRequestTimestamp :: TVar UTCTime , hNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ())) , hRunVar :: TMVar () } newtype App a = App { unApp :: ReaderT Env IO a } deriving (Monad, Applicative, Functor, MonadIO, MonadReader Env) instance MonadTXML App where initialize path loglevel = liftIO $ TXMLImpl.initialize path loglevel uninitialize = liftIO TXMLImpl.uninitialize sendCommand = liftIO . TXMLImpl.sendCommand setCallback = liftIO . TXMLImpl.setCallback instance HasLog Env Message App where getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) } setLogAction _ env = env -- fuck it start :: LogAction IO Message -> TransaqConnectorConfig -> BoundedChan QuoteSourceServerData -> TickerInfoServerHandle -> IO TXMLConnectorHandle start logger config qssChannel tisH = do logWith logger Info "TXMLConnector" "Starting" notificationQueue <- atomically $ newTBQueue 50000 tickTable <- newTickTable requestVar <- newEmptyTMVarIO responseVar <- newEmptyTMVarIO currentCandles <- newTVarIO [] serverConnected <- liftIO $ newTVarIO StageConnection candleKindMap <- newTVarIO M.empty requestTimestamp <- getCurrentTime >>= newTVarIO orderMap <- newTVarIO M.empty notificationCallback <- newTVarIO Nothing orderTransactionIdMap <- newTVarIO BM.empty pendingOrders <- newTVarIO (fromList []) runVar <- newEmptyTMVarIO timerVar <- newEmptyTMVarIO let brokerState = BrokerState { bsOrderTransactionIdMap = orderTransactionIdMap , bsNotificationCallback = notificationCallback , bsOrderMap = orderMap , bsPendingOrders = pendingOrders } let env = Env { qssChannel = qssChannel , tisHandle = tisH , requestVar = requestVar , responseVar = responseVar , requestTimestamp = requestTimestamp , currentCandles = currentCandles , tickMap = tickTable , transaqQueue = notificationQueue , logger = logger , config = config , serverConnected = serverConnected , candleKindMap = candleKindMap , brokerState = brokerState , runVar = runVar , timerVar = timerVar } threadId <- forkIO $ (runReaderT . unApp) workThread env return $ TXMLConnectorHandle { threadId = threadId , notificationQueue = notificationQueue , hRequestVar = requestVar , hResponseVar = responseVar , hRequestTimestamp = requestTimestamp , hNotificationCallback = notificationCallback , hRunVar = runVar } stop :: TXMLConnectorHandle -> IO () stop h = atomically $ putTMVar (hRunVar h) () brSubmitOrder :: TXMLConnectorHandle -> Order -> IO () brSubmitOrder h order = void $ makeRequest h (RequestSubmitOrder order) brCancelOrder :: TXMLConnectorHandle -> OrderId -> IO () brCancelOrder h oid = void $ makeRequest h (RequestCancelOrder oid) brSetNotificationCallback :: TXMLConnectorHandle -> Maybe (BrokerBackendNotification -> IO ()) -> IO () brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h) cb makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend makeBrokerBackend h account = BrokerBackend [account] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h) makeRequest :: TXMLConnectorHandle -> Request -> IO Response makeRequest h request = do now <- getCurrentTime resp <- atomically $ do resp <- newEmptyTMVar writeTVar (hRequestTimestamp h) now putTMVar (hResponseVar h) resp putTMVar (hRequestVar h) request pure resp atomically $ do void $ takeTMVar (hResponseVar h) takeTMVar resp