{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-} module TXMLConnector ( start , TXMLConnector.stop , Request(..) , HistoryRequest(..) , Response(..) , HistoryResponse(..) , makeRequest , TXMLConnectorHandle , makeBrokerBackend ) where import ATrade.Logging (Message, Severity (..), log, logWith) import Colog (HasLog (getLogAction, setLogAction), LogAction (LogAction, unLogAction)) import Config (SubscriptionConfig (SubscriptionConfig), TransaqConnectorConfig (..), transaqHost, transaqLogLevel, transaqLogPath, transaqLogin, transaqPassword, transaqPort) import Control.Concurrent (ThreadId, forkIO, threadDelay) import Control.Concurrent.STM (TVar, atomically, modifyTVar', newEmptyTMVar, newEmptyTMVarIO, newTVarIO, orElse, putTMVar, readTMVar, readTVar, readTVarIO, takeTMVar, tryPutTMVar, tryReadTMVar, writeTVar) import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue, readTBQueue, writeTBQueue) import Control.Monad (forever, void, when) import Control.Monad.Extra (whileM) import qualified Data.Bimap as BM import Data.Maybe (mapMaybe) import qualified Data.Text as T import qualified Deque.Strict as D import Text.XML.Light.Input (parseXML) import Text.XML.Light.Types (Content (Elem), Element (elName), QName (qName)) import Transaq (AllTradesTrade (..), Candle (..), ClientData (..), CommandChangePass (..), CommandConnect (..), CommandDisconnect (CommandDisconnect), CommandGetHistoryData (CommandGetHistoryData), CommandServerStatus (..), CommandSubscribe (..), ConnectionState (Disconnected), Language (LanguageEn), MarketInfo (..), OrderNotification (..), OrderStatus (..), Quotation (..), ResponseAllTrades (ResponseAllTrades), ResponseCandleKinds (ResponseCandleKinds), ResponseCandles (..), ResponseCandlesStatus (StatusPending), ResponseClient (ResponseClient), ResponseMarkets (ResponseMarkets), ResponseOrders (ResponseOrders), ResponseQuotations (ResponseQuotations), ResponseQuotes (ResponseQuotes), ResponseResult (..), ResponseSecurities (ResponseSecurities), ResponseTrades (ResponseTrades), Security (..), SecurityId (..), TradeNotification (..), TransaqCommand (toXml), TransaqResponse (..), TransaqResponse (..), TransaqResponseC (fromXml), UnfilledAction (..), kCandleKindId, kPeriod, state) import TXML (LogLevel, freeCallback, initialize, sendCommand, setCallback) import ATrade.Broker.Backend (BrokerBackend (..), BrokerBackendNotification (..)) import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) import ATrade.Types (Bar (..), BarTimeframe (unBarTimeframe), DataType (BestBid, BestOffer, LastTradePrice), Order (..), OrderId, OrderPrice (..), OrderState (..), Tick (..), TickerId, Trade (..), fromDouble, toDouble) import qualified ATrade.Types as AT import Colog.Monad (WithLog) import Control.Applicative ((<|>)) import Control.Concurrent.BoundedChan (BoundedChan, writeChan) import Control.Concurrent.STM.TMVar (TMVar) import Control.Error (headMay) import Control.Monad (forM_) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Reader (ReaderT (runReaderT)) import Control.Monad.Reader.Class (MonadReader, asks) import Data.Int (Int64) import qualified Data.Map.Strict as M import Data.Time.Clock (UTCTime, diffUTCTime, getCurrentTime) import GHC.Exts (IsList (..)) import Prelude hiding (log) import TickerInfoServer (TickerInfo (..), TickerInfoServerHandle, putTickerInfo) import qualified Transaq import qualified TXML data ConnectionParams = ConnectionParams { cpLogin :: T.Text , cpPassword :: T.Text , cpHost :: T.Text , cpPort :: Int , cpLogPath :: T.Text , cpLogLevel :: LogLevel } deriving (Show, Eq, Ord) data HistoryRequest = HistoryRequest { hrTickerId :: TickerId , hrTimeframe :: BarTimeframe , hrCount :: Int , hrReset :: Bool } deriving (Show, Eq, Ord) data Request = RequestHistory HistoryRequest | RequestSubmitOrder Order | RequestCancelOrder OrderId deriving (Show, Eq) data Response = ResponseHistory HistoryResponse | ResponseOrderSubmitted | ResponseOrderCancelled | ResponseTimeout data HistoryResponse = HistoryResponse { hrBars :: [Bar] , hrMoreData :: Bool } deriving (Show, Eq) data TXMLConnectorHandle = TXMLConnectorHandle { threadId :: ThreadId , notificationQueue :: TBQueue TransaqResponse , hRequestVar :: TMVar Request , hResponseVar :: TMVar (TMVar Response) , hRequestTimestamp :: TVar UTCTime , hNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ())) , hRunVar :: TMVar () } data ConnectionStage = StageConnection | StageGetInfo | StageConnected | StageShutdown deriving (Eq, Show, Ord) data MainQueueData = MainQueueTransaqData TransaqResponse | MainQueueRequest Request | MainQueuePingServer | MainQueueShutdown deriving (Eq, Show) data TickKey = TickKey TickerId DataType deriving (Show, Ord, Eq) data TransactionId = TransactionId Int64 | ExchangeOrderId Int64 deriving (Show, Ord, Eq) data BrokerState = BrokerState { bsOrderTransactionIdMap :: TVar (BM.Bimap OrderId TransactionId) , bsNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ())) , bsOrderMap :: TVar (M.Map OrderId Order) , bsPendingOrders :: TVar (D.Deque Order) } data Env = Env { qssChannel :: BoundedChan QuoteSourceServerData , tisHandle :: TickerInfoServerHandle , requestVar :: TMVar Request , responseVar :: TMVar (TMVar Response) , requestTimestamp :: TVar UTCTime , currentCandles :: TVar [Candle] , tickMap :: TVar (M.Map TickKey Tick) , transaqQueue :: TBQueue TransaqResponse , logger :: LogAction IO Message , config :: TransaqConnectorConfig , serverConnected :: TVar ConnectionStage , candleKindMap :: TVar (M.Map Int Int) , brokerState :: BrokerState , runVar :: TMVar () , timerVar :: TMVar () } newtype App a = App { unApp :: ReaderT Env IO a } deriving (Monad, Applicative, Functor, MonadIO, MonadReader Env) instance HasLog Env Message App where getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) } setLogAction _ env = env -- fuck it start :: LogAction IO Message -> TransaqConnectorConfig -> BoundedChan QuoteSourceServerData -> TickerInfoServerHandle -> IO TXMLConnectorHandle start logger config qssChannel tisH = do logWith logger Info "TXMLConnector" "Starting" notificationQueue <- atomically $ newTBQueue 50000 tickTable <- newTVarIO M.empty requestVar <- newEmptyTMVarIO responseVar <- newEmptyTMVarIO currentCandles <- newTVarIO [] serverConnected <- liftIO $ newTVarIO StageConnection candleKindMap <- newTVarIO M.empty requestTimestamp <- getCurrentTime >>= newTVarIO orderMap <- newTVarIO M.empty notificationCallback <- newTVarIO Nothing orderTransactionIdMap <- newTVarIO BM.empty pendingOrders <- newTVarIO (fromList []) runVar <- newEmptyTMVarIO timerVar <- newEmptyTMVarIO let brokerState = BrokerState { bsOrderTransactionIdMap = orderTransactionIdMap , bsNotificationCallback = notificationCallback , bsOrderMap = orderMap , bsPendingOrders = pendingOrders } let env = Env { qssChannel = qssChannel , tisHandle = tisH , requestVar = requestVar , responseVar = responseVar , requestTimestamp = requestTimestamp , currentCandles = currentCandles , tickMap = tickTable , transaqQueue = notificationQueue , logger = logger , config = config , serverConnected = serverConnected , candleKindMap = candleKindMap , brokerState = brokerState , runVar = runVar , timerVar = timerVar } threadId <- forkIO $ (runReaderT . unApp) workThread env return $ TXMLConnectorHandle { threadId = threadId , notificationQueue = notificationQueue , hRequestVar = requestVar , hResponseVar = responseVar , hRequestTimestamp = requestTimestamp , hNotificationCallback = notificationCallback , hRunVar = runVar } stop :: TXMLConnectorHandle -> IO () stop h = atomically $ putTMVar (hRunVar h) () workThread :: App () workThread = do cfg <- asks config rc <- liftIO $ initialize (transaqLogPath cfg) (parseTransaqLogLevel $ transaqLogLevel cfg) case rc of Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str Right _ -> do queue <- asks transaqQueue logger' <- asks logger rc <- liftIO $ setCallback (parseAndWrite queue logger') case rc of Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" Just cb -> do serverConnectionState <- asks serverConnected timerVar' <- asks timerVar void $ liftIO $ forkIO $ whileM $ do threadDelay 1000000 void . liftIO . atomically $ tryPutTMVar timerVar' () connStatus <- liftIO . readTVarIO $ serverConnectionState pure $ connStatus /= StageShutdown void $ whileM $ do connStatus <- liftIO . readTVarIO $ serverConnectionState case connStatus of StageConnection -> handleUnconnected StageGetInfo -> handleGetInfo StageConnected -> handleConnected StageShutdown -> pure () pure $ connStatus /= StageShutdown liftIO $ freeCallback cb where parseTransaqLogLevel 1 = TXML.Warning parseTransaqLogLevel 3 = TXML.Debug parseTransaqLogLevel _ = TXML.Info parseAndWrite queue logger xml = do let parsed = mapMaybe parseContent $ parseXML xml mapM_ (writeToQueue queue) parsed pure True parseContent (Elem el) = parseElement el parseContent _ = Nothing parseElement el = case qName $ elName el of "candles" -> TransaqResponseCandles <$> fromXml el "server_status" -> TransaqResponseServerStatus <$> fromXml el "markets" -> TransaqResponseMarkets <$> fromXml el "candlekinds" -> TransaqResponseCandleKinds <$> fromXml el "securities" -> TransaqResponseSecurities <$> fromXml el "sec_info" -> TransaqResponseSecInfo <$> fromXml el "quotations" -> TransaqResponseQuotations <$> fromXml el "alltrades" -> TransaqResponseAllTrades <$> fromXml el "quotes" -> TransaqResponseQuotes <$> fromXml el "orders" -> TransaqResponseOrders <$> fromXml el "trades" -> TransaqResponseTrades <$> fromXml el "result" -> TransaqResponseResult <$> fromXml el _ -> Nothing writeToQueue queue resp = atomically $ writeTBQueue queue resp handleConnected :: App () handleConnected = do serverConn <- asks serverConnected rqVar <- asks requestVar runVar' <- asks runVar queue <- asks transaqQueue timerVar' <- asks timerVar item <- liftIO . atomically $ (MainQueueTransaqData <$> readTBQueue queue) `orElse` (MainQueueRequest <$> takeTMVar rqVar) `orElse` (takeTMVar runVar' >> pure MainQueueShutdown) `orElse` (takeTMVar timerVar' >> pure MainQueuePingServer) case item of MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown MainQueuePingServer -> do maybeServerStatus<- liftIO $ sendCommand $ toXml CommandServerStatus case maybeServerStatus of Left serverStatusRaw -> void $ liftIO $ parseAndWrite queue logger serverStatusRaw Right () -> pure () MainQueueTransaqData transaqData -> do tm <- asks tickMap case transaqData of TransaqResponseAllTrades (ResponseAllTrades trades) -> do qssChan <- asks qssChannel let ticks = fmap allTradeToTick trades forM_ ticks (liftIO . writeChan qssChan . QSSTick) forM_ ticks (insertToTickMap tm) TransaqResponseQuotations (ResponseQuotations quotations) -> do qssChan <- asks qssChannel now <- liftIO getCurrentTime let ticks = concatMap (quotationToTicks now) quotations forM_ ticks (liftIO . writeChan qssChan . QSSTick) forM_ ticks (insertToTickMap tm) TransaqResponseCandles respCandle -> do resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle) case resp of Just tmvar -> if cStatus respCandle == StatusPending then do cur <- asks currentCandles liftIO $ atomically . modifyTVar' cur $ (\c -> cCandles respCandle <> c) else do cur <- asks currentCandles liftIO $ atomically $ do candles <- readTVar cur putTMVar tmvar $ ResponseHistory $ HistoryResponse { hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles) , hrMoreData = False } _ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var" TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade _ -> pure () MainQueueRequest (RequestHistory request) -> do cur <- asks currentCandles liftIO $ atomically $ writeTVar cur [] maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO) case maybeCk of Just candleKindId -> do case parseSecurityId (hrTickerId request) of Just secId -> void $ liftIO . sendCommand $ toXml CommandGetHistoryData { security = secId , periodId = candleKindId , count = hrCount request , reset = hrReset request } _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request _ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request) MainQueueRequest (RequestSubmitOrder order) -> do case mkNewOrderCommand order of Just cmd -> do v <- liftIO . sendCommand . toXml $ cmd case v of Left result -> do case headMay (parseXML result) >>= parseContent of Just (TransaqResponseResult (ResponseSuccess (Just transactionId))) -> do brState <- asks brokerState respVar <- asks responseVar liftIO $ atomically $ do modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order) modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert (orderId order) (TransactionId transactionId)) resp <- readTMVar respVar putTMVar resp ResponseOrderSubmitted maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) case maybeCb of Just cb -> do let notif = BackendOrderNotification (orderId order) Submitted liftIO $ cb notif _ -> pure () log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <> (T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId Just (TransaqResponseResult (ResponseFailure err)) -> do log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) case maybeCb of Just cb -> do let notif = BackendOrderNotification (orderId order) Rejected liftIO $ cb notif _ -> pure () _ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result" Right _ -> do log Warning "TXMLConnector.WorkThread" "Expected result, got nothing" _ -> pure () _ -> pure () checkRequestTimeout requestTimeout = 10 handleTrade transaqTrade = do brState <- asks brokerState trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) orderMap <- liftIO $ readTVarIO (bsOrderMap brState) case maybeCb of Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of Just oid -> case M.lookup oid orderMap of Just order -> do let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order) log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif liftIO $ cb notif _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade Nothing -> log Warning "TXMLConnector.WorkThread" $ "No callback for trade notification!" fromTransaqTrade transaqTrade order = Trade { tradeOrderId = orderId order , tradePrice = fromDouble (tPrice transaqTrade) , tradeQuantity = fromIntegral $ tQuantity transaqTrade , tradeVolume = fromDouble $ tValue transaqTrade , tradeVolumeCurrency = "" , tradeOperation = fromDirection (tBuysell transaqTrade) , tradeAccount = tClient transaqTrade <> "#" <> tUnion transaqTrade , tradeSecurity = tBoard transaqTrade <> "#" <> tSecCode transaqTrade , tradeTimestamp = tTimestamp transaqTrade , tradeCommission = fromDouble $ tComission transaqTrade , tradeSignalId = orderSignalId order } fromDirection Transaq.Buy = AT.Buy fromDirection Transaq.Sell = AT.Sell handleOrder orderUpdate = do brState <- asks brokerState trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) case maybeCb of Just cb -> case BM.lookupR (ExchangeOrderId (oOrderNo orderUpdate)) trIdMap <|> BM.lookupR (TransactionId (fromIntegral $ oTransactionId orderUpdate)) trIdMap of Just oid -> do let notif = BackendOrderNotification oid (orderStateFromTransaq orderUpdate) log Debug "TXMLConnector.WorkThread" $ "Sending order notification: " <> (T.pack . show) notif liftIO $ atomically $ do m <- readTVar (bsOrderTransactionIdMap brState) when (BM.notMemberR (ExchangeOrderId (oOrderNo orderUpdate)) m) $ do modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert oid (ExchangeOrderId $ oOrderNo orderUpdate)) liftIO $ cb notif _ -> log Warning "TXMLConnector.WorkThread" "Unable to find order for order notification" Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for order notification" orderStateFromTransaq orderUpdate = case oStatus orderUpdate of OrderActive -> Submitted OrderCancelled -> Cancelled OrderDenied -> Rejected OrderDisabled -> Rejected OrderExpired -> Cancelled OrderFailed -> Rejected OrderForwarding -> Unsubmitted OrderInactive -> OrderError OrderMatched -> Executed OrderRefused -> Rejected OrderRemoved -> Rejected OrderWait -> Unsubmitted OrderWatching -> Unsubmitted _ -> OrderError checkRequestTimeout = do now <- liftIO getCurrentTime tsVar <- asks requestTimestamp ts <- liftIO $ readTVarIO tsVar when (now `diffUTCTime` ts >= requestTimeout) $ do resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar case resp of Just tmvar -> do log Warning "TXMLConnector.WorkThread" "Request timeout" liftIO . atomically . putTMVar tmvar $ ResponseTimeout _ -> pure () handleGetInfo :: App () handleGetInfo = do queue <- asks transaqQueue cfg <- asks config item <- liftIO . atomically $ readTBQueue queue conn <- asks serverConnected case item of TransaqResponseServerStatus serverStatus -> do log Warning "TXMLConnector.WorkThread" $ "Incoming server status: " <> (T.pack . show) serverStatus case state serverStatus of Transaq.Disconnected -> do log Warning "TXMLConnector.WorkThread" "Server disconnected" liftIO . atomically $ writeTVar conn StageConnection Transaq.Connected -> do log Info "TXMLConnector.WorkThread" "Server connected" void $ liftIO . sendCommand $ toXml $ CommandChangePass (transaqPassword cfg) "goobaka12" liftIO . atomically $ writeTVar conn StageConnected v <- makeSubscriptions cfg case v of Left errmsg -> do log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg void $ liftIO . sendCommand $ toXml CommandDisconnect Right _ -> log Info "TXMLConnector.WorkThread" "Subscriptions done" Transaq.Error errmsg -> do log Warning "TXMLConnector.WorkThread" $ "Connection error: " <> errmsg liftIO . atomically $ writeTVar conn StageConnection TransaqResponseResult result -> log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result -- TODO: handle order response TransaqResponseCandles candles -> log Debug "TXMLConnector.WorkThread" $ "Incoming candles message: " <> (T.pack . show . length . Transaq.cCandles $ candles) -- TODO: Pass to qhp TransaqResponseMarkets (ResponseMarkets markets) -> do log Debug "TXMLConnector.WorkThread" "Incoming markets:" forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m) -- TODO: Pass to qtis TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do ckMap <- asks candleKindMap log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k))) TransaqResponseSecurities (ResponseSecurities securities) -> do tisH <- asks tisHandle let tickerInfos = securityToTickerInfo <$> securities log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker) forM_ tickerInfos (liftIO . putTickerInfo tisH) TransaqResponseSecInfo secInfo -> log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo -- TODO: Pass to qtis TransaqResponseClient (ResponseClient clientData) -> do log Debug "TXMLConnector.WorkThread" $ "Incoming client data: " <> (T.pack . show) (cClientId clientData) <> "#" <> (T.pack . show) (cUnion clientData) _ -> pure () handleUnconnected :: App () handleUnconnected = do cfg <- asks config log Debug "TXMLConnector.WorkThread" "Sending connect command" v <- liftIO . sendCommand . toXml $ CommandConnect { login = transaqLogin cfg, password = transaqPassword cfg, host = transaqHost cfg, port = transaqPort cfg, language = LanguageEn, autopos = False, micexRegisters = True, milliseconds = True, utcTime = True, proxy = (), rqDelay = Nothing, sessionTimeout = Nothing, requestTimeout = Nothing, pushULimits = Nothing, pushPosEquity = Nothing } case v of Left err -> do log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]" liftIO $ threadDelay (1000 * 1000 * 10) Right _ -> do log Warning "TXMLConnector.WorkThread" "Connected" conn <- asks serverConnected liftIO . atomically $ writeTVar conn StageGetInfo -- item <- atomically $ readTBQueue queue -- case item of -- TransaqResponseServerStatus status -> do -- case state status of -- Transaq.Error errmsg -> do -- log Warning "TXMLConnector.WorkThread" $ "Unable to connect: " <> errmsg -- void $ sendCommand $ toXml CommandDisconnect -- threadDelay (10 * 1000 * 1000) -- Transaq.Connected -> do -- atomically $ writeTVar serverConnected StageGetInfo -- -- v <- makeSubscriptions config -- -- case v of -- -- Left errmsg -> do -- -- log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg -- -- void $ sendCommand $ toXml CommandDisconnect -- -- Right _ -> -- Transaq.Disconnected -> do -- log Warning "TXMLConnector.WorkThread" "Unable to connect (disconnected)" -- threadDelay (10 * 1000 * 1000) -- other -> do -- log Warning "TXMLConnector.WorkThread" $ "Stray message: " <> (T.pack . show) other -- threadDelay (1000 * 1000) makeSubscriptions config = liftIO . sendCommand . toXml $ CommandSubscribe { alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config), quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config), quotes = fmap subscriptionToSecurityId (quotesSubscriptions config) } subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code insertToTickMap tickMap tick = liftIO . atomically $ modifyTVar' tickMap (M.insert (TickKey (security tick) (datatype tick)) tick) allTradeToTick :: AllTradesTrade -> Tick allTradeToTick att = Tick { security = attBoard att <> "#" <> attSecCode att, datatype = LastTradePrice, timestamp = attTimestamp att, value = fromDouble $ attPrice att, volume = fromIntegral $ attQuantity att } quotationToTicks :: UTCTime -> Quotation -> [Tick] quotationToTicks timestamp q = let security = qBoard q <> "#" <> qSeccode q in [ Tick { security = security, datatype = BestBid, timestamp = timestamp, value = fromDouble $ qBid q, volume = fromIntegral $ qQuantity q }, Tick { security = security, datatype = BestOffer, timestamp = timestamp, value = fromDouble $ qOffer q, volume = fromIntegral $ qQuantity q }] securityToTickerInfo :: Security -> TickerInfo securityToTickerInfo sec = TickerInfo { tiTicker = sBoard sec <> "#" <> sSeccode sec , tiLotSize = sLotSize sec , tiTickSize = sMinStep sec } parseSecurityId :: TickerId -> Maybe SecurityId parseSecurityId tickerId = case T.findIndex (== '#') tickerId of Just ix -> Just $ SecurityId (T.take ix tickerId) (T.drop (ix + 1) tickerId) Nothing -> Nothing makeTickerId :: SecurityId -> TickerId makeTickerId sec = board sec <> "#" <> seccode sec parseAccountId :: T.Text -> Maybe (T.Text, T.Text) parseAccountId accId = case T.findIndex (== '#') accId of Just ix -> Just (T.take ix accId, T.drop (ix + 1) accId) Nothing -> Nothing makeRequest :: TXMLConnectorHandle -> Request -> IO Response makeRequest h request = do now <- getCurrentTime resp <- atomically $ do resp <- newEmptyTMVar writeTVar (hRequestTimestamp h) now putTMVar (hResponseVar h) resp putTMVar (hRequestVar h) request pure resp atomically $ do void $ takeTMVar (hResponseVar h) takeTMVar resp mkNewOrderCommand :: Order -> Maybe Transaq.CommandNewOrder mkNewOrderCommand order = case parseSecurityId (orderSecurity order) of Just secId -> case parseAccountId (orderAccountId order) of Just (client, union) -> do case orderPrice order of Market -> Just $ Transaq.CommandNewOrder { security = secId , client = client , unionCode = union , price = 0 , quantity = fromInteger $ orderQuantity order , buysell = toDirection $ orderOperation order , bymarket = True , brokerRef = T.empty , unfilled = UnfilledPutInQueue , usecredit = False , nosplit = False } Limit price -> Just $ Transaq.CommandNewOrder { security = secId , client = client , unionCode = union , price = toDouble price , quantity = fromInteger $ orderQuantity order , buysell = toDirection $ orderOperation order , bymarket = False , brokerRef = T.empty , unfilled = UnfilledPutInQueue , usecredit = False , nosplit = False } _ -> Nothing where toDirection AT.Buy = Transaq.Buy toDirection AT.Sell = Transaq.Sell candleToBar :: SecurityId -> Candle -> Bar candleToBar sec candle = Bar { barSecurity = makeTickerId sec , barTimestamp = cTimestamp candle , barOpen = fromDouble (cOpen candle) , barHigh = fromDouble (cHigh candle) , barLow = fromDouble (cLow candle) , barClose = fromDouble (cClose candle) , barVolume = fromIntegral $ cVolume candle } brSubmitOrder :: TXMLConnectorHandle -> Order -> IO () brSubmitOrder h order = void $ makeRequest h (RequestSubmitOrder order) brCancelOrder :: TXMLConnectorHandle -> OrderId -> IO () brCancelOrder h oid = void $ makeRequest h (RequestCancelOrder oid) brSetNotificationCallback :: TXMLConnectorHandle -> Maybe (BrokerBackendNotification -> IO ()) -> IO () brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h) cb makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend makeBrokerBackend h account = BrokerBackend [account] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h)