Execution layer for algorithmic trading
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

290 lines
15 KiB

4 years ago
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE DuplicateRecordFields #-}
{-# LANGUAGE FlexibleContexts #-}
4 years ago
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE GeneralizedNewtypeDeriving #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
module ATrade.Driver.Junction
(
junctionMain
) where
4 years ago
import ATrade.Broker.Client (BrokerClientHandle,
startBrokerClient,
4 years ago
stopBrokerClient)
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification),
NotificationSqnum,
getNotificationSqnum)
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (brokerClientCert, brokerEndpoint, brokerNotificationEndpoint, brokerServerCert, instances, qhpEndpoint, qtisEndpoint, redisSocket, robotsConfigsPath),
4 years ago
ProgramOptions (ProgramOptions, configPath))
4 years ago
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription),
QuoteSubscription (QuoteSubscription),
SubscriptionId (SubscriptionId))
4 years ago
import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv),
4 years ago
QuoteThreadHandle,
4 years ago
withQThread)
4 years ago
import qualified ATrade.Driver.Junction.QuoteThread as QT
import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle,
RobotEnv (..),
4 years ago
RobotM (..),
createRobotDriverThread,
onStrategyInstance,
postNotificationEvent)
4 years ago
import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE),
StrategyInstance (strategyInstanceId),
4 years ago
StrategyInstanceDescriptor (..),
confStrategy,
strategyState,
strategyTimers)
import ATrade.Logging (Message,
Severity (Info),
fmtMessage,
logWarning,
logWith)
4 years ago
import ATrade.Quotes.QHP (mkQHPHandle)
import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig))
import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState))
import ATrade.Types (ClientSecurityParams (ClientSecurityParams),
OrderId,
Trade (tradeOrderId))
import Colog (HasLog (getLogAction, setLogAction),
LogAction,
logTextStdout,
(>$<))
import Control.Concurrent (threadDelay)
4 years ago
import Control.Exception.Safe (MonadThrow,
bracket)
import Control.Monad (forM_, forever)
import Control.Monad.Extra (whenM)
4 years ago
import Control.Monad.IO.Class (MonadIO (liftIO))
import Control.Monad.Reader (MonadReader, ReaderT (runReaderT),
asks)
import Data.Aeson (eitherDecode,
encode)
import qualified Data.ByteString.Lazy as BL
import Data.Default (Default (def))
import Data.Foldable (traverse_)
import Data.IORef (IORef,
atomicModifyIORef',
newIORef,
readIORef)
4 years ago
import qualified Data.Map.Strict as M
import Data.Set (notMember)
import qualified Data.Set as S
4 years ago
import qualified Data.Text as T
import Data.Text.Encoding (encodeUtf8)
import Data.Text.IO (readFile)
import Data.Time.Clock.POSIX (getPOSIXTime)
import Database.Redis (ConnectInfo (..),
Connection,
PortID (UnixSocket),
checkedConnect,
defaultConnectInfo,
get, mset,
runRedis)
import Dhall (auto, input)
import Options.Applicative (Parser,
execParser,
fullDesc, header,
help, helper,
info, long,
metavar, progDesc,
short, strOption,
(<**>))
import Prelude hiding (log,
readFile)
4 years ago
import System.ZMQ4 (withContext)
import System.ZMQ4.ZAP (loadCertificateFromFile)
4 years ago
4 years ago
data JunctionEnv =
JunctionEnv
{
4 years ago
peRedisSocket :: Connection,
4 years ago
peConfigPath :: FilePath,
peQuoteThread :: QuoteThreadHandle,
peBroker :: BrokerClientHandle,
peRobots :: IORef (M.Map T.Text RobotDriverHandle),
peLogAction :: LogAction JunctionM Message
}
4 years ago
newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a }
deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadThrow)
4 years ago
instance HasLog JunctionEnv Message JunctionM where
getLogAction = peLogAction
setLogAction a e = e { peLogAction = a }
4 years ago
instance ConfigStorage JunctionM where
4 years ago
loadConfig key = do
basePath <- asks peConfigPath
let path = basePath <> "/" <> T.unpack key -- TODO fix path construction
liftIO $ readFile path >>= input auto
4 years ago
instance MonadPersistence JunctionM where
4 years ago
saveState newState key = do
conn <- asks peRedisSocket
now <- liftIO getPOSIXTime
res <- liftIO $ runRedis conn $ mset [(encodeUtf8 key, BL.toStrict $ encode newState),
(encodeUtf8 (key <> ":last_store") , encodeUtf8 . T.pack . show $ now)]
case res of
Left _ -> logWarning "Junction " "Unable to save state"
4 years ago
Right _ -> return ()
loadState key = do
conn <- asks peRedisSocket
res <- liftIO $ runRedis conn $ get (encodeUtf8 key)
-- TODO: just chain eithers
case res of
Left _ -> do
logWarning "Junction" "Unable to load state"
4 years ago
return def
Right maybeRawState ->
case maybeRawState of
Just rawState -> case eitherDecode $ BL.fromStrict rawState of
Left _ -> do
logWarning "Junction" "Unable to decode state"
4 years ago
return def
Right decodedState -> return decodedState
Nothing -> do
logWarning "Junction" "Unable to decode state"
4 years ago
return def
4 years ago
instance QuoteStream JunctionM where
addSubscription (QuoteSubscription ticker timeframe) chan = do
qt <- asks peQuoteThread
QT.addSubscription qt ticker timeframe chan
return (SubscriptionId 0) -- TODO subscription Ids
removeSubscription _ = undefined
4 years ago
junctionMain :: M.Map T.Text StrategyDescriptorE -> IO ()
junctionMain descriptors = do
opts <- parseOptions
let bootstrapLogAction = fmtMessage >$< logTextStdout
let log = logWith bootstrapLogAction
log Info "Junction" $ "Reading config from: " <> (T.pack . show) (configPath opts)
cfg <- readFile (configPath opts) >>= input auto
4 years ago
barsMap <- newIORef M.empty
redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) })
withContext $ \ctx -> do
let downloaderLogAction = fmtMessage >$< logTextStdout
let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) downloaderLogAction
robotsMap <- newIORef M.empty
ordersMap <- newIORef M.empty
handledNotifications <- newIORef S.empty
withBroker cfg ctx robotsMap ordersMap handledNotifications $ \bro ->
4 years ago
withQThread downloaderEnv barsMap cfg ctx $ \qt -> do
let junctionLogAction = fmtMessage >$< logTextStdout
4 years ago
let env =
JunctionEnv
{
peRedisSocket = redis,
peConfigPath = robotsConfigsPath cfg,
peQuoteThread = qt,
peBroker = bro,
peRobots = robotsMap,
peLogAction = junctionLogAction
4 years ago
}
withJunction env $ do
startRobots cfg bro barsMap
forever $ do
saveRobots
liftIO $ threadDelay 5000000
where
saveRobots :: JunctionM ()
saveRobots = do
robotsMap <- asks peRobots >>= (liftIO . readIORef)
traverse_ saveRobotState robotsMap
saveRobotState :: RobotDriverHandle -> JunctionM ()
saveRobotState handle = onStrategyInstance handle $ \inst -> do
currentState <- liftIO $ readIORef (strategyState inst)
saveState currentState (strategyInstanceId inst)
currentTimers <- liftIO $ readIORef (strategyTimers inst)
saveState currentTimers (strategyInstanceId inst <> ":timers")
startRobots cfg bro barsMap = forM_ (instances cfg) $ \inst ->
case M.lookup (strategyBaseName inst) descriptors of
Just (StrategyDescriptorE desc) -> do
bigConf <- loadConfig (configKey inst)
rConf <- liftIO $ newIORef (confStrategy bigConf)
rState <- loadState (stateKey inst) >>= liftIO . newIORef
rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef
let robotLogAction = fmtMessage >$< logTextStdout
let robotEnv = RobotEnv rState rConf rTimers bro barsMap robotLogAction
robot <- createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState rTimers
robotsMap' <- asks peRobots
liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ()))
Nothing -> error "Unknown strategy"
4 years ago
withJunction :: JunctionEnv -> JunctionM () -> IO ()
withJunction env = (`runReaderT` env) . unJunctionM
4 years ago
handleBrokerNotification :: IORef (M.Map T.Text RobotDriverHandle) ->
IORef (M.Map OrderId T.Text) ->
IORef (S.Set NotificationSqnum) ->
Notification ->
IO ()
handleBrokerNotification robotsRef ordersMapRef handled notification =
whenM (notMember (getNotificationSqnum notification) <$> readIORef handled) $ do
robotsMap <- readIORef robotsRef
ordersMap <- readIORef ordersMapRef
case getNotificationTarget robotsMap ordersMap notification of
Just robot -> postNotificationEvent robot notification
Nothing -> return () --logWarning "Junction" "Unknown order" -- TODO log
atomicModifyIORef' handled (\s -> (S.insert (getNotificationSqnum notification) s, ()))
getNotificationTarget :: M.Map T.Text RobotDriverHandle -> M.Map OrderId T.Text -> Notification -> Maybe RobotDriverHandle
getNotificationTarget robotsMap ordersMap notification = do
robotId <- M.lookup (notificationOrderId notification) ordersMap
M.lookup robotId robotsMap
notificationOrderId (OrderNotification _ oid _) = oid
notificationOrderId (TradeNotification _ trade) = tradeOrderId trade
withBroker cfg ctx robotsMap ordersMap handled f = do
securityParameters <- loadBrokerSecurityParameters cfg
bracket
(startBrokerClient
"broker"
ctx
(brokerEndpoint cfg)
(brokerNotificationEndpoint cfg)
[handleBrokerNotification robotsMap ordersMap handled]
securityParameters)
stopBrokerClient f
loadBrokerSecurityParameters cfg =
case (brokerClientCert cfg, brokerServerCert cfg) of
(Just clientCertPath, Just serverCertPath) -> do
eClientCert <- loadCertificateFromFile clientCertPath
eServerCert <- loadCertificateFromFile serverCertPath
case (eClientCert, eServerCert) of
(Right clientCert, Right serverCert) -> return $ ClientSecurityParams (Just clientCert) (Just serverCert)
(_, _) -> return $ ClientSecurityParams Nothing Nothing
_ -> return $ ClientSecurityParams Nothing Nothing
parseOptions = execParser options
options = info (optionsParser <**> helper)
(fullDesc <>
progDesc "Robocom-zero junction mode driver" <>
header "robocom-zero-junction")
optionsParser :: Parser ProgramOptions
optionsParser = ProgramOptions
<$> strOption
(long "config" <>
short 'c' <>
metavar "FILENAME" <>
help "Configuration file path")