Browse Source

TXML monad refactoring

master
Denis Tereshkin 3 years ago
parent
commit
83e11b2b18
  1. 7
      src/HistoryProviderServer.hs
  2. 21
      src/Linux/TXML.hs
  3. 38
      src/TXML.hs
  4. 612
      src/TXMLConnector.hs
  5. 702
      src/TXMLConnector/Internal.hs
  6. 21
      src/Win32/TXML.hs
  7. 5
      test/Test/TXMLConnector.hs
  8. 7
      transaq-connector.cabal

7
src/HistoryProviderServer.hs

@ -53,11 +53,10 @@ import System.ZMQ4 (Context, Router (Router), bind,
sendMulti, socket, withSocket) sendMulti, socket, withSocket)
import TickerInfoServer (TickerInfoServerHandle, import TickerInfoServer (TickerInfoServerHandle,
getAllTickers) getAllTickers)
import TXMLConnector (HistoryRequest (..), import TXMLConnector (TXMLConnectorHandle, makeRequest)
import TXMLConnector.Internal (HistoryRequest (..),
HistoryResponse (..), HistoryResponse (..),
Request (..), Response (..), Request (..), Response (..))
TXMLConnectorHandle, hrBars,
makeRequest)
data HistoryProviderServerHandle = data HistoryProviderServerHandle =
HistoryProviderServerHandle HistoryProviderServerHandle

21
src/Linux/TXML.hs

@ -1,3 +1,5 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module Linux.TXML module Linux.TXML
( (
@ -6,19 +8,15 @@ module Linux.TXML
, sendCommand , sendCommand
, setCallback , setCallback
, freeCallback , freeCallback
, Callback
, LogLevel(..)
) where ) where
import qualified Data.Text as T import qualified Data.Text as T
import TXML (Callback (..), FreeableCallback (..), LogLevel (..))
data LogLevel = data StubCallback = StubCallback
Debug
| Info
| Warning
deriving (Show, Eq, Ord)
data Callback = Callback instance (Monad m) => FreeableCallback m StubCallback where
freeCallback _ = pure ()
initialize :: FilePath -> LogLevel -> IO (Either T.Text ()) initialize :: FilePath -> LogLevel -> IO (Either T.Text ())
initialize _ _ = return (Right ()) initialize _ _ = return (Right ())
@ -29,9 +27,6 @@ uninitialize = return (Right ())
sendCommand :: T.Text -> IO (Either T.Text ()) sendCommand :: T.Text -> IO (Either T.Text ())
sendCommand _ = return (Right ()) sendCommand _ = return (Right ())
setCallback :: (T.Text -> IO Bool) -> IO (Maybe Callback) setCallback :: (Monad m) => (T.Text -> IO Bool) -> IO (Maybe (Callback m))
setCallback _ = return . Just $ Callback setCallback _ = return . Just $ MkCallback StubCallback
freeCallback :: Callback -> IO ()
freeCallback _ = return ()

38
src/TXML.hs

@ -1,18 +1,34 @@
{-# LANGUAGE CPP #-} {-# LANGUAGE CPP #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module TXML module TXML
( (
initialize MonadTXML(..)
, uninitialize , TXML.Callback(..)
, sendCommand
, setCallback
, freeCallback
, Callback
, LogLevel(..) , LogLevel(..)
, FreeableCallback(..)
) where ) where
#if defined(mingw32_HOST_OS) import qualified Data.Text as T
import Win32.TXML
#else data LogLevel =
import Linux.TXML Debug
#endif | Info
| Warning
deriving (Show, Eq, Ord)
class (Monad m) => FreeableCallback m a where
freeCallback :: a -> m()
data Callback m = forall callback. FreeableCallback m callback => MkCallback callback
instance (Monad m) => FreeableCallback m (Callback m) where
freeCallback = freeCallback
class (Monad m) => MonadTXML m where
initialize :: FilePath -> LogLevel -> m (Either T.Text ())
uninitialize :: m (Either T.Text ())
sendCommand :: T.Text -> m (Either T.Text ())
setCallback :: (T.Text -> IO Bool) -> m (Maybe (Callback IO))

612
src/TXMLConnector.hs

@ -1,4 +1,4 @@
{-# LANGUAGE BangPatterns #-} {-# LANGUAGE CPP #-}
{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE GeneralizedNewtypeDeriving #-}
@ -8,12 +8,8 @@ module TXMLConnector
( (
start start
, TXMLConnector.stop , TXMLConnector.stop
, Request(..)
, HistoryRequest(..)
, Response(..)
, HistoryResponse(..)
, makeRequest
, TXMLConnectorHandle , TXMLConnectorHandle
, makeRequest
, makeBrokerBackend , makeBrokerBackend
) where ) where
@ -37,7 +33,7 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar',
import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue, import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue,
newTBQueue, readTBQueue, newTBQueue, readTBQueue,
writeTBQueue) writeTBQueue)
import Control.Monad (forever, void, when) import Control.Monad (forM_, forever, void, when)
import Control.Monad.Extra (whileM) import Control.Monad.Extra (whileM)
import qualified Data.Bimap as BM import qualified Data.Bimap as BM
import Data.Maybe (mapMaybe) import Data.Maybe (mapMaybe)
@ -83,9 +79,9 @@ import Transaq (AllTradesTrade (..),
TransaqResponseC (fromXml), TransaqResponseC (fromXml),
UnfilledAction (..), UnfilledAction (..),
kCandleKindId, kPeriod, state) kCandleKindId, kPeriod, state)
import TXML (LogLevel, freeCallback, import TXML (LogLevel, MonadTXML,
initialize, sendCommand, freeCallback, initialize,
setCallback) sendCommand, setCallback)
import ATrade.Broker.Backend (BrokerBackend (..), import ATrade.Broker.Backend (BrokerBackend (..),
BrokerBackendNotification (..)) BrokerBackendNotification (..))
@ -104,7 +100,6 @@ import Control.Applicative ((<|>))
import Control.Concurrent.BoundedChan (BoundedChan, writeChan) import Control.Concurrent.BoundedChan (BoundedChan, writeChan)
import Control.Concurrent.STM.TMVar (TMVar) import Control.Concurrent.STM.TMVar (TMVar)
import Control.Error (headMay) import Control.Error (headMay)
import Control.Monad (forM_)
import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (ReaderT (runReaderT)) import Control.Monad.Reader (ReaderT (runReaderT))
import Control.Monad.Reader.Class (MonadReader, asks) import Control.Monad.Reader.Class (MonadReader, asks)
@ -122,6 +117,16 @@ import TickerInfoServer (TickerInfo (..),
putTickerInfo) putTickerInfo)
import qualified Transaq import qualified Transaq
import qualified TXML import qualified TXML
import TXMLConnector.Internal (BrokerState (..),
ConnectionStage (..), Env (..),
Request (..), Response (..),
workThread)
#if defined(mingw32_HOST_OS)
import qualified Win32.TXML as TXMLImpl
#else
import qualified Linux.TXML as TXMLImpl
#endif
data ConnectionParams = data ConnectionParams =
ConnectionParams ConnectionParams
@ -135,35 +140,6 @@ data ConnectionParams =
} }
deriving (Show, Eq, Ord) deriving (Show, Eq, Ord)
data HistoryRequest =
HistoryRequest
{
hrTickerId :: TickerId
, hrTimeframe :: BarTimeframe
, hrCount :: Int
, hrReset :: Bool
} deriving (Show, Eq, Ord)
data Request =
RequestHistory HistoryRequest
| RequestSubmitOrder Order
| RequestCancelOrder OrderId
deriving (Show, Eq)
data Response =
ResponseHistory HistoryResponse
| ResponseOrderSubmitted
| ResponseOrderCancelled
| ResponseTimeout
data HistoryResponse =
HistoryResponse
{
hrBars :: [Bar]
, hrMoreData :: Bool
}
deriving (Show, Eq)
data TXMLConnectorHandle = data TXMLConnectorHandle =
TXMLConnectorHandle TXMLConnectorHandle
{ {
@ -176,58 +152,15 @@ data TXMLConnectorHandle =
, hRunVar :: TMVar () , hRunVar :: TMVar ()
} }
data ConnectionStage = StageConnection | StageGetInfo | StageConnected | StageShutdown
deriving (Eq, Show, Ord)
instance FSMState ConnectionStage where
isTerminalState StageShutdown = True
isTerminalState _ = False
data MainQueueData =
MainQueueTransaqData TransaqResponse
| MainQueueRequest Request
| MainQueuePingServer
| MainQueueShutdown
deriving (Eq, Show)
data TransactionId =
TransactionId Int64
| ExchangeOrderId Int64
deriving (Show, Ord, Eq)
data BrokerState =
BrokerState
{
bsOrderTransactionIdMap :: TVar (BM.Bimap OrderId TransactionId)
, bsNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ()))
, bsOrderMap :: TVar (M.Map OrderId Order)
, bsPendingOrders :: TVar (D.Deque Order)
}
data Env =
Env
{
qssChannel :: BoundedChan QuoteSourceServerData
, tisHandle :: TickerInfoServerHandle
, requestVar :: TMVar Request
, responseVar :: TMVar (TMVar Response)
, requestTimestamp :: TVar UTCTime
, currentCandles :: TVar [Candle]
, tickMap :: TickTable
, transaqQueue :: TBQueue TransaqResponse
, logger :: LogAction IO Message
, config :: TransaqConnectorConfig
, serverConnected :: TVar ConnectionStage
, candleKindMap :: TVar (M.Map Int Int)
, brokerState :: BrokerState
, runVar :: TMVar ()
, timerVar :: TMVar ()
}
newtype App a = App { unApp :: ReaderT Env IO a } newtype App a = App { unApp :: ReaderT Env IO a }
deriving (Monad, Applicative, Functor, MonadIO, MonadReader Env) deriving (Monad, Applicative, Functor, MonadIO, MonadReader Env)
instance MonadTXML App where
initialize path loglevel = liftIO $ TXMLImpl.initialize path loglevel
uninitialize = liftIO TXMLImpl.uninitialize
sendCommand = liftIO . TXMLImpl.sendCommand
setCallback = liftIO . TXMLImpl.setCallback
instance HasLog Env Message App where instance HasLog Env Message App where
getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) } getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) }
setLogAction _ env = env -- fuck it setLogAction _ env = env -- fuck it
@ -296,432 +229,19 @@ start logger config qssChannel tisH = do
stop :: TXMLConnectorHandle -> IO () stop :: TXMLConnectorHandle -> IO ()
stop h = atomically $ putTMVar (hRunVar h) () stop h = atomically $ putTMVar (hRunVar h) ()
workThread :: App ()
workThread = do
cfg <- asks config
rc <- liftIO $ initialize (transaqLogPath cfg) (parseTransaqLogLevel $ transaqLogLevel cfg)
case rc of
Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str
Right _ -> do
queue <- asks transaqQueue
logger' <- asks logger
rc <- liftIO $ setCallback (parseAndWrite queue logger')
case rc of
Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback"
Just cb -> do
serverConnectionState <- asks serverConnected
timerVar' <- asks timerVar
void $ liftIO $ forkIO $ whileM $ do
threadDelay 1000000
void . liftIO . atomically $ tryPutTMVar timerVar' ()
connStatus <- liftIO . readTVarIO $ serverConnectionState
pure $ connStatus /= StageShutdown
fsm <- makeFsm StageConnection [ (StageConnection, FSMCallback handleUnconnected)
, (StageGetInfo, FSMCallback handleGetInfo)
, (StageConnected, FSMCallback handleConnected)]
runFsm fsm
liftIO $ freeCallback cb
where
parseTransaqLogLevel 1 = TXML.Warning
parseTransaqLogLevel 3 = TXML.Debug
parseTransaqLogLevel _ = TXML.Info
parseAndWrite queue logger xml = do
let parsed = mapMaybe parseContent $ parseXML xml
mapM_ (writeToQueue queue) parsed
pure True
parseContent (Elem el) = parseElement el
parseContent _ = Nothing
parseElement el = case qName $ elName el of
"candles" -> TransaqResponseCandles <$> fromXml el
"server_status" -> TransaqResponseServerStatus <$> fromXml el
"markets" -> TransaqResponseMarkets <$> fromXml el
"candlekinds" -> TransaqResponseCandleKinds <$> fromXml el
"securities" -> TransaqResponseSecurities <$> fromXml el
"sec_info" -> TransaqResponseSecInfo <$> fromXml el
"quotations" -> TransaqResponseQuotations <$> fromXml el
"alltrades" -> TransaqResponseAllTrades <$> fromXml el
"quotes" -> TransaqResponseQuotes <$> fromXml el
"orders" -> TransaqResponseOrders <$> fromXml el
"trades" -> TransaqResponseTrades <$> fromXml el
"result" -> TransaqResponseResult <$> fromXml el
_ -> Nothing
writeToQueue queue resp = atomically $ writeTBQueue queue resp
handleConnected :: App (Maybe ConnectionStage)
handleConnected = do
checkRequestTimeout
serverConn <- asks serverConnected
rqVar <- asks requestVar
runVar' <- asks runVar
queue <- asks transaqQueue
timerVar' <- asks timerVar
item <- liftIO . atomically $
(readTMVar runVar' >> pure MainQueueShutdown) `orElse`
(MainQueueTransaqData <$> readTBQueue queue) `orElse`
(MainQueueRequest <$> takeTMVar rqVar) `orElse`
(takeTMVar timerVar' >> pure MainQueuePingServer)
case item of
MainQueueShutdown -> pure $ Just StageShutdown
MainQueuePingServer -> do
maybeServerStatus <- liftIO $ sendCommand $ toXml CommandServerStatus
case maybeServerStatus of
Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of
((TransaqResponseResult (ResponseFailure _)):_) -> do
pure $ Just StageConnection
_ -> do
log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw
pure Nothing
Right () -> pure Nothing
MainQueueTransaqData transaqData -> do
tm <- asks tickMap
case transaqData of
TransaqResponseAllTrades (ResponseAllTrades trades) -> do
qssChan <- asks qssChannel
let ticks = fmap allTradeToTick trades
forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks (insertToTickMap tm)
pure Nothing
TransaqResponseQuotations (ResponseQuotations quotations) -> do
qssChan <- asks qssChannel
now <- liftIO getCurrentTime
let ticks = concatMap (quotationToTicks now) quotations
forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks (insertToTickMap tm)
pure Nothing
TransaqResponseCandles respCandle -> do
resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar
log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle)
case resp of
Just !tmvar -> if cStatus respCandle == StatusPending
then do
cur <- asks currentCandles
liftIO $ atomically . modifyTVar' cur $ (\c -> cCandles respCandle <> c)
else do
cur <- asks currentCandles
liftIO $ atomically $ do
candles <- readTVar cur
putTMVar tmvar $ ResponseHistory $ HistoryResponse
{
hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles)
, hrMoreData = False
}
_ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var"
pure Nothing
TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder >> pure Nothing
TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade >> pure Nothing
_ -> pure Nothing
MainQueueRequest (RequestHistory request) -> do
cur <- asks currentCandles
liftIO $ atomically $ writeTVar cur []
maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO)
case maybeCk of
Just candleKindId -> do
case parseSecurityId (hrTickerId request) of
Just secId -> void $ liftIO . sendCommand $
toXml CommandGetHistoryData
{
security = secId
, periodId = candleKindId
, count = hrCount request
, reset = hrReset request
}
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request
_ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request)
pure Nothing
MainQueueRequest (RequestSubmitOrder order) -> do
case mkNewOrderCommand order of
Just cmd -> do
v <- liftIO . sendCommand . toXml $ cmd
case v of
Left result -> do
case headMay (parseXML result) >>= parseContent of
Just (TransaqResponseResult (ResponseSuccess (Just transactionId))) -> do
brState <- asks brokerState
respVar <- asks responseVar
liftIO $ atomically $ do
modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order)
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert (orderId order) (TransactionId transactionId))
resp <- readTMVar respVar
putTMVar resp ResponseOrderSubmitted
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
Just cb -> do
let notif = BackendOrderNotification (orderId order) Submitted
liftIO $ cb notif
_ -> pure ()
log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <>
(T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId
Just (TransaqResponseResult (ResponseFailure err)) -> do
brState <- asks brokerState
log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
Just cb -> do
let notif = BackendOrderNotification (orderId order) Rejected
liftIO $ cb notif
_ -> pure ()
_ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result"
pure Nothing
Right _ -> do
log Warning "TXMLConnector.WorkThread" "Expected result, got nothing"
pure Nothing
_ -> pure Nothing
_ -> pure Nothing
requestTimeout = 10
handleTrade transaqTrade = do
brState <- asks brokerState
trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState)
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
orderMap <- liftIO $ readTVarIO (bsOrderMap brState)
case maybeCb of
Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of
Just oid -> case M.lookup oid orderMap of
Just order -> do
let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order)
log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif
liftIO $ cb notif
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade
Nothing -> log Warning "TXMLConnector.WorkThread" $ "No callback for trade notification!"
fromTransaqTrade transaqTrade order =
Trade
{
tradeOrderId = orderId order
, tradePrice = fromDouble (tPrice transaqTrade)
, tradeQuantity = fromIntegral $ tQuantity transaqTrade
, tradeVolume = fromDouble $ tValue transaqTrade
, tradeVolumeCurrency = ""
, tradeOperation = fromDirection (tBuysell transaqTrade)
, tradeAccount = tClient transaqTrade <> "#" <> tUnion transaqTrade
, tradeSecurity = tBoard transaqTrade <> "#" <> tSecCode transaqTrade
, tradeTimestamp = tTimestamp transaqTrade
, tradeCommission = fromDouble $ tComission transaqTrade
, tradeSignalId = orderSignalId order
}
fromDirection Transaq.Buy = AT.Buy brSubmitOrder :: TXMLConnectorHandle -> Order -> IO ()
fromDirection Transaq.Sell = AT.Sell brSubmitOrder h order = void $ makeRequest h (RequestSubmitOrder order)
handleOrder orderUpdate = do
brState <- asks brokerState
trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState)
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
Just cb -> case BM.lookupR (ExchangeOrderId (oOrderNo orderUpdate)) trIdMap <|>
BM.lookupR (TransactionId (fromIntegral $ oTransactionId orderUpdate)) trIdMap of
Just oid -> do
let notif = BackendOrderNotification oid (orderStateFromTransaq orderUpdate)
log Debug "TXMLConnector.WorkThread" $ "Sending order notification: " <> (T.pack . show) notif
liftIO $ atomically $ do
m <- readTVar (bsOrderTransactionIdMap brState)
when (BM.notMemberR (ExchangeOrderId (oOrderNo orderUpdate)) m) $ do
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert oid (ExchangeOrderId $ oOrderNo orderUpdate))
liftIO $ cb notif
_ -> log Warning "TXMLConnector.WorkThread" "Unable to find order for order notification"
Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for order notification"
orderStateFromTransaq orderUpdate =
case oStatus orderUpdate of
OrderActive -> Submitted
OrderCancelled -> Cancelled
OrderDenied -> Rejected
OrderDisabled -> Rejected
OrderExpired -> Cancelled
OrderFailed -> Rejected
OrderForwarding -> Unsubmitted
OrderInactive -> OrderError
OrderMatched -> Executed
OrderRefused -> Rejected
OrderRemoved -> Rejected
OrderWait -> Unsubmitted
OrderWatching -> Unsubmitted
_ -> OrderError
checkRequestTimeout = do
now <- liftIO getCurrentTime
tsVar <- asks requestTimestamp
ts <- liftIO $ readTVarIO tsVar
when (now `diffUTCTime` ts >= requestTimeout) $ do
resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar
case resp of
Just tmvar -> do
log Warning "TXMLConnector.WorkThread" "Request timeout"
liftIO . atomically . putTMVar tmvar $ ResponseTimeout
_ -> pure ()
handleGetInfo :: App (Maybe ConnectionStage)
handleGetInfo = do
queue <- asks transaqQueue
cfg <- asks config
item <- liftIO . atomically $ readTBQueue queue
conn <- asks serverConnected
case item of
TransaqResponseServerStatus serverStatus -> do
log Warning "TXMLConnector.WorkThread" $ "Incoming server status: " <> (T.pack . show) serverStatus
case state serverStatus of
Transaq.Disconnected -> do
log Warning "TXMLConnector.WorkThread" "Server disconnected"
pure $ Just StageConnection
Transaq.Connected -> do
log Info "TXMLConnector.WorkThread" "Server connected"
void $ liftIO . sendCommand $ toXml $
CommandChangePass (transaqPassword cfg) "goobaka12"
v <- makeSubscriptions cfg
case v of
Left errmsg -> do
log Warning "TXMLConnector.WorkThread" "Unable to subscribe"
void $ liftIO . sendCommand $ toXml CommandDisconnect
pure $ Just StageConnection
Right _ -> do
log Info "TXMLConnector.WorkThread" "Subscriptions done"
pure $ Just StageConnected
Transaq.Error errmsg -> do
log Warning "TXMLConnector.WorkThread" "Connection error"
liftIO . atomically $ writeTVar conn StageConnection
void $ liftIO $ sendCommand $ toXml $ CommandDisconnect
pure $ Just StageConnection
TransaqResponseResult result -> do
log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result
pure Nothing
TransaqResponseCandles candles -> do
log Debug "TXMLConnector.WorkThread" $
"Incoming candles message: " <> (T.pack . show . length . Transaq.cCandles $ candles)
pure Nothing
TransaqResponseMarkets (ResponseMarkets markets) -> do
log Debug "TXMLConnector.WorkThread" "Incoming markets:"
forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m)
pure Nothing
TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do
ckMap <- asks candleKindMap
log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds
forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k)))
pure Nothing
TransaqResponseSecurities (ResponseSecurities securities) -> do
tisH <- asks tisHandle
let tickerInfos = securityToTickerInfo <$> securities
log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities
forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker)
forM_ tickerInfos (liftIO . putTickerInfo tisH)
pure Nothing
TransaqResponseSecInfo secInfo -> do
log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo
pure Nothing
TransaqResponseClient (ResponseClient clientData) -> do
log Debug "TXMLConnector.WorkThread" $
"Incoming client data: " <> (T.pack . show) (cClientId clientData) <> "#" <> (T.pack . show) (cUnion clientData)
pure Nothing
_ -> pure Nothing
handleUnconnected :: App (Maybe ConnectionStage)
handleUnconnected = do
cfg <- asks config
log Debug "TXMLConnector.WorkThread" "Sending connect command"
v <- liftIO . sendCommand . toXml $ cmdConnect cfg
case v of
Left _ -> do
log Warning "TXMLConnector.WorkThread" "Unable to connect"
liftIO $ do
void $ sendCommand $ toXml CommandDisconnect
threadDelay reconnectionDelay
queue <- asks transaqQueue
void $ liftIO $ atomically $ flushTBQueue queue
pure Nothing
Right _ -> do
log Info "TXMLConnector.WorkThread" "Connected"
pure $ Just StageGetInfo
makeSubscriptions config = liftIO . sendCommand . toXml $ cmdSubscription config
subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code
insertToTickMap tickMap tick = insertTick tickMap tick
reconnectionDelay = 1000 * 1000 * 10
cmdSubscription config =
CommandSubscribe
{
alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config),
quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config),
quotes = fmap subscriptionToSecurityId (quotesSubscriptions config)
}
cmdConnect cfg = CommandConnect
{
login = transaqLogin cfg,
password = transaqPassword cfg,
host = transaqHost cfg,
port = transaqPort cfg,
language = LanguageEn,
autopos = False,
micexRegisters = True,
milliseconds = True,
utcTime = True,
proxy = (),
rqDelay = Nothing,
sessionTimeout = Nothing,
requestTimeout = Nothing,
pushULimits = Nothing,
pushPosEquity = Nothing
}
allTradeToTick :: AllTradesTrade -> Tick
allTradeToTick att =
Tick
{
security = attBoard att <> "#" <> attSecCode att,
datatype = LastTradePrice,
timestamp = attTimestamp att,
value = fromDouble $ attPrice att,
volume = fromIntegral $ attQuantity att
}
quotationToTicks :: UTCTime -> Quotation -> [Tick]
quotationToTicks timestamp q =
let security = qBoard q <> "#" <> qSeccode q in
[
Tick
{
security = security,
datatype = BestBid,
timestamp = timestamp,
value = fromDouble $ qBid q,
volume = fromIntegral $ qQuantity q
},
Tick
{
security = security,
datatype = BestOffer,
timestamp = timestamp,
value = fromDouble $ qOffer q,
volume = fromIntegral $ qQuantity q
}]
securityToTickerInfo :: Security -> TickerInfo
securityToTickerInfo sec =
TickerInfo
{
tiTicker = sBoard sec <> "#" <> sSeccode sec
, tiLotSize = sLotSize sec
, tiTickSize = sMinStep sec
}
parseSecurityId :: TickerId -> Maybe SecurityId
parseSecurityId tickerId = case T.findIndex (== '#') tickerId of
Just ix -> Just $ SecurityId (T.take ix tickerId) (T.drop (ix + 1) tickerId)
Nothing -> Nothing
makeTickerId :: SecurityId -> TickerId brCancelOrder :: TXMLConnectorHandle -> OrderId -> IO ()
makeTickerId sec = board sec <> "#" <> seccode sec brCancelOrder h oid = void $ makeRequest h (RequestCancelOrder oid)
parseAccountId :: T.Text -> Maybe (T.Text, T.Text) brSetNotificationCallback :: TXMLConnectorHandle -> Maybe (BrokerBackendNotification -> IO ()) -> IO ()
parseAccountId accId = case T.findIndex (== '#') accId of brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h) cb
Just ix -> Just (T.take ix accId, T.drop (ix + 1) accId)
Nothing -> Nothing
makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend
makeBrokerBackend h account =
BrokerBackend [account] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h)
makeRequest :: TXMLConnectorHandle -> Request -> IO Response makeRequest :: TXMLConnectorHandle -> Request -> IO Response
makeRequest h request = do makeRequest h request = do
@ -735,73 +255,3 @@ makeRequest h request = do
atomically $ do atomically $ do
void $ takeTMVar (hResponseVar h) void $ takeTMVar (hResponseVar h)
takeTMVar resp takeTMVar resp
mkNewOrderCommand :: Order -> Maybe Transaq.CommandNewOrder
mkNewOrderCommand order =
case parseSecurityId (orderSecurity order) of
Just secId ->
case parseAccountId (orderAccountId order) of
Just (client, union) -> do
case orderPrice order of
Market -> Just $ Transaq.CommandNewOrder
{
security = secId
, client = client
, unionCode = union
, price = 0
, quantity = fromInteger $ orderQuantity order
, buysell = toDirection $ orderOperation order
, bymarket = True
, brokerRef = T.empty
, unfilled = UnfilledPutInQueue
, usecredit = False
, nosplit = False
}
Limit price -> Just $ Transaq.CommandNewOrder
{
security = secId
, client = client
, unionCode = union
, price = toDouble price
, quantity = fromInteger $ orderQuantity order
, buysell = toDirection $ orderOperation order
, bymarket = False
, brokerRef = T.empty
, unfilled = UnfilledPutInQueue
, usecredit = False
, nosplit = False
}
_ -> Nothing
_ -> Nothing
_ -> Nothing
where
toDirection AT.Buy = Transaq.Buy
toDirection AT.Sell = Transaq.Sell
candleToBar :: SecurityId -> Candle -> Bar
candleToBar sec candle =
Bar
{
barSecurity = makeTickerId sec
, barTimestamp = cTimestamp candle
, barOpen = fromDouble (cOpen candle)
, barHigh = fromDouble (cHigh candle)
, barLow = fromDouble (cLow candle)
, barClose = fromDouble (cClose candle)
, barVolume = fromIntegral $ cVolume candle
}
brSubmitOrder :: TXMLConnectorHandle -> Order -> IO ()
brSubmitOrder h order = void $ makeRequest h (RequestSubmitOrder order)
brCancelOrder :: TXMLConnectorHandle -> OrderId -> IO ()
brCancelOrder h oid = void $ makeRequest h (RequestCancelOrder oid)
brSetNotificationCallback :: TXMLConnectorHandle -> Maybe (BrokerBackendNotification -> IO ()) -> IO ()
brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h) cb
makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend
makeBrokerBackend h account =
BrokerBackend [account] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h)

702
src/TXMLConnector/Internal.hs

@ -0,0 +1,702 @@
{-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module TXMLConnector.Internal
(
workThread
, Env(..)
, Request(..)
, Response(..)
, BrokerState(..)
, ConnectionStage(..)
, HistoryRequest(..)
, HistoryResponse(..)
) where
import ATrade.Logging (Message, Severity (..), log,
logWith)
import Colog (HasLog (getLogAction, setLogAction),
LogAction (LogAction, unLogAction))
import Config (SubscriptionConfig (SubscriptionConfig),
TransaqConnectorConfig (..),
transaqHost, transaqLogLevel,
transaqLogPath, transaqLogin,
transaqPassword, transaqPort)
import Control.Concurrent (ThreadId, forkIO, threadDelay)
import Control.Concurrent.STM (TVar, atomically, modifyTVar',
newEmptyTMVar, newEmptyTMVarIO,
newTVarIO, orElse, putTMVar,
readTMVar, readTVar,
readTVarIO, takeTMVar,
tryPutTMVar, tryReadTMVar,
writeTVar)
import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue,
newTBQueue, readTBQueue,
writeTBQueue)
import Control.Monad (forever, void, when)
import Control.Monad.Extra (whileM)
import qualified Data.Bimap as BM
import Data.Maybe (mapMaybe)
import qualified Data.Text as T
import qualified Deque.Strict as D
import Text.XML.Light.Input (parseXML)
import Text.XML.Light.Types (Content (Elem),
Element (elName),
QName (qName))
import TickTable (TickTable, insertTick,
lookupTick, newTickTable)
import Transaq (AllTradesTrade (..),
Candle (..), ClientData (..),
CommandChangePass (..),
CommandConnect (..),
CommandDisconnect (CommandDisconnect),
CommandGetHistoryData (CommandGetHistoryData),
CommandServerStatus (..),
CommandSubscribe (..),
ConnectionState (Disconnected),
Language (LanguageEn),
MarketInfo (..),
OrderNotification (..),
OrderStatus (..),
Quotation (..),
ResponseAllTrades (ResponseAllTrades),
ResponseCandleKinds (ResponseCandleKinds),
ResponseCandles (..),
ResponseCandlesStatus (StatusPending),
ResponseClient (ResponseClient),
ResponseMarkets (ResponseMarkets),
ResponseOrders (ResponseOrders),
ResponseQuotations (ResponseQuotations),
ResponseQuotes (ResponseQuotes),
ResponseResult (..),
ResponseSecurities (ResponseSecurities),
ResponseTrades (ResponseTrades),
Security (..), SecurityId (..),
TradeNotification (..),
TransaqCommand (toXml),
TransaqResponse (..),
TransaqResponse (..),
TransaqResponseC (fromXml),
UnfilledAction (..),
kCandleKindId, kPeriod, state)
import TXML (LogLevel, MonadTXML,
freeCallback, initialize,
sendCommand, setCallback)
import ATrade.Broker.Backend (BrokerBackend (..),
BrokerBackendNotification (..))
import ATrade.QuoteSource.Server (QuoteSourceServerData (..))
import ATrade.Types (Bar (..),
BarTimeframe (unBarTimeframe),
DataType (BestBid, BestOffer, LastTradePrice),
Order (..), OrderId,
OrderPrice (..),
OrderState (..), Tick (..),
TickerId, Trade (..),
fromDouble, toDouble)
import qualified ATrade.Types as AT
import Colog.Monad (WithLog)
import Control.Applicative ((<|>))
import Control.Concurrent.BoundedChan (BoundedChan, writeChan)
import Control.Concurrent.STM.TMVar (TMVar)
import Control.Error (headMay)
import Control.Monad (forM_)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (ReaderT (runReaderT))
import Control.Monad.Reader.Class (MonadReader, asks)
import Data.Int (Int64)
import qualified Data.Map.Strict as M
import Data.Time.Clock (UTCTime, diffUTCTime,
getCurrentTime)
import FSM (FSMCallback (..),
FSMState (isTerminalState),
makeFsm, runFsm)
import GHC.Exts (IsList (..))
import Prelude hiding (log)
import TickerInfoServer (TickerInfo (..),
TickerInfoServerHandle,
putTickerInfo)
import qualified Transaq
import qualified TXML
data ConnectionStage = StageConnection | StageGetInfo | StageConnected | StageShutdown
deriving (Eq, Show, Ord)
instance FSMState ConnectionStage where
isTerminalState StageShutdown = True
isTerminalState _ = False
data Env =
Env
{
qssChannel :: BoundedChan QuoteSourceServerData
, tisHandle :: TickerInfoServerHandle
, requestVar :: TMVar Request
, responseVar :: TMVar (TMVar Response)
, requestTimestamp :: TVar UTCTime
, currentCandles :: TVar [Candle]
, tickMap :: TickTable
, transaqQueue :: TBQueue TransaqResponse
, logger :: LogAction IO Message
, config :: TransaqConnectorConfig
, serverConnected :: TVar ConnectionStage
, candleKindMap :: TVar (M.Map Int Int)
, brokerState :: BrokerState
, runVar :: TMVar ()
, timerVar :: TMVar ()
}
data MainQueueData =
MainQueueTransaqData TransaqResponse
| MainQueueRequest Request
| MainQueuePingServer
| MainQueueShutdown
deriving (Eq, Show)
data TransactionId =
TransactionId Int64
| ExchangeOrderId Int64
deriving (Show, Ord, Eq)
data BrokerState =
BrokerState
{
bsOrderTransactionIdMap :: TVar (BM.Bimap OrderId TransactionId)
, bsNotificationCallback :: TVar (Maybe (BrokerBackendNotification -> IO ()))
, bsOrderMap :: TVar (M.Map OrderId Order)
, bsPendingOrders :: TVar (D.Deque Order)
}
data HistoryRequest =
HistoryRequest
{
hrTickerId :: TickerId
, hrTimeframe :: BarTimeframe
, hrCount :: Int
, hrReset :: Bool
} deriving (Show, Eq, Ord)
data Request =
RequestHistory HistoryRequest
| RequestSubmitOrder Order
| RequestCancelOrder OrderId
deriving (Show, Eq)
data Response =
ResponseHistory HistoryResponse
| ResponseOrderSubmitted
| ResponseOrderCancelled
| ResponseTimeout
data HistoryResponse =
HistoryResponse
{
hrBars :: [Bar]
, hrMoreData :: Bool
}
deriving (Show, Eq)
newtype RespCallback =
RespCallback { unCallback :: T.Text -> IO Bool }
workThread :: (MonadIO m,
MonadReader Env m,
MonadTXML m,
HasLog Env Message m) => m ()
workThread = do
cfg <- asks config
rc <- initialize (transaqLogPath cfg) (parseTransaqLogLevel $ transaqLogLevel cfg)
case rc of
Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str
Right _ -> do
queue <- asks transaqQueue
logger' <- asks logger
rc <- setCallback (parseAndWrite queue)
case rc of
Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback"
Just cb -> do
serverConnectionState <- asks serverConnected
timerVar' <- asks timerVar
void $ liftIO $ forkIO $ whileM $ do
threadDelay 1000000
void . liftIO . atomically $ tryPutTMVar timerVar' ()
connStatus <- liftIO . readTVarIO $ serverConnectionState
pure $ connStatus /= StageShutdown
fsm <- makeFsm StageConnection [ (StageConnection, FSMCallback handleUnconnected)
, (StageGetInfo, FSMCallback handleGetInfo)
, (StageConnected, FSMCallback handleConnected)]
runFsm fsm
liftIO $ freeCallback cb
where
parseTransaqLogLevel 1 = TXML.Warning
parseTransaqLogLevel 3 = TXML.Debug
parseTransaqLogLevel _ = TXML.Info
parseAndWrite :: TBQueue TransaqResponse -> T.Text -> IO Bool
parseAndWrite queue xml = do
let parsed = mapMaybe parseContent $ parseXML xml
atomically $ mapM_ (writeTBQueue queue) parsed
pure True
parseContent :: Content -> Maybe TransaqResponse
parseContent (Elem el) = parseElement
where
parseElement = case qName $ elName el of
"candles" -> TransaqResponseCandles <$> fromXml el
"server_status" -> TransaqResponseServerStatus <$> fromXml el
"markets" -> TransaqResponseMarkets <$> fromXml el
"candlekinds" -> TransaqResponseCandleKinds <$> fromXml el
"securities" -> TransaqResponseSecurities <$> fromXml el
"sec_info" -> TransaqResponseSecInfo <$> fromXml el
"quotations" -> TransaqResponseQuotations <$> fromXml el
"alltrades" -> TransaqResponseAllTrades <$> fromXml el
"quotes" -> TransaqResponseQuotes <$> fromXml el
"orders" -> TransaqResponseOrders <$> fromXml el
"trades" -> TransaqResponseTrades <$> fromXml el
"result" -> TransaqResponseResult <$> fromXml el
_ -> Nothing
parseContent _ = Nothing
handleConnected :: (MonadIO m,
MonadReader Env m,
MonadTXML m,
HasLog Env Message m) => m (Maybe ConnectionStage)
handleConnected = do
checkRequestTimeout
rqVar <- asks requestVar
runVar' <- asks runVar
queue <- asks transaqQueue
timerVar' <- asks timerVar
item <- liftIO . atomically $
(readTMVar runVar' >> pure MainQueueShutdown) `orElse`
(MainQueueTransaqData <$> readTBQueue queue) `orElse`
(MainQueueRequest <$> takeTMVar rqVar) `orElse`
(takeTMVar timerVar' >> pure MainQueuePingServer)
case item of
MainQueueShutdown -> pure $ Just StageShutdown
MainQueuePingServer -> do
maybeServerStatus <- sendCommand $ toXml CommandServerStatus
case maybeServerStatus of
Left serverStatusRaw -> case mapMaybe parseContent $ parseXML serverStatusRaw of
((TransaqResponseResult (ResponseFailure _)):_) -> do
pure $ Just StageConnection
_ -> do
log Warning "TXMLConnector.WorkThread" $ "Unable to parser server status response: " <> (T.pack . show ) serverStatusRaw
pure Nothing
Right () -> pure Nothing
MainQueueTransaqData transaqData -> do
tm <- asks tickMap
case transaqData of
TransaqResponseAllTrades (ResponseAllTrades trades) -> do
qssChan <- asks qssChannel
let ticks = fmap allTradeToTick trades
forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks (insertToTickMap tm)
pure Nothing
TransaqResponseQuotations (ResponseQuotations quotations) -> do
qssChan <- asks qssChannel
now <- liftIO getCurrentTime
let ticks = concatMap (quotationToTicks now) quotations
forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks (insertToTickMap tm)
pure Nothing
TransaqResponseCandles respCandle -> do
resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar
log Debug "TXMLConnector.WorkThread" $ "Incoming candles: " <> (T.pack . show . length) (cCandles respCandle)
case resp of
Just !tmvar -> if cStatus respCandle == StatusPending
then do
cur <- asks currentCandles
liftIO $ atomically . modifyTVar' cur $ (\c -> cCandles respCandle <> c)
else do
cur <- asks currentCandles
liftIO $ atomically $ do
candles <- readTVar cur
putTMVar tmvar $ ResponseHistory $ HistoryResponse
{
hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles)
, hrMoreData = False
}
_ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var"
pure Nothing
TransaqResponseOrders (ResponseOrders orders) -> forM_ orders handleOrder >> pure Nothing
TransaqResponseTrades (ResponseTrades trades) -> forM_ trades handleTrade >> pure Nothing
_ -> pure Nothing
MainQueueRequest (RequestHistory request) -> do
cur <- asks currentCandles
liftIO $ atomically $ writeTVar cur []
maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO)
case maybeCk of
Just candleKindId -> do
case parseSecurityId (hrTickerId request) of
Just secId -> void $ sendCommand $ toXml CommandGetHistoryData
{
security = secId
, periodId = candleKindId
, count = hrCount request
, reset = hrReset request
}
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request
_ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request)
pure Nothing
MainQueueRequest (RequestSubmitOrder order) -> do
case mkNewOrderCommand order of
Just cmd -> do
v <- sendCommand . toXml $ cmd
case v of
Left result -> do
case headMay (parseXML result) >>= parseContent of
Just (TransaqResponseResult (ResponseSuccess (Just transactionId))) -> do
brState <- asks brokerState
respVar <- asks responseVar
liftIO $ atomically $ do
modifyTVar' (bsOrderMap brState) (M.insert (orderId order) order)
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert (orderId order) (TransactionId transactionId))
resp <- readTMVar respVar
putTMVar resp ResponseOrderSubmitted
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
Just cb -> do
let notif = BackendOrderNotification (orderId order) Submitted
liftIO $ cb notif
_ -> pure ()
log Debug "TXMLConnector.WorkThread" $ "Inserting orderid: " <>
(T.pack . show) (orderId order) <> " <-> " <> (T.pack . show) transactionId
Just (TransaqResponseResult (ResponseFailure err)) -> do
brState <- asks brokerState
log Debug "TXMLConnector.WorkThread" $ "Order submission failure: " <> err
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
Just cb -> do
let notif = BackendOrderNotification (orderId order) Rejected
liftIO $ cb notif
_ -> pure ()
_ -> log Warning "TXMLConnector.WorkThread" "Unable to parse result"
pure Nothing
Right _ -> do
log Warning "TXMLConnector.WorkThread" "Expected result, got nothing"
pure Nothing
_ -> pure Nothing
_ -> pure Nothing
where
requestTimeout = 10
handleTrade transaqTrade = do
brState <- asks brokerState
trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState)
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
orderMap <- liftIO $ readTVarIO (bsOrderMap brState)
case maybeCb of
Just cb -> case BM.lookupR (ExchangeOrderId (tOrderNo transaqTrade)) trIdMap of
Just oid -> case M.lookup oid orderMap of
Just order -> do
let notif = BackendTradeNotification (fromTransaqTrade transaqTrade order)
log Debug "TXMLConnector.WorkThread" $ "Sending trade notification: " <> (T.pack . show) notif
liftIO $ cb notif
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order for trade: " <> (T.pack . show) transaqTrade
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to find order in ordermap: " <> (T.pack . show) transaqTrade
Nothing -> log Warning "TXMLConnector.WorkThread" $ "No callback for trade notification!"
fromTransaqTrade transaqTrade order =
Trade
{
tradeOrderId = orderId order
, tradePrice = fromDouble (tPrice transaqTrade)
, tradeQuantity = fromIntegral $ tQuantity transaqTrade
, tradeVolume = fromDouble $ tValue transaqTrade
, tradeVolumeCurrency = ""
, tradeOperation = fromDirection (tBuysell transaqTrade)
, tradeAccount = tClient transaqTrade <> "#" <> tUnion transaqTrade
, tradeSecurity = tBoard transaqTrade <> "#" <> tSecCode transaqTrade
, tradeTimestamp = tTimestamp transaqTrade
, tradeCommission = fromDouble $ tComission transaqTrade
, tradeSignalId = orderSignalId order
}
fromDirection Transaq.Buy = AT.Buy
fromDirection Transaq.Sell = AT.Sell
handleOrder orderUpdate = do
brState <- asks brokerState
trIdMap <- liftIO $ readTVarIO (bsOrderTransactionIdMap brState)
maybeCb <- liftIO $ readTVarIO (bsNotificationCallback brState)
case maybeCb of
Just cb -> case BM.lookupR (ExchangeOrderId (oOrderNo orderUpdate)) trIdMap <|>
BM.lookupR (TransactionId (fromIntegral $ oTransactionId orderUpdate)) trIdMap of
Just oid -> do
let notif = BackendOrderNotification oid (orderStateFromTransaq orderUpdate)
log Debug "TXMLConnector.WorkThread" $ "Sending order notification: " <> (T.pack . show) notif
liftIO $ atomically $ do
m <- readTVar (bsOrderTransactionIdMap brState)
when (BM.notMemberR (ExchangeOrderId (oOrderNo orderUpdate)) m) $ do
modifyTVar' (bsOrderTransactionIdMap brState) (BM.insert oid (ExchangeOrderId $ oOrderNo orderUpdate))
liftIO $ cb notif
_ -> log Warning "TXMLConnector.WorkThread" "Unable to find order for order notification"
Nothing -> log Warning "TXMLConnector.WorkThread" "No callback for order notification"
orderStateFromTransaq orderUpdate =
case oStatus orderUpdate of
OrderActive -> Submitted
OrderCancelled -> Cancelled
OrderDenied -> Rejected
OrderDisabled -> Rejected
OrderExpired -> Cancelled
OrderFailed -> Rejected
OrderForwarding -> Unsubmitted
OrderInactive -> OrderError
OrderMatched -> Executed
OrderRefused -> Rejected
OrderRemoved -> Rejected
OrderWait -> Unsubmitted
OrderWatching -> Unsubmitted
_ -> OrderError
checkRequestTimeout = do
now <- liftIO getCurrentTime
tsVar <- asks requestTimestamp
ts <- liftIO $ readTVarIO tsVar
when (now `diffUTCTime` ts >= requestTimeout) $ do
resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar
case resp of
Just tmvar -> do
log Warning "TXMLConnector.WorkThread" "Request timeout"
liftIO . atomically . putTMVar tmvar $ ResponseTimeout
_ -> pure ()
insertToTickMap = insertTick
handleGetInfo :: (MonadIO m,
MonadReader Env m,
MonadTXML m,
HasLog Env Message m) => m (Maybe ConnectionStage)
handleGetInfo = do
queue <- asks transaqQueue
cfg <- asks config
item <- liftIO . atomically $ readTBQueue queue
conn <- asks serverConnected
case item of
TransaqResponseServerStatus serverStatus -> do
log Warning "TXMLConnector.WorkThread" $ "Incoming server status: " <> (T.pack . show) serverStatus
case state serverStatus of
Transaq.Disconnected -> do
log Warning "TXMLConnector.WorkThread" "Server disconnected"
pure $ Just StageConnection
Transaq.Connected -> do
log Info "TXMLConnector.WorkThread" "Server connected"
void $ sendCommand $ toXml $
CommandChangePass (transaqPassword cfg) "goobaka12"
v <- makeSubscriptions cfg
case v of
Left _ -> do
log Warning "TXMLConnector.WorkThread" "Unable to subscribe"
void $ sendCommand $ toXml CommandDisconnect
pure $ Just StageConnection
Right _ -> do
log Info "TXMLConnector.WorkThread" "Subscriptions done"
pure $ Just StageConnected
Transaq.Error _ -> do
log Warning "TXMLConnector.WorkThread" "Connection error"
liftIO . atomically $ writeTVar conn StageConnection
void $ sendCommand $ toXml $ CommandDisconnect
pure $ Just StageConnection
TransaqResponseResult result -> do
log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result
pure Nothing
TransaqResponseCandles candles -> do
log Debug "TXMLConnector.WorkThread" $
"Incoming candles message: " <> (T.pack . show . length . Transaq.cCandles $ candles)
pure Nothing
TransaqResponseMarkets (ResponseMarkets markets) -> do
log Debug "TXMLConnector.WorkThread" "Incoming markets:"
forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m)
pure Nothing
TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do
ckMap <- asks candleKindMap
log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds
forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k)))
pure Nothing
TransaqResponseSecurities (ResponseSecurities securities) -> do
tisH <- asks tisHandle
let tickerInfos = securityToTickerInfo <$> securities
log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities
forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker)
forM_ tickerInfos (liftIO . putTickerInfo tisH)
pure Nothing
TransaqResponseSecInfo secInfo -> do
log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo
pure Nothing
TransaqResponseClient (ResponseClient clientData) -> do
log Debug "TXMLConnector.WorkThread" $
"Incoming client data: " <> (T.pack . show) (cClientId clientData) <> "#" <> (T.pack . show) (cUnion clientData)
pure Nothing
_ -> pure Nothing
where
makeSubscriptions config = sendCommand . toXml $ cmdSubscription config
cmdSubscription config =
CommandSubscribe
{
alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config),
quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config),
quotes = fmap subscriptionToSecurityId (quotesSubscriptions config)
}
subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code
handleUnconnected :: (MonadIO m,
MonadReader Env m,
MonadTXML m,
HasLog Env Message m) => m (Maybe ConnectionStage)
handleUnconnected = do
cfg <- asks config
log Debug "TXMLConnector.WorkThread" "Sending connect command"
v <- sendCommand . toXml $ cmdConnect cfg
case v of
Left _ -> do
log Warning "TXMLConnector.WorkThread" "Unable to connect"
void $ sendCommand $ toXml CommandDisconnect
liftIO $ threadDelay reconnectionDelay
queue <- asks transaqQueue
void $ liftIO $ atomically $ flushTBQueue queue
pure Nothing
Right _ -> do
log Info "TXMLConnector.WorkThread" "Connected"
pure $ Just StageGetInfo
where
reconnectionDelay = 1000 * 1000 * 10
cmdConnect cfg = CommandConnect
{
login = transaqLogin cfg,
password = transaqPassword cfg,
host = transaqHost cfg,
port = transaqPort cfg,
language = LanguageEn,
autopos = False,
micexRegisters = True,
milliseconds = True,
utcTime = True,
proxy = (),
rqDelay = Nothing,
sessionTimeout = Nothing,
requestTimeout = Nothing,
pushULimits = Nothing,
pushPosEquity = Nothing
}
allTradeToTick :: AllTradesTrade -> Tick
allTradeToTick att =
Tick
{
security = attBoard att <> "#" <> attSecCode att,
datatype = LastTradePrice,
timestamp = attTimestamp att,
value = fromDouble $ attPrice att,
volume = fromIntegral $ attQuantity att
}
quotationToTicks :: UTCTime -> Quotation -> [Tick]
quotationToTicks timestamp q =
let security = qBoard q <> "#" <> qSeccode q in
[
Tick
{
security = security,
datatype = BestBid,
timestamp = timestamp,
value = fromDouble $ qBid q,
volume = fromIntegral $ qQuantity q
},
Tick
{
security = security,
datatype = BestOffer,
timestamp = timestamp,
value = fromDouble $ qOffer q,
volume = fromIntegral $ qQuantity q
}]
securityToTickerInfo :: Security -> TickerInfo
securityToTickerInfo sec =
TickerInfo
{
tiTicker = sBoard sec <> "#" <> sSeccode sec
, tiLotSize = sLotSize sec
, tiTickSize = sMinStep sec
}
parseSecurityId :: TickerId -> Maybe SecurityId
parseSecurityId tickerId = case T.findIndex (== '#') tickerId of
Just ix -> Just $ SecurityId (T.take ix tickerId) (T.drop (ix + 1) tickerId)
Nothing -> Nothing
makeTickerId :: SecurityId -> TickerId
makeTickerId sec = board sec <> "#" <> seccode sec
parseAccountId :: T.Text -> Maybe (T.Text, T.Text)
parseAccountId accId = case T.findIndex (== '#') accId of
Just ix -> Just (T.take ix accId, T.drop (ix + 1) accId)
Nothing -> Nothing
mkNewOrderCommand :: Order -> Maybe Transaq.CommandNewOrder
mkNewOrderCommand order =
case parseSecurityId (orderSecurity order) of
Just secId ->
case parseAccountId (orderAccountId order) of
Just (client, union) -> do
case orderPrice order of
Market -> Just $ Transaq.CommandNewOrder
{
security = secId
, client = client
, unionCode = union
, price = 0
, quantity = fromInteger $ orderQuantity order
, buysell = toDirection $ orderOperation order
, bymarket = True
, brokerRef = T.empty
, unfilled = UnfilledPutInQueue
, usecredit = False
, nosplit = False
}
Limit price -> Just $ Transaq.CommandNewOrder
{
security = secId
, client = client
, unionCode = union
, price = toDouble price
, quantity = fromInteger $ orderQuantity order
, buysell = toDirection $ orderOperation order
, bymarket = False
, brokerRef = T.empty
, unfilled = UnfilledPutInQueue
, usecredit = False
, nosplit = False
}
_ -> Nothing
_ -> Nothing
_ -> Nothing
where
toDirection AT.Buy = Transaq.Buy
toDirection AT.Sell = Transaq.Sell
candleToBar :: SecurityId -> Candle -> Bar
candleToBar sec candle =
Bar
{
barSecurity = makeTickerId sec
, barTimestamp = cTimestamp candle
, barOpen = fromDouble (cOpen candle)
, barHigh = fromDouble (cHigh candle)
, barLow = fromDouble (cLow candle)
, barClose = fromDouble (cClose candle)
, barVolume = fromIntegral $ cVolume candle
}

21
src/Win32/TXML.hs

@ -1,3 +1,5 @@
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module Win32.TXML module Win32.TXML
( (
@ -10,6 +12,7 @@ module Win32.TXML
, LogLevel(..) , LogLevel(..)
) where ) where
import Control.Monad.IO.Class (MonadIO (..))
import qualified Data.ByteString.Char8 as BS import qualified Data.ByteString.Char8 as BS
import qualified Data.Text as T import qualified Data.Text as T
import Data.Text.Encoding import Data.Text.Encoding
@ -17,6 +20,8 @@ import Data.Text.Encoding.Error
import Foreign.C.String import Foreign.C.String
import Foreign.C.Types import Foreign.C.Types
import Foreign.Ptr import Foreign.Ptr
import TXML (Callback (..), FreeableCallback (..),
LogLevel (..))
foreign import ccall "Initialize" c_Initialize :: CString -> CInt -> IO CString foreign import ccall "Initialize" c_Initialize :: CString -> CInt -> IO CString
foreign import ccall "UnInitialize" c_UnInitialize :: IO CString foreign import ccall "UnInitialize" c_UnInitialize :: IO CString
@ -28,13 +33,10 @@ foreign import ccall "FreeMemory" c_FreeMemory :: CString -> IO CBool
foreign import ccall "wrapper" createCallbackPtr :: foreign import ccall "wrapper" createCallbackPtr ::
(CString -> IO CBool) -> IO (FunPtr (CString -> IO CBool)) (CString -> IO CBool) -> IO (FunPtr (CString -> IO CBool))
data LogLevel = newtype WrappedCallback = WrappedCallback { unCallback :: FunPtr (CString -> IO CBool)}
Debug
| Info
| Warning
deriving (Show, Eq, Ord)
newtype Callback = Callback { unCallback :: FunPtr (CString -> IO CBool)} instance (MonadIO m) => FreeableCallback m WrappedCallback where
freeCallback = liftIO . freeHaskellFunPtr . unCallback
logLevelToInt :: LogLevel -> CInt logLevelToInt :: LogLevel -> CInt
logLevelToInt Debug = 3 logLevelToInt Debug = 3
@ -78,7 +80,7 @@ sendCommand cmdData = do
BS.useAsCString (encodeUtf8 cmdData) $ \fpcstr -> BS.useAsCString (encodeUtf8 cmdData) $ \fpcstr ->
c_SendCommand fpcstr >>= rawStringToResult c_SendCommand fpcstr >>= rawStringToResult
setCallback :: (T.Text -> IO Bool) -> IO (Maybe Callback) setCallback :: (T.Text -> IO Bool) -> IO (Maybe (Callback IO))
setCallback callback = do setCallback callback = do
wrappedCallback <- createCallbackPtr (\x -> do wrappedCallback <- createCallbackPtr (\x -> do
packed <- BS.packCString x packed <- BS.packCString x
@ -87,7 +89,7 @@ setCallback callback = do
packed)) packed))
ret <- c_SetCallback wrappedCallback ret <- c_SetCallback wrappedCallback
if ret /= 0 if ret /= 0
then return . Just . Callback $ wrappedCallback then return . Just . MkCallback $ WrappedCallback wrappedCallback
else do else do
freeHaskellFunPtr wrappedCallback freeHaskellFunPtr wrappedCallback
return Nothing return Nothing
@ -95,6 +97,3 @@ setCallback callback = do
boolToCBool False = 0 boolToCBool False = 0
boolToCBool True = 1 boolToCBool True = 1
freeCallback :: Callback -> IO ()
freeCallback = freeHaskellFunPtr . unCallback

5
test/Test/TXMLConnector.hs

@ -0,0 +1,5 @@
module Test.TXMLConnector
(
) where

7
transaq-connector.cabal

@ -25,10 +25,12 @@ executable transaq-connector
, Version , Version
, TXML , TXML
, TXMLConnector , TXMLConnector
, TXMLConnector.Internal
, TickTable , TickTable
, FSM , FSM
default-extensions: OverloadedStrings default-extensions: OverloadedStrings
, MultiWayIf , MultiWayIf
, MultiParamTypeClasses
default-language: Haskell2010 default-language: Haskell2010
build-depends: base >= 4.7 && < 5 build-depends: base >= 4.7 && < 5
, dhall , dhall
@ -78,7 +80,8 @@ test-suite transaq-connector-test
main-is: Spec.hs main-is: Spec.hs
other-modules: Test.TickTable other-modules: Test.TickTable
, Test.FSM , Test.FSM
, TXMLConnector
, TXMLConnector.Internal
, FSM , FSM
, TickTable , TickTable
@ -114,4 +117,4 @@ test-suite transaq-connector-test
, network-uri , network-uri
default-extensions: OverloadedStrings default-extensions: OverloadedStrings
, MultiWayIf , MultiWayIf
, MultiParamTypeClasses

Loading…
Cancel
Save