@ -1,125 +1,182 @@
@@ -1,125 +1,182 @@
{- # LANGUAGE DeriveGeneric # -}
{- # LANGUAGE DuplicateRecordFields # -}
{- # LANGUAGE OverloadedStrings # -}
{- # LANGUAGE DeriveGeneric # -}
{- # LANGUAGE DuplicateRecordFields # -}
{- # LANGUAGE FlexibleInstances # -}
{- # LANGUAGE GeneralizedNewtypeDeriving # -}
{- # LANGUAGE MultiParamTypeClasses # -}
{- # LANGUAGE OverloadedStrings # -}
module ATrade.Driver.Junction
(
junctionMain
) where
import ATrade.Driver.Junction.Types ( StrategyDescriptor ( .. ) ,
StrategyInstance ( .. ) ,
StrategyInstanceDescriptor ( .. ) )
import ATrade.RoboCom.Types ( Ticker ( .. ) )
import Control.Concurrent ( forkIO )
import Control.Concurrent.Async ( forConcurrently_ )
import Control.Concurrent.STM ( atomically )
import Control.Concurrent.STM.TQueue ( newTQueueIO )
import Control.Concurrent.STM.TVar ( newTVarIO )
import Data.Aeson ( FromJSON ( .. ) , ToJSON ( .. ) ,
decode , object , withObject ,
( .: ) , ( .= ) )
import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as BL
import Data.IORef
import qualified Data.Map.Strict as M
import Data.Maybe ( fromMaybe )
import Data.Semigroup ( ( <> ) )
import qualified Data.Text as T
import Data.Text.IO ( readFile )
import Dhall ( FromDhall , auto , input )
import GHC.Generics ( Generic )
import Options.Applicative ( Parser , execParser , fullDesc ,
header , help , helper , info ,
long , metavar , progDesc , short ,
strOption , ( <**> ) )
import Prelude hiding ( readFile )
data BigConfig c = BigConfig {
confTickers :: [ Ticker ] ,
confStrategy :: c
}
instance ( FromJSON c ) => FromJSON ( BigConfig c ) where
parseJSON = withObject " object " ( \ obj -> BigConfig <$>
obj .: " tickers " <*>
obj .: " params " )
instance ( ToJSON c ) => ToJSON ( BigConfig c ) where
toJSON conf = object [ " tickers " .= confTickers conf ,
" params " .= confStrategy conf ]
data ProgramOptions =
ProgramOptions
import ATrade.Broker.Client ( startBrokerClient ,
stopBrokerClient )
import ATrade.Driver.Junction.ProgramConfiguration ( ProgramConfiguration ( brokerEndpoint , brokerNotificationEndpoint , instances , qhpEndpoint , qtisEndpoint , redisSocket , robotsConfigsPath ) ,
ProgramOptions ( ProgramOptions , configPath ) )
import ATrade.Driver.Junction.QuoteStream ( QuoteStream ( addSubscription , removeSubscription ) )
import ATrade.Driver.Junction.QuoteThread ( DownloaderEnv ( DownloaderEnv ) ,
withQThread )
import ATrade.Driver.Junction.RobotDriverThread ( createRobotDriverThread )
import ATrade.Driver.Junction.Types ( StrategyDescriptorE ( StrategyDescriptorE ) ,
StrategyInstanceDescriptor ( .. ) ,
confStrategy )
import ATrade.Quotes.QHP ( mkQHPHandle )
import ATrade.RoboCom.ConfigStorage ( ConfigStorage ( loadConfig ) )
import ATrade.RoboCom.Monad ( MonadRobot ( .. ) )
import ATrade.RoboCom.Persistence ( MonadPersistence ( loadState , saveState ) )
import ATrade.Types ( ClientSecurityParams ( ClientSecurityParams ) )
import Control.Exception.Safe ( MonadThrow ,
bracket )
import Control.Monad ( forM_ )
import Control.Monad.IO.Class ( MonadIO ( liftIO ) )
import Control.Monad.Reader ( MonadReader , ReaderT ( runReaderT ) ,
asks )
import Data.Aeson ( eitherDecode ,
encode )
import qualified Data.ByteString.Lazy as BL
import Data.Default ( Default ( def ) )
import Data.IORef ( IORef , newIORef )
import qualified Data.Map.Strict as M
import qualified Data.Text as T
import Data.Text.Encoding ( encodeUtf8 )
import Data.Text.IO ( readFile )
import Data.Time.Clock.POSIX ( getPOSIXTime )
import Database.Redis ( ConnectInfo ( .. ) ,
Connection ,
PortID ( UnixSocket ) ,
checkedConnect ,
defaultConnectInfo ,
get , mset ,
runRedis )
import Dhall ( auto , input )
import Options.Applicative ( Parser ,
execParser ,
fullDesc , header ,
help , helper ,
info , long ,
metavar , progDesc ,
short , strOption ,
( <**> ) )
import Prelude hiding ( readFile )
import System.Log.Logger ( warningM )
import System.ZMQ4 ( withContext )
data PersistenceEnv =
PersistenceEnv
{
configPath :: FilePath
peRedisSocket :: Connection ,
peConfigPath :: FilePath
}
data ProgramConfiguration =
ProgramConfiguration
newtype PersistenceT a = PersistenceT { unPersistenceT :: ReaderT PersistenceEnv IO a }
deriving ( Functor , Applicative , Monad , MonadReader PersistenceEnv , MonadIO , MonadThrow )
instance ConfigStorage PersistenceT where
loadConfig key = do
basePath <- asks peConfigPath
let path = basePath <> " / " <> T . unpack key -- TODO fix path construction
liftIO $ readFile path >>= input auto
instance MonadPersistence PersistenceT where
saveState newState key = do
conn <- asks peRedisSocket
now <- liftIO getPOSIXTime
res <- liftIO $ runRedis conn $ mset [ ( encodeUtf8 key , BL . toStrict $ encode newState ) ,
( encodeUtf8 ( key <> " :last_store " ) , encodeUtf8 . T . pack . show $ now ) ]
case res of
Left _ -> liftIO $ warningM " main " " Unable to save state "
Right _ -> return ()
loadState key = do
conn <- asks peRedisSocket
res <- liftIO $ runRedis conn $ get ( encodeUtf8 key )
-- TODO: just chain eithers
case res of
Left _ -> do
liftIO $ warningM " main " " Unable to load state "
return def
Right maybeRawState ->
case maybeRawState of
Just rawState -> case eitherDecode $ BL . fromStrict rawState of
Left _ -> do
liftIO $ warningM " main " " Unable to decode state "
return def
Right decodedState -> return decodedState
Nothing -> do
liftIO $ warningM " main " " Unable to decode state "
return def
instance QuoteStream PersistenceT where
addSubscription sub chan = undefined
removeSubscription sub = undefined
data RobotEnv c s =
RobotEnv
{
brokerEndpoint :: T . Text ,
brokerServerCert :: Maybe FilePath ,
brokerClientCert :: Maybe FilePath ,
quotesourceEndpoint :: T . Text ,
quotesourceServerCert :: Maybe FilePath ,
quotesourceClientCert :: Maybe FilePath ,
qhpEndpoint :: T . Text ,
qtisEndpoint :: T . Text ,
redisSocket :: T . Text ,
globalLog :: FilePath ,
instances :: [ StrategyInstanceDescriptor ]
} deriving ( Generic , Show )
instance FromDhall ProgramConfiguration
load :: T . Text -> IO ( Maybe B . ByteString )
load = undefined
junctionMain :: M . Map T . Text StrategyDescriptor -> IO ()
stateRef :: IORef s ,
configRef :: IORef c
}
newtype RobotM c s a = RobotM { unRobotM :: ReaderT ( RobotEnv c s ) IO a }
deriving ( Functor , Applicative , Monad , MonadReader ( RobotEnv c s ) , MonadIO , MonadThrow )
instance MonadRobot ( RobotM c s ) c s where
submitOrder = undefined
cancelOrder = undefined
appendToLog = undefined
setupTimer = undefined
enqueueIOAction = undefined
getConfig = undefined
getState = undefined
setState = undefined
getEnvironment = undefined
getTicker = undefined
junctionMain :: M . Map T . Text StrategyDescriptorE -> IO ()
junctionMain descriptors = do
opts <- parseOptions
cfg <- readFile ( configPath opts ) >>= input auto
bars <- newTVarIO M . empty
strategies <- mkStrategies ( instances cfg )
start strategies bars
barsMap <- newIORef M . empty
redis <- checkedConnect ( defaultConnectInfo { connectPort = UnixSocket ( T . unpack $ redisSocket cfg ) } )
withContext $ \ ctx -> do
let env = DownloaderEnv ( mkQHPHandle ctx ( qhpEndpoint cfg ) ) ctx ( qtisEndpoint cfg )
withBroker cfg ctx $ \ bro ->
withQThread env barsMap cfg ctx $ \ qt ->
withPersistence ( PersistenceEnv redis $ robotsConfigsPath cfg ) $
forM_ ( instances cfg ) $ \ inst ->
case M . lookup ( strategyBaseName inst ) descriptors of
Just ( StrategyDescriptorE desc ) -> do
bigConf <- loadConfig ( configKey inst )
rConf <- liftIO $ newIORef ( confStrategy bigConf )
rState <- loadState ( stateKey inst ) >>= liftIO . newIORef
let robotEnv = RobotEnv rState rConf
createRobotDriverThread inst desc ( flip runReaderT robotEnv . unRobotM ) bigConf rConf rState
Nothing -> error " Unknown strategy "
where
withPersistence :: PersistenceEnv -> PersistenceT () -> IO ()
withPersistence env = ( ` runReaderT ` env ) . unPersistenceT
withBroker cfg ctx f = bracket
( startBrokerClient
" broker "
ctx
( brokerEndpoint cfg )
( brokerNotificationEndpoint cfg )
[]
( ClientSecurityParams -- TODO load certificates from file
Nothing
Nothing ) )
stopBrokerClient f
parseOptions = execParser options
options = info ( optionsParser <**> helper )
( fullDesc <>
progDesc " Robocom-zero junction mode driver " <>
header " robocom-zero-junction " )
mkStrategies :: [ StrategyInstanceDescriptor ] -> IO [ StrategyInstance ]
mkStrategies = mapM mkStrategy
mkStrategy :: StrategyInstanceDescriptor -> IO StrategyInstance
mkStrategy desc = do
sState <- load ( stateKey desc )
sCfg <- load ( configKey desc )
case M . lookup ( strategyId desc ) descriptors of
Just ( StrategyDescriptor _sName sCallback sDefState ) ->
case ( sCfg >>= decode . BL . fromStrict , fromMaybe sDefState ( sState >>= decode . BL . fromStrict ) ) of
( Just bigConfig , pState ) -> do
cfgRef <- newIORef ( confStrategy bigConfig )
stateRef <- newIORef pState
return $ StrategyInstance
{
strategyInstanceId = strategyName desc ,
strategyEventCallback = sCallback ,
strategyState = stateRef ,
strategyConfig = cfgRef
}
_ -> error " Can't read state and config "
_ -> error $ " Can't find strategy: " ++ T . unpack ( strategyId desc )
start strategies bars = undefined
optionsParser :: Parser ProgramOptions
optionsParser = ProgramOptions
<$> strOption