diff --git a/src/HistoryProviderServer.hs b/src/HistoryProviderServer.hs index 97af53f..7c1ba51 100644 --- a/src/HistoryProviderServer.hs +++ b/src/HistoryProviderServer.hs @@ -53,11 +53,10 @@ import System.ZMQ4 (Context, Router (Router), bind, sendMulti, socket, withSocket) import TickerInfoServer (TickerInfoServerHandle, getAllTickers) -import TXMLConnector (HistoryRequest (..), +import TXMLConnector (TXMLConnectorHandle, makeRequest) +import TXMLConnector.Internal (HistoryRequest (..), HistoryResponse (..), - Request (..), Response (..), - TXMLConnectorHandle, hrBars, - makeRequest) + Request (..), Response (..)) data HistoryProviderServerHandle = HistoryProviderServerHandle diff --git a/src/Linux/TXML.hs b/src/Linux/TXML.hs index eaa4b01..71233df 100644 --- a/src/Linux/TXML.hs +++ b/src/Linux/TXML.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} module Linux.TXML ( @@ -6,19 +8,15 @@ module Linux.TXML , sendCommand , setCallback , freeCallback - , Callback - , LogLevel(..) ) where import qualified Data.Text as T +import TXML (Callback (..), FreeableCallback (..), LogLevel (..)) -data LogLevel = - Debug - | Info - | Warning - deriving (Show, Eq, Ord) +data StubCallback = StubCallback -data Callback = Callback +instance (Monad m) => FreeableCallback m StubCallback where + freeCallback _ = pure () initialize :: FilePath -> LogLevel -> IO (Either T.Text ()) initialize _ _ = return (Right ()) @@ -29,9 +27,6 @@ uninitialize = return (Right ()) sendCommand :: T.Text -> IO (Either T.Text ()) sendCommand _ = return (Right ()) -setCallback :: (T.Text -> IO Bool) -> IO (Maybe Callback) -setCallback _ = return . Just $ Callback - -freeCallback :: Callback -> IO () -freeCallback _ = return () +setCallback :: (Monad m) => (T.Text -> IO Bool) -> IO (Maybe (Callback m)) +setCallback _ = return . Just $ MkCallback StubCallback diff --git a/src/TXML.hs b/src/TXML.hs index 75a5036..3018eb0 100644 --- a/src/TXML.hs +++ b/src/TXML.hs @@ -1,18 +1,34 @@ -{-# LANGUAGE CPP #-} +{-# LANGUAGE CPP #-} +{-# LANGUAGE ExistentialQuantification #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} module TXML ( - initialize - , uninitialize - , sendCommand - , setCallback - , freeCallback - , Callback + MonadTXML(..) + , TXML.Callback(..) , LogLevel(..) + , FreeableCallback(..) ) where -#if defined(mingw32_HOST_OS) -import Win32.TXML -#else -import Linux.TXML -#endif +import qualified Data.Text as T + +data LogLevel = + Debug + | Info + | Warning + deriving (Show, Eq, Ord) + +class (Monad m) => FreeableCallback m a where + freeCallback :: a -> m() + +data Callback m = forall callback. FreeableCallback m callback => MkCallback callback + +instance (Monad m) => FreeableCallback m (Callback m) where + freeCallback = freeCallback + +class (Monad m) => MonadTXML m where + initialize :: FilePath -> LogLevel -> m (Either T.Text ()) + uninitialize :: m (Either T.Text ()) + sendCommand :: T.Text -> m (Either T.Text ()) + setCallback :: (T.Text -> IO Bool) -> m (Maybe (Callback IO)) diff --git a/src/TXMLConnector.hs b/src/TXMLConnector.hs index 133987e..ae413f0 100644 --- a/src/TXMLConnector.hs +++ b/src/TXMLConnector.hs @@ -1,4 +1,4 @@ -{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE CPP #-} {-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-} @@ -8,12 +8,8 @@ module TXMLConnector ( start , TXMLConnector.stop - , Request(..) - , HistoryRequest(..) - , Response(..) - , HistoryResponse(..) - , makeRequest , TXMLConnectorHandle + , makeRequest , makeBrokerBackend ) where @@ -37,7 +33,7 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar', import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue, newTBQueue, readTBQueue, writeTBQueue) -import Control.Monad (forever, void, when) +import Control.Monad (forM_, forever, void, when) import Control.Monad.Extra (whileM) import qualified Data.Bimap as BM import Data.Maybe (mapMaybe) @@ -83,9 +79,9 @@ import Transaq (AllTradesTrade (..), TransaqResponseC (fromXml), UnfilledAction (..), kCandleKindId, kPeriod, state) -import TXML (LogLevel, freeCallback, - initialize, sendCommand, - setCallback) +import TXML (LogLevel, MonadTXML, + freeCallback, initialize, + sendCommand, setCallback) import ATrade.Broker.Backend (BrokerBackend (..), BrokerBackendNotification (..)) @@ -104,7 +100,6 @@ import Control.Applicative ((<|>)) import Control.Concurrent.BoundedChan (BoundedChan, writeChan) import Control.Concurrent.STM.TMVar (TMVar) import Control.Error (headMay) -import Control.Monad (forM_) import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Reader (ReaderT (runReaderT)) import Control.Monad.Reader.Class (MonadReader, asks) @@ -122,6 +117,16 @@ import TickerInfoServer (TickerInfo (..), putTickerInfo) import qualified Transaq import qualified TXML +import TXMLConnector.Internal (BrokerState (..), + ConnectionStage (..), Env (..), + Request (..), Response (..), + workThread) + +#if defined(mingw32_HOST_OS) +import qualified Win32.TXML as TXMLImpl +#else +import qualified Linux.TXML as TXMLImpl +#endif data ConnectionParams = ConnectionParams @@ -135,35 +140,6 @@ data ConnectionParams = } deriving (Show, Eq, Ord) -data HistoryRequest = - HistoryRequest - { - hrTickerId :: TickerId - , hrTimeframe :: BarTimeframe - , hrCount :: Int - , hrReset :: Bool - } deriving (Show, Eq, Ord) - -data Request = - RequestHistory HistoryRequest - | RequestSubmitOrder Order - | RequestCancelOrder OrderId - deriving (Show, Eq) - -data Response = - ResponseHistory HistoryResponse - | ResponseOrderSubmitted - | ResponseOrderCancelled - | ResponseTimeout - -data HistoryResponse = - HistoryResponse - { - hrBars :: [Bar] - , hrMoreData :: Bool - } - deriving (Show, Eq) - data TXMLConnectorHandle = TXMLConnectorHandle { @@ -176,58 +152,15 @@ data TXMLConnectorHandle = , hRunVar :: TMVar () } -data ConnectionStage = StageConnection | StageGetInfo | StageConnected | StageShutdown - deriving (Eq, Show, Ord) - -instance FSMState ConnectionStage where - isTerminalState StageShutdown = True - isTerminalState _ = False - -data MainQueueData = - MainQueueTransaqData TransaqResponse - | MainQueueRequest Request - | MainQueuePingServer - | MainQueueShutdown - deriving (Eq, Show) - -data TransactionId = - TransactionId Int64 - | ExchangeOrderId Int64 - deriving (Show, Ord, Eq) - -data BrokerState = - BrokerState - { - bsOrderTransactionIdMap :: TVar (BM.Bimap OrderId TransactionId) - , bsNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ())) - , bsOrderMap :: TVar (M.Map OrderId Order) - , bsPendingOrders :: TVar (D.Deque Order) - } - - -data Env = - Env - { - qssChannel :: BoundedChan QuoteSourceServerData - , tisHandle :: TickerInfoServerHandle - , requestVar :: TMVar Request - , responseVar :: TMVar (TMVar Response) - , requestTimestamp :: TVar UTCTime - , currentCandles :: TVar [Candle] - , tickMap :: TickTable - , transaqQueue :: TBQueue TransaqResponse - , logger :: LogAction IO Message - , config :: TransaqConnectorConfig - , serverConnected :: TVar ConnectionStage - , candleKindMap :: TVar (M.Map Int Int) - , brokerState :: BrokerState - , runVar :: TMVar () - , timerVar :: TMVar () - } - newtype App a = App { unApp :: ReaderT Env IO a } deriving (Monad, Applicative, Functor, MonadIO, MonadReader Env) +instance MonadTXML App where + initialize path loglevel = liftIO $ TXMLImpl.initialize path loglevel + uninitialize = liftIO TXMLImpl.uninitialize + sendCommand = liftIO . TXMLImpl.sendCommand + setCallback = liftIO . TXMLImpl.setCallback + instance HasLog Env Message App where getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) } setLogAction _ env = env -- fuck it @@ -296,432 +229,19 @@ start logger config qssChannel tisH = do stop :: TXMLConnectorHandle -> IO () stop h = atomically $ putTMVar (hRunVar h) () -workThread :: App () -workThread = do - cfg <- asks config - rc <- liftIO $ initialize (transaqLogPath cfg) (parseTransaqLogLevel $ transaqLogLevel cfg) - case rc of - Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str - Right _ -> do - queue <- asks transaqQueue - logger' <- asks logger - rc <- liftIO $ setCallback (parseAndWrite queue logger') - case rc of - Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" - Just cb -> do - serverConnectionState <- asks serverConnected - timerVar' <- asks timerVar - void $ liftIO $ forkIO $ whileM $ do - threadDelay 1000000 - void . liftIO . atomically $ tryPutTMVar timerVar' () - connStatus <- liftIO . readTVarIO $ serverConnectionState - pure $ connStatus /= StageShutdown - - fsm <- makeFsm StageConnection [ (StageConnection, FSMCallback handleUnconnected) - , (StageGetInfo, FSMCallback handleGetInfo) - , (StageConnected, FSMCallback handleConnected)] - runFsm fsm - liftIO $ freeCallback cb - where - parseTransaqLogLevel 1 = TXML.Warning - parseTransaqLogLevel 3 = TXML.Debug - parseTransaqLogLevel _ = TXML.Info - parseAndWrite queue logger xml = do - let parsed = mapMaybe parseContent $ parseXML xml - mapM_ (writeToQueue queue) parsed - pure True - parseContent (Elem el) = parseElement el - parseContent _ = Nothing - parseElement el = case qName $ elName el of - "candles" -> TransaqResponseCandles <$> fromXml el - "server_status" -> TransaqResponseServerStatus <$> fromXml el - "markets" -> TransaqResponseMarkets <$> fromXml el - "candlekinds" -> TransaqResponseCandleKinds <$> fromXml el - "securities" -> TransaqResponseSecurities <$> fromXml el - "sec_info" -> TransaqResponseSecInfo <$> fromXml el - "quotations" -> TransaqResponseQuotations <$> fromXml el - "alltrades" -> TransaqResponseAllTrades <$> fromXml el - "quotes" -> TransaqResponseQuotes <$> fromXml el - "orders" -> TransaqResponseOrders <$> fromXml el - "trades" -> TransaqResponseTrades <$> fromXml el - "result" -> TransaqResponseResult <$> fromXml el - _ -> Nothing - writeToQueue queue resp = atomically $ writeTBQueue queue resp - handleConnected :: App (Maybe ConnectionStage) - handleConnected = do - checkRequestTimeout - - serverConn <- asks serverConnected - rqVar <- asks requestVar - runVar' <- asks runVar - queue <- asks transaqQueue - timerVar' <- asks timerVar - item <- liftIO . atomically $ - (readTMVar runVar' >> pure MainQueueShutdown) `orElse` - (MainQueueTransaqData <$> readTBQueue queue) `orElse` - (MainQueueRequest <$> takeTMVar rqVar) `orElse` - (takeTMVar timerVar' >> pure MainQueuePingServer) - case item of - MainQueueShutdown -> pure $ Just StageShutdown - MainQueuePingServer -> do - maybeServerStatus <- liftIO $ sendCommand $ toXml CommandServerStatus - case maybeServerStatus of - Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of - ((TransaqResponseResult (ResponseFailure _)):_) -> do - pure $ Just StageConnection - _ -> do - log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw - pure Nothing - Right () -> pure Nothing - MainQueueTransaqData transaqData -> do - tm <- asks tickMap - case transaqData of - TransaqResponseAllTrades (ResponseAllTrades trades) -> do - qssChan <- asks qssChannel - let ticks = fmap allTradeToTick trades - forM_ ticks (liftIO . writeChan qssChan . QSSTick) - forM_ ticks (insertToTickMap tm) - pure Nothing - TransaqResponseQuotations (ResponseQuotations quotations) -> do - qssChan <- asks qssChannel - now <- liftIO getCurrentTime - let ticks = concatMap (quotationToTicks now) quotations - forM_ ticks (liftIO . writeChan qssChan . QSSTick) - forM_ ticks (insertToTickMap tm) - pure Nothing - TransaqResponseCandles respCandle -> do - resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar - log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle) - case resp of - Just !tmvar -> if cStatus respCandle == StatusPending - then do - cur <- asks currentCandles - liftIO $ atomically . modifyTVar' cur $ (\c -> cCandles respCandle <> c) - else do - cur <- asks currentCandles - liftIO $ atomically $ do - candles <- readTVar cur - putTMVar tmvar $ ResponseHistory $ HistoryResponse - { - hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles) - , hrMoreData = False - } - _ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var" - pure Nothing - TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder >> pure Nothing - TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade >> pure Nothing - _ -> pure Nothing - MainQueueRequest (RequestHistory request) -> do - cur <- asks currentCandles - liftIO $ atomically $ writeTVar cur [] - maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO) - case maybeCk of - Just candleKindId -> do - case parseSecurityId (hrTickerId request) of - Just secId -> void $ liftIO . sendCommand $ - toXml CommandGetHistoryData - { - security = secId - , periodId = candleKindId - , count = hrCount request - , reset = hrReset request - } - _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request - _ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request) - pure Nothing - MainQueueRequest (RequestSubmitOrder order) -> do - case mkNewOrderCommand order of - Just cmd -> do - v <- liftIO . sendCommand . toXml $ cmd - case v of - Left result -> do - case headMay (parseXML result) >>= parseContent of - Just (TransaqResponseResult (ResponseSuccess (Just transactionId))) -> do - brState <- asks brokerState - respVar <- asks responseVar - liftIO $ atomically $ do - modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order) - modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert (orderId order) (TransactionId transactionId)) - resp <- readTMVar respVar - putTMVar resp ResponseOrderSubmitted - maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) - case maybeCb of - Just cb -> do - let notif = BackendOrderNotification (orderId order) Submitted - liftIO $ cb notif - _ -> pure () - log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <> - (T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId - Just (TransaqResponseResult (ResponseFailure err)) -> do - brState <- asks brokerState - log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err - maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) - case maybeCb of - Just cb -> do - let notif = BackendOrderNotification (orderId order) Rejected - liftIO $ cb notif - _ -> pure () - _ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result" - pure Nothing - Right _ -> do - log Warning "TXMLConnector.WorkThread" "Expected result, got nothing" - pure Nothing - _ -> pure Nothing - _ -> pure Nothing - - requestTimeout = 10 - - handleTrade transaqTrade = do - brState <- asks brokerState - trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) - maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) - orderMap <- liftIO $ readTVarIO (bsOrderMap brState) - case maybeCb of - Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of - Just oid -> case M.lookup oid orderMap of - Just order -> do - let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order) - log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif - liftIO $ cb notif - _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade - _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade - Nothing -> log Warning "TXMLConnector.WorkThread" $ "No callback for trade notification!" - - fromTransaqTrade transaqTrade order = - Trade - { - tradeOrderId = orderId order - , tradePrice = fromDouble (tPrice transaqTrade) - , tradeQuantity = fromIntegral $ tQuantity transaqTrade - , tradeVolume = fromDouble $ tValue transaqTrade - , tradeVolumeCurrency = "" - , tradeOperation = fromDirection (tBuysell transaqTrade) - , tradeAccount = tClient transaqTrade <> "#" <> tUnion transaqTrade - , tradeSecurity = tBoard transaqTrade <> "#" <> tSecCode transaqTrade - , tradeTimestamp = tTimestamp transaqTrade - , tradeCommission = fromDouble $ tComission transaqTrade - , tradeSignalId = orderSignalId order - } - - fromDirection Transaq.Buy = AT.Buy - fromDirection Transaq.Sell = AT.Sell - - handleOrder orderUpdate = do - brState <- asks brokerState - trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) - maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) - case maybeCb of - Just cb -> case BM.lookupR (ExchangeOrderId (oOrderNo orderUpdate)) trIdMap <|> - BM.lookupR (TransactionId (fromIntegral $ oTransactionId orderUpdate)) trIdMap of - Just oid -> do - let notif = BackendOrderNotification oid (orderStateFromTransaq orderUpdate) - log Debug "TXMLConnector.WorkThread" $ "Sending order notification: " <> (T.pack . show) notif - liftIO $ atomically $ do - m <- readTVar (bsOrderTransactionIdMap brState) - when (BM.notMemberR (ExchangeOrderId (oOrderNo orderUpdate)) m) $ do - modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert oid (ExchangeOrderId $ oOrderNo orderUpdate)) - liftIO $ cb notif - _ -> log Warning "TXMLConnector.WorkThread" "Unable to find order for order notification" - Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for order notification" - - orderStateFromTransaq orderUpdate = - case oStatus orderUpdate of - OrderActive -> Submitted - OrderCancelled -> Cancelled - OrderDenied -> Rejected - OrderDisabled -> Rejected - OrderExpired -> Cancelled - OrderFailed -> Rejected - OrderForwarding -> Unsubmitted - OrderInactive -> OrderError - OrderMatched -> Executed - OrderRefused -> Rejected - OrderRemoved -> Rejected - OrderWait -> Unsubmitted - OrderWatching -> Unsubmitted - _ -> OrderError - - checkRequestTimeout = do - now <- liftIO getCurrentTime - tsVar <- asks requestTimestamp - ts <- liftIO $ readTVarIO tsVar - when (now `diffUTCTime` ts >= requestTimeout) $ do - resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar - case resp of - Just tmvar -> do - log Warning "TXMLConnector.WorkThread" "Request timeout" - liftIO . atomically . putTMVar tmvar $ ResponseTimeout - _ -> pure () - - handleGetInfo :: App (Maybe ConnectionStage) - handleGetInfo = do - queue <- asks transaqQueue - cfg <- asks config - item <- liftIO . atomically $ readTBQueue queue - conn <- asks serverConnected - case item of - TransaqResponseServerStatus serverStatus -> do - log Warning "TXMLConnector.WorkThread" $ "Incoming server status: " <> (T.pack . show) serverStatus - case state serverStatus of - Transaq.Disconnected -> do - log Warning "TXMLConnector.WorkThread" "Server disconnected" - pure $ Just StageConnection - Transaq.Connected -> do - log Info "TXMLConnector.WorkThread" "Server connected" - void $ liftIO . sendCommand $ toXml $ - CommandChangePass (transaqPassword cfg) "goobaka12" - v <- makeSubscriptions cfg - case v of - Left errmsg -> do - log Warning "TXMLConnector.WorkThread" "Unable to subscribe" - void $ liftIO . sendCommand $ toXml CommandDisconnect - pure $ Just StageConnection - Right _ -> do - log Info "TXMLConnector.WorkThread" "Subscriptions done" - pure $ Just StageConnected - Transaq.Error errmsg -> do - log Warning "TXMLConnector.WorkThread" "Connection error" - liftIO . atomically $ writeTVar conn StageConnection - void $ liftIO $ sendCommand $ toXml $ CommandDisconnect - pure $ Just StageConnection - TransaqResponseResult result -> do - log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result - pure Nothing - TransaqResponseCandles candles -> do - log Debug "TXMLConnector.WorkThread" $ - "Incoming candles message: " <> (T.pack . show . length . Transaq.cCandles $ candles) - pure Nothing - TransaqResponseMarkets (ResponseMarkets markets) -> do - log Debug "TXMLConnector.WorkThread" "Incoming markets:" - forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m) - pure Nothing - TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do - ckMap <- asks candleKindMap - log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds - forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k))) - pure Nothing - TransaqResponseSecurities (ResponseSecurities securities) -> do - tisH <- asks tisHandle - let tickerInfos = securityToTickerInfo <$> securities - log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities - forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker) - forM_ tickerInfos (liftIO . putTickerInfo tisH) - pure Nothing - TransaqResponseSecInfo secInfo -> do - log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo - pure Nothing - TransaqResponseClient (ResponseClient clientData) -> do - log Debug "TXMLConnector.WorkThread" $ - "Incoming client data: " <> (T.pack . show) (cClientId clientData) <> "#" <> (T.pack . show) (cUnion clientData) - pure Nothing - _ -> pure Nothing - - handleUnconnected :: App (Maybe ConnectionStage) - handleUnconnected = do - cfg <- asks config - log Debug "TXMLConnector.WorkThread" "Sending connect command" - v <- liftIO . sendCommand . toXml $ cmdConnect cfg - case v of - Left _ -> do - log Warning "TXMLConnector.WorkThread" "Unable to connect" - liftIO $ do - void $ sendCommand $ toXml CommandDisconnect - threadDelay reconnectionDelay - queue <- asks transaqQueue - void $ liftIO $ atomically $ flushTBQueue queue - pure Nothing - Right _ -> do - log Info "TXMLConnector.WorkThread" "Connected" - pure $ Just StageGetInfo - - makeSubscriptions config = liftIO . sendCommand . toXml $ cmdSubscription config - - subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code - insertToTickMap tickMap tick = insertTick tickMap tick - - reconnectionDelay = 1000 * 1000 * 10 - - cmdSubscription config = - CommandSubscribe - { - alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config), - quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config), - quotes = fmap subscriptionToSecurityId (quotesSubscriptions config) - } - cmdConnect cfg = CommandConnect - { - login = transaqLogin cfg, - password = transaqPassword cfg, - host = transaqHost cfg, - port = transaqPort cfg, - language = LanguageEn, - autopos = False, - micexRegisters = True, - milliseconds = True, - utcTime = True, - proxy = (), - rqDelay = Nothing, - sessionTimeout = Nothing, - requestTimeout = Nothing, - pushULimits = Nothing, - pushPosEquity = Nothing - } - -allTradeToTick :: AllTradesTrade -> Tick -allTradeToTick att = - Tick - { - security = attBoard att <> "#" <> attSecCode att, - datatype = LastTradePrice, - timestamp = attTimestamp att, - value = fromDouble $ attPrice att, - volume = fromIntegral $ attQuantity att - } - -quotationToTicks :: UTCTime -> Quotation -> [Tick] -quotationToTicks timestamp q = - let security = qBoard q <> "#" <> qSeccode q in - [ - Tick - { - security = security, - datatype = BestBid, - timestamp = timestamp, - value = fromDouble $ qBid q, - volume = fromIntegral $ qQuantity q - }, - Tick - { - security = security, - datatype = BestOffer, - timestamp = timestamp, - value = fromDouble $ qOffer q, - volume = fromIntegral $ qQuantity q - }] - -securityToTickerInfo :: Security -> TickerInfo -securityToTickerInfo sec = - TickerInfo - { - tiTicker = sBoard sec <> "#" <> sSeccode sec - , tiLotSize = sLotSize sec - , tiTickSize = sMinStep sec - } - -parseSecurityId :: TickerId -> Maybe SecurityId -parseSecurityId tickerId = case T.findIndex (== '#') tickerId of - Just ix -> Just $ SecurityId (T.take ix tickerId) (T.drop (ix + 1) tickerId) - Nothing -> Nothing +brSubmitOrder :: TXMLConnectorHandle -> Order -> IO () +brSubmitOrder h order = void $ makeRequest h (RequestSubmitOrder order) -makeTickerId :: SecurityId -> TickerId -makeTickerId sec = board sec <> "#" <> seccode sec +brCancelOrder :: TXMLConnectorHandle -> OrderId -> IO () +brCancelOrder h oid = void $ makeRequest h (RequestCancelOrder oid) -parseAccountId :: T.Text -> Maybe (T.Text, T.Text) -parseAccountId accId = case T.findIndex (== '#') accId of - Just ix -> Just (T.take ix accId, T.drop (ix + 1) accId) - Nothing -> Nothing +brSetNotificationCallback :: TXMLConnectorHandle -> Maybe (BrokerBackendNotification -> IO ()) -> IO () +brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h) cb +makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend +makeBrokerBackend h account = + BrokerBackend [account] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h) makeRequest :: TXMLConnectorHandle -> Request -> IO Response makeRequest h request = do @@ -735,73 +255,3 @@ makeRequest h request = do atomically $ do void $ takeTMVar (hResponseVar h) takeTMVar resp - -mkNewOrderCommand :: Order -> Maybe Transaq.CommandNewOrder -mkNewOrderCommand order = - case parseSecurityId (orderSecurity order) of - Just secId -> - case parseAccountId (orderAccountId order) of - Just (client, union) -> do - case orderPrice order of - Market -> Just $ Transaq.CommandNewOrder - { - security = secId - , client = client - , unionCode = union - , price = 0 - , quantity = fromInteger $ orderQuantity order - , buysell = toDirection $ orderOperation order - , bymarket = True - , brokerRef = T.empty - , unfilled = UnfilledPutInQueue - , usecredit = False - , nosplit = False - } - Limit price -> Just $ Transaq.CommandNewOrder - { - security = secId - , client = client - , unionCode = union - , price = toDouble price - , quantity = fromInteger $ orderQuantity order - , buysell = toDirection $ orderOperation order - , bymarket = False - , brokerRef = T.empty - , unfilled = UnfilledPutInQueue - , usecredit = False - , nosplit = False - } - _ -> Nothing - _ -> Nothing - _ -> Nothing - where - toDirection AT.Buy = Transaq.Buy - toDirection AT.Sell = Transaq.Sell - - -candleToBar :: SecurityId -> Candle -> Bar -candleToBar sec candle = - Bar - { - barSecurity = makeTickerId sec - , barTimestamp = cTimestamp candle - , barOpen = fromDouble (cOpen candle) - , barHigh = fromDouble (cHigh candle) - , barLow = fromDouble (cLow candle) - , barClose = fromDouble (cClose candle) - , barVolume = fromIntegral $ cVolume candle - } - -brSubmitOrder :: TXMLConnectorHandle -> Order -> IO () -brSubmitOrder h order = void $ makeRequest h (RequestSubmitOrder order) - -brCancelOrder :: TXMLConnectorHandle -> OrderId -> IO () -brCancelOrder h oid = void $ makeRequest h (RequestCancelOrder oid) - -brSetNotificationCallback :: TXMLConnectorHandle -> Maybe (BrokerBackendNotification -> IO ()) -> IO () -brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h) cb - -makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend -makeBrokerBackend h account = - BrokerBackend [account] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h) - diff --git a/src/TXMLConnector/Internal.hs b/src/TXMLConnector/Internal.hs new file mode 100644 index 0000000..05b7699 --- /dev/null +++ b/src/TXMLConnector/Internal.hs @@ -0,0 +1,702 @@ +{-# LANGUAGE BangPatterns #-} +{-# LANGUAGE DuplicateRecordFields #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE GeneralizedNewtypeDeriving #-} +{-# LANGUAGE MultiParamTypeClasses #-} + +module TXMLConnector.Internal + ( + workThread + , Env(..) + , Request(..) + , Response(..) + , BrokerState(..) + , ConnectionStage(..) + , HistoryRequest(..) + , HistoryResponse(..) + ) where + +import ATrade.Logging (Message, Severity (..), log, + logWith) +import Colog (HasLog (getLogAction, setLogAction), + LogAction (LogAction, unLogAction)) +import Config (SubscriptionConfig (SubscriptionConfig), + TransaqConnectorConfig (..), + transaqHost, transaqLogLevel, + transaqLogPath, transaqLogin, + transaqPassword, transaqPort) +import Control.Concurrent (ThreadId, forkIO, threadDelay) +import Control.Concurrent.STM (TVar, atomically, modifyTVar', + newEmptyTMVar, newEmptyTMVarIO, + newTVarIO, orElse, putTMVar, + readTMVar, readTVar, + readTVarIO, takeTMVar, + tryPutTMVar, tryReadTMVar, + writeTVar) +import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue, + newTBQueue, readTBQueue, + writeTBQueue) +import Control.Monad (forever, void, when) +import Control.Monad.Extra (whileM) +import qualified Data.Bimap as BM +import Data.Maybe (mapMaybe) +import qualified Data.Text as T +import qualified Deque.Strict as D +import Text.XML.Light.Input (parseXML) +import Text.XML.Light.Types (Content (Elem), + Element (elName), + QName (qName)) +import TickTable (TickTable, insertTick, + lookupTick, newTickTable) +import Transaq (AllTradesTrade (..), + Candle (..), ClientData (..), + CommandChangePass (..), + CommandConnect (..), + CommandDisconnect (CommandDisconnect), + CommandGetHistoryData (CommandGetHistoryData), + CommandServerStatus (..), + CommandSubscribe (..), + ConnectionState (Disconnected), + Language (LanguageEn), + MarketInfo (..), + OrderNotification (..), + OrderStatus (..), + Quotation (..), + ResponseAllTrades (ResponseAllTrades), + ResponseCandleKinds (ResponseCandleKinds), + ResponseCandles (..), + ResponseCandlesStatus (StatusPending), + ResponseClient (ResponseClient), + ResponseMarkets (ResponseMarkets), + ResponseOrders (ResponseOrders), + ResponseQuotations (ResponseQuotations), + ResponseQuotes (ResponseQuotes), + ResponseResult (..), + ResponseSecurities (ResponseSecurities), + ResponseTrades (ResponseTrades), + Security (..), SecurityId (..), + TradeNotification (..), + TransaqCommand (toXml), + TransaqResponse (..), + TransaqResponse (..), + TransaqResponseC (fromXml), + UnfilledAction (..), + kCandleKindId, kPeriod, state) +import TXML (LogLevel, MonadTXML, + freeCallback, initialize, + sendCommand, setCallback) + +import ATrade.Broker.Backend (BrokerBackend (..), + BrokerBackendNotification (..)) +import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) +import ATrade.Types (Bar (..), + BarTimeframe (unBarTimeframe), + DataType (BestBid, BestOffer, LastTradePrice), + Order (..), OrderId, + OrderPrice (..), + OrderState (..), Tick (..), + TickerId, Trade (..), + fromDouble, toDouble) +import qualified ATrade.Types as AT +import Colog.Monad (WithLog) +import Control.Applicative ((<|>)) +import Control.Concurrent.BoundedChan (BoundedChan, writeChan) +import Control.Concurrent.STM.TMVar (TMVar) +import Control.Error (headMay) +import Control.Monad (forM_) +import Control.Monad.IO.Class (MonadIO (liftIO)) +import Control.Monad.Reader (ReaderT (runReaderT)) +import Control.Monad.Reader.Class (MonadReader, asks) +import Data.Int (Int64) +import qualified Data.Map.Strict as M +import Data.Time.Clock (UTCTime, diffUTCTime, + getCurrentTime) +import FSM (FSMCallback (..), + FSMState (isTerminalState), + makeFsm, runFsm) +import GHC.Exts (IsList (..)) +import Prelude hiding (log) +import TickerInfoServer (TickerInfo (..), + TickerInfoServerHandle, + putTickerInfo) +import qualified Transaq +import qualified TXML + +data ConnectionStage = StageConnection | StageGetInfo | StageConnected | StageShutdown + deriving (Eq, Show, Ord) + +instance FSMState ConnectionStage where + isTerminalState StageShutdown = True + isTerminalState _ = False + +data Env = + Env + { + qssChannel :: BoundedChan QuoteSourceServerData + , tisHandle :: TickerInfoServerHandle + , requestVar :: TMVar Request + , responseVar :: TMVar (TMVar Response) + , requestTimestamp :: TVar UTCTime + , currentCandles :: TVar [Candle] + , tickMap :: TickTable + , transaqQueue :: TBQueue TransaqResponse + , logger :: LogAction IO Message + , config :: TransaqConnectorConfig + , serverConnected :: TVar ConnectionStage + , candleKindMap :: TVar (M.Map Int Int) + , brokerState :: BrokerState + , runVar :: TMVar () + , timerVar :: TMVar () + } + +data MainQueueData = + MainQueueTransaqData TransaqResponse + | MainQueueRequest Request + | MainQueuePingServer + | MainQueueShutdown + deriving (Eq, Show) + +data TransactionId = + TransactionId Int64 + | ExchangeOrderId Int64 + deriving (Show, Ord, Eq) + +data BrokerState = + BrokerState + { + bsOrderTransactionIdMap :: TVar (BM.Bimap OrderId TransactionId) + , bsNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ())) + , bsOrderMap :: TVar (M.Map OrderId Order) + , bsPendingOrders :: TVar (D.Deque Order) + } + +data HistoryRequest = + HistoryRequest + { + hrTickerId :: TickerId + , hrTimeframe :: BarTimeframe + , hrCount :: Int + , hrReset :: Bool + } deriving (Show, Eq, Ord) + +data Request = + RequestHistory HistoryRequest + | RequestSubmitOrder Order + | RequestCancelOrder OrderId + deriving (Show, Eq) + +data Response = + ResponseHistory HistoryResponse + | ResponseOrderSubmitted + | ResponseOrderCancelled + | ResponseTimeout + +data HistoryResponse = + HistoryResponse + { + hrBars :: [Bar] + , hrMoreData :: Bool + } + deriving (Show, Eq) + +newtype RespCallback = + RespCallback { unCallback :: T.Text -> IO Bool } + +workThread :: (MonadIO m, + MonadReader Env m, + MonadTXML m, + HasLog Env Message m) => m () +workThread = do + cfg <- asks config + rc <- initialize (transaqLogPath cfg) (parseTransaqLogLevel $ transaqLogLevel cfg) + case rc of + Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str + Right _ -> do + queue <- asks transaqQueue + logger' <- asks logger + rc <- setCallback (parseAndWrite queue) + case rc of + Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" + Just cb -> do + serverConnectionState <- asks serverConnected + timerVar' <- asks timerVar + void $ liftIO $ forkIO $ whileM $ do + threadDelay 1000000 + void . liftIO . atomically $ tryPutTMVar timerVar' () + connStatus <- liftIO . readTVarIO $ serverConnectionState + pure $ connStatus /= StageShutdown + + fsm <- makeFsm StageConnection [ (StageConnection, FSMCallback handleUnconnected) + , (StageGetInfo, FSMCallback handleGetInfo) + , (StageConnected, FSMCallback handleConnected)] + runFsm fsm + liftIO $ freeCallback cb + where + parseTransaqLogLevel 1 = TXML.Warning + parseTransaqLogLevel 3 = TXML.Debug + parseTransaqLogLevel _ = TXML.Info + parseAndWrite :: TBQueue TransaqResponse -> T.Text -> IO Bool + parseAndWrite queue xml = do + let parsed = mapMaybe parseContent $ parseXML xml + atomically $ mapM_ (writeTBQueue queue) parsed + pure True + +parseContent :: Content -> Maybe TransaqResponse +parseContent (Elem el) = parseElement + where + parseElement = case qName $ elName el of + "candles" -> TransaqResponseCandles <$> fromXml el + "server_status" -> TransaqResponseServerStatus <$> fromXml el + "markets" -> TransaqResponseMarkets <$> fromXml el + "candlekinds" -> TransaqResponseCandleKinds <$> fromXml el + "securities" -> TransaqResponseSecurities <$> fromXml el + "sec_info" -> TransaqResponseSecInfo <$> fromXml el + "quotations" -> TransaqResponseQuotations <$> fromXml el + "alltrades" -> TransaqResponseAllTrades <$> fromXml el + "quotes" -> TransaqResponseQuotes <$> fromXml el + "orders" -> TransaqResponseOrders <$> fromXml el + "trades" -> TransaqResponseTrades <$> fromXml el + "result" -> TransaqResponseResult <$> fromXml el + _ -> Nothing +parseContent _ = Nothing + + +handleConnected :: (MonadIO m, + MonadReader Env m, + MonadTXML m, + HasLog Env Message m) => m (Maybe ConnectionStage) +handleConnected = do + checkRequestTimeout + + rqVar <- asks requestVar + runVar' <- asks runVar + queue <- asks transaqQueue + timerVar' <- asks timerVar + item <- liftIO . atomically $ + (readTMVar runVar' >> pure MainQueueShutdown) `orElse` + (MainQueueTransaqData <$> readTBQueue queue) `orElse` + (MainQueueRequest <$> takeTMVar rqVar) `orElse` + (takeTMVar timerVar' >> pure MainQueuePingServer) + case item of + MainQueueShutdown -> pure $ Just StageShutdown + MainQueuePingServer -> do + maybeServerStatus <- sendCommand $ toXml CommandServerStatus + case maybeServerStatus of + Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of + ((TransaqResponseResult (ResponseFailure _)):_) -> do + pure $ Just StageConnection + _ -> do + log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw + pure Nothing + Right () -> pure Nothing + MainQueueTransaqData transaqData -> do + tm <- asks tickMap + case transaqData of + TransaqResponseAllTrades (ResponseAllTrades trades) -> do + qssChan <- asks qssChannel + let ticks = fmap allTradeToTick trades + forM_ ticks (liftIO . writeChan qssChan . QSSTick) + forM_ ticks (insertToTickMap tm) + pure Nothing + TransaqResponseQuotations (ResponseQuotations quotations) -> do + qssChan <- asks qssChannel + now <- liftIO getCurrentTime + let ticks = concatMap (quotationToTicks now) quotations + forM_ ticks (liftIO . writeChan qssChan . QSSTick) + forM_ ticks (insertToTickMap tm) + pure Nothing + TransaqResponseCandles respCandle -> do + resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar + log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle) + case resp of + Just !tmvar -> if cStatus respCandle == StatusPending + then do + cur <- asks currentCandles + liftIO $ atomically . modifyTVar' cur $ (\c -> cCandles respCandle <> c) + else do + cur <- asks currentCandles + liftIO $ atomically $ do + candles <- readTVar cur + putTMVar tmvar $ ResponseHistory $ HistoryResponse + { + hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles) + , hrMoreData = False + } + _ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var" + pure Nothing + TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder >> pure Nothing + TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade >> pure Nothing + _ -> pure Nothing + MainQueueRequest (RequestHistory request) -> do + cur <- asks currentCandles + liftIO $ atomically $ writeTVar cur [] + maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO) + case maybeCk of + Just candleKindId -> do + case parseSecurityId (hrTickerId request) of + Just secId -> void $ sendCommand $ toXml CommandGetHistoryData + { + security = secId + , periodId = candleKindId + , count = hrCount request + , reset = hrReset request + } + _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request + _ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request) + pure Nothing + MainQueueRequest (RequestSubmitOrder order) -> do + case mkNewOrderCommand order of + Just cmd -> do + v <- sendCommand . toXml $ cmd + case v of + Left result -> do + case headMay (parseXML result) >>= parseContent of + Just (TransaqResponseResult (ResponseSuccess (Just transactionId))) -> do + brState <- asks brokerState + respVar <- asks responseVar + liftIO $ atomically $ do + modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order) + modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert (orderId order) (TransactionId transactionId)) + resp <- readTMVar respVar + putTMVar resp ResponseOrderSubmitted + maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) + case maybeCb of + Just cb -> do + let notif = BackendOrderNotification (orderId order) Submitted + liftIO $ cb notif + _ -> pure () + log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <> + (T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId + Just (TransaqResponseResult (ResponseFailure err)) -> do + brState <- asks brokerState + log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err + maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) + case maybeCb of + Just cb -> do + let notif = BackendOrderNotification (orderId order) Rejected + liftIO $ cb notif + _ -> pure () + _ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result" + pure Nothing + Right _ -> do + log Warning "TXMLConnector.WorkThread" "Expected result, got nothing" + pure Nothing + _ -> pure Nothing + _ -> pure Nothing + + where + requestTimeout = 10 + + handleTrade transaqTrade = do + brState <- asks brokerState + trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) + maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) + orderMap <- liftIO $ readTVarIO (bsOrderMap brState) + case maybeCb of + Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of + Just oid -> case M.lookup oid orderMap of + Just order -> do + let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order) + log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif + liftIO $ cb notif + _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade + _ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade + Nothing -> log Warning "TXMLConnector.WorkThread" $ "No callback for trade notification!" + + fromTransaqTrade transaqTrade order = + Trade + { + tradeOrderId = orderId order + , tradePrice = fromDouble (tPrice transaqTrade) + , tradeQuantity = fromIntegral $ tQuantity transaqTrade + , tradeVolume = fromDouble $ tValue transaqTrade + , tradeVolumeCurrency = "" + , tradeOperation = fromDirection (tBuysell transaqTrade) + , tradeAccount = tClient transaqTrade <> "#" <> tUnion transaqTrade + , tradeSecurity = tBoard transaqTrade <> "#" <> tSecCode transaqTrade + , tradeTimestamp = tTimestamp transaqTrade + , tradeCommission = fromDouble $ tComission transaqTrade + , tradeSignalId = orderSignalId order + } + + fromDirection Transaq.Buy = AT.Buy + fromDirection Transaq.Sell = AT.Sell + + handleOrder orderUpdate = do + brState <- asks brokerState + trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) + maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) + case maybeCb of + Just cb -> case BM.lookupR (ExchangeOrderId (oOrderNo orderUpdate)) trIdMap <|> + BM.lookupR (TransactionId (fromIntegral $ oTransactionId orderUpdate)) trIdMap of + Just oid -> do + let notif = BackendOrderNotification oid (orderStateFromTransaq orderUpdate) + log Debug "TXMLConnector.WorkThread" $ "Sending order notification: " <> (T.pack . show) notif + liftIO $ atomically $ do + m <- readTVar (bsOrderTransactionIdMap brState) + when (BM.notMemberR (ExchangeOrderId (oOrderNo orderUpdate)) m) $ do + modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert oid (ExchangeOrderId $ oOrderNo orderUpdate)) + liftIO $ cb notif + _ -> log Warning "TXMLConnector.WorkThread" "Unable to find order for order notification" + Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for order notification" + + orderStateFromTransaq orderUpdate = + case oStatus orderUpdate of + OrderActive -> Submitted + OrderCancelled -> Cancelled + OrderDenied -> Rejected + OrderDisabled -> Rejected + OrderExpired -> Cancelled + OrderFailed -> Rejected + OrderForwarding -> Unsubmitted + OrderInactive -> OrderError + OrderMatched -> Executed + OrderRefused -> Rejected + OrderRemoved -> Rejected + OrderWait -> Unsubmitted + OrderWatching -> Unsubmitted + _ -> OrderError + + checkRequestTimeout = do + now <- liftIO getCurrentTime + tsVar <- asks requestTimestamp + ts <- liftIO $ readTVarIO tsVar + when (now `diffUTCTime` ts >= requestTimeout) $ do + resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar + case resp of + Just tmvar -> do + log Warning "TXMLConnector.WorkThread" "Request timeout" + liftIO . atomically . putTMVar tmvar $ ResponseTimeout + _ -> pure () + + insertToTickMap = insertTick + +handleGetInfo :: (MonadIO m, + MonadReader Env m, + MonadTXML m, + HasLog Env Message m) => m (Maybe ConnectionStage) +handleGetInfo = do + queue <- asks transaqQueue + cfg <- asks config + item <- liftIO . atomically $ readTBQueue queue + conn <- asks serverConnected + case item of + TransaqResponseServerStatus serverStatus -> do + log Warning "TXMLConnector.WorkThread" $ "Incoming server status: " <> (T.pack . show) serverStatus + case state serverStatus of + Transaq.Disconnected -> do + log Warning "TXMLConnector.WorkThread" "Server disconnected" + pure $ Just StageConnection + Transaq.Connected -> do + log Info "TXMLConnector.WorkThread" "Server connected" + void $ sendCommand $ toXml $ + CommandChangePass (transaqPassword cfg) "goobaka12" + v <- makeSubscriptions cfg + case v of + Left _ -> do + log Warning "TXMLConnector.WorkThread" "Unable to subscribe" + void $ sendCommand $ toXml CommandDisconnect + pure $ Just StageConnection + Right _ -> do + log Info "TXMLConnector.WorkThread" "Subscriptions done" + pure $ Just StageConnected + Transaq.Error _ -> do + log Warning "TXMLConnector.WorkThread" "Connection error" + liftIO . atomically $ writeTVar conn StageConnection + void $ sendCommand $ toXml $ CommandDisconnect + pure $ Just StageConnection + TransaqResponseResult result -> do + log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result + pure Nothing + TransaqResponseCandles candles -> do + log Debug "TXMLConnector.WorkThread" $ + "Incoming candles message: " <> (T.pack . show . length . Transaq.cCandles $ candles) + pure Nothing + TransaqResponseMarkets (ResponseMarkets markets) -> do + log Debug "TXMLConnector.WorkThread" "Incoming markets:" + forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m) + pure Nothing + TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do + ckMap <- asks candleKindMap + log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds + forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k))) + pure Nothing + TransaqResponseSecurities (ResponseSecurities securities) -> do + tisH <- asks tisHandle + let tickerInfos = securityToTickerInfo <$> securities + log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities + forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker) + forM_ tickerInfos (liftIO . putTickerInfo tisH) + pure Nothing + TransaqResponseSecInfo secInfo -> do + log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo + pure Nothing + TransaqResponseClient (ResponseClient clientData) -> do + log Debug "TXMLConnector.WorkThread" $ + "Incoming client data: " <> (T.pack . show) (cClientId clientData) <> "#" <> (T.pack . show) (cUnion clientData) + pure Nothing + _ -> pure Nothing + where + makeSubscriptions config = sendCommand . toXml $ cmdSubscription config + cmdSubscription config = + CommandSubscribe + { + alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config), + quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config), + quotes = fmap subscriptionToSecurityId (quotesSubscriptions config) + } + subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code + + +handleUnconnected :: (MonadIO m, + MonadReader Env m, + MonadTXML m, + HasLog Env Message m) => m (Maybe ConnectionStage) +handleUnconnected = do + cfg <- asks config + log Debug "TXMLConnector.WorkThread" "Sending connect command" + v <- sendCommand . toXml $ cmdConnect cfg + case v of + Left _ -> do + log Warning "TXMLConnector.WorkThread" "Unable to connect" + void $ sendCommand $ toXml CommandDisconnect + liftIO $ threadDelay reconnectionDelay + queue <- asks transaqQueue + void $ liftIO $ atomically $ flushTBQueue queue + pure Nothing + Right _ -> do + log Info "TXMLConnector.WorkThread" "Connected" + pure $ Just StageGetInfo + where + reconnectionDelay = 1000 * 1000 * 10 + + cmdConnect cfg = CommandConnect + { + login = transaqLogin cfg, + password = transaqPassword cfg, + host = transaqHost cfg, + port = transaqPort cfg, + language = LanguageEn, + autopos = False, + micexRegisters = True, + milliseconds = True, + utcTime = True, + proxy = (), + rqDelay = Nothing, + sessionTimeout = Nothing, + requestTimeout = Nothing, + pushULimits = Nothing, + pushPosEquity = Nothing + } + + + +allTradeToTick :: AllTradesTrade -> Tick +allTradeToTick att = + Tick + { + security = attBoard att <> "#" <> attSecCode att, + datatype = LastTradePrice, + timestamp = attTimestamp att, + value = fromDouble $ attPrice att, + volume = fromIntegral $ attQuantity att + } + +quotationToTicks :: UTCTime -> Quotation -> [Tick] +quotationToTicks timestamp q = + let security = qBoard q <> "#" <> qSeccode q in + [ + Tick + { + security = security, + datatype = BestBid, + timestamp = timestamp, + value = fromDouble $ qBid q, + volume = fromIntegral $ qQuantity q + }, + Tick + { + security = security, + datatype = BestOffer, + timestamp = timestamp, + value = fromDouble $ qOffer q, + volume = fromIntegral $ qQuantity q + }] + +securityToTickerInfo :: Security -> TickerInfo +securityToTickerInfo sec = + TickerInfo + { + tiTicker = sBoard sec <> "#" <> sSeccode sec + , tiLotSize = sLotSize sec + , tiTickSize = sMinStep sec + } + +parseSecurityId :: TickerId -> Maybe SecurityId +parseSecurityId tickerId = case T.findIndex (== '#') tickerId of + Just ix -> Just $ SecurityId (T.take ix tickerId) (T.drop (ix + 1) tickerId) + Nothing -> Nothing + +makeTickerId :: SecurityId -> TickerId +makeTickerId sec = board sec <> "#" <> seccode sec + +parseAccountId :: T.Text -> Maybe (T.Text, T.Text) +parseAccountId accId = case T.findIndex (== '#') accId of + Just ix -> Just (T.take ix accId, T.drop (ix + 1) accId) + Nothing -> Nothing + + +mkNewOrderCommand :: Order -> Maybe Transaq.CommandNewOrder +mkNewOrderCommand order = + case parseSecurityId (orderSecurity order) of + Just secId -> + case parseAccountId (orderAccountId order) of + Just (client, union) -> do + case orderPrice order of + Market -> Just $ Transaq.CommandNewOrder + { + security = secId + , client = client + , unionCode = union + , price = 0 + , quantity = fromInteger $ orderQuantity order + , buysell = toDirection $ orderOperation order + , bymarket = True + , brokerRef = T.empty + , unfilled = UnfilledPutInQueue + , usecredit = False + , nosplit = False + } + Limit price -> Just $ Transaq.CommandNewOrder + { + security = secId + , client = client + , unionCode = union + , price = toDouble price + , quantity = fromInteger $ orderQuantity order + , buysell = toDirection $ orderOperation order + , bymarket = False + , brokerRef = T.empty + , unfilled = UnfilledPutInQueue + , usecredit = False + , nosplit = False + } + _ -> Nothing + _ -> Nothing + _ -> Nothing + where + toDirection AT.Buy = Transaq.Buy + toDirection AT.Sell = Transaq.Sell + +candleToBar :: SecurityId -> Candle -> Bar +candleToBar sec candle = + Bar + { + barSecurity = makeTickerId sec + , barTimestamp = cTimestamp candle + , barOpen = fromDouble (cOpen candle) + , barHigh = fromDouble (cHigh candle) + , barLow = fromDouble (cLow candle) + , barClose = fromDouble (cClose candle) + , barVolume = fromIntegral $ cVolume candle + } diff --git a/src/Win32/TXML.hs b/src/Win32/TXML.hs index d9050d8..ff74595 100644 --- a/src/Win32/TXML.hs +++ b/src/Win32/TXML.hs @@ -1,3 +1,5 @@ +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE MultiParamTypeClasses #-} module Win32.TXML ( @@ -10,6 +12,7 @@ module Win32.TXML , LogLevel(..) ) where +import Control.Monad.IO.Class (MonadIO (..)) import qualified Data.ByteString.Char8 as BS import qualified Data.Text as T import Data.Text.Encoding @@ -17,6 +20,8 @@ import Data.Text.Encoding.Error import Foreign.C.String import Foreign.C.Types import Foreign.Ptr +import TXML (Callback (..), FreeableCallback (..), + LogLevel (..)) foreign import ccall "Initialize" c_Initialize :: CString -> CInt -> IO CString foreign import ccall "UnInitialize" c_UnInitialize :: IO CString @@ -28,13 +33,10 @@ foreign import ccall "FreeMemory" c_FreeMemory :: CString -> IO CBool foreign import ccall "wrapper" createCallbackPtr :: (CString -> IO CBool) -> IO (FunPtr (CString -> IO CBool)) -data LogLevel = - Debug - | Info - | Warning - deriving (Show, Eq, Ord) +newtype WrappedCallback = WrappedCallback { unCallback :: FunPtr (CString -> IO CBool)} -newtype Callback = Callback { unCallback :: FunPtr (CString -> IO CBool)} +instance (MonadIO m) => FreeableCallback m WrappedCallback where + freeCallback = liftIO . freeHaskellFunPtr . unCallback logLevelToInt :: LogLevel -> CInt logLevelToInt Debug = 3 @@ -78,7 +80,7 @@ sendCommand cmdData = do BS.useAsCString (encodeUtf8 cmdData) $ \fpcstr -> c_SendCommand fpcstr >>= rawStringToResult -setCallback :: (T.Text -> IO Bool) -> IO (Maybe Callback) +setCallback :: (T.Text -> IO Bool) -> IO (Maybe (Callback IO)) setCallback callback = do wrappedCallback <- createCallbackPtr (\x -> do packed <- BS.packCString x @@ -87,7 +89,7 @@ setCallback callback = do packed)) ret <- c_SetCallback wrappedCallback if ret /= 0 - then return . Just . Callback $ wrappedCallback + then return . Just . MkCallback $ WrappedCallback wrappedCallback else do freeHaskellFunPtr wrappedCallback return Nothing @@ -95,6 +97,3 @@ setCallback callback = do boolToCBool False = 0 boolToCBool True = 1 -freeCallback :: Callback -> IO () -freeCallback = freeHaskellFunPtr . unCallback - diff --git a/test/Test/TXMLConnector.hs b/test/Test/TXMLConnector.hs new file mode 100644 index 0000000..78d099d --- /dev/null +++ b/test/Test/TXMLConnector.hs @@ -0,0 +1,5 @@ + +module Test.TXMLConnector + ( + ) where + diff --git a/transaq-connector.cabal b/transaq-connector.cabal index 56c7746..e79e2d8 100644 --- a/transaq-connector.cabal +++ b/transaq-connector.cabal @@ -25,10 +25,12 @@ executable transaq-connector , Version , TXML , TXMLConnector + , TXMLConnector.Internal , TickTable , FSM default-extensions: OverloadedStrings , MultiWayIf + , MultiParamTypeClasses default-language: Haskell2010 build-depends: base >= 4.7 && < 5 , dhall @@ -78,7 +80,8 @@ test-suite transaq-connector-test main-is: Spec.hs other-modules: Test.TickTable , Test.FSM - + , TXMLConnector + , TXMLConnector.Internal , FSM , TickTable @@ -114,4 +117,4 @@ test-suite transaq-connector-test , network-uri default-extensions: OverloadedStrings , MultiWayIf - + , MultiParamTypeClasses