You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
741 lines
33 KiB
741 lines
33 KiB
{-# LANGUAGE DuplicateRecordFields #-} |
|
{-# LANGUAGE FlexibleContexts #-} |
|
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
|
{-# LANGUAGE MultiParamTypeClasses #-} |
|
{-# LANGUAGE OverloadedLabels #-} |
|
{-# LANGUAGE RecordWildCards #-} |
|
{-# LANGUAGE TypeApplications #-} |
|
|
|
module TXMLConnector |
|
( |
|
start |
|
, Request(..) |
|
, HistoryRequest(..) |
|
, Response(..) |
|
, HistoryResponse(..) |
|
, makeRequest |
|
, TXMLConnectorHandle |
|
, makeBrokerBackend |
|
) where |
|
|
|
import ATrade.Logging (Message, Severity (..), log, |
|
logWith) |
|
import Colog (HasLog (getLogAction, setLogAction), |
|
LogAction (LogAction, unLogAction)) |
|
import Config (SubscriptionConfig (SubscriptionConfig), |
|
TransaqConnectorConfig (..), |
|
transaqHost, transaqLogLevel, |
|
transaqLogPath, transaqLogin, |
|
transaqPassword, transaqPort) |
|
import Control.Concurrent (ThreadId, forkIO, threadDelay) |
|
import Control.Concurrent.STM (TVar, atomically, modifyTVar', |
|
newEmptyTMVar, newEmptyTMVarIO, |
|
newTVarIO, orElse, putTMVar, |
|
readTMVar, readTVar, |
|
readTVarIO, takeTMVar, |
|
tryReadTMVar, writeTVar) |
|
import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue, |
|
readTBQueue, writeTBQueue) |
|
import Control.Monad (forever, void, when) |
|
import qualified Data.Bimap as BM |
|
import Data.Maybe (mapMaybe) |
|
import qualified Data.Text as T |
|
import qualified Deque.Strict as D |
|
import Text.XML.Light.Input (parseXML) |
|
import Text.XML.Light.Types (Content (Elem), |
|
Element (elName), |
|
QName (qName)) |
|
import Transaq (AllTradesTrade (..), |
|
Candle (..), ClientData (..), |
|
CommandChangePass (..), |
|
CommandConnect (..), |
|
CommandDisconnect (CommandDisconnect), |
|
CommandGetHistoryData (CommandGetHistoryData), |
|
CommandSubscribe (..), |
|
ConnectionState (Disconnected), |
|
Language (LanguageEn), |
|
MarketInfo (..), |
|
OrderNotification (..), |
|
OrderStatus (..), |
|
Quotation (..), |
|
ResponseAllTrades (ResponseAllTrades), |
|
ResponseCandleKinds (ResponseCandleKinds), |
|
ResponseCandles (..), |
|
ResponseCandlesStatus (StatusPending), |
|
ResponseClient (ResponseClient), |
|
ResponseMarkets (ResponseMarkets), |
|
ResponseOrders (ResponseOrders), |
|
ResponseQuotations (ResponseQuotations), |
|
ResponseQuotes (ResponseQuotes), |
|
ResponseResult (..), |
|
ResponseSecurities (ResponseSecurities), |
|
ResponseTrades (ResponseTrades), |
|
Security (..), SecurityId (..), |
|
TradeNotification (..), |
|
TransaqCommand (toXml), |
|
TransaqResponse (..), |
|
TransaqResponse (..), |
|
TransaqResponseC (fromXml), |
|
UnfilledAction (..), |
|
kCandleKindId, kPeriod, state) |
|
import TXML (LogLevel, freeCallback, |
|
initialize, sendCommand, |
|
setCallback) |
|
|
|
import ATrade.Broker.Backend (BrokerBackend (..), |
|
BrokerBackendNotification (..)) |
|
import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) |
|
import ATrade.Types (Bar (..), |
|
BarTimeframe (unBarTimeframe), |
|
DataType (BestBid, BestOffer, LastTradePrice), |
|
Order (..), OrderId, |
|
OrderPrice (..), |
|
OrderState (..), Tick (..), |
|
TickerId, Trade (..), |
|
fromDouble, toDouble) |
|
import qualified ATrade.Types as AT |
|
import Colog.Monad (WithLog) |
|
import Control.Applicative ((<|>)) |
|
import Control.Concurrent.BoundedChan (BoundedChan, writeChan) |
|
import Control.Concurrent.STM.TMVar (TMVar) |
|
import Control.Error (headMay) |
|
import Control.Monad (forM_) |
|
import Control.Monad.IO.Class (MonadIO (liftIO)) |
|
import Control.Monad.Reader (ReaderT (runReaderT)) |
|
import Control.Monad.Reader.Class (MonadReader, asks) |
|
import Data.Int (Int64) |
|
import qualified Data.Map.Strict as M |
|
import Data.Time.Clock (UTCTime, diffUTCTime, |
|
getCurrentTime) |
|
import GHC.Exts (IsList (..)) |
|
import Prelude hiding (log) |
|
import TickerInfoServer (TickerInfo (..), |
|
TickerInfoServerHandle, |
|
putTickerInfo) |
|
import qualified Transaq |
|
import qualified TXML |
|
|
|
data ConnectionParams = |
|
ConnectionParams |
|
{ |
|
cpLogin :: T.Text |
|
, cpPassword :: T.Text |
|
, cpHost :: T.Text |
|
, cpPort :: Int |
|
, cpLogPath :: T.Text |
|
, cpLogLevel :: LogLevel |
|
} |
|
deriving (Show, Eq, Ord) |
|
|
|
data HistoryRequest = |
|
HistoryRequest |
|
{ |
|
hrTickerId :: TickerId |
|
, hrTimeframe :: BarTimeframe |
|
, hrCount :: Int |
|
, hrReset :: Bool |
|
} deriving (Show, Eq, Ord) |
|
|
|
data Request = |
|
RequestHistory HistoryRequest |
|
| RequestSubmitOrder Order |
|
| RequestCancelOrder OrderId |
|
deriving (Show, Eq) |
|
|
|
data Response = |
|
ResponseHistory HistoryResponse |
|
| ResponseOrderSubmitted |
|
| ResponseOrderCancelled |
|
| ResponseTimeout |
|
|
|
data HistoryResponse = |
|
HistoryResponse |
|
{ |
|
hrBars :: [Bar] |
|
, hrMoreData :: Bool |
|
} |
|
deriving (Show, Eq) |
|
|
|
data TXMLConnectorHandle = |
|
TXMLConnectorHandle |
|
{ |
|
threadId :: ThreadId |
|
, notificationQueue :: TBQueue TransaqResponse |
|
, hRequestVar :: TMVar Request |
|
, hResponseVar :: TMVar (TMVar Response) |
|
, hRequestTimestamp :: TVar UTCTime |
|
, hNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ())) |
|
} |
|
|
|
data ConnectionStage = StageConnection | StageGetInfo | StageConnected |
|
deriving (Eq, Show, Ord) |
|
|
|
data MainQueueData = |
|
MainQueueTransaqData TransaqResponse |
|
| MainQueueRequest Request |
|
deriving (Eq, Show) |
|
|
|
data TickKey = TickKey TickerId DataType |
|
deriving (Show, Ord, Eq) |
|
|
|
data TransactionId = |
|
TransactionId Int64 |
|
| ExchangeOrderId Int64 |
|
deriving (Show, Ord, Eq) |
|
|
|
data BrokerState = |
|
BrokerState |
|
{ |
|
bsOrderTransactionIdMap :: TVar (BM.Bimap OrderId TransactionId) |
|
, bsNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ())) |
|
, bsOrderMap :: TVar (M.Map OrderId Order) |
|
, bsPendingOrders :: TVar (D.Deque Order) |
|
} |
|
|
|
data Env = |
|
Env |
|
{ |
|
qssChannel :: BoundedChan QuoteSourceServerData |
|
, tisHandle :: TickerInfoServerHandle |
|
, requestVar :: TMVar Request |
|
, responseVar :: TMVar (TMVar Response) |
|
, requestTimestamp :: TVar UTCTime |
|
, currentCandles :: TVar [Candle] |
|
, tickMap :: TVar (M.Map TickKey Tick) |
|
, transaqQueue :: TBQueue TransaqResponse |
|
, logger :: LogAction IO Message |
|
, config :: TransaqConnectorConfig |
|
, serverConnected :: TVar ConnectionStage |
|
, candleKindMap :: TVar (M.Map Int Int) |
|
, brokerState :: BrokerState |
|
} |
|
|
|
newtype App a = App { unApp :: ReaderT Env IO a } |
|
deriving (Monad, Applicative, Functor, MonadIO, MonadReader Env) |
|
|
|
instance HasLog Env Message App where |
|
getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) } |
|
setLogAction _ env = env -- fuck it |
|
|
|
start :: |
|
LogAction IO Message |
|
-> TransaqConnectorConfig |
|
-> BoundedChan QuoteSourceServerData |
|
-> TickerInfoServerHandle |
|
-> IO TXMLConnectorHandle |
|
start logger config qssChannel tisH = do |
|
logWith logger Info "TXMLConnector" "Starting" |
|
notificationQueue <- atomically $ newTBQueue 50000 |
|
tickTable <- newTVarIO M.empty |
|
requestVar <- newEmptyTMVarIO |
|
responseVar <- newEmptyTMVarIO |
|
currentCandles <- newTVarIO [] |
|
serverConnected <- liftIO $ newTVarIO StageConnection |
|
candleKindMap <- newTVarIO M.empty |
|
requestTimestamp <- getCurrentTime >>= newTVarIO |
|
orderMap <- newTVarIO M.empty |
|
notificationCallback <- newTVarIO Nothing |
|
orderTransactionIdMap <- newTVarIO BM.empty |
|
pendingOrders <- newTVarIO (fromList []) |
|
let brokerState = |
|
BrokerState |
|
{ |
|
bsOrderTransactionIdMap = orderTransactionIdMap |
|
, bsNotificationCallback = notificationCallback |
|
, bsOrderMap = orderMap |
|
, bsPendingOrders = pendingOrders |
|
} |
|
let env = |
|
Env |
|
{ |
|
qssChannel = qssChannel |
|
, tisHandle = tisH |
|
, requestVar = requestVar |
|
, responseVar = responseVar |
|
, requestTimestamp = requestTimestamp |
|
, currentCandles = currentCandles |
|
, tickMap = tickTable |
|
, transaqQueue = notificationQueue |
|
, logger = logger |
|
, config = config |
|
, serverConnected = serverConnected |
|
, candleKindMap = candleKindMap |
|
, brokerState = brokerState |
|
} |
|
threadId <- forkIO $ (runReaderT . unApp) workThread env |
|
return $ TXMLConnectorHandle |
|
{ |
|
threadId = threadId |
|
, notificationQueue = notificationQueue |
|
, hRequestVar = requestVar |
|
, hResponseVar = responseVar |
|
, hRequestTimestamp = requestTimestamp |
|
, hNotificationCallback = notificationCallback |
|
} |
|
|
|
workThread :: App () |
|
workThread = do |
|
cfg <- asks config |
|
rc <- liftIO $ initialize (transaqLogPath cfg) (parseTransaqLogLevel $ transaqLogLevel cfg) |
|
case rc of |
|
Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str |
|
Right _ -> do |
|
queue <- asks transaqQueue |
|
logger' <- asks logger |
|
rc <- liftIO $ setCallback (parseAndWrite queue logger') |
|
case rc of |
|
Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" |
|
Just cb -> do |
|
void $ forever $ do |
|
connStatus <- asks serverConnected >>= (liftIO . readTVarIO) |
|
case connStatus of |
|
StageConnection -> handleUnconnected |
|
StageGetInfo -> handleGetInfo |
|
StageConnected -> handleConnected |
|
liftIO $ freeCallback cb |
|
where |
|
parseTransaqLogLevel 1 = TXML.Warning |
|
parseTransaqLogLevel 3 = TXML.Debug |
|
parseTransaqLogLevel _ = TXML.Info |
|
parseAndWrite queue logger xml = do |
|
let parsed = mapMaybe parseContent $ parseXML xml |
|
logWith logger Debug "TXML.Callback" $ "Parsed entities: " <> (T.pack . show . length) parsed |
|
mapM_ (writeToQueue queue) parsed |
|
pure True |
|
parseContent (Elem el) = parseElement el |
|
parseContent _ = Nothing |
|
parseElement el = case qName $ elName el of |
|
"candles" -> TransaqResponseCandles <$> fromXml el |
|
"server_status" -> TransaqResponseServerStatus <$> fromXml el |
|
"markets" -> TransaqResponseMarkets <$> fromXml el |
|
"candlekinds" -> TransaqResponseCandleKinds <$> fromXml el |
|
"securities" -> TransaqResponseSecurities <$> fromXml el |
|
"sec_info" -> TransaqResponseSecInfo <$> fromXml el |
|
"quotations" -> TransaqResponseQuotations <$> fromXml el |
|
"alltrades" -> TransaqResponseAllTrades <$> fromXml el |
|
"quotes" -> TransaqResponseQuotes <$> fromXml el |
|
"orders" -> TransaqResponseOrders <$> fromXml el |
|
"trades" -> TransaqResponseTrades <$> fromXml el |
|
"result" -> TransaqResponseResult <$> fromXml el |
|
_ -> Nothing |
|
writeToQueue queue resp = atomically $ writeTBQueue queue resp |
|
handleConnected :: App () |
|
handleConnected = do |
|
rqVar <- asks requestVar |
|
queue <- asks transaqQueue |
|
item <- liftIO . atomically $ (MainQueueTransaqData <$> readTBQueue queue) `orElse` |
|
(MainQueueRequest <$> takeTMVar rqVar) |
|
case item of |
|
MainQueueTransaqData transaqData -> do |
|
tm <- asks tickMap |
|
case transaqData of |
|
TransaqResponseAllTrades (ResponseAllTrades trades) -> do |
|
qssChan <- asks qssChannel |
|
let ticks = fmap allTradeToTick trades |
|
forM_ ticks (liftIO . writeChan qssChan . QSSTick) |
|
forM_ ticks (insertToTickMap tm) |
|
TransaqResponseQuotations (ResponseQuotations quotations) -> do |
|
qssChan <- asks qssChannel |
|
now <- liftIO getCurrentTime |
|
let ticks = concatMap (quotationToTicks now) quotations |
|
forM_ ticks (liftIO . writeChan qssChan . QSSTick) |
|
forM_ ticks (insertToTickMap tm) |
|
TransaqResponseCandles respCandle -> do |
|
resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar |
|
log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle) |
|
case resp of |
|
Just tmvar -> if cStatus respCandle == StatusPending |
|
then do |
|
cur <- asks currentCandles |
|
liftIO $ atomically . modifyTVar' cur $ (\c -> cCandles respCandle <> c) |
|
else do |
|
cur <- asks currentCandles |
|
liftIO $ atomically $ do |
|
candles <- readTVar cur |
|
putTMVar tmvar $ ResponseHistory $ HistoryResponse |
|
{ |
|
hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles) |
|
, hrMoreData = False |
|
} |
|
_ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var" |
|
TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder |
|
TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade |
|
_ -> pure () |
|
MainQueueRequest (RequestHistory request) -> do |
|
cur <- asks currentCandles |
|
liftIO $ atomically $ writeTVar cur [] |
|
maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO) |
|
case maybeCk of |
|
Just candleKindId -> do |
|
case parseSecurityId (hrTickerId request) of |
|
Just secId -> void $ liftIO . sendCommand $ |
|
toXml CommandGetHistoryData |
|
{ |
|
security = secId |
|
, periodId = candleKindId |
|
, count = hrCount request |
|
, reset = hrReset request |
|
} |
|
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request |
|
_ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request) |
|
MainQueueRequest (RequestSubmitOrder order) -> do |
|
case mkNewOrderCommand order of |
|
Just cmd -> do |
|
v <- liftIO . sendCommand . toXml $ cmd |
|
case v of |
|
Left result -> do |
|
case headMay (parseXML result) >>= parseContent of |
|
Just (TransaqResponseResult (ResponseSuccess (Just transactionId))) -> do |
|
brState <- asks brokerState |
|
respVar <- asks responseVar |
|
liftIO $ atomically $ do |
|
modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order) |
|
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert (orderId order) (TransactionId transactionId)) |
|
resp <- readTMVar respVar |
|
putTMVar resp ResponseOrderSubmitted |
|
log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <> |
|
(T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId |
|
_ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result" |
|
Right _ -> do |
|
log Warning "TXMLConnector.WorkThread" "Expected result, got nothing" |
|
_ -> pure () |
|
_ -> pure () |
|
|
|
checkRequestTimeout |
|
|
|
requestTimeout = 10 |
|
|
|
handleTrade transaqTrade = do |
|
brState <- asks brokerState |
|
trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) |
|
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) |
|
orderMap <- liftIO $ readTVarIO (bsOrderMap brState) |
|
case maybeCb of |
|
Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of |
|
Just oid -> case M.lookup oid orderMap of |
|
Just order -> do |
|
let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order) |
|
log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif |
|
liftIO $ cb notif |
|
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade |
|
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade |
|
Nothing -> log Warning "TXMLConnector.WorkThread" $ "No callback for trade notification!" |
|
|
|
fromTransaqTrade transaqTrade order = |
|
Trade |
|
{ |
|
tradeOrderId = orderId order |
|
, tradePrice = fromDouble (tPrice transaqTrade) |
|
, tradeQuantity = fromIntegral $ tQuantity transaqTrade |
|
, tradeVolume = fromDouble $ tValue transaqTrade |
|
, tradeVolumeCurrency = "" |
|
, tradeOperation = fromDirection (tBuysell transaqTrade) |
|
, tradeAccount = tClient transaqTrade <> "#" <> tUnion transaqTrade |
|
, tradeSecurity = tBoard transaqTrade <> "#" <> tSecCode transaqTrade |
|
, tradeTimestamp = tTimestamp transaqTrade |
|
, tradeCommission = fromDouble $ tComission transaqTrade |
|
, tradeSignalId = orderSignalId order |
|
} |
|
|
|
fromDirection Transaq.Buy = AT.Buy |
|
fromDirection Transaq.Sell = AT.Sell |
|
|
|
handleOrder orderUpdate = do |
|
brState <- asks brokerState |
|
trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState) |
|
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState) |
|
case maybeCb of |
|
Just cb -> case BM.lookupR (ExchangeOrderId (oOrderNo orderUpdate)) trIdMap <|> |
|
BM.lookupR (TransactionId (fromIntegral $ oTransactionId orderUpdate)) trIdMap of |
|
Just oid -> do |
|
let notif = BackendOrderNotification oid (orderStateFromTransaq orderUpdate) |
|
log Debug "TXMLConnector.WorkThread" $ "Sending order notification: " <> (T.pack . show) notif |
|
liftIO $ atomically $ do |
|
m <- readTVar (bsOrderTransactionIdMap brState) |
|
when (BM.notMemberR (ExchangeOrderId (oOrderNo orderUpdate)) m) $ do |
|
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert oid (ExchangeOrderId $ oOrderNo orderUpdate)) |
|
liftIO $ cb notif |
|
_ -> log Warning "TXMLConnector.WorkThread" "Unable to find order for order notification" |
|
Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for order notification" |
|
|
|
orderStateFromTransaq orderUpdate = |
|
case oStatus orderUpdate of |
|
OrderActive -> Submitted |
|
OrderCancelled -> Cancelled |
|
OrderDenied -> Rejected |
|
OrderDisabled -> Rejected |
|
OrderExpired -> Cancelled |
|
OrderFailed -> Rejected |
|
OrderForwarding -> Unsubmitted |
|
OrderInactive -> OrderError |
|
OrderMatched -> Executed |
|
OrderRefused -> Rejected |
|
OrderRemoved -> Rejected |
|
OrderWait -> Unsubmitted |
|
OrderWatching -> Unsubmitted |
|
_ -> OrderError |
|
|
|
checkRequestTimeout = do |
|
now <- liftIO getCurrentTime |
|
tsVar <- asks requestTimestamp |
|
ts <- liftIO $ readTVarIO tsVar |
|
when (now `diffUTCTime` ts >= requestTimeout) $ do |
|
resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar |
|
case resp of |
|
Just tmvar -> do |
|
log Warning "TXMLConnector.WorkThread" "Request timeout" |
|
liftIO . atomically . putTMVar tmvar $ ResponseTimeout |
|
_ -> pure () |
|
|
|
handleGetInfo :: App () |
|
handleGetInfo = do |
|
queue <- asks transaqQueue |
|
cfg <- asks config |
|
item <- liftIO . atomically $ readTBQueue queue |
|
conn <- asks serverConnected |
|
case item of |
|
TransaqResponseServerStatus serverStatus -> |
|
case state serverStatus of |
|
Transaq.Disconnected -> do |
|
log Warning "TXMLConnector.WorkThread" "Server disconnected" |
|
liftIO . atomically $ writeTVar conn StageConnection |
|
Transaq.Connected -> do |
|
log Info "TXMLConnector.WorkThread" "Server connected" |
|
void $ liftIO . sendCommand $ toXml $ |
|
CommandChangePass (transaqPassword cfg) "goobaka12" |
|
liftIO . atomically $ writeTVar conn StageConnected |
|
v <- makeSubscriptions cfg |
|
case v of |
|
Left errmsg -> do |
|
log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg |
|
void $ liftIO . sendCommand $ toXml CommandDisconnect |
|
Right _ -> log Info "TXMLConnector.WorkThread" "Subscriptions done" |
|
Transaq.Error errmsg -> do |
|
log Warning "TXMLConnector.WorkThread" $ "Connection error: " <> errmsg |
|
liftIO . atomically $ writeTVar conn StageConnection |
|
TransaqResponseResult result -> |
|
log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result |
|
-- TODO: handle order response |
|
TransaqResponseCandles candles -> |
|
log Debug "TXMLConnector.WorkThread" $ |
|
"Incoming candles message: " <> (T.pack . show . length . Transaq.cCandles $ candles) |
|
-- TODO: Pass to qhp |
|
TransaqResponseMarkets (ResponseMarkets markets) -> do |
|
log Debug "TXMLConnector.WorkThread" "Incoming markets:" |
|
forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m) |
|
-- TODO: Pass to qtis |
|
TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do |
|
ckMap <- asks candleKindMap |
|
log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds |
|
forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k))) |
|
TransaqResponseSecurities (ResponseSecurities securities) -> do |
|
tisH <- asks tisHandle |
|
let tickerInfos = securityToTickerInfo <$> securities |
|
log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities |
|
forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker) |
|
forM_ tickerInfos (liftIO . putTickerInfo tisH) |
|
TransaqResponseSecInfo secInfo -> |
|
log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo |
|
-- TODO: Pass to qtis |
|
TransaqResponseClient (ResponseClient clientData) -> do |
|
log Debug "TXMLConnector.WorkThread" $ |
|
"Incoming client data: " <> (T.pack . show) (cClientId clientData) <> "#" <> (T.pack . show) (cUnion clientData) |
|
_ -> pure () |
|
handleUnconnected :: App () |
|
handleUnconnected = do |
|
cfg <- asks config |
|
log Debug "TXMLConnector.WorkThread" "Sending connect command" |
|
v <- liftIO . sendCommand . |
|
toXml $ CommandConnect |
|
{ |
|
login = transaqLogin cfg, |
|
password = transaqPassword cfg, |
|
host = transaqHost cfg, |
|
port = transaqPort cfg, |
|
language = LanguageEn, |
|
autopos = False, |
|
micexRegisters = True, |
|
milliseconds = True, |
|
utcTime = True, |
|
proxy = (), |
|
rqDelay = Nothing, |
|
sessionTimeout = Nothing, |
|
requestTimeout = Nothing, |
|
pushULimits = Nothing, |
|
pushPosEquity = Nothing |
|
} |
|
case v of |
|
Left err -> do |
|
log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]" |
|
liftIO $ threadDelay (1000 * 1000 * 10) |
|
Right _ -> do |
|
conn <- asks serverConnected |
|
liftIO . atomically $ writeTVar conn StageGetInfo |
|
-- item <- atomically $ readTBQueue queue |
|
-- case item of |
|
-- TransaqResponseServerStatus status -> do |
|
-- case state status of |
|
-- Transaq.Error errmsg -> do |
|
-- log Warning "TXMLConnector.WorkThread" $ "Unable to connect: " <> errmsg |
|
-- void $ sendCommand $ toXml CommandDisconnect |
|
-- threadDelay (10 * 1000 * 1000) |
|
-- Transaq.Connected -> do |
|
-- atomically $ writeTVar serverConnected StageGetInfo |
|
-- -- v <- makeSubscriptions config |
|
-- -- case v of |
|
-- -- Left errmsg -> do |
|
-- -- log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg |
|
-- -- void $ sendCommand $ toXml CommandDisconnect |
|
-- -- Right _ -> |
|
-- Transaq.Disconnected -> do |
|
-- log Warning "TXMLConnector.WorkThread" "Unable to connect (disconnected)" |
|
-- threadDelay (10 * 1000 * 1000) |
|
-- other -> do |
|
-- log Warning "TXMLConnector.WorkThread" $ "Stray message: " <> (T.pack . show) other |
|
-- threadDelay (1000 * 1000) |
|
makeSubscriptions config = |
|
liftIO . sendCommand . toXml $ |
|
CommandSubscribe |
|
{ |
|
alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config), |
|
quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config), |
|
quotes = fmap subscriptionToSecurityId (quotesSubscriptions config) |
|
} |
|
subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code |
|
insertToTickMap tickMap tick = liftIO . atomically $ modifyTVar' tickMap (M.insert (TickKey (security tick) (datatype tick)) tick) |
|
|
|
allTradeToTick :: AllTradesTrade -> Tick |
|
allTradeToTick att = |
|
Tick |
|
{ |
|
security = attBoard att <> "#" <> attSecCode att, |
|
datatype = LastTradePrice, |
|
timestamp = attTimestamp att, |
|
value = fromDouble $ attPrice att, |
|
volume = fromIntegral $ attQuantity att |
|
} |
|
|
|
quotationToTicks :: UTCTime -> Quotation -> [Tick] |
|
quotationToTicks timestamp q = |
|
let security = qBoard q <> "#" <> qSeccode q in |
|
[ |
|
Tick |
|
{ |
|
security = security, |
|
datatype = BestBid, |
|
timestamp = timestamp, |
|
value = fromDouble $ qBid q, |
|
volume = fromIntegral $ qQuantity q |
|
}, |
|
Tick |
|
{ |
|
security = security, |
|
datatype = BestOffer, |
|
timestamp = timestamp, |
|
value = fromDouble $ qOffer q, |
|
volume = fromIntegral $ qQuantity q |
|
}] |
|
|
|
securityToTickerInfo :: Security -> TickerInfo |
|
securityToTickerInfo sec = |
|
TickerInfo |
|
{ |
|
tiTicker = sBoard sec <> "#" <> sSeccode sec |
|
, tiLotSize = sLotSize sec |
|
, tiTickSize = sMinStep sec |
|
} |
|
|
|
parseSecurityId :: TickerId -> Maybe SecurityId |
|
parseSecurityId tickerId = case T.findIndex (== '#') tickerId of |
|
Just ix -> Just $ SecurityId (T.take ix tickerId) (T.drop (ix + 1) tickerId) |
|
Nothing -> Nothing |
|
|
|
makeTickerId :: SecurityId -> TickerId |
|
makeTickerId sec = board sec <> "#" <> seccode sec |
|
|
|
parseAccountId :: T.Text -> Maybe (T.Text, T.Text) |
|
parseAccountId accId = case T.findIndex (== '#') accId of |
|
Just ix -> Just (T.take ix accId, T.drop (ix + 1) accId) |
|
Nothing -> Nothing |
|
|
|
|
|
makeRequest :: TXMLConnectorHandle -> Request -> IO Response |
|
makeRequest h request = do |
|
now <- getCurrentTime |
|
resp <- atomically $ do |
|
resp <- newEmptyTMVar |
|
writeTVar (hRequestTimestamp h) now |
|
putTMVar (hResponseVar h) resp |
|
putTMVar (hRequestVar h) request |
|
pure resp |
|
atomically $ do |
|
void $ takeTMVar (hResponseVar h) |
|
takeTMVar resp |
|
|
|
mkNewOrderCommand :: Order -> Maybe Transaq.CommandNewOrder |
|
mkNewOrderCommand order = |
|
case parseSecurityId (orderSecurity order) of |
|
Just secId -> |
|
case parseAccountId (orderAccountId order) of |
|
Just (client, union) -> do |
|
case orderPrice order of |
|
Market -> Just $ Transaq.CommandNewOrder |
|
{ |
|
security = secId |
|
, client = client |
|
, unionCode = union |
|
, price = 0 |
|
, quantity = fromInteger $ orderQuantity order |
|
, buysell = toDirection $ orderOperation order |
|
, bymarket = True |
|
, brokerRef = T.empty |
|
, unfilled = UnfilledPutInQueue |
|
, usecredit = False |
|
, nosplit = False |
|
} |
|
Limit price -> Just $ Transaq.CommandNewOrder |
|
{ |
|
security = secId |
|
, client = client |
|
, unionCode = union |
|
, price = toDouble price |
|
, quantity = fromInteger $ orderQuantity order |
|
, buysell = toDirection $ orderOperation order |
|
, bymarket = False |
|
, brokerRef = T.empty |
|
, unfilled = UnfilledPutInQueue |
|
, usecredit = False |
|
, nosplit = False |
|
} |
|
_ -> Nothing |
|
where |
|
toDirection AT.Buy = Transaq.Buy |
|
toDirection AT.Sell = Transaq.Sell |
|
|
|
|
|
candleToBar :: SecurityId -> Candle -> Bar |
|
candleToBar sec candle = |
|
Bar |
|
{ |
|
barSecurity = makeTickerId sec |
|
, barTimestamp = cTimestamp candle |
|
, barOpen = fromDouble (cOpen candle) |
|
, barHigh = fromDouble (cHigh candle) |
|
, barLow = fromDouble (cLow candle) |
|
, barClose = fromDouble (cClose candle) |
|
, barVolume = fromIntegral $ cVolume candle |
|
} |
|
|
|
brSubmitOrder :: TXMLConnectorHandle -> Order -> IO () |
|
brSubmitOrder h order = void $ makeRequest h (RequestSubmitOrder order) |
|
|
|
brCancelOrder :: TXMLConnectorHandle -> OrderId -> IO () |
|
brCancelOrder h oid = void $ makeRequest h (RequestCancelOrder oid) |
|
|
|
brSetNotificationCallback :: TXMLConnectorHandle -> Maybe (BrokerBackendNotification -> IO ()) -> IO () |
|
brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h) cb |
|
|
|
makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend |
|
makeBrokerBackend h account = |
|
BrokerBackend [account] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (pure ()) |
|
|
|
|