{-# LANGUAGE CPP #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} module TXMLConnector ( start , TXMLConnector.stop , TXMLConnectorHandle , makeRequest , makeBrokerBackend ) where import ATrade.Broker.Backend (BrokerBackend (..), BrokerBackendNotification (..)) import ATrade.Logging (Message, Severity (..), log, logWith) import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) import ATrade.Types (Order, OrderId) import Colog (HasLog (getLogAction, setLogAction), LogAction (LogAction, unLogAction)) import Config (SubscriptionConfig (SubscriptionConfig), TransaqConnectorConfig (..), transaqHost, transaqLogLevel, transaqLogPath, transaqLogin, transaqPassword, transaqPort) import Control.Concurrent (ThreadId, forkIO, threadDelay) import Control.Concurrent.BoundedChan (BoundedChan) import Control.Concurrent.STM (TVar, atomically, modifyTVar', newEmptyTMVar, newEmptyTMVarIO, newTVarIO, orElse, putTMVar, readTMVar, readTVar, readTVarIO, takeTMVar, tryPutTMVar, tryReadTMVar, writeTVar) import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue, newTBQueue, readTBQueue, writeTBQueue) import Control.Concurrent.STM.TMVar (TMVar) import Control.Monad (forM_, forever, void, when) import Control.Monad.Extra (whileM) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Reader (ReaderT (runReaderT)) import Control.Monad.Reader.Class (MonadReader) import qualified Data.Bimap as BM import qualified Data.Map.Strict as M import Data.Maybe (mapMaybe) import qualified Data.Text as T import Data.Time.Clock (UTCTime, getCurrentTime) import qualified Deque.Strict as D import GHC.Exts (IsList (..)) import Prelude hiding (log) import Text.XML.Light.Input (parseXML) import Text.XML.Light.Types (Content (Elem), Element (elName), QName (qName)) import TickerInfoServer (TickerInfoServerHandle) import TickTable (newTickTable) import Transaq (TransaqResponse) import TXML (LogLevel, MonadTXML, initialize, sendCommand, setCallback) import qualified TXML import TXMLConnector.Internal (BrokerState (..), ConnectionStage (..), Env (..), Request (..), Response (..), workThread) #if defined(mingw32_HOST_OS) import qualified Win32.TXML as TXMLImpl #else import qualified Linux.TXML as TXMLImpl #endif data ConnectionParams = ConnectionParams { cpLogin :: T.Text , cpPassword :: T.Text , cpHost :: T.Text , cpPort :: Int , cpLogPath :: T.Text , cpLogLevel :: LogLevel } deriving (Show, Eq, Ord) data TXMLConnectorHandle = TXMLConnectorHandle { threadId :: ThreadId , notificationQueue :: TBQueue TransaqResponse , hRequestVar :: TMVar Request , hResponseVar :: TMVar (TMVar Response) , hRequestTimestamp :: TVar UTCTime , hNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ())) , hRunVar :: TMVar () } newtype App a = App { unApp :: ReaderT Env IO a } deriving (Monad, Applicative, Functor, MonadIO, MonadReader Env) instance MonadTXML App where initialize path loglevel = liftIO $ TXMLImpl.initialize path loglevel uninitialize = liftIO TXMLImpl.uninitialize sendCommand = liftIO . TXMLImpl.sendCommand setCallback = liftIO . TXMLImpl.setCallback instance HasLog Env Message App where getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) } setLogAction _ env = env -- fuck it start :: LogAction IO Message -> TransaqConnectorConfig -> BoundedChan QuoteSourceServerData -> TickerInfoServerHandle -> IO TXMLConnectorHandle start logger' config' qssChannel' tisH = do logWith logger' Info "TXMLConnector" "Starting" notificationQueue' <- atomically $ newTBQueue 50000 tickTable <- newTickTable requestVar' <- newEmptyTMVarIO responseVar' <- newEmptyTMVarIO currentCandles' <- newTVarIO [] serverConnected' <- liftIO $ newTVarIO StageConnection candleKindMap' <- newTVarIO M.empty requestTimestamp' <- getCurrentTime >>= newTVarIO orderMap <- newTVarIO M.empty notificationCallback <- newTVarIO Nothing orderTransactionIdMap <- newTVarIO BM.empty pendingOrders <- newTVarIO (fromList []) runVar' <- newEmptyTMVarIO timerVar' <- newEmptyTMVarIO let brokerState' = BrokerState { bsOrderTransactionIdMap = orderTransactionIdMap , bsNotificationCallback = notificationCallback , bsOrderMap = orderMap , bsPendingOrders = pendingOrders } let env = Env { qssChannel = qssChannel' , tisHandle = tisH , requestVar = requestVar' , responseVar = responseVar' , requestTimestamp = requestTimestamp' , currentCandles = currentCandles' , tickMap = tickTable , transaqQueue = notificationQueue' , logger = logger' , config = config' , serverConnected = serverConnected' , candleKindMap = candleKindMap' , brokerState = brokerState' , runVar = runVar' , timerVar = timerVar' } workThreadId <- forkIO $ (runReaderT . unApp) workThread env return $ TXMLConnectorHandle { threadId = workThreadId , notificationQueue = notificationQueue' , hRequestVar = requestVar' , hResponseVar = responseVar' , hRequestTimestamp = requestTimestamp' , hNotificationCallback = notificationCallback , hRunVar = runVar' } stop :: TXMLConnectorHandle -> IO () stop h = atomically $ putTMVar (hRunVar h) () brSubmitOrder :: TXMLConnectorHandle -> Order -> IO () brSubmitOrder h order = void $ makeRequest h (RequestSubmitOrder order) brCancelOrder :: TXMLConnectorHandle -> OrderId -> IO () brCancelOrder h oid = void $ makeRequest h (RequestCancelOrder oid) brSetNotificationCallback :: TXMLConnectorHandle -> Maybe (BrokerBackendNotification -> IO ()) -> IO () brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h) cb makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend makeBrokerBackend h accountId = BrokerBackend [accountId] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h) makeRequest :: TXMLConnectorHandle -> Request -> IO Response makeRequest h request = do now <- getCurrentTime resp <- atomically $ do resp <- newEmptyTMVar writeTVar (hRequestTimestamp h) now putTMVar (hResponseVar h) resp putTMVar (hRequestVar h) request pure resp atomically $ do void $ takeTMVar (hResponseVar h) takeTMVar resp