You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

782 lines
34 KiB

{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeApplications #-}
3 years ago
module TXMLConnector
(
3 years ago
start
, TXMLConnector.stop
3 years ago
, Request(..)
, HistoryRequest(..)
, Response(..)
, HistoryResponse(..)
, makeRequest
, TXMLConnectorHandle
, makeBrokerBackend
3 years ago
) where
import ATrade.Logging (Message, Severity (..), log,
3 years ago
logWith)
import Colog (HasLog (getLogAction, setLogAction),
LogAction (LogAction, unLogAction))
3 years ago
import Config (SubscriptionConfig (SubscriptionConfig),
TransaqConnectorConfig (..),
transaqHost, transaqLogLevel,
transaqLogPath, transaqLogin,
transaqPassword, transaqPort)
import Control.Concurrent (ThreadId, forkIO, threadDelay)
import Control.Concurrent.STM (TVar, atomically, modifyTVar',
3 years ago
newEmptyTMVar, newEmptyTMVarIO,
newTVarIO, orElse, putTMVar,
readTMVar, readTVar,
readTVarIO, takeTMVar,
tryPutTMVar, tryReadTMVar,
writeTVar)
3 years ago
import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue,
readTBQueue, writeTBQueue)
import Control.Monad (forever, void, when)
import Control.Monad.Extra (whileM)
import qualified Data.Bimap as BM
3 years ago
import Data.Maybe (mapMaybe)
import qualified Data.Text as T
import qualified Deque.Strict as D
3 years ago
import Text.XML.Light.Input (parseXML)
import Text.XML.Light.Types (Content (Elem),
Element (elName),
QName (qName))
import Transaq (AllTradesTrade (..),
Candle (..), ClientData (..),
3 years ago
CommandChangePass (..),
3 years ago
CommandConnect (..),
CommandDisconnect (CommandDisconnect),
CommandGetHistoryData (CommandGetHistoryData),
CommandServerStatus (..),
3 years ago
CommandSubscribe (..),
ConnectionState (Disconnected),
Language (LanguageEn),
MarketInfo (..),
OrderNotification (..),
OrderStatus (..),
3 years ago
Quotation (..),
ResponseAllTrades (ResponseAllTrades),
ResponseCandleKinds (ResponseCandleKinds),
3 years ago
ResponseCandles (..),
ResponseCandlesStatus (StatusPending),
ResponseClient (ResponseClient),
3 years ago
ResponseMarkets (ResponseMarkets),
ResponseOrders (ResponseOrders),
3 years ago
ResponseQuotations (ResponseQuotations),
ResponseQuotes (ResponseQuotes),
ResponseResult (..),
3 years ago
ResponseSecurities (ResponseSecurities),
ResponseTrades (ResponseTrades),
3 years ago
Security (..), SecurityId (..),
TradeNotification (..),
3 years ago
TransaqCommand (toXml),
TransaqResponse (..),
TransaqResponse (..),
TransaqResponseC (fromXml),
UnfilledAction (..),
kCandleKindId, kPeriod, state)
3 years ago
import TXML (LogLevel, freeCallback,
initialize, sendCommand,
setCallback)
import ATrade.Broker.Backend (BrokerBackend (..),
BrokerBackendNotification (..))
3 years ago
import ATrade.QuoteSource.Server (QuoteSourceServerData (..))
3 years ago
import ATrade.Types (Bar (..),
BarTimeframe (unBarTimeframe),
DataType (BestBid, BestOffer, LastTradePrice),
Order (..), OrderId,
OrderPrice (..),
OrderState (..), Tick (..),
TickerId, Trade (..),
fromDouble, toDouble)
import qualified ATrade.Types as AT
import Colog.Monad (WithLog)
3 years ago
import Control.Applicative ((<|>))
3 years ago
import Control.Concurrent.BoundedChan (BoundedChan, writeChan)
3 years ago
import Control.Concurrent.STM.TMVar (TMVar)
3 years ago
import Control.Error (headMay)
3 years ago
import Control.Monad (forM_)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (ReaderT (runReaderT))
import Control.Monad.Reader.Class (MonadReader, asks)
import Data.Int (Int64)
3 years ago
import qualified Data.Map.Strict as M
import Data.Time.Clock (UTCTime, diffUTCTime,
getCurrentTime)
import GHC.Exts (IsList (..))
import Prelude hiding (log)
3 years ago
import TickerInfoServer (TickerInfo (..),
TickerInfoServerHandle,
putTickerInfo)
3 years ago
import qualified Transaq
import qualified TXML
data ConnectionParams =
ConnectionParams
{
cpLogin :: T.Text
, cpPassword :: T.Text
, cpHost :: T.Text
, cpPort :: Int
, cpLogPath :: T.Text
, cpLogLevel :: LogLevel
}
deriving (Show, Eq, Ord)
3 years ago
data HistoryRequest =
HistoryRequest
{
hrTickerId :: TickerId
3 years ago
, hrTimeframe :: BarTimeframe
, hrCount :: Int
, hrReset :: Bool
3 years ago
} deriving (Show, Eq, Ord)
data Request =
RequestHistory HistoryRequest
| RequestSubmitOrder Order
| RequestCancelOrder OrderId
deriving (Show, Eq)
3 years ago
data Response =
ResponseHistory HistoryResponse
| ResponseOrderSubmitted
| ResponseOrderCancelled
| ResponseTimeout
3 years ago
data HistoryResponse =
HistoryResponse
{
hrBars :: [Bar]
, hrMoreData :: Bool
}
deriving (Show, Eq)
3 years ago
data TXMLConnectorHandle =
TXMLConnectorHandle
{
threadId :: ThreadId
, notificationQueue :: TBQueue TransaqResponse
, hRequestVar :: TMVar Request
, hResponseVar :: TMVar (TMVar Response)
, hRequestTimestamp :: TVar UTCTime
, hNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ()))
, hRunVar :: TMVar ()
3 years ago
}
data ConnectionStage = StageConnection | StageGetInfo | StageConnected | StageShutdown
3 years ago
deriving (Eq, Show, Ord)
data MainQueueData =
MainQueueTransaqData TransaqResponse
| MainQueueRequest Request
| MainQueuePingServer
| MainQueueShutdown
deriving (Eq, Show)
3 years ago
data TickKey = TickKey TickerId DataType
deriving (Show, Ord, Eq)
data TransactionId =
TransactionId Int64
| ExchangeOrderId Int64
deriving (Show, Ord, Eq)
data BrokerState =
BrokerState
{
bsOrderTransactionIdMap :: TVar (BM.Bimap OrderId TransactionId)
, bsNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ()))
, bsOrderMap :: TVar (M.Map OrderId Order)
, bsPendingOrders :: TVar (D.Deque Order)
}
data Env =
Env
{
qssChannel :: BoundedChan QuoteSourceServerData
, tisHandle :: TickerInfoServerHandle
, requestVar :: TMVar Request
, responseVar :: TMVar (TMVar Response)
, requestTimestamp :: TVar UTCTime
, currentCandles :: TVar [Candle]
, tickMap :: TVar (M.Map TickKey Tick)
, transaqQueue :: TBQueue TransaqResponse
, logger :: LogAction IO Message
, config :: TransaqConnectorConfig
, serverConnected :: TVar ConnectionStage
, candleKindMap :: TVar (M.Map Int Int)
, brokerState :: BrokerState
, runVar :: TMVar ()
, timerVar :: TMVar ()
}
newtype App a = App { unApp :: ReaderT Env IO a }
deriving (Monad, Applicative, Functor, MonadIO, MonadReader Env)
instance HasLog Env Message App where
getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) }
setLogAction _ env = env -- fuck it
3 years ago
start ::
LogAction IO Message
-> TransaqConnectorConfig
-> BoundedChan QuoteSourceServerData
3 years ago
-> TickerInfoServerHandle
3 years ago
-> IO TXMLConnectorHandle
3 years ago
start logger config qssChannel tisH = do
3 years ago
logWith logger Info "TXMLConnector" "Starting"
notificationQueue <- atomically $ newTBQueue 50000
tickTable <- newTVarIO M.empty
requestVar <- newEmptyTMVarIO
responseVar <- newEmptyTMVarIO
3 years ago
currentCandles <- newTVarIO []
serverConnected <- liftIO $ newTVarIO StageConnection
candleKindMap <- newTVarIO M.empty
requestTimestamp <- getCurrentTime >>= newTVarIO
orderMap <- newTVarIO M.empty
notificationCallback <- newTVarIO Nothing
orderTransactionIdMap <- newTVarIO BM.empty
pendingOrders <- newTVarIO (fromList [])
runVar <- newEmptyTMVarIO
timerVar <- newEmptyTMVarIO
let brokerState =
BrokerState
{
bsOrderTransactionIdMap = orderTransactionIdMap
, bsNotificationCallback = notificationCallback
, bsOrderMap = orderMap
, bsPendingOrders = pendingOrders
}
let env =
Env
{
qssChannel = qssChannel
, tisHandle = tisH
, requestVar = requestVar
, responseVar = responseVar
, requestTimestamp = requestTimestamp
3 years ago
, currentCandles = currentCandles
, tickMap = tickTable
, transaqQueue = notificationQueue
, logger = logger
, config = config
, serverConnected = serverConnected
, candleKindMap = candleKindMap
, brokerState = brokerState
, runVar = runVar
, timerVar = timerVar
}
threadId <- forkIO $ (runReaderT . unApp) workThread env
return $ TXMLConnectorHandle
{
threadId = threadId
, notificationQueue = notificationQueue
, hRequestVar = requestVar
, hResponseVar = responseVar
, hRequestTimestamp = requestTimestamp
, hNotificationCallback = notificationCallback
, hRunVar = runVar
}
3 years ago
stop :: TXMLConnectorHandle -> IO ()
stop h = atomically $ putTMVar (hRunVar h) ()
workThread :: App ()
workThread = do
cfg <- asks config
rc <- liftIO $ initialize (transaqLogPath cfg) (parseTransaqLogLevel $ transaqLogLevel cfg)
3 years ago
case rc of
Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str
Right _ -> do
queue <- asks transaqQueue
logger' <- asks logger
rc <- liftIO $ setCallback (parseAndWrite queue logger')
3 years ago
case rc of
Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback"
Just cb -> do
serverConnectionState <- asks serverConnected
timerVar' <- asks timerVar
void $ liftIO $ forkIO $ whileM $ do
threadDelay 1000000
void . liftIO . atomically $ tryPutTMVar timerVar' ()
connStatus <- liftIO . readTVarIO $ serverConnectionState
pure $ connStatus /= StageShutdown
void $ whileM $ do
connStatus <- liftIO . readTVarIO $ serverConnectionState
3 years ago
case connStatus of
StageConnection -> handleUnconnected
StageGetInfo -> handleGetInfo
StageConnected -> handleConnected
StageShutdown -> pure ()
pure $ connStatus /= StageShutdown
liftIO $ freeCallback cb
3 years ago
where
parseTransaqLogLevel 1 = TXML.Warning
parseTransaqLogLevel 3 = TXML.Debug
parseTransaqLogLevel _ = TXML.Info
parseAndWrite queue logger xml = do
3 years ago
let parsed = mapMaybe parseContent $ parseXML xml
mapM_ (writeToQueue queue) parsed
3 years ago
pure True
parseContent (Elem el) = parseElement el
parseContent _ = Nothing
parseElement el = case qName $ elName el of
"candles" -> TransaqResponseCandles <$> fromXml el
"server_status" -> TransaqResponseServerStatus <$> fromXml el
"markets" -> TransaqResponseMarkets <$> fromXml el
"candlekinds" -> TransaqResponseCandleKinds <$> fromXml el
"securities" -> TransaqResponseSecurities <$> fromXml el
"sec_info" -> TransaqResponseSecInfo <$> fromXml el
"quotations" -> TransaqResponseQuotations <$> fromXml el
"alltrades" -> TransaqResponseAllTrades <$> fromXml el
"quotes" -> TransaqResponseQuotes <$> fromXml el
3 years ago
"orders" -> TransaqResponseOrders <$> fromXml el
"trades" -> TransaqResponseTrades <$> fromXml el
"result" -> TransaqResponseResult <$> fromXml el
3 years ago
_ -> Nothing
writeToQueue queue resp = atomically $ writeTBQueue queue resp
handleConnected :: App ()
handleConnected = do
serverConn <- asks serverConnected
rqVar <- asks requestVar
runVar' <- asks runVar
queue <- asks transaqQueue
timerVar' <- asks timerVar
item <- liftIO . atomically $ (MainQueueTransaqData <$> readTBQueue queue) `orElse`
(MainQueueRequest <$> takeTMVar rqVar) `orElse`
(takeTMVar runVar' >> pure MainQueueShutdown) `orElse`
(takeTMVar timerVar' >> pure MainQueuePingServer)
3 years ago
case item of
MainQueueShutdown -> liftIO $ atomically $ writeTVar serverConn StageShutdown
MainQueuePingServer -> do
maybeServerStatus<- liftIO $ sendCommand $ toXml CommandServerStatus
case maybeServerStatus of
Left serverStatusRaw -> void $ liftIO $ parseAndWrite queue logger serverStatusRaw
Right () -> pure ()
MainQueueTransaqData transaqData -> do
tm <- asks tickMap
case transaqData of
TransaqResponseAllTrades (ResponseAllTrades trades) -> do
qssChan <- asks qssChannel
let ticks = fmap allTradeToTick trades
forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks (insertToTickMap tm)
TransaqResponseQuotations (ResponseQuotations quotations) -> do
qssChan <- asks qssChannel
now <- liftIO getCurrentTime
let ticks = concatMap (quotationToTicks now) quotations
forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks (insertToTickMap tm)
3 years ago
TransaqResponseCandles respCandle -> do
resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar
log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle)
case resp of
Just tmvar -> if cStatus respCandle == StatusPending
then do
cur <- asks currentCandles
liftIO $ atomically . modifyTVar' cur $ (\c -> cCandles respCandle <> c)
else do
cur <- asks currentCandles
liftIO $ atomically $ do
candles <- readTVar cur
putTMVar tmvar $ ResponseHistory $ HistoryResponse
{
hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles)
, hrMoreData = False
}
_ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var"
TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder
TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade
_ -> pure ()
MainQueueRequest (RequestHistory request) -> do
3 years ago
cur <- asks currentCandles
liftIO $ atomically $ writeTVar cur []
maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO)
case maybeCk of
Just candleKindId -> do
case parseSecurityId (hrTickerId request) of
Just secId -> void $ liftIO . sendCommand $
toXml CommandGetHistoryData
{
security = secId
, periodId = candleKindId
, count = hrCount request
, reset = hrReset request
}
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request
_ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request)
MainQueueRequest (RequestSubmitOrder order) -> do
case mkNewOrderCommand order of
Just cmd -> do
v <- liftIO . sendCommand . toXml $ cmd
case v of
3 years ago
Left result -> do
case headMay (parseXML result) >>= parseContent of
Just (TransaqResponseResult (ResponseSuccess (Just transactionId))) -> do
brState <- asks brokerState
respVar <- asks responseVar
liftIO $ atomically $ do
modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order)
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert (orderId order) (TransactionId transactionId))
resp <- readTMVar respVar
putTMVar resp ResponseOrderSubmitted
log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <>
(T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId
_ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result"
Right _ -> do
3 years ago
log Warning "TXMLConnector.WorkThread" "Expected result, got nothing"
_ -> pure ()
_ -> pure ()
checkRequestTimeout
requestTimeout = 10
handleTrade transaqTrade = do
brState <- asks brokerState
trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState)
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
orderMap <- liftIO $ readTVarIO (bsOrderMap brState)
case maybeCb of
Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of
Just oid -> case M.lookup oid orderMap of
3 years ago
Just order -> do
let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order)
log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif
liftIO $ cb notif
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade
Nothing -> log Warning "TXMLConnector.WorkThread" $ "No callback for trade notification!"
fromTransaqTrade transaqTrade order =
Trade
{
tradeOrderId = orderId order
, tradePrice = fromDouble (tPrice transaqTrade)
, tradeQuantity = fromIntegral $ tQuantity transaqTrade
, tradeVolume = fromDouble $ tValue transaqTrade
, tradeVolumeCurrency = ""
, tradeOperation = fromDirection (tBuysell transaqTrade)
, tradeAccount = tClient transaqTrade <> "#" <> tUnion transaqTrade
, tradeSecurity = tBoard transaqTrade <> "#" <> tSecCode transaqTrade
, tradeTimestamp = tTimestamp transaqTrade
, tradeCommission = fromDouble $ tComission transaqTrade
, tradeSignalId = orderSignalId order
}
fromDirection Transaq.Buy = AT.Buy
fromDirection Transaq.Sell = AT.Sell
handleOrder orderUpdate = do
brState <- asks brokerState
trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState)
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
3 years ago
Just cb -> case BM.lookupR (ExchangeOrderId (oOrderNo orderUpdate)) trIdMap <|>
BM.lookupR (TransactionId (fromIntegral $ oTransactionId orderUpdate)) trIdMap of
Just oid -> do
let notif = BackendOrderNotification oid (orderStateFromTransaq orderUpdate)
log Debug "TXMLConnector.WorkThread" $ "Sending order notification: " <> (T.pack . show) notif
liftIO $ atomically $ do
m <- readTVar (bsOrderTransactionIdMap brState)
when (BM.notMemberR (ExchangeOrderId (oOrderNo orderUpdate)) m) $ do
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert oid (ExchangeOrderId $ oOrderNo orderUpdate))
liftIO $ cb notif
_ -> log Warning "TXMLConnector.WorkThread" "Unable to find order for order notification"
Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for order notification"
orderStateFromTransaq orderUpdate =
case oStatus orderUpdate of
OrderActive -> Submitted
OrderCancelled -> Cancelled
OrderDenied -> Rejected
OrderDisabled -> Rejected
OrderExpired -> Cancelled
OrderFailed -> Rejected
OrderForwarding -> Unsubmitted
OrderInactive -> OrderError
OrderMatched -> Executed
OrderRefused -> Rejected
OrderRemoved -> Rejected
OrderWait -> Unsubmitted
OrderWatching -> Unsubmitted
_ -> OrderError
checkRequestTimeout = do
now <- liftIO getCurrentTime
tsVar <- asks requestTimestamp
ts <- liftIO $ readTVarIO tsVar
when (now `diffUTCTime` ts >= requestTimeout) $ do
resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar
case resp of
Just tmvar -> do
log Warning "TXMLConnector.WorkThread" "Request timeout"
liftIO . atomically . putTMVar tmvar $ ResponseTimeout
_ -> pure ()
handleGetInfo :: App ()
handleGetInfo = do
queue <- asks transaqQueue
cfg <- asks config
item <- liftIO . atomically $ readTBQueue queue
conn <- asks serverConnected
3 years ago
case item of
TransaqResponseServerStatus serverStatus -> do
log Warning "TXMLConnector.WorkThread" $ "Incoming server status: " <> (T.pack . show) serverStatus
3 years ago
case state serverStatus of
Transaq.Disconnected -> do
log Warning "TXMLConnector.WorkThread" "Server disconnected"
liftIO . atomically $ writeTVar conn StageConnection
3 years ago
Transaq.Connected -> do
log Info "TXMLConnector.WorkThread" "Server connected"
3 years ago
void $ liftIO . sendCommand $ toXml $
CommandChangePass (transaqPassword cfg) "goobaka12"
liftIO . atomically $ writeTVar conn StageConnected
v <- makeSubscriptions cfg
3 years ago
case v of
Left errmsg -> do
log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg
void $ liftIO . sendCommand $ toXml CommandDisconnect
3 years ago
Right _ -> log Info "TXMLConnector.WorkThread" "Subscriptions done"
Transaq.Error errmsg -> do
log Warning "TXMLConnector.WorkThread" $ "Connection error: " <> errmsg
liftIO . atomically $ writeTVar conn StageConnection
3 years ago
TransaqResponseResult result ->
log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result
-- TODO: handle order response
TransaqResponseCandles candles ->
log Debug "TXMLConnector.WorkThread" $
3 years ago
"Incoming candles message: " <> (T.pack . show . length . Transaq.cCandles $ candles)
3 years ago
-- TODO: Pass to qhp
TransaqResponseMarkets (ResponseMarkets markets) -> do
log Debug "TXMLConnector.WorkThread" "Incoming markets:"
forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m)
-- TODO: Pass to qtis
TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do
ckMap <- asks candleKindMap
log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds
forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k)))
3 years ago
TransaqResponseSecurities (ResponseSecurities securities) -> do
tisH <- asks tisHandle
let tickerInfos = securityToTickerInfo <$> securities
log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities
forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker)
forM_ tickerInfos (liftIO . putTickerInfo tisH)
3 years ago
TransaqResponseSecInfo secInfo ->
log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo
-- TODO: Pass to qtis
TransaqResponseClient (ResponseClient clientData) -> do
log Debug "TXMLConnector.WorkThread" $
"Incoming client data: " <> (T.pack . show) (cClientId clientData) <> "#" <> (T.pack . show) (cUnion clientData)
3 years ago
_ -> pure ()
handleUnconnected :: App ()
handleUnconnected = do
cfg <- asks config
3 years ago
log Debug "TXMLConnector.WorkThread" "Sending connect command"
v <- liftIO . sendCommand .
3 years ago
toXml $ CommandConnect
{
login = transaqLogin cfg,
password = transaqPassword cfg,
host = transaqHost cfg,
port = transaqPort cfg,
3 years ago
language = LanguageEn,
autopos = False,
micexRegisters = True,
milliseconds = True,
utcTime = True,
proxy = (),
rqDelay = Nothing,
sessionTimeout = Nothing,
requestTimeout = Nothing,
pushULimits = Nothing,
pushPosEquity = Nothing
}
case v of
Left err -> do
log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]"
liftIO $ threadDelay (1000 * 1000 * 10)
3 years ago
Right _ -> do
log Warning "TXMLConnector.WorkThread" "Connected"
conn <- asks serverConnected
liftIO . atomically $ writeTVar conn StageGetInfo
3 years ago
-- item <- atomically $ readTBQueue queue
-- case item of
-- TransaqResponseServerStatus status -> do
-- case state status of
-- Transaq.Error errmsg -> do
-- log Warning "TXMLConnector.WorkThread" $ "Unable to connect: " <> errmsg
-- void $ sendCommand $ toXml CommandDisconnect
-- threadDelay (10 * 1000 * 1000)
-- Transaq.Connected -> do
-- atomically $ writeTVar serverConnected StageGetInfo
-- -- v <- makeSubscriptions config
-- -- case v of
-- -- Left errmsg -> do
-- -- log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg
-- -- void $ sendCommand $ toXml CommandDisconnect
-- -- Right _ ->
-- Transaq.Disconnected -> do
-- log Warning "TXMLConnector.WorkThread" "Unable to connect (disconnected)"
-- threadDelay (10 * 1000 * 1000)
-- other -> do
-- log Warning "TXMLConnector.WorkThread" $ "Stray message: " <> (T.pack . show) other
-- threadDelay (1000 * 1000)
makeSubscriptions config =
liftIO . sendCommand . toXml $
3 years ago
CommandSubscribe
{
alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config),
quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config),
quotes = fmap subscriptionToSecurityId (quotesSubscriptions config)
}
subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code
insertToTickMap tickMap tick = liftIO . atomically $ modifyTVar' tickMap (M.insert (TickKey (security tick) (datatype tick)) tick)
3 years ago
allTradeToTick :: AllTradesTrade -> Tick
allTradeToTick att =
Tick
{
security = attBoard att <> "#" <> attSecCode att,
datatype = LastTradePrice,
timestamp = attTimestamp att,
value = fromDouble $ attPrice att,
volume = fromIntegral $ attQuantity att
}
quotationToTicks :: UTCTime -> Quotation -> [Tick]
quotationToTicks timestamp q =
let security = qBoard q <> "#" <> qSeccode q in
[
Tick
{
security = security,
datatype = BestBid,
timestamp = timestamp,
value = fromDouble $ qBid q,
volume = fromIntegral $ qQuantity q
},
Tick
{
security = security,
datatype = BestOffer,
timestamp = timestamp,
value = fromDouble $ qOffer q,
volume = fromIntegral $ qQuantity q
}]
3 years ago
securityToTickerInfo :: Security -> TickerInfo
securityToTickerInfo sec =
TickerInfo
{
tiTicker = sBoard sec <> "#" <> sSeccode sec
, tiLotSize = sLotSize sec
, tiTickSize = sMinStep sec
}
parseSecurityId :: TickerId -> Maybe SecurityId
3 years ago
parseSecurityId tickerId = case T.findIndex (== '#') tickerId of
Just ix -> Just $ SecurityId (T.take ix tickerId) (T.drop (ix + 1) tickerId)
Nothing -> Nothing
makeTickerId :: SecurityId -> TickerId
makeTickerId sec = board sec <> "#" <> seccode sec
parseAccountId :: T.Text -> Maybe (T.Text, T.Text)
parseAccountId accId = case T.findIndex (== '#') accId of
Just ix -> Just (T.take ix accId, T.drop (ix + 1) accId)
Nothing -> Nothing
3 years ago
makeRequest :: TXMLConnectorHandle -> Request -> IO Response
makeRequest h request = do
now <- getCurrentTime
3 years ago
resp <- atomically $ do
resp <- newEmptyTMVar
writeTVar (hRequestTimestamp h) now
3 years ago
putTMVar (hResponseVar h) resp
putTMVar (hRequestVar h) request
pure resp
atomically $ do
void $ takeTMVar (hResponseVar h)
takeTMVar resp
mkNewOrderCommand :: Order -> Maybe Transaq.CommandNewOrder
mkNewOrderCommand order =
case parseSecurityId (orderSecurity order) of
Just secId ->
case parseAccountId (orderAccountId order) of
Just (client, union) -> do
case orderPrice order of
Market -> Just $ Transaq.CommandNewOrder
{
security = secId
, client = client
, unionCode = union
, price = 0
, quantity = fromInteger $ orderQuantity order
, buysell = toDirection $ orderOperation order
, bymarket = True
, brokerRef = T.empty
, unfilled = UnfilledPutInQueue
, usecredit = False
, nosplit = False
}
Limit price -> Just $ Transaq.CommandNewOrder
{
security = secId
, client = client
, unionCode = union
, price = toDouble price
, quantity = fromInteger $ orderQuantity order
, buysell = toDirection $ orderOperation order
, bymarket = False
, brokerRef = T.empty
, unfilled = UnfilledPutInQueue
, usecredit = False
, nosplit = False
}
_ -> Nothing
where
toDirection AT.Buy = Transaq.Buy
toDirection AT.Sell = Transaq.Sell
3 years ago
candleToBar :: SecurityId -> Candle -> Bar
candleToBar sec candle =
Bar
{
barSecurity = makeTickerId sec
, barTimestamp = cTimestamp candle
, barOpen = fromDouble (cOpen candle)
, barHigh = fromDouble (cHigh candle)
, barLow = fromDouble (cLow candle)
, barClose = fromDouble (cClose candle)
, barVolume = fromIntegral $ cVolume candle
}
brSubmitOrder :: TXMLConnectorHandle -> Order -> IO ()
brSubmitOrder h order = void $ makeRequest h (RequestSubmitOrder order)
brCancelOrder :: TXMLConnectorHandle -> OrderId -> IO ()
brCancelOrder h oid = void $ makeRequest h (RequestCancelOrder oid)
brSetNotificationCallback :: TXMLConnectorHandle -> Maybe (BrokerBackendNotification -> IO ()) -> IO ()
brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h) cb
makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend
makeBrokerBackend h account =
BrokerBackend [account] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h)