Browse Source

txmlconnector: refactor workthread in ReaderT

master
Denis Tereshkin 3 years ago
parent
commit
61cdf6cc28
  1. 238
      src/TXMLConnector.hs
  2. 1
      transaq-connector.cabal

238
src/TXMLConnector.hs

@ -1,4 +1,7 @@
{-# LANGUAGE DuplicateRecordFields #-} {-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedLabels #-} {-# LANGUAGE OverloadedLabels #-}
{-# LANGUAGE RecordWildCards #-} {-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE TypeApplications #-} {-# LANGUAGE TypeApplications #-}
@ -8,9 +11,10 @@ module TXMLConnector
start start
) where ) where
import ATrade.Logging (Message, Severity (..), import ATrade.Logging (Message, Severity (..), log,
logWith) logWith)
import Colog (LogAction) import Colog (HasLog (getLogAction, setLogAction),
LogAction (LogAction, unLogAction))
import Config (SubscriptionConfig (SubscriptionConfig), import Config (SubscriptionConfig (SubscriptionConfig),
TransaqConnectorConfig (..), TransaqConnectorConfig (..),
transaqHost, transaqLogLevel, transaqHost, transaqLogLevel,
@ -18,7 +22,8 @@ import Config (SubscriptionConfig (Subscriptio
transaqPassword, transaqPort) transaqPassword, transaqPort)
import Control.Concurrent (ThreadId, forkIO, threadDelay) import Control.Concurrent (ThreadId, forkIO, threadDelay)
import Control.Concurrent.STM (TVar, atomically, modifyTVar', import Control.Concurrent.STM (TVar, atomically, modifyTVar',
newTVarIO, readTVarIO, newEmptyTMVarIO, newTVarIO,
orElse, readTMVar, readTVarIO,
writeTVar) writeTVar)
import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue, import Control.Concurrent.STM.TBQueue (TBQueue, newTBQueue,
readTBQueue, writeTBQueue) readTBQueue, writeTBQueue)
@ -32,6 +37,7 @@ import Text.XML.Light.Types (Content (Elem),
import Transaq (AllTradesTrade (..), import Transaq (AllTradesTrade (..),
CommandConnect (..), CommandConnect (..),
CommandDisconnect (CommandDisconnect), CommandDisconnect (CommandDisconnect),
CommandGetHistoryData (CommandGetHistoryData),
CommandSubscribe (..), CommandSubscribe (..),
ConnectionState (Disconnected), ConnectionState (Disconnected),
Language (LanguageEn), Language (LanguageEn),
@ -48,20 +54,27 @@ import Transaq (AllTradesTrade (..),
TransaqResponse (..), TransaqResponse (..),
TransaqResponse (..), TransaqResponse (..),
TransaqResponseC (fromXml), TransaqResponseC (fromXml),
state) kCandleKindId, kPeriod, state)
import TXML (LogLevel, freeCallback, import TXML (LogLevel, freeCallback,
initialize, sendCommand, initialize, sendCommand,
setCallback) setCallback)
import ATrade.QuoteSource.Server (QuoteSourceServerData (..)) import ATrade.QuoteSource.Server (QuoteSourceServerData (..))
import ATrade.Types (BarTimeframe, DataType (BestBid, BestOffer, LastTradePrice), import ATrade.Types (Bar,
BarTimeframe (unBarTimeframe),
DataType (BestBid, BestOffer, LastTradePrice),
Tick (..), TickerId, Tick (..), TickerId,
fromDouble) fromDouble)
import Colog.Monad (WithLog)
import Control.Concurrent.BoundedChan (BoundedChan, writeChan) import Control.Concurrent.BoundedChan (BoundedChan, writeChan)
import Control.Concurrent.STM.TMVar (TMVar) import Control.Concurrent.STM.TMVar (TMVar)
import Control.Monad (forM_) import Control.Monad (forM_)
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (ReaderT (runReaderT))
import Control.Monad.Reader.Class (MonadReader, asks)
import qualified Data.Map.Strict as M import qualified Data.Map.Strict as M
import Data.Time.Clock (UTCTime, getCurrentTime) import Data.Time.Clock (UTCTime, getCurrentTime)
import Prelude hiding (log)
import TickerInfoServer (TickerInfo (..), import TickerInfoServer (TickerInfo (..),
TickerInfoServerHandle, TickerInfoServerHandle,
putTickerInfo) putTickerInfo)
@ -83,31 +96,70 @@ data ConnectionParams =
data HistoryRequest = data HistoryRequest =
HistoryRequest HistoryRequest
{ {
hrTIckerId :: TickerId hrTickerId :: TickerId
, hrTimeframe :: BarTimeframe , hrTimeframe :: BarTimeframe
, hrStartTime :: UTCTime , hrCount :: Int
, hrEndTime :: UTCTime , hrReset :: Bool
} deriving (Show, Eq, Ord) } deriving (Show, Eq, Ord)
data Request = newtype Request =
Request HistoryRequest Request HistoryRequest
deriving (Show, Eq, Ord) deriving (Show, Eq, Ord)
data HistoryResponse =
HistoryResponse
{
hrBars :: [Bar]
, hrMoreData :: Bool
}
deriving (Show, Eq)
newtype Response =
Response HistoryResponse
deriving (Show, Eq)
data TXMLConnectorHandle = data TXMLConnectorHandle =
TXMLConnectorHandle TXMLConnectorHandle
{ {
threadId :: ThreadId, threadId :: ThreadId,
notificationQueue :: TBQueue TransaqResponse, notificationQueue :: TBQueue TransaqResponse,
requestVar :: TMVar Request, hRequestVar :: TMVar Request,
responseVar :: TMVar Response hResponseVar :: TMVar Response
} }
data ConnectionStage = StageConnection | StageGetInfo | StageConnected data ConnectionStage = StageConnection | StageGetInfo | StageConnected
deriving (Eq, Show, Ord) deriving (Eq, Show, Ord)
data MainQueueData =
MainQueueTransaqData TransaqResponse
| MainQueueRequest Request
deriving (Eq, Show, Ord)
data TickKey = TickKey TickerId DataType data TickKey = TickKey TickerId DataType
deriving (Show, Ord, Eq) deriving (Show, Ord, Eq)
data Env =
Env
{
qssChannel :: BoundedChan QuoteSourceServerData
, tisHandle :: TickerInfoServerHandle
, requestVar :: TMVar Request
, responseVar :: TMVar Response
, tickMap :: TVar (M.Map TickKey Tick)
, transaqQueue :: TBQueue TransaqResponse
, logger :: LogAction IO Message
, config :: TransaqConnectorConfig
, serverConnected :: TVar ConnectionStage
, candleKindMap :: TVar (M.Map Int Int)
}
newtype App a = App { unApp :: ReaderT Env IO a }
deriving (Monad, Applicative, Functor, MonadIO, MonadReader Env)
instance HasLog Env Message App where
getLogAction env = LogAction { unLogAction = liftIO . (unLogAction . logger $ env) }
setLogAction _ env = env -- fuck it
start :: start ::
LogAction IO Message LogAction IO Message
-> TransaqConnectorConfig -> TransaqConnectorConfig
@ -118,45 +170,61 @@ start logger config qssChannel tisH = do
logWith logger Info "TXMLConnector" "Starting" logWith logger Info "TXMLConnector" "Starting"
notificationQueue <- atomically $ newTBQueue 50000 notificationQueue <- atomically $ newTBQueue 50000
tickTable <- newTVarIO M.empty tickTable <- newTVarIO M.empty
threadId <- forkIO (workThread logger config notificationQueue qssChannel tickTable tisH) requestVar <- newEmptyTMVarIO
return $ TXMLConnectorHandle {..} responseVar <- newEmptyTMVarIO
serverConnected <- liftIO $ newTVarIO StageConnection
candleKindMap <- newTVarIO M.empty
let env =
Env
{
qssChannel = qssChannel
, tisHandle = tisH
, requestVar = requestVar
, responseVar = responseVar
, tickMap = tickTable
, transaqQueue = notificationQueue
, logger = logger
, config = config
, serverConnected = serverConnected
, candleKindMap = candleKindMap
}
threadId <- forkIO $ (runReaderT . unApp) workThread env
return $ TXMLConnectorHandle
{
threadId = threadId
, notificationQueue = notificationQueue
, hRequestVar = requestVar
, hResponseVar = responseVar
}
workThread :: workThread :: App ()
LogAction IO Message workThread = do
-> TransaqConnectorConfig cfg <- asks config
-> TBQueue TransaqResponse rc <- liftIO $ initialize (transaqLogPath cfg) (parseTransaqLogLevel $ transaqLogLevel cfg)
-> BoundedChan QuoteSourceServerData
-> TVar (M.Map TickKey Tick)
-> TickerInfoServerHandle
-> IO ()
workThread logger config queue qssChannel tickMap tisH = do
rc <- initialize (transaqLogPath config) (parseTransaqLogLevel $ transaqLogLevel config)
case rc of case rc of
Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str Left str -> log Error "TXMLConnector.WorkThread" $ "Unable to initialize TXML" <> str
Right _ -> do Right _ -> do
rc <- setCallback parseAndWrite queue <- asks transaqQueue
logger' <- asks logger
rc <- liftIO $ setCallback (parseAndWrite queue logger')
case rc of case rc of
Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback" Nothing -> log Error "TXMLConnector.WorkThread" "Unable to set callback"
Just cb -> do Just cb -> do
serverConnected <- newTVarIO StageConnection
void $ forever $ do void $ forever $ do
connStatus <- readTVarIO serverConnected connStatus <- asks serverConnected >>= (liftIO . readTVarIO)
case connStatus of case connStatus of
StageConnection -> handleUnconnected serverConnected StageConnection -> handleUnconnected
StageGetInfo -> handleGetInfo serverConnected StageGetInfo -> handleGetInfo
StageConnected -> handleConnected serverConnected StageConnected -> handleConnected
freeCallback cb liftIO $ freeCallback cb
where where
log = logWith logger
parseTransaqLogLevel 1 = TXML.Warning parseTransaqLogLevel 1 = TXML.Warning
parseTransaqLogLevel 3 = TXML.Debug parseTransaqLogLevel 3 = TXML.Debug
parseTransaqLogLevel _ = TXML.Info parseTransaqLogLevel _ = TXML.Info
parseAndWrite xml = do parseAndWrite queue logger xml = do
let parsed = mapMaybe parseContent $ parseXML xml let parsed = mapMaybe parseContent $ parseXML xml
log Debug "TXML.Callback" $ "Parsed entities: " <> (T.pack . show . length) parsed logWith logger Debug "TXML.Callback" $ "Parsed entities: " <> (T.pack . show . length) parsed
log Debug "TXML.Callback" $ "parsed xml: " <> (T.pack . show) (parseXML xml) mapM_ (writeToQueue queue) parsed
log Debug "TXML.Callback" $ "parsed: " <> (T.pack . show) xml
mapM_ writeToQueue parsed
pure True pure True
parseContent (Elem el) = parseElement el parseContent (Elem el) = parseElement el
parseContent _ = Nothing parseContent _ = Nothing
@ -171,40 +239,70 @@ workThread logger config queue qssChannel tickMap tisH = do
"alltrades" -> TransaqResponseAllTrades <$> fromXml el "alltrades" -> TransaqResponseAllTrades <$> fromXml el
"quotes" -> TransaqResponseQuotes <$> fromXml el "quotes" -> TransaqResponseQuotes <$> fromXml el
_ -> Nothing _ -> Nothing
writeToQueue resp = atomically $ writeTBQueue queue resp writeToQueue queue resp = atomically $ writeTBQueue queue resp
handleConnected serverConnected = do handleConnected :: App ()
item <- atomically $ readTBQueue queue handleConnected = do
rqVar <- asks requestVar
queue <- asks transaqQueue
item <- liftIO . atomically $ (MainQueueTransaqData <$> readTBQueue queue) `orElse`
(MainQueueRequest <$> readTMVar rqVar)
case item of case item of
MainQueueTransaqData transaqData -> do
tm <- asks tickMap
case transaqData of
TransaqResponseAllTrades (ResponseAllTrades trades) -> do TransaqResponseAllTrades (ResponseAllTrades trades) -> do
qssChan <- asks qssChannel
let ticks = fmap allTradeToTick trades let ticks = fmap allTradeToTick trades
forM_ ticks (writeChan qssChannel . QSSTick) forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks insertToTickMap forM_ ticks (insertToTickMap tm)
TransaqResponseQuotations (ResponseQuotations quotations) -> do TransaqResponseQuotations (ResponseQuotations quotations) -> do
now <- getCurrentTime qssChan <- asks qssChannel
now <- liftIO getCurrentTime
let ticks = concatMap (quotationToTicks now) quotations let ticks = concatMap (quotationToTicks now) quotations
forM_ ticks (writeChan qssChannel . QSSTick) forM_ ticks (liftIO . writeChan qssChan . QSSTick)
forM_ ticks insertToTickMap forM_ ticks (insertToTickMap tm)
TransaqResponseCandles respCandle -> undefined
_ -> pure () _ -> pure ()
handleGetInfo serverConnected = do MainQueueRequest (Request request) -> do
item <- atomically $ readTBQueue queue maybeCk <- M.lookup (unBarTimeframe . hrTimeframe $ request) <$> (asks candleKindMap >>= liftIO . readTVarIO)
case maybeCk of
Just candleKindId -> do
case parseSecurityId (hrTickerId request) of
Just secId -> void $ liftIO . sendCommand $
toXml CommandGetHistoryData
{
security = secId
, periodId = candleKindId
, count = hrCount request
, reset = hrReset request
}
_ -> log Warning "TXMLConnector.WorkThread" $ "Unable to parse security ID: " <> hrTickerId request
_ -> log Warning "TXMLConnector.WorkThread" $ "Invalid candlekind requested" <> (T.pack . show . unBarTimeframe . hrTimeframe $ request)
handleGetInfo :: App ()
handleGetInfo = do
queue <- asks transaqQueue
cfg <- asks config
item <- liftIO . atomically $ readTBQueue queue
conn <- asks serverConnected
case item of case item of
TransaqResponseServerStatus serverStatus -> TransaqResponseServerStatus serverStatus ->
case state serverStatus of case state serverStatus of
Transaq.Disconnected -> do Transaq.Disconnected -> do
log Warning "TXMLConnector.WorkThread" "Server disconnected" log Warning "TXMLConnector.WorkThread" "Server disconnected"
atomically $ writeTVar serverConnected StageConnection liftIO . atomically $ writeTVar conn StageConnection
Transaq.Connected -> do Transaq.Connected -> do
log Info "TXMLConnector.WorkThread" "Server connected" log Info "TXMLConnector.WorkThread" "Server connected"
atomically $ writeTVar serverConnected StageConnected liftIO . atomically $ writeTVar conn StageConnected
v <- makeSubscriptions config v <- makeSubscriptions cfg
case v of case v of
Left errmsg -> do Left errmsg -> do
log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg log Warning "TXMLConnector.WorkThread" $ "Unable to subscribe: " <> errmsg
void $ sendCommand $ toXml CommandDisconnect void $ liftIO . sendCommand $ toXml CommandDisconnect
Right _ -> log Info "TXMLConnector.WorkThread" "Subscriptions done" Right _ -> log Info "TXMLConnector.WorkThread" "Subscriptions done"
Transaq.Error errmsg -> do Transaq.Error errmsg -> do
log Warning "TXMLConnector.WorkThread" $ "Connection error: " <> errmsg log Warning "TXMLConnector.WorkThread" $ "Connection error: " <> errmsg
atomically $ writeTVar serverConnected StageConnection liftIO . atomically $ writeTVar conn StageConnection
TransaqResponseResult result -> TransaqResponseResult result ->
log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result log Info "TXMLConnector.WorkThread" $ "Incoming result" <> (T.pack . show) result
-- TODO: handle order response -- TODO: handle order response
@ -217,25 +315,30 @@ workThread logger config queue qssChannel tickMap tisH = do
forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m) forM_ markets (\m -> log Debug "TXMLConnector.WorkThread" $ (T.pack . show) (marketId m) <> "/" <> marketName m)
-- TODO: Pass to qtis -- TODO: Pass to qtis
TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do TransaqResponseCandleKinds (ResponseCandleKinds kinds) -> do
log Debug "TXMLConnector.WorkThread" "Incoming candle kinds:" ckMap <- asks candleKindMap
forM_ kinds (log Debug "TXMLConnector.WorkThread" . (T.pack . show)) log Debug "TXMLConnector.WorkThread" $ "Incoming candle kinds: " <> (T.pack . show . length) kinds
-- TODO: Pass to qtis, maybe something else? forM_ kinds (\k -> liftIO . atomically $ modifyTVar' ckMap (M.insert (kPeriod k) (kCandleKindId k)))
TransaqResponseSecurities (ResponseSecurities securities) -> do TransaqResponseSecurities (ResponseSecurities securities) -> do
log Debug "TXMLConnector.WorkThread" "Incoming securities:" tisH <- asks tisHandle
forM_ securities (putTickerInfo tisH . securityToTickerInfo) let tickerInfos = securityToTickerInfo <$> securities
log Info "TXMLConnector.WorkThread" $ "Incoming securities: " <> (T.pack . show . length) securities
forM_ tickerInfos (log Debug "TXMLConnector.WorkThread" . T.pack . show . tiTicker)
forM_ tickerInfos (liftIO . putTickerInfo tisH)
TransaqResponseSecInfo secInfo -> TransaqResponseSecInfo secInfo ->
log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo log Debug "TXMLConnector.WorkThread" $ "Incoming secinfo:" <> (T.pack . show) secInfo
-- TODO: Pass to qtis -- TODO: Pass to qtis
_ -> pure () _ -> pure ()
handleUnconnected serverConnected = do handleUnconnected :: App ()
handleUnconnected = do
cfg <- asks config
log Debug "TXMLConnector.WorkThread" "Sending connect command" log Debug "TXMLConnector.WorkThread" "Sending connect command"
v <- sendCommand $ v <- liftIO . sendCommand .
toXml $ CommandConnect toXml $ CommandConnect
{ {
login = transaqLogin config, login = transaqLogin cfg,
password = transaqPassword config, password = transaqPassword cfg,
host = transaqHost config, host = transaqHost cfg,
port = transaqPort config, port = transaqPort cfg,
language = LanguageEn, language = LanguageEn,
autopos = False, autopos = False,
micexRegisters = True, micexRegisters = True,
@ -251,9 +354,10 @@ workThread logger config queue qssChannel tickMap tisH = do
case v of case v of
Left err -> do Left err -> do
log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]" log Warning "TXMLConnector.WorkThread" $ "Unable to connect: [" <> err <> "]"
threadDelay (1000 * 1000 * 10) liftIO $ threadDelay (1000 * 1000 * 10)
Right _ -> do Right _ -> do
atomically $ writeTVar serverConnected StageGetInfo conn <- asks serverConnected
liftIO . atomically $ writeTVar conn StageGetInfo
-- item <- atomically $ readTBQueue queue -- item <- atomically $ readTBQueue queue
-- case item of -- case item of
-- TransaqResponseServerStatus status -> do -- TransaqResponseServerStatus status -> do
@ -277,7 +381,7 @@ workThread logger config queue qssChannel tickMap tisH = do
-- log Warning "TXMLConnector.WorkThread" $ "Stray message: " <> (T.pack . show) other -- log Warning "TXMLConnector.WorkThread" $ "Stray message: " <> (T.pack . show) other
-- threadDelay (1000 * 1000) -- threadDelay (1000 * 1000)
makeSubscriptions config = makeSubscriptions config =
sendCommand $ toXml $ liftIO . sendCommand . toXml $
CommandSubscribe CommandSubscribe
{ {
alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config), alltrades = fmap subscriptionToSecurityId (allTradesSubscriptions config),
@ -285,7 +389,7 @@ workThread logger config queue qssChannel tickMap tisH = do
quotes = fmap subscriptionToSecurityId (quotesSubscriptions config) quotes = fmap subscriptionToSecurityId (quotesSubscriptions config)
} }
subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code subscriptionToSecurityId (SubscriptionConfig brd code) = SecurityId brd code
insertToTickMap tick = atomically $ modifyTVar' tickMap (M.insert (TickKey (security tick) (datatype tick)) tick) insertToTickMap tickMap tick = liftIO . atomically $ modifyTVar' tickMap (M.insert (TickKey (security tick) (datatype tick)) tick)
allTradeToTick :: AllTradesTrade -> Tick allTradeToTick :: AllTradesTrade -> Tick
allTradeToTick att = allTradeToTick att =
@ -328,3 +432,5 @@ securityToTickerInfo sec =
, tiTickSize = sMinStep sec , tiTickSize = sMinStep sec
} }
parseSecurityId :: TickerId -> Maybe SecurityId
parseSecurityId = undefined

1
transaq-connector.cabal

@ -46,6 +46,7 @@ executable transaq-connector
, stm , stm
, extra , extra
, errors , errors
, mtl
extra-lib-dirs: lib extra-lib-dirs: lib
ghc-options: -Wall ghc-options: -Wall
-Wcompat -Wcompat

Loading…
Cancel
Save