Compare commits
71 Commits
36 changed files with 2440 additions and 1112 deletions
@ -1,58 +1,211 @@ |
|||||||
|
{-# LANGUAGE DuplicateRecordFields #-} |
||||||
|
{-# LANGUAGE FlexibleContexts #-} |
||||||
|
{-# LANGUAGE FlexibleInstances #-} |
||||||
|
{-# LANGUAGE MultiParamTypeClasses #-} |
||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
|
{-# LANGUAGE RankNTypes #-} |
||||||
|
|
||||||
module ATrade.Driver.Junction |
module ATrade.Driver.Junction |
||||||
( |
( |
||||||
junctionMain |
junctionMain |
||||||
) where |
) where |
||||||
|
|
||||||
import ATrade.Driver.Junction.Types (StrategyDescriptor (..), |
import ATrade.Broker.Client (startBrokerClient, |
||||||
StrategyInstance (..), |
stopBrokerClient) |
||||||
StrategyInstanceDescriptor (..)) |
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification), |
||||||
import Data.Aeson (decode) |
NotificationSqnum (unNotificationSqnum), |
||||||
import qualified Data.ByteString as B |
getNotificationSqnum) |
||||||
import qualified Data.ByteString.Lazy as BL |
import ATrade.Driver.Junction.BrokerService (getNotifications, |
||||||
import Data.IORef |
mkBrokerService) |
||||||
import qualified Data.Map.Strict as M |
import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (..), |
||||||
import qualified Data.Text as T |
JunctionM (..), |
||||||
|
saveRobots, |
||||||
|
startRobot) |
||||||
|
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..), |
||||||
|
ProgramOptions (ProgramOptions, configPath)) |
||||||
|
import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv), |
||||||
|
withQThread) |
||||||
|
import ATrade.Driver.Junction.RemoteControl (handleRemoteControl) |
||||||
|
import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, postNotificationEvent) |
||||||
|
import ATrade.Driver.Junction.Types (StrategyDescriptorE) |
||||||
|
import ATrade.Logging (Message (..), Severity (Debug, Info, Trace, Warning), |
||||||
|
fmtMessage, |
||||||
|
logWith) |
||||||
|
import ATrade.Quotes.QHP (mkQHPHandle) |
||||||
|
import ATrade.Types (OrderId, Trade (tradeOrderId)) |
||||||
|
import Colog (LogAction (LogAction), |
||||||
|
cfilter, |
||||||
|
hoistLogAction, |
||||||
|
logTextStderr, |
||||||
|
(<&), (>$<)) |
||||||
|
import Colog.Actions (logTextHandle) |
||||||
|
import Control.Concurrent.QSem (newQSem) |
||||||
|
import Control.Monad (forM_, forever) |
||||||
|
import Control.Monad.Extra (whenM) |
||||||
|
import Control.Monad.IO.Class (MonadIO (liftIO)) |
||||||
|
import Control.Monad.Reader (ReaderT (runReaderT)) |
||||||
|
import Data.IORef (IORef, |
||||||
|
atomicModifyIORef', |
||||||
|
newIORef, |
||||||
|
readIORef) |
||||||
|
import qualified Data.Map.Strict as M |
||||||
|
import Data.Set (notMember) |
||||||
|
import qualified Data.Set as S |
||||||
|
import qualified Data.Text as T |
||||||
|
import Data.Text.IO (readFile) |
||||||
|
import Database.Redis (ConnectInfo (..), PortID (UnixSocket), |
||||||
|
checkedConnect, |
||||||
|
defaultConnectInfo) |
||||||
|
import Dhall (auto, input) |
||||||
|
import Options.Applicative (Parser, |
||||||
|
execParser, |
||||||
|
fullDesc, header, |
||||||
|
help, helper, |
||||||
|
info, long, |
||||||
|
metavar, progDesc, |
||||||
|
short, strOption, |
||||||
|
(<**>)) |
||||||
|
import Prelude hiding (log, |
||||||
|
readFile) |
||||||
|
import System.IO (BufferMode (LineBuffering), |
||||||
|
Handle, |
||||||
|
IOMode (AppendMode), |
||||||
|
hSetBuffering, |
||||||
|
withFile) |
||||||
|
import System.ZMQ4 (Router (Router), |
||||||
|
bind, withContext, |
||||||
|
withSocket) |
||||||
|
import UnliftIO (MonadUnliftIO) |
||||||
|
import UnliftIO.Exception (bracket) |
||||||
|
import UnliftIO.QSem (QSem, withQSem) |
||||||
|
|
||||||
load :: T.Text -> IO B.ByteString |
|
||||||
load = undefined |
|
||||||
|
|
||||||
junctionMain :: M.Map T.Text StrategyDescriptor -> IO () |
locked :: (MonadIO m, MonadUnliftIO m) => QSem -> LogAction m a -> LogAction m a |
||||||
|
locked sem action = LogAction (\m -> withQSem sem (action <& m)) |
||||||
|
|
||||||
|
logger :: (MonadIO m) => M.Map T.Text Severity -> Handle -> LogAction m Message |
||||||
|
logger loglevels h = cfilter checkLoglevel (fmtMessage >$< (logTextStderr <> logTextHandle h)) |
||||||
|
where |
||||||
|
checkLoglevel msg = |
||||||
|
case M.lookup (msgComponent msg) loglevels of |
||||||
|
Just level -> msgSeverity msg >= level |
||||||
|
Nothing -> True |
||||||
|
|
||||||
|
junctionMain :: M.Map T.Text StrategyDescriptorE -> IO () |
||||||
junctionMain descriptors = do |
junctionMain descriptors = do |
||||||
parseOptions |
opts <- parseOptions |
||||||
instanceDescriptors <- undefined |
|
||||||
strategies <- mkStrategies instanceDescriptors |
let initialLogger = fmtMessage >$< logTextStderr |
||||||
|
|
||||||
|
logWith initialLogger Info "Junction" $ "Reading config from: " <> (T.pack . show) (configPath opts) |
||||||
|
|
||||||
|
cfg <- readFile (configPath opts) >>= input auto |
||||||
|
|
||||||
|
withFile (logBasePath cfg <> "/all.log") AppendMode $ \h -> do |
||||||
|
|
||||||
|
hSetBuffering h LineBuffering |
||||||
|
|
||||||
start strategies |
locksem <- newQSem 1 |
||||||
|
let globalLogger = locked locksem (logger (M.fromList $ logLevels cfg) h) |
||||||
|
let log = logWith globalLogger |
||||||
|
|
||||||
|
barsMap <- newIORef M.empty |
||||||
|
tickerInfoMap <- newIORef M.empty |
||||||
|
|
||||||
|
log Info "Junction" $ "Connecting to redis: " <> redisSocket cfg |
||||||
|
redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) }) |
||||||
|
log Info "Junction" "redis: connected" |
||||||
|
withContext $ \ctx -> do |
||||||
|
log Debug "Junction" "0mq context created" |
||||||
|
let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) (hoistLogAction liftIO globalLogger) |
||||||
|
robotsMap <- newIORef M.empty |
||||||
|
ordersMap <- newIORef M.empty |
||||||
|
handledNotifications <- newIORef S.empty |
||||||
|
withBroker cfg robotsMap ordersMap handledNotifications globalLogger $ \bro -> |
||||||
|
withQThread downloaderEnv barsMap tickerInfoMap cfg ctx globalLogger $ \qt -> |
||||||
|
withSocket ctx Router $ \rcSocket -> do |
||||||
|
liftIO $ bind rcSocket (T.unpack . remoteControlEndpoint $ cfg) |
||||||
|
broService <- mkBrokerService bro ordersMap |
||||||
|
let junctionLogAction = hoistLogAction liftIO globalLogger |
||||||
|
let env = |
||||||
|
JunctionEnv |
||||||
|
{ |
||||||
|
peRedisSocket = redis, |
||||||
|
peConfigPath = robotsConfigsPath cfg, |
||||||
|
peQuoteThread = qt, |
||||||
|
peBroker = bro, |
||||||
|
peRobots = robotsMap, |
||||||
|
peRemoteControlSocket = rcSocket, |
||||||
|
peLogAction = junctionLogAction, |
||||||
|
peIoLogAction = globalLogger, |
||||||
|
peProgramConfiguration = cfg, |
||||||
|
peBarsMap = barsMap, |
||||||
|
peTickerInfoMap = tickerInfoMap, |
||||||
|
peBrokerService = broService, |
||||||
|
peDescriptors = descriptors |
||||||
|
} |
||||||
|
withJunction env $ do |
||||||
|
startRobots cfg |
||||||
|
forever $ do |
||||||
|
notifications <- getNotifications broService |
||||||
|
forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications globalLogger) |
||||||
|
saveRobots |
||||||
|
handleRemoteControl 1000 |
||||||
where |
where |
||||||
parseOptions = undefined |
startRobots :: ProgramConfiguration -> JunctionM () |
||||||
|
startRobots cfg = forM_ (instances cfg) startRobot |
||||||
mkStrategies :: [StrategyInstanceDescriptor] -> IO [StrategyInstance] |
|
||||||
mkStrategies = mapM mkStrategy |
withJunction :: JunctionEnv -> JunctionM () -> IO () |
||||||
|
withJunction env = (`runReaderT` env) . unJunctionM |
||||||
mkStrategy :: StrategyInstanceDescriptor -> IO StrategyInstance |
|
||||||
mkStrategy desc = do |
handleBrokerNotification :: IORef (M.Map T.Text RobotDriverHandle) -> |
||||||
sState <- load (stateKey desc) |
IORef (M.Map OrderId T.Text) -> |
||||||
sCfg <- load (configKey desc) |
IORef (S.Set NotificationSqnum) -> |
||||||
case M.lookup (strategyId desc) descriptors of |
LogAction IO Message -> |
||||||
Just (StrategyDescriptor _sName sCallback _sDefState) -> |
Notification -> |
||||||
case (decode $ BL.fromStrict sCfg, decode $ BL.fromStrict sState) of |
IO () |
||||||
(Just pCfg, Just pState) -> do |
handleBrokerNotification robotsRef ordersMapRef handled logger' notification= do |
||||||
cfgRef <- newIORef pCfg |
logWith logger' Trace "Junction" $ "Incoming notification: " <> (T.pack . show . unNotificationSqnum . getNotificationSqnum) notification |
||||||
stateRef <- newIORef pState |
whenM (notMember (getNotificationSqnum notification) <$> readIORef handled) $ do |
||||||
return $ StrategyInstance |
robotsMap <- readIORef robotsRef |
||||||
{ |
ordersMap <- readIORef ordersMapRef |
||||||
strategyInstanceId = strategyName desc, |
|
||||||
strategyEventCallback = sCallback, |
case getNotificationTarget robotsMap ordersMap notification of |
||||||
strategyState = stateRef, |
Just robot -> postNotificationEvent robot notification |
||||||
strategyConfig = cfgRef |
Nothing -> do |
||||||
} |
logWith logger' Warning "Junction" $ "Unknown order: " <> (T.pack . show) (notificationOrderId notification) |
||||||
_ -> undefined |
logWith logger' Debug "Junction" $ "Ordermap: " <> (T.pack . show) (M.toList ordersMap) |
||||||
_ -> undefined |
|
||||||
|
atomicModifyIORef' handled (\s -> (S.insert (getNotificationSqnum notification) s, ())) |
||||||
start = undefined |
|
||||||
|
getNotificationTarget :: M.Map T.Text RobotDriverHandle -> M.Map OrderId T.Text -> Notification -> Maybe RobotDriverHandle |
||||||
|
getNotificationTarget robotsMap ordersMap notification = do |
||||||
|
robotId <- M.lookup (notificationOrderId notification) ordersMap |
||||||
|
M.lookup robotId robotsMap |
||||||
|
|
||||||
|
notificationOrderId (OrderNotification _ oid _) = oid |
||||||
|
notificationOrderId (TradeNotification _ trade) = tradeOrderId trade |
||||||
|
|
||||||
|
withBroker cfg robotsMap ordersMap handled logger' f = do |
||||||
|
bracket |
||||||
|
(startBrokerClient |
||||||
|
(brokerIdentity cfg) |
||||||
|
(brokerEndpoint cfg) |
||||||
|
[handleBrokerNotification robotsMap ordersMap handled logger'] |
||||||
|
logger') |
||||||
|
stopBrokerClient f |
||||||
|
|
||||||
|
parseOptions = execParser options |
||||||
|
options = info (optionsParser <**> helper) |
||||||
|
(fullDesc <> |
||||||
|
progDesc "Robocom-zero junction mode driver" <> |
||||||
|
header "robocom-zero-junction") |
||||||
|
|
||||||
|
optionsParser :: Parser ProgramOptions |
||||||
|
optionsParser = ProgramOptions |
||||||
|
<$> strOption |
||||||
|
(long "config" <> |
||||||
|
short 'c' <> |
||||||
|
metavar "FILENAME" <> |
||||||
|
help "Configuration file path") |
||||||
|
|
||||||
|
|||||||
@ -0,0 +1,64 @@ |
|||||||
|
{-# LANGUAGE FlexibleContexts #-} |
||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
|
|
||||||
|
module ATrade.Driver.Junction.BrokerService |
||||||
|
( |
||||||
|
BrokerService, |
||||||
|
mkBrokerService, |
||||||
|
submitOrder, |
||||||
|
cancelOrder, |
||||||
|
getNotifications |
||||||
|
) where |
||||||
|
|
||||||
|
import qualified ATrade.Broker.Client as Bro |
||||||
|
import ATrade.Broker.Protocol (Notification (..)) |
||||||
|
import ATrade.Logging (Message, logDebug, logWarning) |
||||||
|
import ATrade.Types (Order (..), OrderId) |
||||||
|
import Colog (WithLog) |
||||||
|
import Control.Monad.IO.Class (MonadIO (liftIO)) |
||||||
|
import Control.Monad.Reader.Class (MonadReader) |
||||||
|
import Data.IORef (IORef, atomicModifyIORef', |
||||||
|
newIORef) |
||||||
|
import qualified Data.Map.Strict as M |
||||||
|
import qualified Data.Text as T |
||||||
|
|
||||||
|
data BrokerService = |
||||||
|
BrokerService |
||||||
|
{ |
||||||
|
broker :: Bro.BrokerClientHandle, |
||||||
|
orderMap :: IORef (M.Map OrderId T.Text), |
||||||
|
orderIdCounter :: IORef OrderId |
||||||
|
} |
||||||
|
|
||||||
|
mkBrokerService :: Bro.BrokerClientHandle -> IORef (M.Map OrderId T.Text) -> IO BrokerService |
||||||
|
mkBrokerService h om = BrokerService h om <$> newIORef 1 |
||||||
|
|
||||||
|
submitOrder :: (MonadIO m, WithLog env Message m, MonadReader env m) => BrokerService -> T.Text -> Order -> m OrderId |
||||||
|
submitOrder service identity order = do |
||||||
|
oid <- nextOrderId service |
||||||
|
logDebug "BrokerService" $ "New order, id: " <> (T.pack . show) oid |
||||||
|
liftIO $ atomicModifyIORef' (orderMap service) (\s -> (M.insert oid identity s, ())) |
||||||
|
r <- liftIO $ Bro.submitOrder (broker service) order { orderId = oid } |
||||||
|
case r of |
||||||
|
Left err -> logWarning "BrokerService" $ "Submit order error: " <> err |
||||||
|
_ -> return () |
||||||
|
return oid |
||||||
|
where |
||||||
|
nextOrderId srv = liftIO $ atomicModifyIORef' (orderIdCounter srv) (\s -> (s + 1, s)) |
||||||
|
|
||||||
|
cancelOrder :: (MonadIO m, WithLog env Message m) => BrokerService -> OrderId -> m () |
||||||
|
cancelOrder service oid = do |
||||||
|
r <- liftIO $ Bro.cancelOrder (broker service) oid |
||||||
|
case r of |
||||||
|
Left err -> logWarning "BrokerServer" $ "Cancel order error: " <> err |
||||||
|
_ -> return () |
||||||
|
return () |
||||||
|
|
||||||
|
getNotifications :: (MonadIO m, WithLog env Message m) => BrokerService -> m [Notification] |
||||||
|
getNotifications service = do |
||||||
|
v <- liftIO $ Bro.getNotifications (broker service) |
||||||
|
case v of |
||||||
|
Left err -> do |
||||||
|
logWarning "BrokerServer" $ "Get notifications order error: " <> err |
||||||
|
return [] |
||||||
|
Right n -> return n |
||||||
@ -0,0 +1,258 @@ |
|||||||
|
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
||||||
|
{-# LANGUAGE MultiParamTypeClasses #-} |
||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
|
|
||||||
|
|
||||||
|
module ATrade.Driver.Junction.JunctionMonad |
||||||
|
( |
||||||
|
JunctionEnv(..), |
||||||
|
JunctionM(..), |
||||||
|
startRobot, |
||||||
|
saveRobots, |
||||||
|
reloadConfig, |
||||||
|
getState, |
||||||
|
setState |
||||||
|
) where |
||||||
|
|
||||||
|
import ATrade.Broker.Client (BrokerClientHandle) |
||||||
|
import ATrade.Driver.Junction.BrokerService (BrokerService) |
||||||
|
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (logBasePath)) |
||||||
|
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), |
||||||
|
QuoteSubscription (QuoteSubscription)) |
||||||
|
import ATrade.Driver.Junction.QuoteThread (QuoteThreadHandle) |
||||||
|
import qualified ATrade.Driver.Junction.QuoteThread as QT |
||||||
|
import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, RobotEnv (RobotEnv), |
||||||
|
RobotM (unRobotM), |
||||||
|
createRobotDriverThread, |
||||||
|
getInstanceDescriptor, |
||||||
|
onStrategyInstance, |
||||||
|
onStrategyInstanceM) |
||||||
|
import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE), |
||||||
|
StrategyInstanceDescriptor, |
||||||
|
accountId, |
||||||
|
confStrategy, |
||||||
|
confTickers, |
||||||
|
configKey, |
||||||
|
stateKey, |
||||||
|
strategyBaseName, |
||||||
|
strategyConfig, |
||||||
|
strategyId, |
||||||
|
strategyInstanceId, |
||||||
|
strategyState, |
||||||
|
strategyTimers, |
||||||
|
tickerId, |
||||||
|
timeframe) |
||||||
|
import ATrade.Logging (Message, Severity (Error, Info), |
||||||
|
fmtMessage, |
||||||
|
logWarning, |
||||||
|
logWith) |
||||||
|
import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) |
||||||
|
import ATrade.RoboCom.Monad (StrategyEnvironment (..)) |
||||||
|
import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) |
||||||
|
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), |
||||||
|
Bars, |
||||||
|
TickerInfoMap) |
||||||
|
import Colog (HasLog (getLogAction, setLogAction), |
||||||
|
LogAction, |
||||||
|
hoistLogAction, |
||||||
|
logTextHandle, |
||||||
|
(>$<)) |
||||||
|
import Control.Exception.Safe (finally) |
||||||
|
import Control.Monad.Reader (MonadIO (liftIO), |
||||||
|
MonadReader, |
||||||
|
ReaderT (runReaderT), |
||||||
|
asks) |
||||||
|
import Data.Aeson (decode, |
||||||
|
eitherDecode, |
||||||
|
encode) |
||||||
|
import qualified Data.ByteString as B |
||||||
|
import qualified Data.ByteString.Lazy as BL |
||||||
|
import Data.Default (Default (def)) |
||||||
|
import Data.Foldable (traverse_) |
||||||
|
import Data.IORef (IORef, |
||||||
|
atomicModifyIORef', |
||||||
|
newIORef, |
||||||
|
readIORef, |
||||||
|
writeIORef) |
||||||
|
import Data.List.NonEmpty (NonEmpty ((:|))) |
||||||
|
import qualified Data.Map.Strict as M |
||||||
|
import qualified Data.Text as T |
||||||
|
import Data.Text.Encoding (encodeUtf8) |
||||||
|
import Data.Text.IO (readFile) |
||||||
|
import Data.Time (getCurrentTime) |
||||||
|
import Data.Time.Clock.POSIX (getPOSIXTime) |
||||||
|
import Database.Redis (Connection, get, |
||||||
|
mset, runRedis) |
||||||
|
import Dhall (auto, input) |
||||||
|
import Prelude hiding (log, |
||||||
|
readFile) |
||||||
|
import System.IO (BufferMode (LineBuffering), |
||||||
|
IOMode (AppendMode), |
||||||
|
hClose, |
||||||
|
hSetBuffering, |
||||||
|
openFile) |
||||||
|
import System.ZMQ4 (Router, Socket) |
||||||
|
import UnliftIO (MonadUnliftIO) |
||||||
|
import UnliftIO.Exception (catchAny, |
||||||
|
onException) |
||||||
|
|
||||||
|
data JunctionEnv = |
||||||
|
JunctionEnv |
||||||
|
{ |
||||||
|
peRedisSocket :: Connection, |
||||||
|
peConfigPath :: FilePath, |
||||||
|
peQuoteThread :: QuoteThreadHandle, |
||||||
|
peBroker :: BrokerClientHandle, |
||||||
|
peRobots :: IORef (M.Map T.Text RobotDriverHandle), |
||||||
|
peRemoteControlSocket :: Socket Router, |
||||||
|
peLogAction :: LogAction JunctionM Message, |
||||||
|
peIoLogAction :: LogAction IO Message, |
||||||
|
peProgramConfiguration :: ProgramConfiguration, |
||||||
|
peBarsMap :: IORef Bars, |
||||||
|
peTickerInfoMap :: IORef TickerInfoMap, |
||||||
|
peBrokerService :: BrokerService, |
||||||
|
peDescriptors :: M.Map T.Text StrategyDescriptorE |
||||||
|
} |
||||||
|
|
||||||
|
newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a } |
||||||
|
deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadUnliftIO) |
||||||
|
|
||||||
|
instance HasLog JunctionEnv Message JunctionM where |
||||||
|
getLogAction = peLogAction |
||||||
|
setLogAction a e = e { peLogAction = a } |
||||||
|
|
||||||
|
instance ConfigStorage JunctionM where |
||||||
|
loadConfig key = do |
||||||
|
basePath <- asks peConfigPath |
||||||
|
let path = basePath <> "/" <> T.unpack key -- TODO fix path construction |
||||||
|
liftIO $ readFile path >>= input auto |
||||||
|
|
||||||
|
instance MonadPersistence JunctionM where |
||||||
|
saveState newState key = do |
||||||
|
conn <- asks peRedisSocket |
||||||
|
now <- liftIO getPOSIXTime |
||||||
|
res <- liftIO $ runRedis conn $ mset [(encodeUtf8 key, BL.toStrict $ encode newState), |
||||||
|
(encodeUtf8 (key <> ":last_store") , encodeUtf8 . T.pack . show $ now)] |
||||||
|
case res of |
||||||
|
Left _ -> logWarning "Junction " "Unable to save state" |
||||||
|
Right _ -> return () |
||||||
|
|
||||||
|
loadState key = do |
||||||
|
conn <- asks peRedisSocket |
||||||
|
res <- liftIO $ runRedis conn $ get (encodeUtf8 key) |
||||||
|
-- TODO: just chain eithers |
||||||
|
case res of |
||||||
|
Left _ -> do |
||||||
|
logWarning "Junction" "Unable to load state" |
||||||
|
return def |
||||||
|
Right maybeRawState -> |
||||||
|
case maybeRawState of |
||||||
|
Just rawState -> case eitherDecode $ BL.fromStrict rawState of |
||||||
|
Left _ -> do |
||||||
|
logWarning "Junction" "Unable to decode state" |
||||||
|
return def |
||||||
|
Right decodedState -> return decodedState |
||||||
|
Nothing -> do |
||||||
|
logWarning "Junction" "Unable to decode state" |
||||||
|
return def |
||||||
|
|
||||||
|
instance QuoteStream JunctionM where |
||||||
|
addSubscription (QuoteSubscription ticker tf) chan = do |
||||||
|
qt <- asks peQuoteThread |
||||||
|
QT.addSubscription qt ticker tf chan |
||||||
|
removeSubscription subId = do |
||||||
|
qt <- asks peQuoteThread |
||||||
|
QT.removeSubscription qt subId |
||||||
|
|
||||||
|
startRobot :: StrategyInstanceDescriptor -> JunctionM () |
||||||
|
startRobot inst = do |
||||||
|
ioLogger <- asks peIoLogAction |
||||||
|
descriptors <- asks peDescriptors |
||||||
|
cfg <- asks peProgramConfiguration |
||||||
|
barsMap <- asks peBarsMap |
||||||
|
tickerInfoMap <- asks peTickerInfoMap |
||||||
|
broService <- asks peBrokerService |
||||||
|
now <- liftIO getCurrentTime |
||||||
|
let lLogger = hoistLogAction liftIO ioLogger |
||||||
|
logWith lLogger Info "Junction" $ "Starting strategy: " <> strategyBaseName inst |
||||||
|
case M.lookup (strategyBaseName inst) descriptors of |
||||||
|
Just (StrategyDescriptorE desc) -> flip catchAny (\e -> logWith lLogger Error "Junction" $ "Exception: " <> (T.pack . show $ e)) $ do |
||||||
|
bigConf <- loadConfig (configKey inst) |
||||||
|
case confTickers bigConf of |
||||||
|
(firstTicker:restTickers) -> do |
||||||
|
rConf <- liftIO $ newIORef (confStrategy bigConf) |
||||||
|
rState <- loadState (stateKey inst) >>= liftIO . newIORef |
||||||
|
rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef |
||||||
|
localH <- liftIO $ openFile (logBasePath cfg <> "/" <> T.unpack (strategyId inst) <> ".log") AppendMode |
||||||
|
liftIO $ hSetBuffering localH LineBuffering |
||||||
|
let robotLogAction = hoistLogAction liftIO ioLogger <> (fmtMessage >$< logTextHandle localH) |
||||||
|
stratEnv <- liftIO $ newIORef StrategyEnvironment |
||||||
|
{ |
||||||
|
_seInstanceId = strategyId inst, |
||||||
|
_seAccount = accountId inst, |
||||||
|
_seVolume = 1, |
||||||
|
_seLastTimestamp = now |
||||||
|
} |
||||||
|
let robotEnv = |
||||||
|
RobotEnv rState rConf rTimers barsMap tickerInfoMap stratEnv robotLogAction broService (toBarSeriesId <$> (firstTicker :| restTickers)) |
||||||
|
robot <- createRobotDriverThread inst desc (\a -> (flip runReaderT robotEnv . unRobotM) a `finally` hClose localH) bigConf rConf rState rTimers |
||||||
|
robotsMap' <- asks peRobots |
||||||
|
liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ())) |
||||||
|
_ -> logWith lLogger Error (strategyId inst) "No tickers configured !!!" |
||||||
|
Nothing -> logWith lLogger Error "Junction" $ "Unknown strategy: " <> strategyBaseName inst |
||||||
|
|
||||||
|
where |
||||||
|
toBarSeriesId t = BarSeriesId (tickerId t) (timeframe t) |
||||||
|
|
||||||
|
saveRobots :: JunctionM () |
||||||
|
saveRobots = do |
||||||
|
robotsMap <- asks peRobots >>= (liftIO . readIORef) |
||||||
|
traverse_ saveRobotState robotsMap |
||||||
|
|
||||||
|
saveRobotState :: RobotDriverHandle -> JunctionM () |
||||||
|
saveRobotState handle = onStrategyInstance handle $ \inst -> do |
||||||
|
currentState <- liftIO $ readIORef (strategyState inst) |
||||||
|
saveState currentState (strategyInstanceId inst) |
||||||
|
currentTimers <- liftIO $ readIORef (strategyTimers inst) |
||||||
|
saveState currentTimers (strategyInstanceId inst <> ":timers") |
||||||
|
|
||||||
|
reloadConfig :: T.Text -> JunctionM (Either T.Text ()) |
||||||
|
reloadConfig instId = flip catchAny (\_ -> return $ Left "Exception") $ do |
||||||
|
robotsMap' <- asks peRobots |
||||||
|
robots <- liftIO $ readIORef robotsMap' |
||||||
|
case M.lookup instId robots of |
||||||
|
Just robot -> do |
||||||
|
onStrategyInstanceM robot |
||||||
|
(\inst -> do |
||||||
|
let instDesc = getInstanceDescriptor robot |
||||||
|
bigConf <- loadConfig (configKey instDesc) |
||||||
|
liftIO $ writeIORef (strategyConfig inst) (confStrategy bigConf)) |
||||||
|
return $ Right () |
||||||
|
Nothing -> return $ Left "Unable to load config" |
||||||
|
|
||||||
|
getState :: T.Text -> JunctionM (Either T.Text B.ByteString) |
||||||
|
getState instId = do |
||||||
|
robotsMap' <- asks peRobots |
||||||
|
robots <- liftIO $ readIORef robotsMap' |
||||||
|
case M.lookup instId robots of |
||||||
|
Just robot -> do |
||||||
|
Right <$> onStrategyInstanceM robot |
||||||
|
(\inst -> do |
||||||
|
v <- liftIO $ readIORef (strategyState inst) |
||||||
|
return $ BL.toStrict $ encode v) |
||||||
|
Nothing -> return $ Left $ "Unknown robot: " <> instId |
||||||
|
|
||||||
|
setState :: T.Text -> B.ByteString -> JunctionM (Either T.Text ()) |
||||||
|
setState instId newState = do |
||||||
|
robotsMap' <- asks peRobots |
||||||
|
robots <- liftIO $ readIORef robotsMap' |
||||||
|
case M.lookup instId robots of |
||||||
|
Just robot -> do |
||||||
|
onStrategyInstanceM robot |
||||||
|
(\inst -> do |
||||||
|
case decode . BL.fromStrict $ newState of |
||||||
|
Just newS -> do |
||||||
|
liftIO $ writeIORef (strategyState inst) newS |
||||||
|
return $ Right () |
||||||
|
Nothing -> return $ Left $ "Unable to decode state for " <> instId) |
||||||
|
Nothing -> return $ Left $ "Unknown robot: " <> instId |
||||||
@ -0,0 +1,72 @@ |
|||||||
|
{-# LANGUAGE DeriveGeneric #-} |
||||||
|
{-# LANGUAGE NamedFieldPuns #-} |
||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
|
{-# LANGUAGE RecordWildCards #-} |
||||||
|
|
||||||
|
module ATrade.Driver.Junction.ProgramConfiguration |
||||||
|
( |
||||||
|
ProgramOptions(..), |
||||||
|
ProgramConfiguration(..) |
||||||
|
) where |
||||||
|
import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor) |
||||||
|
import ATrade.Logging (Severity (..)) |
||||||
|
import qualified Data.Text as T |
||||||
|
import Dhall (FromDhall, autoWith) |
||||||
|
import Dhall.Core (Expr (..), FieldSelection (..)) |
||||||
|
import qualified Dhall.Map |
||||||
|
import Dhall.Marshal.Decode (Decoder (..), typeError) |
||||||
|
import GHC.Generics (Generic) |
||||||
|
|
||||||
|
newtype ProgramOptions = |
||||||
|
ProgramOptions |
||||||
|
{ |
||||||
|
configPath :: FilePath |
||||||
|
} |
||||||
|
|
||||||
|
data ProgramConfiguration = |
||||||
|
ProgramConfiguration |
||||||
|
{ |
||||||
|
brokerEndpoint :: T.Text, |
||||||
|
brokerNotificationEndpoint :: T.Text, |
||||||
|
brokerServerCert :: Maybe FilePath, |
||||||
|
brokerClientCert :: Maybe FilePath, |
||||||
|
brokerIdentity :: T.Text, |
||||||
|
quotesourceEndpoint :: T.Text, |
||||||
|
quotesourceServerCert :: Maybe FilePath, |
||||||
|
quotesourceClientCert :: Maybe FilePath, |
||||||
|
qhpEndpoint :: T.Text, |
||||||
|
qtisEndpoint :: T.Text, |
||||||
|
remoteControlEndpoint :: T.Text, |
||||||
|
redisSocket :: T.Text, |
||||||
|
robotsConfigsPath :: FilePath, |
||||||
|
logBasePath :: FilePath, |
||||||
|
logLevels :: [(T.Text, Severity)], |
||||||
|
instances :: [StrategyInstanceDescriptor] |
||||||
|
} deriving (Generic, Show) |
||||||
|
|
||||||
|
instance FromDhall Severity where |
||||||
|
autoWith _ = Decoder {..} |
||||||
|
where |
||||||
|
extract expr@(Field _ FieldSelection{ fieldSelectionLabel }) = |
||||||
|
case fieldSelectionLabel of |
||||||
|
"Trace" -> pure Trace |
||||||
|
"Debug" -> pure Debug |
||||||
|
"Info" -> pure Info |
||||||
|
"Warning" -> pure Warning |
||||||
|
"Error" -> pure Error |
||||||
|
_ -> typeError expected expr |
||||||
|
extract expr = typeError expected expr |
||||||
|
|
||||||
|
expected = pure |
||||||
|
(Union |
||||||
|
(Dhall.Map.fromList |
||||||
|
[ ("Trace", Nothing) |
||||||
|
, ("Debug", Nothing) |
||||||
|
, ("Info", Nothing) |
||||||
|
, ("Warning", Nothing) |
||||||
|
, ("Error", Nothing) |
||||||
|
] |
||||||
|
) |
||||||
|
) |
||||||
|
|
||||||
|
instance FromDhall ProgramConfiguration |
||||||
@ -0,0 +1,30 @@ |
|||||||
|
{-# LANGUAGE DeriveGeneric #-} |
||||||
|
|
||||||
|
module ATrade.Driver.Junction.QuoteStream |
||||||
|
( |
||||||
|
QuoteSubscription(..), |
||||||
|
QuoteStream(..), |
||||||
|
SubscriptionId(..) |
||||||
|
) where |
||||||
|
|
||||||
|
import ATrade.QuoteSource.Client (QuoteData) |
||||||
|
import ATrade.Types (BarTimeframe, TickerId) |
||||||
|
import Control.Concurrent.BoundedChan (BoundedChan) |
||||||
|
import Data.Hashable (Hashable) |
||||||
|
import GHC.Generics (Generic) |
||||||
|
|
||||||
|
data QuoteSubscription = |
||||||
|
QuoteSubscription TickerId BarTimeframe |
||||||
|
deriving (Generic, Eq) |
||||||
|
|
||||||
|
instance Hashable BarTimeframe |
||||||
|
instance Hashable QuoteSubscription |
||||||
|
|
||||||
|
newtype SubscriptionId = SubscriptionId { unSubscriptionId :: Int } |
||||||
|
deriving (Show, Eq, Generic) |
||||||
|
|
||||||
|
instance Hashable SubscriptionId |
||||||
|
|
||||||
|
class (Monad m) => QuoteStream m where |
||||||
|
addSubscription :: QuoteSubscription -> BoundedChan QuoteData -> m SubscriptionId |
||||||
|
removeSubscription :: SubscriptionId -> m () |
||||||
@ -0,0 +1,304 @@ |
|||||||
|
{-# LANGUAGE DeriveGeneric #-} |
||||||
|
{-# LANGUAGE FlexibleContexts #-} |
||||||
|
{-# LANGUAGE FlexibleInstances #-} |
||||||
|
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
||||||
|
{-# LANGUAGE LambdaCase #-} |
||||||
|
{-# LANGUAGE MultiParamTypeClasses #-} |
||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
|
{-# LANGUAGE ScopedTypeVariables #-} |
||||||
|
{-# LANGUAGE TypeSynonymInstances #-} |
||||||
|
|
||||||
|
module ATrade.Driver.Junction.QuoteThread |
||||||
|
( |
||||||
|
QuoteThreadHandle, |
||||||
|
startQuoteThread, |
||||||
|
stopQuoteThread, |
||||||
|
addSubscription, |
||||||
|
removeSubscription, |
||||||
|
DownloaderM, |
||||||
|
DownloaderEnv(..), |
||||||
|
runDownloaderM, |
||||||
|
withQThread |
||||||
|
) where |
||||||
|
|
||||||
|
import qualified ATrade.BarAggregator as BA |
||||||
|
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..)) |
||||||
|
import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..), |
||||||
|
SubscriptionId (SubscriptionId)) |
||||||
|
import ATrade.Logging (Message, logDebug, |
||||||
|
logInfo, |
||||||
|
logWarning) |
||||||
|
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) |
||||||
|
import ATrade.Quotes.QHP (QHPHandle, requestHistoryFromQHP) |
||||||
|
import ATrade.Quotes.QTIS (TickerInfo (tiLotSize, tiTickSize, tiTicker), |
||||||
|
qtisGetTickersInfo) |
||||||
|
import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) |
||||||
|
import ATrade.QuoteSource.Client (QuoteData (QDBar, QDTick), |
||||||
|
QuoteSourceClientHandle, |
||||||
|
quoteSourceClientSubscribe, |
||||||
|
startQuoteSourceClient, |
||||||
|
stopQuoteSourceClient) |
||||||
|
import ATrade.RoboCom.Types (Bar (barSecurity), |
||||||
|
BarSeries (..), |
||||||
|
BarSeriesId (BarSeriesId), |
||||||
|
Bars, |
||||||
|
InstrumentParameters (InstrumentParameters), |
||||||
|
TickerInfoMap) |
||||||
|
import ATrade.Types (BarTimeframe (BarTimeframe), |
||||||
|
ClientSecurityParams (ClientSecurityParams), |
||||||
|
Tick (security), |
||||||
|
TickerId) |
||||||
|
import Colog (HasLog (getLogAction, setLogAction), |
||||||
|
LogAction, |
||||||
|
WithLog) |
||||||
|
import Control.Concurrent (ThreadId, forkIO, |
||||||
|
killThread) |
||||||
|
import Control.Concurrent.BoundedChan (BoundedChan, |
||||||
|
newBoundedChan, |
||||||
|
readChan, |
||||||
|
tryWriteChan, |
||||||
|
writeChan) |
||||||
|
import Control.Exception.Safe (MonadMask, |
||||||
|
MonadThrow, |
||||||
|
bracket) |
||||||
|
import Control.Monad (forM, forM_, |
||||||
|
forever) |
||||||
|
import Control.Monad.Reader (MonadIO (liftIO), ReaderT (runReaderT), |
||||||
|
lift) |
||||||
|
import Control.Monad.Reader.Class (MonadReader, asks) |
||||||
|
import qualified Data.HashMap.Strict as HM |
||||||
|
import Data.IORef (IORef, |
||||||
|
atomicModifyIORef', |
||||||
|
newIORef, |
||||||
|
readIORef) |
||||||
|
import qualified Data.Map.Strict as M |
||||||
|
import qualified Data.Text as T |
||||||
|
import Data.Time (addUTCTime, |
||||||
|
getCurrentTime) |
||||||
|
import System.ZMQ4 (Context) |
||||||
|
import System.ZMQ4.ZAP (loadCertificateFromFile) |
||||||
|
|
||||||
|
|
||||||
|
data QuoteThreadHandle = QuoteThreadHandle ThreadId ThreadId QuoteThreadEnv |
||||||
|
|
||||||
|
data QuoteThreadEnv = |
||||||
|
QuoteThreadEnv |
||||||
|
{ |
||||||
|
bars :: IORef Bars, |
||||||
|
endpoints :: IORef (HM.HashMap QuoteSubscription [(SubscriptionId, BoundedChan QuoteData)]), |
||||||
|
qsclient :: QuoteSourceClientHandle, |
||||||
|
paramsCache :: IORef TickerInfoMap, |
||||||
|
downloaderChan :: BoundedChan QuoteSubscription, |
||||||
|
subscriptionIdCounter :: IORef Int, |
||||||
|
subscriptions :: IORef (HM.HashMap SubscriptionId QuoteSubscription), |
||||||
|
aggregators :: IORef (HM.HashMap (TickerId, BarTimeframe) BA.BarAggregator) |
||||||
|
} |
||||||
|
|
||||||
|
startQuoteThread :: (MonadIO m, |
||||||
|
MonadIO m1, |
||||||
|
WithLog env Message m1, |
||||||
|
HistoryProvider m1, |
||||||
|
TickerInfoProvider m1) => |
||||||
|
IORef Bars -> |
||||||
|
IORef TickerInfoMap -> |
||||||
|
Context -> |
||||||
|
T.Text -> |
||||||
|
ClientSecurityParams -> |
||||||
|
(m1 () -> IO ()) -> |
||||||
|
LogAction IO Message -> |
||||||
|
m QuoteThreadHandle |
||||||
|
startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do |
||||||
|
chan <- liftIO $ newBoundedChan 2000 |
||||||
|
dChan <- liftIO $ newBoundedChan 2000 |
||||||
|
qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger |
||||||
|
env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan <*> newIORef 0 <*> newIORef HM.empty <*> newIORef HM.empty |
||||||
|
tid <- liftIO . forkIO $ quoteThread env chan |
||||||
|
downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) |
||||||
|
return $ QuoteThreadHandle tid downloaderTid env |
||||||
|
where |
||||||
|
downloaderThread env chan = do |
||||||
|
logInfo "QuoteThread" "Started" |
||||||
|
forever $ do |
||||||
|
QuoteSubscription tickerid tf <- liftIO $ readChan chan |
||||||
|
logInfo "QuoteThread" $ "Subscription: " <> tickerid |
||||||
|
paramsMap <- liftIO $ readIORef $ paramsCache env |
||||||
|
mbParams <- case M.lookup tickerid paramsMap of |
||||||
|
Nothing -> do |
||||||
|
paramsList <- getInstrumentParameters [tickerid] |
||||||
|
case paramsList of |
||||||
|
(params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params)) |
||||||
|
_ -> return Nothing |
||||||
|
Just params -> return $ Just params |
||||||
|
logDebug "QuoteThread" $ "Got info params: " <> (T.pack . show $ mbParams) |
||||||
|
barsMap <- liftIO $ readIORef (bars env) |
||||||
|
case M.lookup (BarSeriesId tickerid tf) barsMap of |
||||||
|
Just _ -> return () -- already downloaded |
||||||
|
Nothing -> case mbParams of |
||||||
|
Just params -> do |
||||||
|
now <- liftIO getCurrentTime |
||||||
|
-- Load data in interval [today - 60days; today + 1day]. +1 day guarantees that we will download data up until current time. |
||||||
|
-- If we don't make this adjustment it is possible that we will get data only up to beginning of current day. |
||||||
|
barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) (86400 `addUTCTime` now) |
||||||
|
let barSeries = BarSeries tickerid tf barsData params |
||||||
|
liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) |
||||||
|
_ -> logWarning "QuoteThread" $ "Unable to find parameters for: " <> (T.pack . show $ BarSeriesId tickerid tf) |
||||||
|
|
||||||
|
pushToBarAggregators tick = forM_ (BarTimeframe <$> [60, 300, 900, 3600]) (pushTickToAggregator tick) |
||||||
|
|
||||||
|
pushTickToAggregator tick tf = do |
||||||
|
aggsRef <- asks aggregators |
||||||
|
aggs <- liftIO . readIORef $ aggsRef |
||||||
|
let key = (security tick, tf) |
||||||
|
case HM.lookup key aggs of |
||||||
|
Just agg -> do |
||||||
|
let (mbar, agg') = BA.handleTick tick agg |
||||||
|
liftIO $ atomicModifyIORef' aggsRef (\m -> (HM.insert key agg' m, ())) |
||||||
|
barsRef' <- asks bars |
||||||
|
case mbar of |
||||||
|
Just bar -> do |
||||||
|
liftIO $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ())) |
||||||
|
writeBarData bar tf (QDBar (tf, bar)) |
||||||
|
_ -> do |
||||||
|
pure () |
||||||
|
_ -> do |
||||||
|
let agg = BA.mkAggregatorFromBars (M.singleton (security tick) (BarSeries (security tick) tf [] (InstrumentParameters (security tick) 1 1))) [(0, 86400)] |
||||||
|
liftIO $ atomicModifyIORef' aggsRef (\m -> (HM.insert key agg m, ())) |
||||||
|
|
||||||
|
quoteThread env chan = flip runReaderT env $ forever $ do |
||||||
|
qssData <- lift $ readChan chan |
||||||
|
case qssData of |
||||||
|
QDBar (tf, bar) -> do |
||||||
|
barsRef' <- asks bars |
||||||
|
lift $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ())) |
||||||
|
writeBarData bar tf qssData |
||||||
|
QDTick tick -> do |
||||||
|
pushToBarAggregators tick |
||||||
|
writeTickData tick qssData |
||||||
|
|
||||||
|
writeTickData tick qssData = do |
||||||
|
let key = QuoteSubscription (security tick) (BarTimeframe 0) |
||||||
|
subs <- asks endpoints >>= (lift . readIORef) |
||||||
|
case HM.lookup key subs of |
||||||
|
Just clientChannels -> do |
||||||
|
lift $ mapM_ (\(_, chan') -> tryWriteChan chan' qssData) clientChannels |
||||||
|
Nothing -> return () |
||||||
|
|
||||||
|
writeBarData bar tf qssData = do |
||||||
|
let key = QuoteSubscription (barSecurity bar) tf |
||||||
|
subs <- asks endpoints >>= (lift . readIORef) |
||||||
|
case HM.lookup key subs of |
||||||
|
Just clientChannels -> do |
||||||
|
lift $ mapM_ (\(_, chan') -> tryWriteChan chan' qssData) clientChannels |
||||||
|
Nothing -> return () |
||||||
|
|
||||||
|
stopQuoteThread :: (MonadIO m) => QuoteThreadHandle -> m () |
||||||
|
stopQuoteThread (QuoteThreadHandle tid dtid env) = liftIO $ do |
||||||
|
killThread tid |
||||||
|
killThread dtid |
||||||
|
stopQuoteSourceClient (qsclient env) |
||||||
|
|
||||||
|
addSubscription :: (MonadIO m) => QuoteThreadHandle -> TickerId -> BarTimeframe -> BoundedChan QuoteData -> m SubscriptionId |
||||||
|
addSubscription (QuoteThreadHandle _ _ env) tid tf chan = liftIO $ do |
||||||
|
cnt <- atomicModifyIORef' (subscriptionIdCounter env) (\c -> (c + 1, c)) |
||||||
|
let subscription = QuoteSubscription tid tf |
||||||
|
let subid = SubscriptionId cnt |
||||||
|
writeChan (downloaderChan env) subscription |
||||||
|
atomicModifyIORef' (endpoints env) (\m -> (doAddSubscription m subid tid, ())) |
||||||
|
atomicModifyIORef' (subscriptions env) (\m -> (HM.insert subid subscription m, ())) |
||||||
|
quoteSourceClientSubscribe (qsclient env) [(tid, BarTimeframe 0)] |
||||||
|
return subid |
||||||
|
where |
||||||
|
doAddSubscription m subid tickerid = |
||||||
|
let m1 = HM.alter (\case |
||||||
|
Just chans -> Just ((subid, chan) : chans) |
||||||
|
_ -> Just [(subid, chan)]) (QuoteSubscription tickerid tf) m in |
||||||
|
HM.alter (\case |
||||||
|
Just chans -> Just ((subid, chan) : chans) |
||||||
|
_ -> Just [(subid, chan)]) (QuoteSubscription tickerid (BarTimeframe 0)) m1 |
||||||
|
|
||||||
|
removeSubscription :: (MonadIO m) => QuoteThreadHandle -> SubscriptionId -> m () |
||||||
|
removeSubscription (QuoteThreadHandle _ _ env) subId = liftIO $ do |
||||||
|
subs <- readIORef (subscriptions env) |
||||||
|
case HM.lookup subId subs of |
||||||
|
Just sub -> atomicModifyIORef' (endpoints env) (\m -> (doRemoveSubscription m sub, ())) |
||||||
|
Nothing -> return () |
||||||
|
where |
||||||
|
doRemoveSubscription m sub = |
||||||
|
let m1 = HM.adjust (filter (\(subId', _) -> subId' == subId)) sub m in |
||||||
|
HM.adjust (filter (\(subId', _) -> subId' == subId)) (sub0 sub) m1 |
||||||
|
sub0 sub = let QuoteSubscription tid _ = sub in QuoteSubscription tid (BarTimeframe 0) |
||||||
|
|
||||||
|
updateBarsMap :: Bars -> Bar -> BarTimeframe -> Bars |
||||||
|
updateBarsMap barsMap bar tf = M.adjust (addToSeries bar) (BarSeriesId (barSecurity bar) tf) barsMap |
||||||
|
|
||||||
|
addToSeries :: Bar -> BarSeries -> BarSeries |
||||||
|
addToSeries bar series = series { bsBars = bar : bsBars series } |
||||||
|
|
||||||
|
data DownloaderEnv = |
||||||
|
DownloaderEnv |
||||||
|
{ |
||||||
|
qhp :: QHPHandle, |
||||||
|
downloaderContext :: Context, |
||||||
|
downloaderQtisEndpoint :: T.Text, |
||||||
|
logAction :: LogAction DownloaderM Message |
||||||
|
} |
||||||
|
|
||||||
|
newtype DownloaderM a = DownloaderM { unDownloaderM :: ReaderT DownloaderEnv IO a } |
||||||
|
deriving (Functor, Applicative, Monad, MonadReader DownloaderEnv, MonadIO, MonadThrow) |
||||||
|
|
||||||
|
instance HasLog DownloaderEnv Message DownloaderM where |
||||||
|
getLogAction = logAction |
||||||
|
setLogAction a e = e { logAction = a } |
||||||
|
|
||||||
|
instance HistoryProvider DownloaderM where |
||||||
|
getHistory tid tf from to = do |
||||||
|
q <- asks qhp |
||||||
|
requestHistoryFromQHP q tid tf from to |
||||||
|
|
||||||
|
instance TickerInfoProvider DownloaderM where |
||||||
|
getInstrumentParameters tickers = do |
||||||
|
ctx <- asks downloaderContext |
||||||
|
ep <- asks downloaderQtisEndpoint |
||||||
|
tis <- forM tickers (qtisGetTickersInfo ctx ep) |
||||||
|
pure $ convert `fmap` tis |
||||||
|
where |
||||||
|
convert ti = InstrumentParameters |
||||||
|
(tiTicker ti) |
||||||
|
(fromInteger $ tiLotSize ti) |
||||||
|
(tiTickSize ti) |
||||||
|
|
||||||
|
withQThread :: |
||||||
|
DownloaderEnv |
||||||
|
-> IORef Bars |
||||||
|
-> IORef TickerInfoMap |
||||||
|
-> ProgramConfiguration |
||||||
|
-> Context |
||||||
|
-> LogAction IO Message |
||||||
|
-> (QuoteThreadHandle -> IO ()) |
||||||
|
-> IO () |
||||||
|
withQThread env barsMap tiMap cfg ctx logger f = do |
||||||
|
securityParameters <- loadSecurityParameters |
||||||
|
bracket |
||||||
|
(startQuoteThread |
||||||
|
barsMap |
||||||
|
tiMap |
||||||
|
ctx |
||||||
|
(quotesourceEndpoint cfg) |
||||||
|
securityParameters |
||||||
|
(runDownloaderM env) |
||||||
|
logger) |
||||||
|
stopQuoteThread f |
||||||
|
where |
||||||
|
loadSecurityParameters = |
||||||
|
case (quotesourceClientCert cfg, quotesourceServerCert cfg) of |
||||||
|
(Just clientCertPath, Just serverCertPath) -> do |
||||||
|
eClientCert <- loadCertificateFromFile clientCertPath |
||||||
|
eServerCert <- loadCertificateFromFile serverCertPath |
||||||
|
case (eClientCert, eServerCert) of |
||||||
|
(Right clientCert, Right serverCert) -> return $ ClientSecurityParams (Just clientCert) (Just serverCert) |
||||||
|
(_, _) -> return $ ClientSecurityParams Nothing Nothing |
||||||
|
|
||||||
|
_ -> return $ ClientSecurityParams Nothing Nothing |
||||||
|
|
||||||
|
runDownloaderM :: DownloaderEnv -> DownloaderM () -> IO () |
||||||
|
runDownloaderM env = (`runReaderT` env) . unDownloaderM |
||||||
@ -0,0 +1,151 @@ |
|||||||
|
{-# LANGUAGE FlexibleContexts #-} |
||||||
|
{-# LANGUAGE MultiWayIf #-} |
||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
|
|
||||||
|
module ATrade.Driver.Junction.RemoteControl |
||||||
|
( |
||||||
|
handleRemoteControl |
||||||
|
) where |
||||||
|
|
||||||
|
import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (peLogAction, peRemoteControlSocket, peRobots), |
||||||
|
JunctionM, getState, |
||||||
|
reloadConfig, |
||||||
|
setState, startRobot) |
||||||
|
import ATrade.Driver.Junction.RobotDriverThread (stopRobot) |
||||||
|
import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor) |
||||||
|
import ATrade.Logging (Severity (Info), |
||||||
|
logErrorWith, |
||||||
|
logWith) |
||||||
|
import Control.Monad (unless) |
||||||
|
import Control.Monad.Reader (asks) |
||||||
|
import Data.Aeson (decode) |
||||||
|
import qualified Data.ByteString as B |
||||||
|
import qualified Data.ByteString.Lazy as BL |
||||||
|
import Data.List.NonEmpty (NonEmpty ((:|))) |
||||||
|
import qualified Data.Map.Strict as M |
||||||
|
import qualified Data.Text as T |
||||||
|
import Data.Text.Encoding (decodeUtf8', |
||||||
|
encodeUtf8) |
||||||
|
import System.ZMQ4 (Event (In), |
||||||
|
Poll (Sock), poll, |
||||||
|
receiveMulti, |
||||||
|
sendMulti) |
||||||
|
import UnliftIO (MonadIO (liftIO), |
||||||
|
atomicModifyIORef', |
||||||
|
readIORef) |
||||||
|
|
||||||
|
data RemoteControlResponse = |
||||||
|
ResponseOk |
||||||
|
| ResponseError T.Text |
||||||
|
| ResponseData B.ByteString |
||||||
|
deriving (Show, Eq) |
||||||
|
|
||||||
|
data RemoteControlRequest = |
||||||
|
StartRobot StrategyInstanceDescriptor |
||||||
|
| StopRobot T.Text |
||||||
|
| ReloadConfig T.Text |
||||||
|
| GetState T.Text |
||||||
|
| SetState T.Text B.ByteString |
||||||
|
| Ping |
||||||
|
deriving (Show) |
||||||
|
|
||||||
|
data ParseError = |
||||||
|
UnknownCmd |
||||||
|
| UtfDecodeError |
||||||
|
| JsonDecodeError |
||||||
|
deriving (Show, Eq) |
||||||
|
|
||||||
|
parseRemoteControlRequest :: B.ByteString -> Either ParseError RemoteControlRequest |
||||||
|
parseRemoteControlRequest bs = |
||||||
|
if |
||||||
|
| cmd == "START" -> parseStart |
||||||
|
| cmd == "STOP" -> parseStop |
||||||
|
| cmd == "RELOAD_CONFIG" -> parseReloadConfig |
||||||
|
| cmd == "GET_STATE" -> parseGetState |
||||||
|
| cmd == "SET_STATE" -> parseSetState |
||||||
|
| cmd == "PING" -> Right Ping |
||||||
|
| otherwise -> Left UnknownCmd |
||||||
|
where |
||||||
|
cmd = B.takeWhile (/= 0x20) bs |
||||||
|
rest = B.dropWhile (== 0x20) . B.dropWhile (/= 0x20) $ bs |
||||||
|
|
||||||
|
parseStart = case decode . BL.fromStrict $ rest of |
||||||
|
Just inst -> Right (StartRobot inst) |
||||||
|
Nothing -> Left JsonDecodeError |
||||||
|
|
||||||
|
parseStop = case decodeUtf8' rest of |
||||||
|
Left _ -> Left UtfDecodeError |
||||||
|
Right r -> Right (StopRobot (T.strip r)) |
||||||
|
|
||||||
|
parseReloadConfig = case decodeUtf8' rest of |
||||||
|
Left _ -> Left UtfDecodeError |
||||||
|
Right r -> Right (ReloadConfig (T.strip r)) |
||||||
|
|
||||||
|
parseGetState = case decodeUtf8' (B.takeWhile (/= 0x20) rest) of |
||||||
|
Left _ -> Left UtfDecodeError |
||||||
|
Right r -> Right (GetState r) |
||||||
|
|
||||||
|
parseSetState = case decodeUtf8' (B.takeWhile (/= 0x20) rest) of |
||||||
|
Left _ -> Left UtfDecodeError |
||||||
|
Right r -> Right (SetState r (B.dropWhile (== 0x20) . B.dropWhile (/= 0x20) $ rest)) |
||||||
|
|
||||||
|
|
||||||
|
makeRemoteControlResponse :: RemoteControlResponse -> B.ByteString |
||||||
|
makeRemoteControlResponse ResponseOk = "OK" |
||||||
|
makeRemoteControlResponse (ResponseError msg) = "ERROR " <> encodeUtf8 msg |
||||||
|
makeRemoteControlResponse (ResponseData d) = "DATA\n" <> d |
||||||
|
|
||||||
|
handleRemoteControl :: Int -> JunctionM () |
||||||
|
handleRemoteControl timeout = do |
||||||
|
sock <- asks peRemoteControlSocket |
||||||
|
logger <- asks peLogAction |
||||||
|
evs <- poll (fromIntegral timeout) [Sock sock [In] Nothing] |
||||||
|
case evs of |
||||||
|
(x:_) -> unless (null x) $ do |
||||||
|
frames <- liftIO $ receiveMulti sock |
||||||
|
case frames of |
||||||
|
[peerId, _, rawRequest] -> do |
||||||
|
case parseRemoteControlRequest rawRequest of |
||||||
|
Left err -> logErrorWith logger "RemoteControl" ("Unable to parse request: " <> (T.pack . show) err) |
||||||
|
Right request -> do |
||||||
|
response <- handleRequest request |
||||||
|
liftIO $ sendMulti sock $ peerId :| [B.empty, makeRemoteControlResponse response] |
||||||
|
_ -> logErrorWith logger "RemoteControl" "Invalid incoming request" |
||||||
|
_ -> return () |
||||||
|
where |
||||||
|
handleRequest (StartRobot inst) = do |
||||||
|
startRobot inst |
||||||
|
return ResponseOk |
||||||
|
handleRequest (StopRobot instId) = do |
||||||
|
robotsRef <- asks peRobots |
||||||
|
robots <- readIORef robotsRef |
||||||
|
case M.lookup instId robots of |
||||||
|
Just robot -> do |
||||||
|
logger <- asks peLogAction |
||||||
|
logWith logger Info "RemoteControl" $ "Stopping robot: " <> instId |
||||||
|
stopRobot robot |
||||||
|
liftIO $ atomicModifyIORef' robotsRef (\r -> (M.delete instId r, ())) |
||||||
|
return ResponseOk |
||||||
|
Nothing -> return $ ResponseError $ "Not started: " <> instId |
||||||
|
|
||||||
|
handleRequest (ReloadConfig instId) = do |
||||||
|
res <- reloadConfig instId |
||||||
|
case res of |
||||||
|
Left errmsg -> return $ ResponseError errmsg |
||||||
|
Right () -> return ResponseOk |
||||||
|
|
||||||
|
handleRequest (GetState instId) = do |
||||||
|
res <- getState instId |
||||||
|
case res of |
||||||
|
Left errmsg -> return $ ResponseError errmsg |
||||||
|
Right d -> return $ ResponseData d |
||||||
|
|
||||||
|
handleRequest (SetState instId rawState) = do |
||||||
|
res <- setState instId rawState |
||||||
|
case res of |
||||||
|
Left errmsg -> return $ ResponseError errmsg |
||||||
|
Right () -> return ResponseOk |
||||||
|
|
||||||
|
handleRequest Ping = return ResponseOk |
||||||
|
|
||||||
|
|
||||||
@ -0,0 +1,216 @@ |
|||||||
|
{-# LANGUAGE ExistentialQuantification #-} |
||||||
|
{-# LANGUAGE FlexibleContexts #-} |
||||||
|
{-# LANGUAGE FlexibleInstances #-} |
||||||
|
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
||||||
|
{-# LANGUAGE MultiParamTypeClasses #-} |
||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
|
{-# LANGUAGE RankNTypes #-} |
||||||
|
|
||||||
|
module ATrade.Driver.Junction.RobotDriverThread |
||||||
|
( |
||||||
|
createRobotDriverThread, |
||||||
|
RobotEnv(..), |
||||||
|
RobotM(..), |
||||||
|
RobotDriverHandle, |
||||||
|
onStrategyInstance, |
||||||
|
onStrategyInstanceM, |
||||||
|
postNotificationEvent, |
||||||
|
stopRobot, |
||||||
|
getInstanceDescriptor |
||||||
|
) where |
||||||
|
|
||||||
|
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification)) |
||||||
|
import qualified ATrade.Driver.Junction.BrokerService as Bro |
||||||
|
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), |
||||||
|
QuoteSubscription (QuoteSubscription), |
||||||
|
SubscriptionId) |
||||||
|
import ATrade.Driver.Junction.Types (BigConfig, |
||||||
|
StrategyDescriptor, |
||||||
|
StrategyInstance (StrategyInstance, strategyEventCallback), |
||||||
|
StrategyInstanceDescriptor (configKey), |
||||||
|
confStrategy, |
||||||
|
confTickers, |
||||||
|
eventCallback, stateKey, |
||||||
|
strategyId, tickerId, |
||||||
|
timeframe) |
||||||
|
import ATrade.Logging (Message, log) |
||||||
|
import ATrade.QuoteSource.Client (QuoteData (..)) |
||||||
|
import ATrade.RoboCom.ConfigStorage (ConfigStorage) |
||||||
|
import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderUpdate), |
||||||
|
MonadRobot (..), |
||||||
|
StrategyEnvironment (..)) |
||||||
|
import ATrade.RoboCom.Persistence (MonadPersistence) |
||||||
|
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), |
||||||
|
Bars, TickerInfoMap) |
||||||
|
import ATrade.Types (OrderId, OrderState, |
||||||
|
Tick (value), Trade) |
||||||
|
import Colog (HasLog (getLogAction, setLogAction), |
||||||
|
LogAction) |
||||||
|
import Control.Concurrent (ThreadId, forkIO, |
||||||
|
killThread) |
||||||
|
import Control.Concurrent.BoundedChan (BoundedChan, |
||||||
|
newBoundedChan, readChan, |
||||||
|
writeChan) |
||||||
|
import Control.Exception.Safe (MonadThrow) |
||||||
|
import Control.Monad (forM, forM_, forever, |
||||||
|
void, when) |
||||||
|
import Control.Monad.IO.Class (MonadIO, liftIO) |
||||||
|
import Control.Monad.Reader (MonadReader (local), |
||||||
|
ReaderT, asks) |
||||||
|
import Data.Aeson (FromJSON, ToJSON) |
||||||
|
import Data.Default (Default) |
||||||
|
import Data.IORef (IORef, |
||||||
|
atomicModifyIORef', |
||||||
|
readIORef, writeIORef) |
||||||
|
import Data.List.NonEmpty (NonEmpty) |
||||||
|
import qualified Data.Map.Strict as M |
||||||
|
import qualified Data.Text.Lazy as TL |
||||||
|
import Data.Time (UTCTime, getCurrentTime) |
||||||
|
import Dhall (FromDhall) |
||||||
|
import Prelude hiding (log) |
||||||
|
|
||||||
|
data RobotDriverHandle = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => |
||||||
|
RobotDriverHandle StrategyInstanceDescriptor (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) [SubscriptionId] |
||||||
|
|
||||||
|
data RobotDriverRequest |
||||||
|
|
||||||
|
data RobotDriverEvent = |
||||||
|
EventRequest RobotDriverRequest |
||||||
|
| QuoteEvent QuoteData |
||||||
|
| NewTradeEvent Trade |
||||||
|
| OrderEvent OrderId OrderState |
||||||
|
|
||||||
|
|
||||||
|
robotDriverThread :: (MonadIO m, |
||||||
|
MonadRobot m c s) => |
||||||
|
StrategyInstance c s -> |
||||||
|
BoundedChan RobotDriverEvent -> |
||||||
|
m () |
||||||
|
|
||||||
|
robotDriverThread inst eventQueue = |
||||||
|
forever $ liftIO (readChan eventQueue) >>= handleEvent |
||||||
|
where |
||||||
|
handleEvent (EventRequest _) = return () |
||||||
|
handleEvent (QuoteEvent d) = |
||||||
|
case d of |
||||||
|
QDTick tick -> when (value tick /= 0) $ strategyEventCallback inst (NewTick tick) |
||||||
|
QDBar (tf, bar) -> strategyEventCallback inst (NewBar (tf, bar)) |
||||||
|
handleEvent (NewTradeEvent trade) = strategyEventCallback inst (NewTrade trade) |
||||||
|
handleEvent (OrderEvent oid newState) = strategyEventCallback inst (OrderUpdate oid newState) |
||||||
|
|
||||||
|
createRobotDriverThread :: (MonadIO m1, |
||||||
|
ConfigStorage m1, |
||||||
|
MonadPersistence m1, |
||||||
|
QuoteStream m1, |
||||||
|
Default s, |
||||||
|
FromJSON s, |
||||||
|
ToJSON s, |
||||||
|
FromDhall c, |
||||||
|
MonadIO m, |
||||||
|
MonadReader (RobotEnv c s) m, |
||||||
|
MonadRobot m c s) => |
||||||
|
StrategyInstanceDescriptor |
||||||
|
-> StrategyDescriptor c s |
||||||
|
-> (m () -> IO ()) |
||||||
|
-> BigConfig c |
||||||
|
-> IORef c |
||||||
|
-> IORef s |
||||||
|
-> IORef [UTCTime] |
||||||
|
-> m1 RobotDriverHandle |
||||||
|
|
||||||
|
createRobotDriverThread instDesc strDesc runner bigConf rConf rState rTimers = do |
||||||
|
eventQueue <- liftIO $ newBoundedChan 2000 |
||||||
|
|
||||||
|
let inst = StrategyInstance (strategyId instDesc) (eventCallback strDesc) rState rConf rTimers |
||||||
|
|
||||||
|
quoteQueue <- liftIO $ newBoundedChan 2000 |
||||||
|
subIds <- forM (confTickers bigConf) (\x -> addSubscription (QuoteSubscription (tickerId x) (timeframe x)) quoteQueue) |
||||||
|
qthread <- liftIO . forkIO $ forever $ passQuoteEvents eventQueue quoteQueue |
||||||
|
|
||||||
|
driver <- liftIO . forkIO $ runner $ robotDriverThread inst eventQueue |
||||||
|
return $ RobotDriverHandle instDesc inst driver qthread eventQueue subIds |
||||||
|
|
||||||
|
where |
||||||
|
passQuoteEvents eventQueue quoteQueue = do |
||||||
|
v <- readChan quoteQueue |
||||||
|
writeChan eventQueue (QuoteEvent v) |
||||||
|
|
||||||
|
stopRobot :: (MonadIO m, QuoteStream m) => RobotDriverHandle -> m () |
||||||
|
stopRobot (RobotDriverHandle _ _ driver qthread _ subIds) = do |
||||||
|
forM_ subIds removeSubscription |
||||||
|
liftIO $ killThread driver |
||||||
|
liftIO $ killThread qthread |
||||||
|
|
||||||
|
onStrategyInstance :: RobotDriverHandle -> forall r. (forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> r) -> r |
||||||
|
onStrategyInstance (RobotDriverHandle _ inst _ _ _ _) f = f inst |
||||||
|
|
||||||
|
onStrategyInstanceM :: (MonadIO m) => RobotDriverHandle -> |
||||||
|
(forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> m r) -> m r |
||||||
|
onStrategyInstanceM (RobotDriverHandle _ inst _ _ _ _) f = f inst |
||||||
|
|
||||||
|
data RobotEnv c s = |
||||||
|
RobotEnv |
||||||
|
{ |
||||||
|
stateRef :: IORef s, |
||||||
|
configRef :: IORef c, |
||||||
|
timersRef :: IORef [UTCTime], |
||||||
|
bars :: IORef Bars, |
||||||
|
tickerInfoMap :: IORef TickerInfoMap, |
||||||
|
env :: IORef StrategyEnvironment, |
||||||
|
logAction :: LogAction (RobotM c s) Message, |
||||||
|
brokerService :: Bro.BrokerService, |
||||||
|
tickers :: NonEmpty BarSeriesId |
||||||
|
} |
||||||
|
|
||||||
|
newtype RobotM c s a = RobotM { unRobotM :: ReaderT (RobotEnv c s) IO a } |
||||||
|
deriving (Functor, Applicative, Monad, MonadReader (RobotEnv c s), MonadIO, MonadThrow) |
||||||
|
|
||||||
|
instance HasLog (RobotEnv c s) Message (RobotM c s) where |
||||||
|
getLogAction = logAction |
||||||
|
setLogAction a e = e { logAction = a } |
||||||
|
|
||||||
|
instance MonadRobot (RobotM c s) c s where |
||||||
|
submitOrder order = do |
||||||
|
instId <- _seInstanceId <$> (asks env >>= liftIO . readIORef) |
||||||
|
bro <- asks brokerService |
||||||
|
Bro.submitOrder bro instId order |
||||||
|
|
||||||
|
cancelOrder oid = do |
||||||
|
bro <- asks brokerService |
||||||
|
Bro.cancelOrder bro oid |
||||||
|
|
||||||
|
appendToLog s t = do |
||||||
|
instId <- _seInstanceId <$> (asks env >>= liftIO . readIORef) |
||||||
|
log s instId $ TL.toStrict t |
||||||
|
|
||||||
|
setupTimer t = do |
||||||
|
ref <- asks timersRef |
||||||
|
liftIO $ atomicModifyIORef' ref (\s -> (t : s, ())) |
||||||
|
|
||||||
|
enqueueIOAction = undefined |
||||||
|
getConfig = asks configRef >>= liftIO . readIORef |
||||||
|
getState = asks stateRef >>= liftIO . readIORef |
||||||
|
setState newState = asks stateRef >>= liftIO . flip writeIORef newState |
||||||
|
getEnvironment = do |
||||||
|
ref <- asks env |
||||||
|
now <- liftIO getCurrentTime |
||||||
|
liftIO $ atomicModifyIORef' ref (\e -> (e { _seLastTimestamp = now }, e { _seLastTimestamp = now})) |
||||||
|
|
||||||
|
getTicker tid tf = do |
||||||
|
b <- asks bars >>= liftIO . readIORef |
||||||
|
return $ M.lookup (BarSeriesId tid tf) b |
||||||
|
|
||||||
|
getTickerInfo tid = do |
||||||
|
b <- asks tickerInfoMap >>= liftIO . readIORef |
||||||
|
return $ M.lookup tid b |
||||||
|
|
||||||
|
getAvailableTickers = asks tickers |
||||||
|
|
||||||
|
postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m () |
||||||
|
postNotificationEvent (RobotDriverHandle _ _ _ _ eventQueue _) notification = liftIO $ |
||||||
|
case notification of |
||||||
|
OrderNotification _ oid state -> writeChan eventQueue (OrderEvent oid state) |
||||||
|
TradeNotification _ trade -> writeChan eventQueue (NewTradeEvent trade) |
||||||
|
|
||||||
|
getInstanceDescriptor :: RobotDriverHandle -> StrategyInstanceDescriptor |
||||||
|
getInstanceDescriptor (RobotDriverHandle instDesc _ _ _ _ _) = instDesc |
||||||
@ -1,361 +0,0 @@ |
|||||||
{-# LANGUAGE FlexibleInstances #-} |
|
||||||
{-# LANGUAGE OverloadedStrings #-} |
|
||||||
{-# LANGUAGE TypeSynonymInstances #-} |
|
||||||
|
|
||||||
module ATrade.Quotes.Finam ( |
|
||||||
downloadFinamSymbols, |
|
||||||
Symbol(..), |
|
||||||
Period(..), |
|
||||||
DateFormat(..), |
|
||||||
TimeFormat(..), |
|
||||||
FieldSeparator(..), |
|
||||||
RequestParams(..), |
|
||||||
defaultParams, |
|
||||||
downloadQuotes, |
|
||||||
parseQuotes, |
|
||||||
downloadAndParseQuotes, |
|
||||||
Row(..) |
|
||||||
) where |
|
||||||
|
|
||||||
import ATrade.Types |
|
||||||
import Control.Error.Util |
|
||||||
import Control.Exception |
|
||||||
import Control.Lens |
|
||||||
import Control.Monad |
|
||||||
import qualified Data.ByteString as B |
|
||||||
import qualified Data.ByteString.Char8 as B8 |
|
||||||
import qualified Data.ByteString.Lazy as BL |
|
||||||
import Data.Csv hiding (Options) |
|
||||||
import Data.List |
|
||||||
import qualified Data.Map as M |
|
||||||
import Data.Maybe |
|
||||||
import qualified Data.Text as T |
|
||||||
import qualified Data.Text.ICU.Convert as TC |
|
||||||
import Data.Time.Calendar |
|
||||||
import Data.Time.Clock |
|
||||||
import Data.Time.Format |
|
||||||
import qualified Data.Vector as V |
|
||||||
import Network.Wreq |
|
||||||
import Safe |
|
||||||
import System.Log.Logger |
|
||||||
import Text.Parsec |
|
||||||
import Text.ParserCombinators.Parsec.Number |
|
||||||
|
|
||||||
data Period = |
|
||||||
PeriodTick | |
|
||||||
Period1Min | |
|
||||||
Period5Min | |
|
||||||
Period10Min | |
|
||||||
Period15Min | |
|
||||||
Period30Min | |
|
||||||
PeriodHour | |
|
||||||
PeriodDay | |
|
||||||
PeriodWeek | |
|
||||||
PeriodMonth |
|
||||||
deriving (Show, Eq) |
|
||||||
|
|
||||||
instance Enum Period where |
|
||||||
fromEnum PeriodTick = 1 |
|
||||||
fromEnum Period1Min = 2 |
|
||||||
fromEnum Period5Min = 3 |
|
||||||
fromEnum Period10Min = 4 |
|
||||||
fromEnum Period15Min = 5 |
|
||||||
fromEnum Period30Min = 6 |
|
||||||
fromEnum PeriodHour = 7 |
|
||||||
fromEnum PeriodDay = 8 |
|
||||||
fromEnum PeriodWeek = 9 |
|
||||||
fromEnum PeriodMonth = 10 |
|
||||||
|
|
||||||
toEnum 1 = PeriodTick |
|
||||||
toEnum 2 = Period1Min |
|
||||||
toEnum 3 = Period5Min |
|
||||||
toEnum 4 = Period10Min |
|
||||||
toEnum 5 = Period15Min |
|
||||||
toEnum 6 = Period30Min |
|
||||||
toEnum 7 = PeriodHour |
|
||||||
toEnum 8 = PeriodDay |
|
||||||
toEnum 9 = PeriodWeek |
|
||||||
toEnum 10 = PeriodMonth |
|
||||||
toEnum _ = PeriodDay |
|
||||||
|
|
||||||
data DateFormat = |
|
||||||
FormatYYYYMMDD | |
|
||||||
FormatYYMMDD | |
|
||||||
FormatDDMMYY | |
|
||||||
FormatDD_MM_YY | |
|
||||||
FormatMM_DD_YY |
|
||||||
deriving (Show, Eq) |
|
||||||
|
|
||||||
instance Enum DateFormat where |
|
||||||
fromEnum FormatYYYYMMDD = 1 |
|
||||||
fromEnum FormatYYMMDD = 2 |
|
||||||
fromEnum FormatDDMMYY = 3 |
|
||||||
fromEnum FormatDD_MM_YY = 4 |
|
||||||
fromEnum FormatMM_DD_YY = 5 |
|
||||||
|
|
||||||
toEnum 1 = FormatYYYYMMDD |
|
||||||
toEnum 2 = FormatYYMMDD |
|
||||||
toEnum 3 = FormatDDMMYY |
|
||||||
toEnum 4 = FormatDD_MM_YY |
|
||||||
toEnum 5 = FormatMM_DD_YY |
|
||||||
toEnum _ = FormatYYYYMMDD |
|
||||||
|
|
||||||
|
|
||||||
data TimeFormat = |
|
||||||
FormatHHMMSS | |
|
||||||
FormatHHMM | |
|
||||||
FormatHH_MM_SS | |
|
||||||
FormatHH_MM |
|
||||||
deriving (Show, Eq) |
|
||||||
|
|
||||||
instance Enum TimeFormat where |
|
||||||
fromEnum FormatHHMMSS = 1 |
|
||||||
fromEnum FormatHHMM = 2 |
|
||||||
fromEnum FormatHH_MM_SS = 3 |
|
||||||
fromEnum FormatHH_MM = 4 |
|
||||||
|
|
||||||
toEnum 1 = FormatHHMMSS |
|
||||||
toEnum 2 = FormatHHMM |
|
||||||
toEnum 3 = FormatHH_MM_SS |
|
||||||
toEnum 4 = FormatHH_MM |
|
||||||
toEnum _ = FormatHHMMSS |
|
||||||
|
|
||||||
data FieldSeparator = |
|
||||||
SeparatorComma | |
|
||||||
SeparatorPeriod | |
|
||||||
SeparatorSemicolon | |
|
||||||
SeparatorTab | |
|
||||||
SeparatorSpace |
|
||||||
deriving (Show, Eq) |
|
||||||
|
|
||||||
instance Enum FieldSeparator where |
|
||||||
fromEnum SeparatorComma = 1 |
|
||||||
fromEnum SeparatorPeriod = 2 |
|
||||||
fromEnum SeparatorSemicolon = 3 |
|
||||||
fromEnum SeparatorTab = 4 |
|
||||||
fromEnum SeparatorSpace = 5 |
|
||||||
|
|
||||||
toEnum 1 = SeparatorComma |
|
||||||
toEnum 2 = SeparatorPeriod |
|
||||||
toEnum 3 = SeparatorSemicolon |
|
||||||
toEnum 4 = SeparatorTab |
|
||||||
toEnum 5 = SeparatorSpace |
|
||||||
toEnum _ = SeparatorComma |
|
||||||
|
|
||||||
data RequestParams = RequestParams { |
|
||||||
ticker :: T.Text, |
|
||||||
startDate :: Day, |
|
||||||
endDate :: Day, |
|
||||||
period :: Period, |
|
||||||
dateFormat :: DateFormat, |
|
||||||
timeFormat :: TimeFormat, |
|
||||||
fieldSeparator :: FieldSeparator, |
|
||||||
includeHeader :: Bool, |
|
||||||
fillEmpty :: Bool |
|
||||||
} |
|
||||||
|
|
||||||
defaultParams :: RequestParams |
|
||||||
defaultParams = RequestParams { |
|
||||||
ticker = "", |
|
||||||
startDate = fromGregorian 1970 1 1, |
|
||||||
endDate = fromGregorian 1970 1 1, |
|
||||||
period = PeriodDay, |
|
||||||
dateFormat = FormatYYYYMMDD, |
|
||||||
timeFormat = FormatHHMMSS, |
|
||||||
fieldSeparator = SeparatorComma, |
|
||||||
includeHeader = True, |
|
||||||
fillEmpty = False |
|
||||||
} |
|
||||||
|
|
||||||
data Symbol = Symbol { |
|
||||||
symCode :: T.Text, |
|
||||||
symName :: T.Text, |
|
||||||
symId :: Integer, |
|
||||||
symMarketCode :: Integer, |
|
||||||
symMarketName :: T.Text |
|
||||||
} |
|
||||||
deriving (Show, Eq) |
|
||||||
|
|
||||||
data Row = Row { |
|
||||||
rowTicker :: T.Text, |
|
||||||
rowTime :: UTCTime, |
|
||||||
rowOpen :: Price, |
|
||||||
rowHigh :: Price, |
|
||||||
rowLow :: Price, |
|
||||||
rowClose :: Price, |
|
||||||
rowVolume :: Integer |
|
||||||
} deriving (Show, Eq) |
|
||||||
|
|
||||||
instance FromField Price where |
|
||||||
parseField s = fromDouble <$> (parseField s :: Parser Double) |
|
||||||
|
|
||||||
instance FromRecord Row where |
|
||||||
parseRecord v |
|
||||||
| length v == 9 = do |
|
||||||
tkr <- v .! 0 |
|
||||||
date <- v .! 2 |
|
||||||
time <- v .! 3 |
|
||||||
dt <- addUTCTime (-3 * 3600) <$> (parseDt date time) |
|
||||||
open <- v .! 4 |
|
||||||
high <- v .! 5 |
|
||||||
low <- v .! 6 |
|
||||||
close <- v .! 7 |
|
||||||
vol <- v .! 8 |
|
||||||
return $ Row tkr dt open high low close vol |
|
||||||
| otherwise = mzero |
|
||||||
where |
|
||||||
parseDt :: B.ByteString -> B.ByteString -> Parser UTCTime |
|
||||||
parseDt d t = case parseTimeM True defaultTimeLocale "%Y%m%d %H%M%S" $ B8.unpack d ++ " " ++ B8.unpack t of |
|
||||||
Just dt -> return dt |
|
||||||
Nothing -> fail "Unable to parse date/time" |
|
||||||
|
|
||||||
downloadAndParseQuotes :: RequestParams -> IO (Maybe [Row]) |
|
||||||
downloadAndParseQuotes requestParams = downloadAndParseQuotes' 3 |
|
||||||
where |
|
||||||
downloadAndParseQuotes' iter = do |
|
||||||
raw <- downloadQuotes requestParams `catch` (\e -> do |
|
||||||
debugM "History" $ "exception: " ++ show (e :: SomeException) |
|
||||||
return Nothing) |
|
||||||
case raw of |
|
||||||
Just r -> return $ parseQuotes r |
|
||||||
Nothing -> if iter <= 0 then return Nothing else downloadAndParseQuotes' (iter - 1) |
|
||||||
|
|
||||||
parseQuotes :: B.ByteString -> Maybe [Row] |
|
||||||
parseQuotes csvData = case decode HasHeader $ BL.fromStrict csvData of |
|
||||||
Left _ -> Nothing |
|
||||||
Right d -> Just $ V.toList d |
|
||||||
|
|
||||||
downloadQuotes :: RequestParams -> IO (Maybe B.ByteString) |
|
||||||
downloadQuotes requestParams = do |
|
||||||
symbols <- downloadFinamSymbols |
|
||||||
case requestUrl symbols requestParams of |
|
||||||
Just (url, options') -> do |
|
||||||
resp <- getWith options' url |
|
||||||
return $ Just $ BL.toStrict $ resp ^. responseBody |
|
||||||
Nothing -> return Nothing |
|
||||||
|
|
||||||
requestUrl :: [Symbol] -> RequestParams -> Maybe (String, Options) |
|
||||||
requestUrl symbols requestParams = case getFinamCode symbols (ticker requestParams) of |
|
||||||
Just (sym, market) -> Just ("http://export.finam.ru/export9.out", getOptions sym market) |
|
||||||
Nothing -> Nothing |
|
||||||
where |
|
||||||
getOptions sym market = defaults & |
|
||||||
param "market" .~ [T.pack . show $ market] & |
|
||||||
param "f" .~ [ticker requestParams] & |
|
||||||
param "e" .~ [".csv"] & |
|
||||||
param "dtf" .~ [T.pack . show . fromEnum . dateFormat $ requestParams] & |
|
||||||
param "tmf" .~ [T.pack . show . fromEnum . dateFormat $ requestParams] & |
|
||||||
param "MSOR" .~ ["0"] & |
|
||||||
param "mstime" .~ ["on"] & |
|
||||||
param "mstimever" .~ ["1"] & |
|
||||||
param "sep" .~ [T.pack . show . fromEnum . fieldSeparator $ requestParams] & |
|
||||||
param "sep2" .~ ["1"] & |
|
||||||
param "at" .~ [if includeHeader requestParams then "1" else "0"] & |
|
||||||
param "fsp" .~ [if fillEmpty requestParams then "1" else "0"] & |
|
||||||
param "p" .~ [T.pack . show . fromEnum $ period requestParams] & |
|
||||||
param "em" .~ [T.pack . show $ sym ] & |
|
||||||
param "df" .~ [T.pack . show $ dayFrom] & |
|
||||||
param "mf" .~ [T.pack . show $ (monthFrom - 1)] & |
|
||||||
param "yf" .~ [T.pack . show $ yearFrom] & |
|
||||||
param "dt" .~ [T.pack . show $ dayTo] & |
|
||||||
param "mt" .~ [T.pack . show $ (monthTo - 1)] & |
|
||||||
param "yt" .~ [T.pack . show $ yearTo] & |
|
||||||
param "code" .~ [ticker requestParams] & |
|
||||||
param "datf" .~ if period requestParams == PeriodTick then ["11"] else ["1"] |
|
||||||
(yearFrom, monthFrom, dayFrom) = toGregorian $ startDate requestParams |
|
||||||
(yearTo, monthTo, dayTo) = toGregorian $ endDate requestParams |
|
||||||
|
|
||||||
getFinamCode :: [Symbol] -> T.Text -> Maybe (Integer, Integer) |
|
||||||
getFinamCode symbols tickerCode = case find (\x -> symCode x == tickerCode && symMarketCode x `notElem` archives) symbols of |
|
||||||
Just sym -> Just (symId sym, symMarketCode sym) |
|
||||||
Nothing -> Nothing |
|
||||||
|
|
||||||
downloadFinamSymbols :: IO [Symbol] |
|
||||||
downloadFinamSymbols = do |
|
||||||
conv <- TC.open "cp1251" Nothing |
|
||||||
result <- get "http://www.finam.ru/cache/icharts/icharts.js" |
|
||||||
if result ^. responseStatus . statusCode == 200 |
|
||||||
then return $ parseSymbols . T.lines $ TC.toUnicode conv $ BL.toStrict $ result ^. responseBody |
|
||||||
else return [] |
|
||||||
where |
|
||||||
parseSymbols :: [T.Text] -> [Symbol] |
|
||||||
parseSymbols strs = zipWith5 Symbol codes names ids marketCodes marketNames |
|
||||||
where |
|
||||||
getWithParser parser pos = fromMaybe [] $ do |
|
||||||
s <- T.unpack <$> strs `atMay` pos |
|
||||||
hush $ parse parser "" s |
|
||||||
|
|
||||||
ids :: [Integer] |
|
||||||
ids = getWithParser intlist 0 |
|
||||||
|
|
||||||
names :: [T.Text] |
|
||||||
names = T.pack <$> getWithParser strlist 1 |
|
||||||
|
|
||||||
codes :: [T.Text] |
|
||||||
codes = T.pack <$> getWithParser strlist 2 |
|
||||||
|
|
||||||
marketCodes :: [Integer] |
|
||||||
marketCodes = getWithParser intlist 3 |
|
||||||
|
|
||||||
marketNames :: [T.Text] |
|
||||||
marketNames = fmap (\code -> fromMaybe "" $ M.lookup code codeToName) marketCodes |
|
||||||
|
|
||||||
intlist = do |
|
||||||
_ <- string "var" |
|
||||||
spaces |
|
||||||
skipMany1 alphaNum |
|
||||||
spaces |
|
||||||
_ <- char '=' |
|
||||||
spaces |
|
||||||
_ <- char '[' |
|
||||||
manyTill (do |
|
||||||
i <- int |
|
||||||
_ <- char ',' <|> char ']' |
|
||||||
return i) (char '\'' <|> char ';') |
|
||||||
|
|
||||||
strlist = do |
|
||||||
_ <- string "var" |
|
||||||
spaces |
|
||||||
skipMany1 alphaNum |
|
||||||
spaces |
|
||||||
_ <- char '=' |
|
||||||
spaces |
|
||||||
_ <- char '[' |
|
||||||
(char '\'' >> manyTill ((char '\\' >> char '\'') <|> anyChar) (char '\'')) `sepBy` char ',' |
|
||||||
|
|
||||||
codeToName :: M.Map Integer T.Text |
|
||||||
codeToName = M.fromList [ |
|
||||||
(200, "МосБиржа топ"), |
|
||||||
(1 , "МосБиржа акции"), |
|
||||||
(14 , "МосБиржа фьючерсы"), |
|
||||||
(41, "Курс рубля"), |
|
||||||
(45, "МосБиржа валютный рынок"), |
|
||||||
(2, "МосБиржа облигации"), |
|
||||||
(12, "МосБиржа внесписочные облигации"), |
|
||||||
(29, "МосБиржа пифы"), |
|
||||||
(8, "Расписки"), |
|
||||||
(6, "Мировые Индексы"), |
|
||||||
(24, "Товары"), |
|
||||||
(5, "Мировые валюты"), |
|
||||||
(25, "Акции США(BATS)"), |
|
||||||
(7, "Фьючерсы США"), |
|
||||||
(27, "Отрасли экономики США"), |
|
||||||
(26, "Гособлигации США"), |
|
||||||
(28, "ETF"), |
|
||||||
(30, "Индексы мировой экономики"), |
|
||||||
(3, "РТС"), |
|
||||||
(20, "RTS Board"), |
|
||||||
(10, "РТС-GAZ"), |
|
||||||
(17, "ФОРТС Архив"), |
|
||||||
(31, "Сырье Архив"), |
|
||||||
(38, "RTS Standard Архив"), |
|
||||||
(16, "ММВБ Архив"), |
|
||||||
(18, "РТС Архив"), |
|
||||||
(9, "СПФБ Архив"), |
|
||||||
(32, "РТС-BOARD Архив"), |
|
||||||
(39, "Расписки Архив"), |
|
||||||
(-1, "Отрасли") ] |
|
||||||
|
|
||||||
|
|
||||||
archives :: [Integer] |
|
||||||
archives = [3, 8, 16, 17, 18, 31, 32, 38, 39, 517] |
|
||||||
@ -0,0 +1,12 @@ |
|||||||
|
|
||||||
|
module ATrade.Quotes.HistoryProvider |
||||||
|
( |
||||||
|
HistoryProvider(..) |
||||||
|
) where |
||||||
|
|
||||||
|
import ATrade.RoboCom.Types (Bar) |
||||||
|
import ATrade.Types (BarTimeframe, TickerId) |
||||||
|
import Data.Time (UTCTime) |
||||||
|
|
||||||
|
class (Monad m) => HistoryProvider m where |
||||||
|
getHistory :: TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] |
||||||
@ -0,0 +1,12 @@ |
|||||||
|
|
||||||
|
module ATrade.Quotes.TickerInfoProvider |
||||||
|
( |
||||||
|
TickerInfoProvider(..) |
||||||
|
) where |
||||||
|
|
||||||
|
import ATrade.RoboCom.Types (InstrumentParameters) |
||||||
|
import ATrade.Types (TickerId) |
||||||
|
|
||||||
|
class (Monad m) => TickerInfoProvider m where |
||||||
|
getInstrumentParameters :: [TickerId] -> m [InstrumentParameters] |
||||||
|
|
||||||
@ -0,0 +1,14 @@ |
|||||||
|
{-# LANGUAGE RankNTypes #-} |
||||||
|
|
||||||
|
module ATrade.RoboCom.ConfigStorage |
||||||
|
( |
||||||
|
ConfigStorage(..) |
||||||
|
) where |
||||||
|
|
||||||
|
import qualified Data.Text as T |
||||||
|
import Dhall (FromDhall) |
||||||
|
|
||||||
|
class (Monad m) => ConfigStorage m where |
||||||
|
loadConfig :: forall c. (FromDhall c) => T.Text -> m c |
||||||
|
|
||||||
|
|
||||||
@ -0,0 +1,16 @@ |
|||||||
|
{-# LANGUAGE RankNTypes #-} |
||||||
|
|
||||||
|
module ATrade.RoboCom.Persistence |
||||||
|
( |
||||||
|
MonadPersistence(..) |
||||||
|
) where |
||||||
|
|
||||||
|
import Data.Aeson |
||||||
|
import Data.Default (Default) |
||||||
|
import qualified Data.Text as T |
||||||
|
|
||||||
|
class (Monad m) => MonadPersistence m where |
||||||
|
saveState :: forall s. (ToJSON s) => s -> T.Text -> m () |
||||||
|
loadState :: forall s. (Default s, FromJSON s) => T.Text -> m s |
||||||
|
|
||||||
|
|
||||||
@ -0,0 +1,117 @@ |
|||||||
|
{-# LANGUAGE FlexibleInstances #-} |
||||||
|
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
||||||
|
{-# LANGUAGE MultiParamTypeClasses #-} |
||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
|
{-# LANGUAGE TypeSynonymInstances #-} |
||||||
|
|
||||||
|
module Test.Driver.Junction.QuoteThread |
||||||
|
( |
||||||
|
unitTests |
||||||
|
) where |
||||||
|
|
||||||
|
import Test.Tasty |
||||||
|
import Test.Tasty.HUnit |
||||||
|
import Test.Tasty.QuickCheck as QC |
||||||
|
import Test.Tasty.SmallCheck as SC |
||||||
|
|
||||||
|
import ATrade.Driver.Junction.QuoteThread (addSubscription, |
||||||
|
startQuoteThread, |
||||||
|
stopQuoteThread) |
||||||
|
import ATrade.Logging (Message) |
||||||
|
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) |
||||||
|
import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) |
||||||
|
import ATrade.QuoteSource.Client (QuoteData (QDBar)) |
||||||
|
import ATrade.QuoteSource.Server (QuoteSourceServerData (..), |
||||||
|
startQuoteSourceServer, |
||||||
|
stopQuoteSourceServer) |
||||||
|
import ATrade.RoboCom.Types (BarSeries (bsBars), |
||||||
|
BarSeriesId (BarSeriesId), |
||||||
|
InstrumentParameters (InstrumentParameters)) |
||||||
|
import ATrade.Types |
||||||
|
import Colog.Core (LogAction (..)) |
||||||
|
import Colog.Core.Class (HasLog (..)) |
||||||
|
import Control.Concurrent (forkIO, threadDelay) |
||||||
|
import Control.Concurrent.BoundedChan (newBoundedChan, readChan, |
||||||
|
writeChan) |
||||||
|
import Control.Exception (bracket) |
||||||
|
import Control.Monad (forever) |
||||||
|
import Control.Monad.Reader |
||||||
|
import Data.IORef (IORef, newIORef, readIORef) |
||||||
|
import qualified Data.Map.Strict as M |
||||||
|
import qualified Data.Text as T |
||||||
|
import Data.Time (UTCTime (UTCTime), |
||||||
|
fromGregorian) |
||||||
|
import System.IO (BufferMode (LineBuffering), |
||||||
|
hSetBuffering, stderr) |
||||||
|
import System.ZMQ4 (withContext) |
||||||
|
import Test.Mock.HistoryProvider (MockHistoryProvider, |
||||||
|
mkMockHistoryProvider, |
||||||
|
mockGetHistory) |
||||||
|
import Test.Mock.TickerInfoProvider (MockTickerInfoProvider, |
||||||
|
mkMockTickerInfoProvider, |
||||||
|
mockGetInstrumentParameters) |
||||||
|
|
||||||
|
data TestEnv = |
||||||
|
TestEnv |
||||||
|
{ |
||||||
|
historyProvider :: MockHistoryProvider, |
||||||
|
tickerInfoProvider :: MockTickerInfoProvider |
||||||
|
} |
||||||
|
|
||||||
|
type TestM = ReaderT TestEnv IO |
||||||
|
|
||||||
|
instance HistoryProvider TestM where |
||||||
|
getHistory tid tf from to = do |
||||||
|
hp <- asks historyProvider |
||||||
|
liftIO $ mockGetHistory hp tid tf from to |
||||||
|
|
||||||
|
instance TickerInfoProvider TestM where |
||||||
|
getInstrumentParameters tickers = do |
||||||
|
tip <- asks tickerInfoProvider |
||||||
|
liftIO $ mockGetInstrumentParameters tip tickers |
||||||
|
|
||||||
|
instance HasLog TestEnv Message TestM where |
||||||
|
getLogAction env = LogAction $ \msg -> return () |
||||||
|
|
||||||
|
qsEndpoint = "inproc://qs" |
||||||
|
|
||||||
|
mockHistoryProvider = mkMockHistoryProvider $ M.fromList [(BarSeriesId "FOO" (BarTimeframe 3600), bars)] |
||||||
|
where |
||||||
|
bars = [] |
||||||
|
|
||||||
|
mockTickerInfoProvider = mkMockTickerInfoProvider $ M.fromList [("FOO", InstrumentParameters "FOO" 10 0.1)] |
||||||
|
|
||||||
|
unitTests = testGroup "Driver.Junction.QuoteThread" [ |
||||||
|
testSubscription |
||||||
|
] |
||||||
|
|
||||||
|
testSubscription :: TestTree |
||||||
|
testSubscription = testCase "Subscription" $ withContext $ \ctx -> do |
||||||
|
barsRef <- newIORef M.empty |
||||||
|
tiRef <- newIORef M.empty |
||||||
|
serverChan <- newBoundedChan 2000 |
||||||
|
let clientSecurityParams = defaultClientSecurityParams |
||||||
|
bracket |
||||||
|
(startQuoteSourceServer serverChan ctx qsEndpoint defaultServerSecurityParams) |
||||||
|
stopQuoteSourceServer $ \_ -> |
||||||
|
bracket |
||||||
|
(startQuoteThread barsRef tiRef ctx qsEndpoint clientSecurityParams (`runReaderT` (TestEnv mockHistoryProvider mockTickerInfoProvider)) (LogAction $ \_ -> return ())) |
||||||
|
|
||||||
|
stopQuoteThread $ \qt -> do |
||||||
|
chan <- newBoundedChan 2000 |
||||||
|
addSubscription qt "FOO" (BarTimeframe 3600) chan |
||||||
|
|
||||||
|
forkIO $ forever $ threadDelay 50000 >> writeChan serverChan (QSSBar (BarTimeframe 3600, bar)) |
||||||
|
|
||||||
|
clientData <- readChan chan |
||||||
|
assertEqual "Invalid client data" clientData (QDBar (BarTimeframe 3600, bar)) |
||||||
|
|
||||||
|
bars <- readIORef barsRef |
||||||
|
case M.lookup (BarSeriesId "FOO" (BarTimeframe 3600)) bars of |
||||||
|
Just series -> assertBool "Length should be >= 1" $ (not . null . bsBars) series |
||||||
|
Nothing -> assertFailure "Bar Series should be present" |
||||||
|
where |
||||||
|
bar = |
||||||
|
Bar { |
||||||
|
barSecurity="FOO", barTimestamp=UTCTime (fromGregorian 2021 11 20) 7200, barOpen=10, barHigh=12, barLow=9, barClose=11, barVolume=100 |
||||||
|
} |
||||||
@ -0,0 +1,27 @@ |
|||||||
|
|
||||||
|
module Test.Mock.HistoryProvider |
||||||
|
( |
||||||
|
MockHistoryProvider, |
||||||
|
mkMockHistoryProvider, |
||||||
|
mockGetHistory |
||||||
|
) where |
||||||
|
|
||||||
|
import ATrade.Quotes.HistoryProvider |
||||||
|
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), Bars) |
||||||
|
import ATrade.Types (Bar (Bar, barTimestamp), |
||||||
|
BarTimeframe (BarTimeframe), |
||||||
|
TickerId) |
||||||
|
import Control.Monad.IO.Class (MonadIO) |
||||||
|
import qualified Data.Map.Strict as M |
||||||
|
import Data.Time (UTCTime) |
||||||
|
|
||||||
|
data MockHistoryProvider = MockHistoryProvider (M.Map BarSeriesId [Bar]) |
||||||
|
|
||||||
|
mkMockHistoryProvider :: M.Map BarSeriesId [Bar] -> MockHistoryProvider |
||||||
|
mkMockHistoryProvider = MockHistoryProvider |
||||||
|
|
||||||
|
mockGetHistory :: (MonadIO m) => MockHistoryProvider -> TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] |
||||||
|
mockGetHistory (MockHistoryProvider bars) tid tf from to = |
||||||
|
case M.lookup (BarSeriesId tid tf) bars of |
||||||
|
Just series -> return $ filter (\bar -> (barTimestamp bar >= from) && (barTimestamp bar <= to)) series |
||||||
|
Nothing -> return [] |
||||||
@ -0,0 +1,22 @@ |
|||||||
|
|
||||||
|
module Test.Mock.TickerInfoProvider |
||||||
|
( |
||||||
|
MockTickerInfoProvider, |
||||||
|
mkMockTickerInfoProvider, |
||||||
|
mockGetInstrumentParameters |
||||||
|
) where |
||||||
|
|
||||||
|
import ATrade.Quotes.TickerInfoProvider |
||||||
|
import ATrade.RoboCom.Types (InstrumentParameters) |
||||||
|
import ATrade.Types (TickerId) |
||||||
|
import Control.Monad.IO.Class (MonadIO) |
||||||
|
import qualified Data.Map.Strict as M |
||||||
|
import Data.Maybe (catMaybes, mapMaybe) |
||||||
|
|
||||||
|
data MockTickerInfoProvider = MockTickerInfoProvider (M.Map TickerId InstrumentParameters) |
||||||
|
|
||||||
|
mkMockTickerInfoProvider :: (M.Map TickerId InstrumentParameters) -> MockTickerInfoProvider |
||||||
|
mkMockTickerInfoProvider = MockTickerInfoProvider |
||||||
|
|
||||||
|
mockGetInstrumentParameters :: MockTickerInfoProvider -> [TickerId] -> IO [InstrumentParameters] |
||||||
|
mockGetInstrumentParameters (MockTickerInfoProvider params) = return . mapMaybe (`M.lookup` params) |
||||||
Loading…
Reference in new issue