Browse Source

Formatting

master
Denis Tereshkin 2 years ago
parent
commit
b107546afa
  1. 102
      src/HistoryProviderServer.hs
  2. 173
      src/TXMLConnector.hs
  3. 92
      src/TXMLConnector/Internal.hs

102
src/HistoryProviderServer.hs

@ -12,51 +12,48 @@ module HistoryProviderServer
, withHistoryProviderServer , withHistoryProviderServer
) where ) where
import ATrade.Logging (Message, import ATrade.Logging (Message,
Severity (Debug, Info, Warning), Severity (Debug, Info, Warning),
log) log)
import ATrade.Types (Bar (..), BarTimeframe (..), import ATrade.Types (Bar (..), BarTimeframe (..),
TickerId, toDouble) TickerId, toDouble)
import Colog (HasLog (getLogAction, setLogAction), import Colog (HasLog (getLogAction, setLogAction),
LogAction (LogAction, unLogAction)) LogAction (LogAction, unLogAction))
import Control.Concurrent (ThreadId, forkIO) import Control.Concurrent (ThreadId, forkIO)
import Control.Concurrent.STM (TVar, atomically, newTVarIO, import Control.Concurrent.STM (TVar, atomically, newTVarIO,
putTMVar, readTVarIO, takeTMVar, readTVarIO, writeTVar)
writeTVar) import Control.Exception (bracket)
import Control.Concurrent.STM.TMVar (TMVar) import Control.Monad (forM_, void)
import Control.Exception (bracket) import Control.Monad.Extra (whileM)
import Control.Monad (forM_, void) import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Extra (whileM) import Control.Monad.Reader (MonadReader, asks)
import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.Trans.Reader (ReaderT (runReaderT))
import Control.Monad.Reader (MonadReader, asks) import Data.Aeson (eitherDecode)
import Control.Monad.Trans.Reader (ReaderT (runReaderT)) import qualified Data.Aeson.KeyMap as KM
import Data.Aeson (FromJSON (..), eitherDecode, import Data.Aeson.Types as Aeson
withObject, (.:)) import Data.Attoparsec.Text as Attoparsec hiding (count)
import qualified Data.Aeson.KeyMap as KM import Data.Binary.Put (putDoublele, putWord64le, runPut)
import Data.Aeson.Types as Aeson import qualified Data.ByteString as B
import Data.Attoparsec.Text as Attoparsec import qualified Data.ByteString.Lazy as BL
import Data.Binary.Put (putDoublele, putWord64le, runPut) import qualified Data.List as L
import qualified Data.ByteString as B import Data.List.NonEmpty (NonEmpty ((:|)))
import qualified Data.ByteString.Lazy as BL import qualified Data.Text as T
import qualified Data.List as L import Data.Text.Encoding (encodeUtf8)
import Data.List.NonEmpty (NonEmpty ((:|))) import Data.Time (UTCTime (UTCTime),
import qualified Data.Text as T fromGregorianValid)
import Data.Text.Encoding (encodeUtf8) import Data.Time.Clock (diffUTCTime, getCurrentTime,
import Data.Time (Day, UTCTime (UTCTime), secondsToDiffTime)
fromGregorianValid) import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds)
import Data.Time.Clock (diffUTCTime, getCurrentTime, import Prelude hiding (log)
secondsToDiffTime) import System.ZMQ4 (Context, Router (Router), bind,
import Data.Time.Clock.POSIX (utcTimeToPOSIXSeconds) close, receiveMulti, sendMulti,
import Prelude hiding (log) socket)
import System.ZMQ4 (Context, Router (Router), bind, import TickerInfoServer (TickerInfoServerHandle,
close, receive, receiveMulti, getAllTickers)
sendMulti, socket, withSocket) import TXMLConnector (TXMLConnectorHandle, makeRequest)
import TickerInfoServer (TickerInfoServerHandle, import TXMLConnector.Internal (HistoryRequest (..),
getAllTickers) HistoryResponse (..), Request (..),
import TXMLConnector (TXMLConnectorHandle, makeRequest) Response (..))
import TXMLConnector.Internal (HistoryRequest (..),
HistoryResponse (..),
Request (..), Response (..))
data HistoryProviderServerHandle = data HistoryProviderServerHandle =
HistoryProviderServerHandle HistoryProviderServerHandle
@ -76,17 +73,6 @@ data Period =
PeriodMonth PeriodMonth
deriving (Eq, Show) deriving (Eq, Show)
parsePeriod :: T.Text -> Maybe Period
parsePeriod "M1" = Just Period1Min
parsePeriod "M5" = Just Period5Min
parsePeriod "M15" = Just Period15Min
parsePeriod "M30" = Just Period30Min
parsePeriod "H1" = Just PeriodHour
parsePeriod "D" = Just PeriodDay
parsePeriod "W" = Just PeriodWeek
parsePeriod "M" = Just PeriodMonth
parsePeriod _ = Nothing
periodToSeconds :: Period -> Int periodToSeconds :: Period -> Int
periodToSeconds Period1Min = 60 periodToSeconds Period1Min = 60
periodToSeconds Period5Min = 60 * 5 periodToSeconds Period5Min = 60 * 5
@ -237,13 +223,13 @@ workThread = do
Right request -> do Right request -> do
response <- handleRequest sender request response <- handleRequest sender request
sendResponseWithDelimiter sock sender response sendResponseWithDelimiter sock sender response
Left err -> log Warning "HistoryProviderServer.WorkThread" $ "Unable to parse request" Left _ -> log Warning "HistoryProviderServer.WorkThread" "Unable to parse request"
(sender:rawRq:_) -> (sender:rawRq:_) ->
case eitherDecode $ BL.fromStrict rawRq of case eitherDecode $ BL.fromStrict rawRq of
Right request -> do Right request -> do
response <- handleRequest sender request response <- handleRequest sender request
sendResponse sock sender response sendResponse sock sender response
Left err -> log Warning "HistoryProviderServer.WorkThread" $ "Unable to parse request" Left _ -> log Warning "HistoryProviderServer.WorkThread" "Unable to parse request"
_ -> log Warning "HistoryProviderServer.WorkThread" "Unable to parse request" _ -> log Warning "HistoryProviderServer.WorkThread" "Unable to parse request"
liftIO $ readTVarIO runVar liftIO $ readTVarIO runVar
liftIO $ close sock liftIO $ close sock

173
src/TXMLConnector.hs

@ -13,8 +13,12 @@ module TXMLConnector
, makeBrokerBackend , makeBrokerBackend
) where ) where
import ATrade.Broker.Backend (BrokerBackend (..),
BrokerBackendNotification (..))
import ATrade.Logging (Message, Severity (..), log, import ATrade.Logging (Message, Severity (..), log,
logWith) logWith)
import ATrade.QuoteSource.Server (QuoteSourceServerData (..))
import ATrade.Types (Order, OrderId)
import Colog (HasLog (getLogAction, setLogAction), import Colog (HasLog (getLogAction, setLogAction),
LogAction (LogAction, unLogAction)) LogAction (LogAction, unLogAction))
import Config (SubscriptionConfig (SubscriptionConfig), import Config (SubscriptionConfig (SubscriptionConfig),
@ -23,6 +27,7 @@ import Config (SubscriptionConfig (Subscriptio
transaqLogPath, transaqLogin, transaqLogPath, transaqLogin,
transaqPassword, transaqPort) transaqPassword, transaqPort)
import Control.Concurrent (ThreadId, forkIO, threadDelay) import Control.Concurrent (ThreadId, forkIO, threadDelay)
import Control.Concurrent.BoundedChan (BoundedChan)
import Control.Concurrent.STM (TVar, atomically, modifyTVar', import Control.Concurrent.STM (TVar, atomically, modifyTVar',
newEmptyTMVar, newEmptyTMVarIO, newEmptyTMVar, newEmptyTMVarIO,
newTVarIO, orElse, putTMVar, newTVarIO, orElse, putTMVar,
@ -33,89 +38,30 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar',
import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue, import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue,
newTBQueue, readTBQueue, newTBQueue, readTBQueue,
writeTBQueue) writeTBQueue)
import Control.Concurrent.STM.TMVar (TMVar)
import Control.Monad (forM_, forever, void, when) import Control.Monad (forM_, forever, void, when)
import Control.Monad.Extra (whileM) import Control.Monad.Extra (whileM)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (ReaderT (runReaderT))
import Control.Monad.Reader.Class (MonadReader)
import qualified Data.Bimap as BM import qualified Data.Bimap as BM
import qualified Data.Map.Strict as M
import Data.Maybe (mapMaybe) import Data.Maybe (mapMaybe)
import qualified Data.Text as T import qualified Data.Text as T
import Data.Time.Clock (UTCTime, getCurrentTime)
import qualified Deque.Strict as D import qualified Deque.Strict as D
import GHC.Exts (IsList (..))
import Prelude hiding (log)
import Text.XML.Light.Input (parseXML) import Text.XML.Light.Input (parseXML)
import Text.XML.Light.Types (Content (Elem), import Text.XML.Light.Types (Content (Elem),
Element (elName), Element (elName),
QName (qName)) QName (qName))
import TickTable (TickTable, insertTick, import TickerInfoServer (TickerInfoServerHandle)
lookupTick, newTickTable) import TickTable (newTickTable)
import Transaq (AllTradesTrade (..), import Transaq (TransaqResponse)
Candle (..), ClientData (..),
CommandChangePass (..),
CommandConnect (..),
CommandDisconnect (CommandDisconnect),
CommandGetHistoryData (CommandGetHistoryData),
CommandServerStatus (..),
CommandSubscribe (..),
ConnectionState (Disconnected),
Language (LanguageEn),
MarketInfo (..),
OrderNotification (..),
OrderStatus (..),
Quotation (..),
ResponseAllTrades (ResponseAllTrades),
ResponseCandleKinds (ResponseCandleKinds),
ResponseCandles (..),
ResponseCandlesStatus (StatusPending),
ResponseClient (ResponseClient),
ResponseMarkets (ResponseMarkets),
ResponseOrders (ResponseOrders),
ResponseQuotations (ResponseQuotations),
ResponseQuotes (ResponseQuotes),
ResponseResult (..),
ResponseSecurities (ResponseSecurities),
ResponseTrades (ResponseTrades),
Security (..), SecurityId (..),
TradeNotification (..),
TransaqCommand (toXml),
TransaqResponse (..),
TransaqResponse (..),
TransaqResponseC (fromXml),
UnfilledAction (..),
kCandleKindId, kPeriod, state)
import TXML (LogLevel, MonadTXML, import TXML (LogLevel, MonadTXML,
freeCallback, initialize, initialize, sendCommand,
sendCommand, setCallback) setCallback)
import ATrade.Broker.Backend (BrokerBackend (..),
BrokerBackendNotification (..))
import ATrade.QuoteSource.Server (QuoteSourceServerData (..))
import ATrade.Types (Bar (..),
BarTimeframe (unBarTimeframe),
DataType (BestBid, BestOffer, LastTradePrice),
Order (..), OrderId,
OrderPrice (..),
OrderState (..), Tick (..),
TickerId, Trade (..),
fromDouble, toDouble)
import qualified ATrade.Types as AT
import Colog.Monad (WithLog)
import Control.Applicative ((<|>))
import Control.Concurrent.BoundedChan (BoundedChan, writeChan)
import Control.Concurrent.STM.TMVar (TMVar)
import Control.Error (headMay)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (ReaderT (runReaderT))
import Control.Monad.Reader.Class (MonadReader, asks)
import Data.Int (Int64)
import qualified Data.Map.Strict as M
import Data.Time.Clock (UTCTime, diffUTCTime,
getCurrentTime)
import FSM (FSMCallback (..),
FSMState (isTerminalState),
makeFsm, runFsm)
import GHC.Exts (IsList (..))
import Prelude hiding (log)
import TickerInfoServer (TickerInfo (..),
TickerInfoServerHandle,
putTickerInfo)
import qualified Transaq
import qualified TXML import qualified TXML
import TXMLConnector.Internal (BrokerState (..), import TXMLConnector.Internal (BrokerState (..),
ConnectionStage (..), Env (..), ConnectionStage (..), Env (..),
@ -128,10 +74,10 @@ import qualified Win32.TXML as TXMLImpl
import qualified Linux.TXML as TXMLImpl import qualified Linux.TXML as TXMLImpl
#endif #endif
data ConnectionParams = data ConnectionParams =
ConnectionParams ConnectionParams
{ { cpLogin :: T.Text
cpLogin :: T.Text
, cpPassword :: T.Text , cpPassword :: T.Text
, cpHost :: T.Text , cpHost :: T.Text
, cpPort :: Int , cpPort :: Int
@ -142,8 +88,7 @@ data ConnectionParams =
data TXMLConnectorHandle = data TXMLConnectorHandle =
TXMLConnectorHandle TXMLConnectorHandle
{ { threadId :: ThreadId
threadId :: ThreadId
, notificationQueue :: TBQueue TransaqResponse , notificationQueue :: TBQueue TransaqResponse
, hRequestVar :: TMVar Request , hRequestVar :: TMVar Request
, hResponseVar :: TMVar (TMVar Response) , hResponseVar :: TMVar (TMVar Response)
@ -165,71 +110,69 @@ instance HasLog Env Message App where
getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) } getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) }
setLogAction _ env = env -- fuck it setLogAction _ env = env -- fuck it
start :: start ::
LogAction IO Message LogAction IO Message
-> TransaqConnectorConfig -> TransaqConnectorConfig
-> BoundedChan QuoteSourceServerData -> BoundedChan QuoteSourceServerData
-> TickerInfoServerHandle -> TickerInfoServerHandle
-> IO TXMLConnectorHandle -> IO TXMLConnectorHandle
start logger config qssChannel tisH = do start logger' config' qssChannel' tisH = do
logWith logger Info "TXMLConnector" "Starting" logWith logger' Info "TXMLConnector" "Starting"
notificationQueue <- atomically $ newTBQueue 50000 notificationQueue' <- atomically $ newTBQueue 50000
tickTable <- newTickTable tickTable <- newTickTable
requestVar <- newEmptyTMVarIO requestVar' <- newEmptyTMVarIO
responseVar <- newEmptyTMVarIO responseVar' <- newEmptyTMVarIO
currentCandles <- newTVarIO [] currentCandles' <- newTVarIO []
serverConnected <- liftIO $ newTVarIO StageConnection serverConnected' <- liftIO $ newTVarIO StageConnection
candleKindMap <- newTVarIO M.empty candleKindMap' <- newTVarIO M.empty
requestTimestamp <- getCurrentTime >>= newTVarIO requestTimestamp' <- getCurrentTime >>= newTVarIO
orderMap <- newTVarIO M.empty orderMap <- newTVarIO M.empty
notificationCallback <- newTVarIO Nothing notificationCallback <- newTVarIO Nothing
orderTransactionIdMap <- newTVarIO BM.empty orderTransactionIdMap <- newTVarIO BM.empty
pendingOrders <- newTVarIO (fromList []) pendingOrders <- newTVarIO (fromList [])
runVar <- newEmptyTMVarIO runVar' <- newEmptyTMVarIO
timerVar <- newEmptyTMVarIO timerVar' <- newEmptyTMVarIO
let brokerState = let brokerState' =
BrokerState BrokerState
{ { bsOrderTransactionIdMap = orderTransactionIdMap
bsOrderTransactionIdMap = orderTransactionIdMap
, bsNotificationCallback = notificationCallback , bsNotificationCallback = notificationCallback
, bsOrderMap = orderMap , bsOrderMap = orderMap
, bsPendingOrders = pendingOrders , bsPendingOrders = pendingOrders
} }
let env = let env =
Env Env
{ { qssChannel = qssChannel'
qssChannel = qssChannel
, tisHandle = tisH , tisHandle = tisH
, requestVar = requestVar , requestVar = requestVar'
, responseVar = responseVar , responseVar = responseVar'
, requestTimestamp = requestTimestamp , requestTimestamp = requestTimestamp'
, currentCandles = currentCandles , currentCandles = currentCandles'
, tickMap = tickTable , tickMap = tickTable
, transaqQueue = notificationQueue , transaqQueue = notificationQueue'
, logger = logger , logger = logger'
, config = config , config = config'
, serverConnected = serverConnected , serverConnected = serverConnected'
, candleKindMap = candleKindMap , candleKindMap = candleKindMap'
, brokerState = brokerState , brokerState = brokerState'
, runVar = runVar , runVar = runVar'
, timerVar = timerVar , timerVar = timerVar'
} }
threadId <- forkIO $ (runReaderT . unApp) workThread env workThreadId <- forkIO $ (runReaderT . unApp) workThread env
return $ TXMLConnectorHandle return $ TXMLConnectorHandle
{ { threadId = workThreadId
threadId = threadId , notificationQueue = notificationQueue'
, notificationQueue = notificationQueue , hRequestVar = requestVar'
, hRequestVar = requestVar , hResponseVar = responseVar'
, hResponseVar = responseVar , hRequestTimestamp = requestTimestamp'
, hRequestTimestamp = requestTimestamp
, hNotificationCallback = notificationCallback , hNotificationCallback = notificationCallback
, hRunVar = runVar , hRunVar = runVar'
} }
stop :: TXMLConnectorHandle -> IO () stop :: TXMLConnectorHandle -> IO ()
stop h = atomically $ putTMVar (hRunVar h) () stop h = atomically $ putTMVar (hRunVar h) ()
brSubmitOrder :: TXMLConnectorHandle -> Order -> IO () brSubmitOrder :: TXMLConnectorHandle -> Order -> IO ()
brSubmitOrder h order = void $ makeRequest h (RequestSubmitOrder order) brSubmitOrder h order = void $ makeRequest h (RequestSubmitOrder order)
@ -240,8 +183,8 @@ brSetNotificationCallback :: TXMLConnectorHandle -> Maybe (BrokerBackendNotifica
brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h) cb brSetNotificationCallback h cb = atomically $ writeTVar (hNotificationCallback h) cb
makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend makeBrokerBackend :: TXMLConnectorHandle -> T.Text -> BrokerBackend
makeBrokerBackend h account = makeBrokerBackend h accountId =
BrokerBackend [account] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h) BrokerBackend [accountId] (brSetNotificationCallback h) (brSubmitOrder h) (brCancelOrder h) (TXMLConnector.stop h)
makeRequest :: TXMLConnectorHandle -> Request -> IO Response makeRequest :: TXMLConnectorHandle -> Request -> IO Response
makeRequest h request = do makeRequest h request = do

92
src/TXMLConnector/Internal.hs

@ -1,8 +1,7 @@
{-# LANGUAGE BangPatterns #-} {-# LANGUAGE BangPatterns #-}
{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-} {-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-} {-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE MultiParamTypeClasses #-}
module TXMLConnector.Internal module TXMLConnector.Internal
( (
@ -16,10 +15,8 @@ module TXMLConnector.Internal
, HistoryResponse(..) , HistoryResponse(..)
) where ) where
import ATrade.Logging (Message, Severity (..), log, import ATrade.Logging (Message, Severity (..), log)
logWith) import Colog (HasLog, LogAction (LogAction))
import Colog (HasLog (getLogAction, setLogAction),
LogAction (LogAction, unLogAction))
import Config (SubscriptionConfig (SubscriptionConfig), import Config (SubscriptionConfig (SubscriptionConfig),
TransaqConnectorConfig (..), TransaqConnectorConfig (..),
transaqHost, transaqLogLevel, transaqHost, transaqLogLevel,
@ -36,7 +33,7 @@ import Control.Concurrent.STM (TVar, atomically, modifyTVar',
import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue, import Control.Concurrent.STM.TBQueue (TBQueue, flushTBQueue,
newTBQueue, readTBQueue, newTBQueue, readTBQueue,
writeTBQueue) writeTBQueue)
import Control.Monad (forever, void, when) import Control.Monad (forM_, forever, void, when)
import Control.Monad.Extra (whileM) import Control.Monad.Extra (whileM)
import qualified Data.Bimap as BM import qualified Data.Bimap as BM
import Data.Maybe (mapMaybe) import Data.Maybe (mapMaybe)
@ -103,7 +100,6 @@ import Control.Applicative ((<|>))
import Control.Concurrent.BoundedChan (BoundedChan, writeChan) import Control.Concurrent.BoundedChan (BoundedChan, writeChan)
import Control.Concurrent.STM.TMVar (TMVar) import Control.Concurrent.STM.TMVar (TMVar)
import Control.Error (headMay) import Control.Error (headMay)
import Control.Monad (forM_)
import Control.Monad.IO.Class (MonadIO (liftIO)) import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (ReaderT (runReaderT)) import Control.Monad.Reader (ReaderT (runReaderT))
import Control.Monad.Reader.Class (MonadReader, asks) import Control.Monad.Reader.Class (MonadReader, asks)
@ -199,9 +195,6 @@ data HistoryResponse =
} }
deriving (Show, Eq) deriving (Show, Eq)
newtype RespCallback =
RespCallback { unCallback :: T.Text -> IO Bool }
workThread :: (MonadIO m, workThread :: (MonadIO m,
MonadReader Env m, MonadReader Env m,
MonadTXML m, MonadTXML m,
@ -213,9 +206,8 @@ workThread = do
Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str
Right _ -> do Right _ -> do
queue <- asks transaqQueue queue <- asks transaqQueue
logger' <- asks logger rc' <- setCallback (parseAndWrite queue)
rc <- setCallback (parseAndWrite queue) case rc' of
case rc of
Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback"
Just cb -> do Just cb -> do
serverConnectionState <- asks serverConnected serverConnectionState <- asks serverConnected
@ -273,10 +265,10 @@ handleTransaqData transaqData = do
liftIO $ forM_ ticks (writeChan qssChan . QSSTick) liftIO $ forM_ ticks (writeChan qssChan . QSSTick)
liftIO $ forM_ ticks (insertTick tm) liftIO $ forM_ ticks (insertTick tm)
pure Nothing pure Nothing
TransaqResponseQuotations (ResponseQuotations quotations) -> do TransaqResponseQuotations (ResponseQuotations quotations') -> do
qssChan <- asks qssChannel qssChan <- asks qssChannel
now <- liftIO getCurrentTime now <- liftIO getCurrentTime
let ticks = concatMap (quotationToTicks now) quotations let ticks = concatMap (quotationToTicks now) quotations'
liftIO $ forM_ ticks (writeChan qssChan . QSSTick) liftIO $ forM_ ticks (writeChan qssChan . QSSTick)
liftIO $ forM_ ticks (insertTick tm) liftIO $ forM_ ticks (insertTick tm)
pure Nothing pure Nothing
@ -293,8 +285,7 @@ handleTransaqData transaqData = do
liftIO $ atomically $ do liftIO $ atomically $ do
candles <- readTVar cur candles <- readTVar cur
putTMVar tmvar $ ResponseHistory $ HistoryResponse putTMVar tmvar $ ResponseHistory $ HistoryResponse
{ { hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles)
hrBars = (candleToBar $ cSecurity respCandle) <$> (cCandles respCandle <> candles)
, hrMoreData = False , hrMoreData = False
} }
_ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var" _ -> log Warning "TXMLConnector.WorkThread" "Incoming candles without response var"
@ -308,7 +299,7 @@ handleTransaqData transaqData = do
log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities
forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker) forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker)
forM_ tickerInfos (liftIO . putTickerInfo tisH) forM_ tickerInfos (liftIO . putTickerInfo tisH)
makeSubscriptions cfg _ <- makeSubscriptions cfg
pure Nothing pure Nothing
_ -> pure Nothing _ -> pure Nothing
where where
@ -470,14 +461,13 @@ handleConnected = do
_ -> pure Nothing _ -> pure Nothing
where where
requestTimeout = 10 requestTimeoutValue = 10
checkRequestTimeout = do checkRequestTimeout = do
now <- liftIO getCurrentTime now <- liftIO getCurrentTime
tsVar <- asks requestTimestamp tsVar <- asks requestTimestamp
ts <- liftIO $ readTVarIO tsVar ts <- liftIO $ readTVarIO tsVar
when (now `diffUTCTime` ts >= requestTimeout) $ do when (now `diffUTCTime` ts >= requestTimeoutValue) $ do
resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar resp <- asks responseVar >>= liftIO . atomically . tryReadTMVar
case resp of case resp of
Just tmvar -> do Just tmvar -> do
@ -551,17 +541,15 @@ handleGetInfo = do
pure Nothing pure Nothing
_ -> pure Nothing _ -> pure Nothing
makeSubscriptions :: (MonadIO m, makeSubscriptions :: (MonadTXML m) => TransaqConnectorConfig -> m (Either T.Text ())
MonadTXML m, makeSubscriptions = sendCommand . toXml . cmdSubscription
HasLog Env Message m) => TransaqConnectorConfig -> m (Either T.Text ())
makeSubscriptions config = sendCommand . toXml $ cmdSubscription config
where where
cmdSubscription config = cmdSubscription config' =
CommandSubscribe CommandSubscribe
{ {
alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config), alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config'),
quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config), quotations = fmap subscriptionToSecurityId (quotationsSubscriptions config'),
quotes = fmap subscriptionToSecurityId (quotesSubscriptions config) quotes = fmap subscriptionToSecurityId (quotesSubscriptions config')
} }
subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code
@ -612,33 +600,30 @@ handleUnconnected = do
allTradeToTick :: AllTradesTrade -> Tick allTradeToTick :: AllTradesTrade -> Tick
allTradeToTick att = allTradeToTick att =
Tick Tick
{ { security = attBoard att <> "#" <> attSecCode att
security = attBoard att <> "#" <> attSecCode att, , datatype = LastTradePrice
datatype = LastTradePrice, , timestamp = attTimestamp att
timestamp = attTimestamp att, , value = fromDouble $ attPrice att
value = fromDouble $ attPrice att, , volume = fromIntegral $ attQuantity att
volume = fromIntegral $ attQuantity att
} }
quotationToTicks :: UTCTime -> Quotation -> [Tick] quotationToTicks :: UTCTime -> Quotation -> [Tick]
quotationToTicks timestamp q = quotationToTicks timestamp' q =
let security = qBoard q <> "#" <> qSeccode q in let security' = qBoard q <> "#" <> qSeccode q in
[ [
Tick Tick
{ { security = security'
security = security, , datatype = BestBid
datatype = BestBid, , timestamp = timestamp'
timestamp = timestamp, , value = fromDouble $ qBid q
value = fromDouble $ qBid q, , volume = fromIntegral $ qQuantity q
volume = fromIntegral $ qQuantity q
}, },
Tick Tick
{ { security = security'
security = security, , datatype = BestOffer
datatype = BestOffer, , timestamp = timestamp'
timestamp = timestamp, , value = fromDouble $ qOffer q
value = fromDouble $ qOffer q, , volume = fromIntegral $ qQuantity q
volume = fromIntegral $ qQuantity q
}] }]
securityToTickerInfo :: Security -> TickerInfo securityToTickerInfo :: Security -> TickerInfo
@ -663,7 +648,6 @@ parseAccountId accId = case T.findIndex (== '#') accId of
Just ix -> Just (T.take ix accId, T.drop (ix + 1) accId) Just ix -> Just (T.take ix accId, T.drop (ix + 1) accId)
Nothing -> Nothing Nothing -> Nothing
mkNewOrderCommand :: Order -> Maybe Transaq.CommandNewOrder mkNewOrderCommand :: Order -> Maybe Transaq.CommandNewOrder
mkNewOrderCommand order = mkNewOrderCommand order =
case parseSecurityId (orderSecurity order) of case parseSecurityId (orderSecurity order) of

Loading…
Cancel
Save