You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

808 lines
35 KiB

{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
3 years ago
module TXMLConnector
(
3 years ago
start
, TXMLConnector.stop
3 years ago
, Request(..)
, HistoryRequest(..)
, Response(..)
, HistoryResponse(..)
, makeRequest
, TXMLConnectorHandle
, makeBrokerBackend
3 years ago
) where
import ATrade.Logging (Message, Severity (..), log,
3 years ago
logWith)
import Colog (HasLog (getLogAction, setLogAction),
LogAction (LogAction, unLogAction))
3 years ago
import Config (SubscriptionConfig (SubscriptionConfig),
TransaqConnectorConfig (..),
transaqHost, transaqLogLevel,
transaqLogPath, transaqLogin,
transaqPassword, transaqPort)
import Control.Concurrent (ThreadId, forkIO, threadDelay)
import Control.Concurrent.STM (TVar, atomically, modifyTVar',
3 years ago
newEmptyTMVar, newEmptyTMVarIO,
newTVarIO, orElse, putTMVar,
readTMVar, readTVar,
readTVarIO, takeTMVar,
tryPutTMVar, tryReadTMVar,
writeTVar)
3 years ago
import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue,
newTBQueue, readTBQueue,
writeTBQueue)
import Control.Monad (forever, void, when)
import Control.Monad.Extra (whileM)
import qualified Data.Bimap as BM
3 years ago
import Data.Maybe (mapMaybe)
import qualified Data.Text as T
import qualified Deque.Strict as D
3 years ago
import Text.XML.Light.Input (parseXML)
import Text.XML.Light.Types (Content (Elem),
Element (elName),
QName (qName))
3 years ago
import TickTable (TickTable, insertTick,
lookupTick, newTickTable)
3 years ago
import Transaq (AllTradesTrade (..),
Candle (..), ClientData (..),
3 years ago
CommandChangePass (..),
3 years ago
CommandConnect (..),
CommandDisconnect (CommandDisconnect),
CommandGetHistoryData (CommandGetHistoryData),
CommandServerStatus (..),
3 years ago
CommandSubscribe (..),
ConnectionState (Disconnected),
Language (LanguageEn),
MarketInfo (..),
OrderNotification (..),
OrderStatus (..),
3 years ago
Quotation (..),
ResponseAllTrades (ResponseAllTrades),
ResponseCandleKinds (ResponseCandleKinds),
3 years ago
ResponseCandles (..),
ResponseCandlesStatus (StatusPending),
ResponseClient (ResponseClient),
3 years ago
ResponseMarkets (ResponseMarkets),
ResponseOrders (ResponseOrders),
3 years ago
ResponseQuotations (ResponseQuotations),
ResponseQuotes (ResponseQuotes),
ResponseResult (..),
3 years ago
ResponseSecurities (ResponseSecurities),
ResponseTrades (ResponseTrades),
3 years ago
Security (..), SecurityId (..),
TradeNotification (..),
3 years ago
TransaqCommand (toXml),
TransaqResponse (..),
TransaqResponse (..),
TransaqResponseC (fromXml),
UnfilledAction (..),
kCandleKindId, kPeriod, state)
3 years ago
import TXML (LogLevel, freeCallback,
initialize, sendCommand,
setCallback)
import ATrade.Broker.Backend (BrokerBackend (..),
BrokerBackendNotification (..))
3 years ago
import ATrade.QuoteSource.Server (QuoteSourceServerData (..))
3 years ago
import ATrade.Types (Bar (..),
BarTimeframe (unBarTimeframe),
DataType (BestBid, BestOffer, LastTradePrice),
Order (..), OrderId,
OrderPrice (..),
OrderState (..), Tick (..),
TickerId, Trade (..),
fromDouble, toDouble)
import qualified ATrade.Types as AT
import Colog.Monad (WithLog)
3 years ago
import Control.Applicative ((<|>))
3 years ago
import Control.Concurrent.BoundedChan (BoundedChan, writeChan)
3 years ago
import Control.Concurrent.STM.TMVar (TMVar)
3 years ago
import Control.Error (headMay)
3 years ago
import Control.Monad (forM_)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (ReaderT (runReaderT))
import Control.Monad.Reader.Class (MonadReader, asks)
import Data.Int (Int64)
3 years ago
import qualified Data.Map.Strict as M
import Data.Time.Clock (UTCTime, diffUTCTime,
getCurrentTime)
import FSM (FSMCallback (..),
FSMState (isTerminalState),
makeFsm, runFsm)
import GHC.Exts (IsList (..))
import Prelude hiding (log)
3 years ago
import TickerInfoServer (TickerInfo (..),
TickerInfoServerHandle,
putTickerInfo)
3 years ago
import qualified Transaq
import qualified TXML
data ConnectionParams =
ConnectionParams
{
cpLogin :: T.Text
, cpPassword :: T.Text
, cpHost :: T.Text
, cpPort :: Int
, cpLogPath :: T.Text
, cpLogLevel :: LogLevel
}
deriving (Show, Eq, Ord)
3 years ago
data HistoryRequest =
HistoryRequest
{
hrTickerId :: TickerId
3 years ago
, hrTimeframe :: BarTimeframe
, hrCount :: Int
, hrReset :: Bool
3 years ago
} deriving (Show, Eq, Ord)
data Request =
RequestHistory HistoryRequest
| RequestSubmitOrder Order
| RequestCancelOrder OrderId
deriving (Show, Eq)
3 years ago
data Response =
ResponseHistory HistoryResponse
| ResponseOrderSubmitted
| ResponseOrderCancelled
| ResponseTimeout
3 years ago
data HistoryResponse =
HistoryResponse
{
hrBars :: [Bar]
, hrMoreData :: Bool
}
deriving (Show, Eq)
3 years ago
data TXMLConnectorHandle =
TXMLConnectorHandle
{
threadId :: ThreadId
, notificationQueue :: TBQueue TransaqResponse
, hRequestVar :: TMVar Request
, hResponseVar :: TMVar (TMVar Response)
, hRequestTimestamp :: TVar UTCTime
, hNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ()))
, hRunVar :: TMVar ()
3 years ago
}
data ConnectionStage = StageConnection | StageGetInfo | StageConnected | StageShutdown
3 years ago
deriving (Eq, Show, Ord)
instance FSMState ConnectionStage where
isTerminalState StageShutdown = True
isTerminalState _ = False
data MainQueueData =
MainQueueTransaqData TransaqResponse
| MainQueueRequest Request
| MainQueuePingServer
| MainQueueShutdown
deriving (Eq, Show)
data TransactionId =
TransactionId Int64
| ExchangeOrderId Int64
deriving (Show, Ord, Eq)
data BrokerState =
BrokerState
{
bsOrderTransactionIdMap :: TVar (BM.Bimap OrderId TransactionId)
, bsNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ()))
, bsOrderMap :: TVar (M.Map OrderId Order)
, bsPendingOrders :: TVar (D.Deque Order)
}
data Env =
Env
{
qssChannel :: BoundedChan QuoteSourceServerData
, tisHandle :: TickerInfoServerHandle
, requestVar :: TMVar Request
, responseVar :: TMVar (TMVar Response)
, requestTimestamp :: TVar UTCTime
, currentCandles :: TVar [Candle]
3 years ago
, tickMap :: TickTable
, transaqQueue :: TBQueue TransaqResponse
, logger :: LogAction IO Message
, config :: TransaqConnectorConfig
, serverConnected :: TVar ConnectionStage
, candleKindMap :: TVar (M.Map Int Int)
, brokerState :: BrokerState
, runVar :: TMVar ()
, timerVar :: TMVar ()
}
newtype App a = App { unApp :: ReaderT Env IO a }
deriving (Monad, Applicative, Functor, MonadIO, MonadReader Env)
instance HasLog Env Message App where
getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) }
setLogAction _ env = env -- fuck it
3 years ago
start ::
LogAction IO Message
-> TransaqConnectorConfig
-> BoundedChan QuoteSourceServerData
3 years ago
-> TickerInfoServerHandle
3 years ago
-> IO TXMLConnectorHandle
3 years ago
start logger config qssChannel tisH = do
3 years ago
logWith logger Info "TXMLConnector" "Starting"
notificationQueue <- atomically $ newTBQueue 50000
3 years ago
tickTable <- newTickTable
requestVar <- newEmptyTMVarIO
responseVar <- newEmptyTMVarIO
3 years ago
currentCandles <- newTVarIO []
serverConnected <- liftIO $ newTVarIO StageConnection
candleKindMap <- newTVarIO M.empty
requestTimestamp <- getCurrentTime >>= newTVarIO
orderMap <- newTVarIO M.empty
notificationCallback <- newTVarIO Nothing
orderTransactionIdMap <- newTVarIO BM.empty
pendingOrders <- newTVarIO (fromList [])
runVar <- newEmptyTMVarIO
timerVar <- newEmptyTMVarIO
let brokerState =
BrokerState
{
bsOrderTransactionIdMap = orderTransactionIdMap
, bsNotificationCallback = notificationCallback
, bsOrderMap = orderMap
, bsPendingOrders = pendingOrders
}
let env =
Env
{
qssChannel = qssChannel
, tisHandle = tisH
, requestVar = requestVar
, responseVar = responseVar
, requestTimestamp = requestTimestamp
3 years ago
, currentCandles = currentCandles
, tickMap = tickTable
, transaqQueue = notificationQueue
, logger = logger
, config = config
, serverConnected = serverConnected
, candleKindMap = candleKindMap
, brokerState = brokerState
, runVar = runVar
, timerVar = timerVar
}
threadId <- forkIO $ (runReaderT . unApp) workThread env
return $ TXMLConnectorHandle
{
threadId = threadId
, notificationQueue = notificationQueue
, hRequestVar = requestVar
, hResponseVar = responseVar
, hRequestTimestamp = requestTimestamp
, hNotificationCallback = notificationCallback
, hRunVar = runVar
}
3 years ago
stop :: TXMLConnectorHandle -> IO ()
stop h = atomically $ putTMVar (hRunVar h) ()
workThread :: App ()
workThread = do
cfg <- asks config
rc <- liftIO $ initialize (transaqLogPath cfg) (parseTransaqLogLevel $ transaqLogLevel cfg)
3 years ago
case rc of
Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str
Right _ -> do
queue <- asks transaqQueue
logger' <- asks logger
rc <- liftIO $ setCallback (parseAndWrite queue logger')
3 years ago
case rc of
Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback"
Just cb -> do
serverConnectionState <- asks serverConnected
timerVar' <- asks timerVar
void $ liftIO $ forkIO $ whileM $ do
threadDelay 1000000
void . liftIO . atomically $ tryPutTMVar timerVar' ()
connStatus <- liftIO . readTVarIO $ serverConnectionState
pure $ connStatus /= StageShutdown
fsm <- makeFsm StageConnection [ (StageConnection, FSMCallback handleUnconnected)
, (StageGetInfo, FSMCallback handleGetInfo)
, (StageConnected, FSMCallback handleConnected)]
runFsm fsm
liftIO $ freeCallback cb
3 years ago
where
parseTransaqLogLevel 1 = TXML.Warning
parseTransaqLogLevel 3 = TXML.Debug
parseTransaqLogLevel _ = TXML.Info
parseAndWrite queue logger xml = do
3 years ago
let parsed = mapMaybe parseContent $ parseXML xml
mapM_ (writeToQueue queue) parsed
3 years ago
pure True
parseContent (Elem el) = parseElement el
parseContent _ = Nothing
parseElement el = case qName $ elName el of
"candles" -> TransaqResponseCandles <$> fromXml el
"server_status" -> TransaqResponseServerStatus <$> fromXml el
"markets" -> TransaqResponseMarkets <$> fromXml el
"candlekinds" -> TransaqResponseCandleKinds <$> fromXml el
"securities" -> TransaqResponseSecurities <$> fromXml el
"sec_info" -> TransaqResponseSecInfo <$> fromXml el
"quotations" -> TransaqResponseQuotations <$> fromXml el
"alltrades" -> TransaqResponseAllTrades <$> fromXml el
"quotes" -> TransaqResponseQuotes <$> fromXml el
3 years ago
"orders" -> TransaqResponseOrders <$> fromXml el
"trades" -> TransaqResponseTrades <$> fromXml el
"result" -> TransaqResponseResult <$> fromXml el
3 years ago
_ -> Nothing
writeToQueue queue resp = atomically $ writeTBQueue queue resp
handleConnected :: App (Maybe ConnectionStage)
handleConnected = do
checkRequestTimeout
serverConn <- asks serverConnected
rqVar <- asks requestVar
runVar' <- asks runVar
queue <- asks transaqQueue
timerVar' <- asks timerVar
3 years ago
item <- liftIO . atomically $
(readTMVar runVar' >> pure MainQueueShutdown) `orElse`
(MainQueueTransaqData <$> readTBQueue queue) `orElse`
(MainQueueRequest <$> takeTMVar rqVar) `orElse`
(takeTMVar timerVar' >> pure MainQueuePingServer)
3 years ago
case item of
MainQueueShutdown -> pure $ Just StageShutdown
MainQueuePingServer -> do
3 years ago
maybeServerStatus <- liftIO $ sendCommand $ toXml CommandServerStatus
case maybeServerStatus of
3 years ago
Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of
((TransaqResponseResult (ResponseFailure _)):_) -> do
pure $ Just StageConnection
_ -> do
log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw
pure Nothing
Right () -> pure Nothing
MainQueueTransaqData transaqData -> do
tm <- asks tickMap
case transaqData of
TransaqResponseAllTrades (ResponseAllTrades trades) -> do
qssChan <- asks qssChannel
let ticks = fmap allTradeToTick trades
forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks (insertToTickMap tm)
pure Nothing
TransaqResponseQuotations (ResponseQuotations quotations) -> do
qssChan <- asks qssChannel
now <- liftIO getCurrentTime
let ticks = concatMap (quotationToTicks now) quotations
forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks (insertToTickMap tm)
pure Nothing
3 years ago
TransaqResponseCandles respCandle -> do
resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar
log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle)
case resp of
Just !tmvar -> if cStatus respCandle == StatusPending
3 years ago
then do
cur <- asks currentCandles
liftIO $ atomically . modifyTVar' cur $ (\c -> cCandles respCandle <> c)
else do
cur <- asks currentCandles
liftIO $ atomically $ do
candles <- readTVar cur
putTMVar tmvar $ ResponseHistory $ HistoryResponse
{
hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles)
, hrMoreData = False
}
_ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var"
pure Nothing
TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder >> pure Nothing
TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade >> pure Nothing
_ -> pure Nothing
MainQueueRequest (RequestHistory request) -> do
3 years ago
cur <- asks currentCandles
liftIO $ atomically $ writeTVar cur []
maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO)
case maybeCk of
Just candleKindId -> do
case parseSecurityId (hrTickerId request) of
Just secId -> void $ liftIO . sendCommand $
toXml CommandGetHistoryData
{
security = secId
, periodId = candleKindId
, count = hrCount request
, reset = hrReset request
}
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request
_ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request)
pure Nothing
MainQueueRequest (RequestSubmitOrder order) -> do
case mkNewOrderCommand order of
Just cmd -> do
v <- liftIO . sendCommand . toXml $ cmd
case v of
3 years ago
Left result -> do
case headMay (parseXML result) >>= parseContent of
Just (TransaqResponseResult (ResponseSuccess (Just transactionId))) -> do
brState <- asks brokerState
respVar <- asks responseVar
liftIO $ atomically $ do
modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order)
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert (orderId order) (TransactionId transactionId))
resp <- readTMVar respVar
putTMVar resp ResponseOrderSubmitted
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
Just cb -> do
let notif = BackendOrderNotification (orderId order) Submitted
liftIO $ cb notif
_ -> pure ()
3 years ago
log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <>
(T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId
Just (TransaqResponseResult (ResponseFailure err)) -> do
3 years ago
brState <- asks brokerState
log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
Just cb -> do
let notif = BackendOrderNotification (orderId order) Rejected
liftIO $ cb notif
_ -> pure ()
3 years ago
_ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result"
pure Nothing
Right _ -> do
3 years ago
log Warning "TXMLConnector.WorkThread" "Expected result, got nothing"
pure Nothing
_ -> pure Nothing
_ -> pure Nothing
requestTimeout = 10
handleTrade transaqTrade = do
brState <- asks brokerState
trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState)
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
orderMap <- liftIO $ readTVarIO (bsOrderMap brState)
case maybeCb of
Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of
Just oid -> case M.lookup oid orderMap of
3 years ago
Just order -> do
let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order)
log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif
liftIO $ cb notif
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade
Nothing -> log Warning "TXMLConnector.WorkThread" $ "No callback for trade notification!"
fromTransaqTrade transaqTrade order =
Trade
{
tradeOrderId = orderId order
, tradePrice = fromDouble (tPrice transaqTrade)
, tradeQuantity = fromIntegral $ tQuantity transaqTrade
, tradeVolume = fromDouble $ tValue transaqTrade
, tradeVolumeCurrency = ""
, tradeOperation = fromDirection (tBuysell transaqTrade)
, tradeAccount = tClient transaqTrade <> "#" <> tUnion transaqTrade
, tradeSecurity = tBoard transaqTrade <> "#" <> tSecCode transaqTrade
, tradeTimestamp = tTimestamp transaqTrade
, tradeCommission = fromDouble $ tComission transaqTrade
, tradeSignalId = orderSignalId order
}
fromDirection Transaq.Buy = AT.Buy
fromDirection Transaq.Sell = AT.Sell
handleOrder orderUpdate = do
brState <- asks brokerState
trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState)
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
3 years ago
Just cb -> case BM.lookupR (ExchangeOrderId (oOrderNo orderUpdate)) trIdMap <|>
BM.lookupR (TransactionId (fromIntegral $ oTransactionId orderUpdate)) trIdMap of
Just oid -> do
let notif = BackendOrderNotification oid (orderStateFromTransaq orderUpdate)
log Debug "TXMLConnector.WorkThread" $ "Sending order notification: " <> (T.pack . show) notif
liftIO $ atomically $ do
m <- readTVar (bsOrderTransactionIdMap brState)
when (BM.notMemberR (ExchangeOrderId (oOrderNo orderUpdate)) m) $ do
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert oid (ExchangeOrderId $ oOrderNo orderUpdate))
liftIO $ cb notif
_ -> log Warning "TXMLConnector.WorkThread" "Unable to find order for order notification"
Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for order notification"
orderStateFromTransaq orderUpdate =
case oStatus orderUpdate of
OrderActive -> Submitted
OrderCancelled -> Cancelled
OrderDenied -> Rejected
OrderDisabled -> Rejected
OrderExpired -> Cancelled
OrderFailed -> Rejected
OrderForwarding -> Unsubmitted
OrderInactive -> OrderError
OrderMatched -> Executed
OrderRefused -> Rejected
OrderRemoved -> Rejected
OrderWait -> Unsubmitted
OrderWatching -> Unsubmitted
_ -> OrderError
checkRequestTimeout = do
now <- liftIO getCurrentTime
tsVar <- asks requestTimestamp
ts <- liftIO $ readTVarIO tsVar
when (now `diffUTCTime` ts >= requestTimeout) $ do
resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar
case resp of
Just tmvar -> do
log Warning "TXMLConnector.WorkThread" "Request timeout"
liftIO . atomically . putTMVar tmvar $ ResponseTimeout
_ -> pure ()
handleGetInfo :: App (Maybe ConnectionStage)
handleGetInfo = do
queue <- asks transaqQueue
cfg <- asks config
item <- liftIO . atomically $ readTBQueue queue
conn <- asks serverConnected
3 years ago
case item of
TransaqResponseServerStatus serverStatus -> do
log Warning "TXMLConnector.WorkThread" $ "Incoming server status: " <> (T.pack . show) serverStatus
3 years ago
case state serverStatus of
Transaq.Disconnected -> do
log Warning "TXMLConnector.WorkThread" "Server disconnected"
pure $ Just StageConnection
3 years ago
Transaq.Connected -> do
log Info "TXMLConnector.WorkThread" "Server connected"
3 years ago
void $ liftIO . sendCommand $ toXml $
CommandChangePass (transaqPassword cfg) "goobaka12"
v <- makeSubscriptions cfg
3 years ago
case v of
Left errmsg -> do
3 years ago
log Warning "TXMLConnector.WorkThread" "Unable to subscribe"
void $ liftIO . sendCommand $ toXml CommandDisconnect
pure $ Just StageConnection
Right _ -> do
log Info "TXMLConnector.WorkThread" "Subscriptions done"
pure $ Just StageConnected
3 years ago
Transaq.Error errmsg -> do
3 years ago
log Warning "TXMLConnector.WorkThread" "Connection error"
liftIO . atomically $ writeTVar conn StageConnection
3 years ago
void $ liftIO $ sendCommand $ toXml $ CommandDisconnect
pure $ Just StageConnection
TransaqResponseResult result -> do
3 years ago
log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result
pure Nothing
TransaqResponseCandles candles -> do
3 years ago
log Debug "TXMLConnector.WorkThread" $
3 years ago
"Incoming candles message: " <> (T.pack . show . length . Transaq.cCandles $ candles)
pure Nothing
3 years ago
TransaqResponseMarkets (ResponseMarkets markets) -> do
log Debug "TXMLConnector.WorkThread" "Incoming markets:"
forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m)
pure Nothing
3 years ago
TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do
ckMap <- asks candleKindMap
log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds
forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k)))
pure Nothing
3 years ago
TransaqResponseSecurities (ResponseSecurities securities) -> do
tisH <- asks tisHandle
let tickerInfos = securityToTickerInfo <$> securities
log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities
forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker)
forM_ tickerInfos (liftIO . putTickerInfo tisH)
pure Nothing
TransaqResponseSecInfo secInfo -> do
3 years ago
log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo
pure Nothing
TransaqResponseClient (ResponseClient clientData) -> do
log Debug "TXMLConnector.WorkThread" $
"Incoming client data: " <> (T.pack . show) (cClientId clientData) <> "#" <> (T.pack . show) (cUnion clientData)
pure Nothing
_ -> pure Nothing
handleUnconnected :: App (Maybe ConnectionStage)
handleUnconnected = do
cfg <- asks config
3 years ago
log Debug "TXMLConnector.WorkThread" "Sending connect command"
v <- liftIO . sendCommand . toXml $ cmdConnect cfg
case v of
Left _ -> do
log Warning "TXMLConnector.WorkThread" "Unable to connect"
liftIO $ do
void $ sendCommand $ toXml CommandDisconnect
threadDelay reconnectionDelay
queue <- asks transaqQueue
void $ liftIO $ atomically $ flushTBQueue queue
pure Nothing
Right _ -> do
log Info "TXMLConnector.WorkThread" "Connected"
pure $ Just StageGetInfo
makeSubscriptions config = liftIO . sendCommand . toXml $ cmdSubscription config
subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code
insertToTickMap tickMap tick = insertTick tickMap tick
reconnectionDelay = 1000 * 1000 * 10
cmdSubscription config =
CommandSubscribe
{
alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config),
quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config),
quotes = fmap subscriptionToSecurityId (quotesSubscriptions config)
}
cmdConnect cfg = CommandConnect
3 years ago
{
login = transaqLogin cfg,
password = transaqPassword cfg,
host = transaqHost cfg,
port = transaqPort cfg,
3 years ago
language = LanguageEn,
autopos = False,
micexRegisters = True,
milliseconds = True,
utcTime = True,
proxy = (),
rqDelay = Nothing,
sessionTimeout = Nothing,
requestTimeout = Nothing,
pushULimits = Nothing,
pushPosEquity = Nothing
}
allTradeToTick :: AllTradesTrade -> Tick
allTradeToTick att =
Tick
{
security = attBoard att <> "#" <> attSecCode att,
datatype = LastTradePrice,
timestamp = attTimestamp att,
value = fromDouble $ attPrice att,
volume = fromIntegral $ attQuantity att
}
quotationToTicks :: UTCTime -> Quotation -> [Tick]
quotationToTicks timestamp q =
let security = qBoard q <> "#" <> qSeccode q in
[
Tick
{
security = security,
datatype = BestBid,
timestamp = timestamp,
value = fromDouble $ qBid q,
volume = fromIntegral $ qQuantity q
},
Tick
{
security = security,
datatype = BestOffer,
timestamp = timestamp,
value = fromDouble $ qOffer q,
volume = fromIntegral $ qQuantity q
}]
3 years ago
securityToTickerInfo :: Security -> TickerInfo
securityToTickerInfo sec =
TickerInfo
{
tiTicker = sBoard sec <> "#" <> sSeccode sec
, tiLotSize = sLotSize sec
, tiTickSize = sMinStep sec
}
parseSecurityId :: TickerId -> Maybe SecurityId
3 years ago
parseSecurityId tickerId = case T.findIndex (== '#') tickerId of
Just ix -> Just $ SecurityId (T.take ix tickerId) (T.drop (ix + 1) tickerId)
Nothing -> Nothing
makeTickerId :: SecurityId -> TickerId
makeTickerId sec = board sec <> "#" <> seccode sec
parseAccountId :: T.Text -> Maybe (T.Text, T.Text)
parseAccountId accId = case T.findIndex (== '#') accId of
Just ix -> Just (T.take ix accId, T.drop (ix + 1) accId)
Nothing -> Nothing
3 years ago
makeRequest :: TXMLConnectorHandle -> Request -> IO Response
makeRequest h request = do
now <- getCurrentTime
3 years ago
resp <- atomically $ do
resp <- newEmptyTMVar
writeTVar (hRequestTimestamp h) now
3 years ago
putTMVar (hResponseVar h) resp
putTMVar (hRequestVar h) request
pure resp
atomically $ do
void $ takeTMVar (hResponseVar h)
takeTMVar resp
mkNewOrderCommand :: Order -> Maybe Transaq.CommandNewOrder
mkNewOrderCommand order =
case parseSecurityId (orderSecurity order) of
Just secId ->
case parseAccountId (orderAccountId order) of
Just (client, union) -> do
case orderPrice order of
Market -> Just $ Transaq.CommandNewOrder
{
security = secId
, client = client
, unionCode = union
, price = 0
, quantity = fromInteger $ orderQuantity order
, buysell = toDirection $ orderOperation order
, bymarket = True
, brokerRef = T.empty
, unfilled = UnfilledPutInQueue
, usecredit = False
, nosplit = False
}
Limit price -> Just $ Transaq.CommandNewOrder
{
security = secId
, client = client
, unionCode = union
, price = toDouble price
, quantity = fromInteger $ orderQuantity order
, buysell = toDirection $ orderOperation order
, bymarket = False
, brokerRef = T.empty
, unfilled = UnfilledPutInQueue
, usecredit = False
, nosplit = False
}
_ -> Nothing
3 years ago
_ -> Nothing
_ -> Nothing
where
toDirection AT.Buy = Transaq.Buy
toDirection AT.Sell = Transaq.Sell
3 years ago
candleToBar :: SecurityId -> Candle -> Bar
candleToBar sec candle =
Bar
{
barSecurity = makeTickerId sec
, barTimestamp = cTimestamp candle
, barOpen = fromDouble (cOpen candle)
, barHigh = fromDouble (cHigh candle)
, barLow = fromDouble (cLow candle)
, barClose = fromDouble (cClose candle)
, barVolume = fromIntegral $ cVolume candle
}
brSubmitOrder :: TXMLConnectorHandle -> Order -> IO ()
brSubmitOrder h order = void $ makeRequest h (RequestSubmitOrder order)
brCancelOrder :: TXMLConnectorHandle -> OrderId -> IO ()
brCancelOrder h oid = void $ makeRequest h (RequestCancelOrder oid)
brSetNotificationCallback :: TXMLConnectorHandle -> Maybe (BrokerBackendNotification -> IO ()) -> IO ()
brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h) cb
makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend
makeBrokerBackend h account =
BrokerBackend [account] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h)