Compare commits
No commits in common. 'master' and 'stable' have entirely different histories.
35 changed files with 1068 additions and 2385 deletions
@ -1,211 +1,58 @@ |
|||||||
{-# LANGUAGE DuplicateRecordFields #-} |
|
||||||
{-# LANGUAGE FlexibleContexts #-} |
|
||||||
{-# LANGUAGE FlexibleInstances #-} |
|
||||||
{-# LANGUAGE MultiParamTypeClasses #-} |
|
||||||
{-# LANGUAGE OverloadedStrings #-} |
|
||||||
{-# LANGUAGE RankNTypes #-} |
|
||||||
|
|
||||||
module ATrade.Driver.Junction |
module ATrade.Driver.Junction |
||||||
( |
( |
||||||
junctionMain |
junctionMain |
||||||
) where |
) where |
||||||
|
|
||||||
import ATrade.Broker.Client (startBrokerClient, |
import ATrade.Driver.Junction.Types (StrategyDescriptor (..), |
||||||
stopBrokerClient) |
StrategyInstance (..), |
||||||
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification), |
StrategyInstanceDescriptor (..)) |
||||||
NotificationSqnum (unNotificationSqnum), |
import Data.Aeson (decode) |
||||||
getNotificationSqnum) |
import qualified Data.ByteString as B |
||||||
import ATrade.Driver.Junction.BrokerService (getNotifications, |
import qualified Data.ByteString.Lazy as BL |
||||||
mkBrokerService) |
import Data.IORef |
||||||
import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (..), |
import qualified Data.Map.Strict as M |
||||||
JunctionM (..), |
import qualified Data.Text as T |
||||||
saveRobots, |
|
||||||
startRobot) |
|
||||||
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..), |
|
||||||
ProgramOptions (ProgramOptions, configPath)) |
|
||||||
import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv), |
|
||||||
withQThread) |
|
||||||
import ATrade.Driver.Junction.RemoteControl (handleRemoteControl) |
|
||||||
import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, postNotificationEvent) |
|
||||||
import ATrade.Driver.Junction.Types (StrategyDescriptorE) |
|
||||||
import ATrade.Logging (Message (..), Severity (Debug, Info, Trace, Warning), |
|
||||||
fmtMessage, |
|
||||||
logWith) |
|
||||||
import ATrade.Quotes.QHP (mkQHPHandle) |
|
||||||
import ATrade.Types (OrderId, Trade (tradeOrderId)) |
|
||||||
import Colog (LogAction (LogAction), |
|
||||||
cfilter, |
|
||||||
hoistLogAction, |
|
||||||
logTextStderr, |
|
||||||
(<&), (>$<)) |
|
||||||
import Colog.Actions (logTextHandle) |
|
||||||
import Control.Concurrent.QSem (newQSem) |
|
||||||
import Control.Monad (forM_, forever) |
|
||||||
import Control.Monad.Extra (whenM) |
|
||||||
import Control.Monad.IO.Class (MonadIO (liftIO)) |
|
||||||
import Control.Monad.Reader (ReaderT (runReaderT)) |
|
||||||
import Data.IORef (IORef, |
|
||||||
atomicModifyIORef', |
|
||||||
newIORef, |
|
||||||
readIORef) |
|
||||||
import qualified Data.Map.Strict as M |
|
||||||
import Data.Set (notMember) |
|
||||||
import qualified Data.Set as S |
|
||||||
import qualified Data.Text as T |
|
||||||
import Data.Text.IO (readFile) |
|
||||||
import Database.Redis (ConnectInfo (..), PortID (UnixSocket), |
|
||||||
checkedConnect, |
|
||||||
defaultConnectInfo) |
|
||||||
import Dhall (auto, input) |
|
||||||
import Options.Applicative (Parser, |
|
||||||
execParser, |
|
||||||
fullDesc, header, |
|
||||||
help, helper, |
|
||||||
info, long, |
|
||||||
metavar, progDesc, |
|
||||||
short, strOption, |
|
||||||
(<**>)) |
|
||||||
import Prelude hiding (log, |
|
||||||
readFile) |
|
||||||
import System.IO (BufferMode (LineBuffering), |
|
||||||
Handle, |
|
||||||
IOMode (AppendMode), |
|
||||||
hSetBuffering, |
|
||||||
withFile) |
|
||||||
import System.ZMQ4 (Router (Router), |
|
||||||
bind, withContext, |
|
||||||
withSocket) |
|
||||||
import UnliftIO (MonadUnliftIO) |
|
||||||
import UnliftIO.Exception (bracket) |
|
||||||
import UnliftIO.QSem (QSem, withQSem) |
|
||||||
|
|
||||||
|
load :: T.Text -> IO B.ByteString |
||||||
|
load = undefined |
||||||
|
|
||||||
locked :: (MonadIO m, MonadUnliftIO m) => QSem -> LogAction m a -> LogAction m a |
junctionMain :: M.Map T.Text StrategyDescriptor -> IO () |
||||||
locked sem action = LogAction (\m -> withQSem sem (action <& m)) |
|
||||||
|
|
||||||
logger :: (MonadIO m) => M.Map T.Text Severity -> Handle -> LogAction m Message |
|
||||||
logger loglevels h = cfilter checkLoglevel (fmtMessage >$< (logTextStderr <> logTextHandle h)) |
|
||||||
where |
|
||||||
checkLoglevel msg = |
|
||||||
case M.lookup (msgComponent msg) loglevels of |
|
||||||
Just level -> msgSeverity msg >= level |
|
||||||
Nothing -> True |
|
||||||
|
|
||||||
junctionMain :: M.Map T.Text StrategyDescriptorE -> IO () |
|
||||||
junctionMain descriptors = do |
junctionMain descriptors = do |
||||||
opts <- parseOptions |
parseOptions |
||||||
|
instanceDescriptors <- undefined |
||||||
let initialLogger = fmtMessage >$< logTextStderr |
strategies <- mkStrategies instanceDescriptors |
||||||
|
|
||||||
logWith initialLogger Info "Junction" $ "Reading config from: " <> (T.pack . show) (configPath opts) |
|
||||||
|
|
||||||
cfg <- readFile (configPath opts) >>= input auto |
|
||||||
|
|
||||||
withFile (logBasePath cfg <> "/all.log") AppendMode $ \h -> do |
|
||||||
|
|
||||||
hSetBuffering h LineBuffering |
|
||||||
|
|
||||||
locksem <- newQSem 1 |
start strategies |
||||||
let globalLogger = locked locksem (logger (M.fromList $ logLevels cfg) h) |
|
||||||
let log = logWith globalLogger |
|
||||||
|
|
||||||
barsMap <- newIORef M.empty |
|
||||||
tickerInfoMap <- newIORef M.empty |
|
||||||
|
|
||||||
log Info "Junction" $ "Connecting to redis: " <> redisSocket cfg |
|
||||||
redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) }) |
|
||||||
log Info "Junction" "redis: connected" |
|
||||||
withContext $ \ctx -> do |
|
||||||
log Debug "Junction" "0mq context created" |
|
||||||
let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) (hoistLogAction liftIO globalLogger) |
|
||||||
robotsMap <- newIORef M.empty |
|
||||||
ordersMap <- newIORef M.empty |
|
||||||
handledNotifications <- newIORef S.empty |
|
||||||
withBroker cfg robotsMap ordersMap handledNotifications globalLogger $ \bro -> |
|
||||||
withQThread downloaderEnv barsMap tickerInfoMap cfg ctx globalLogger $ \qt -> |
|
||||||
withSocket ctx Router $ \rcSocket -> do |
|
||||||
liftIO $ bind rcSocket (T.unpack . remoteControlEndpoint $ cfg) |
|
||||||
broService <- mkBrokerService bro ordersMap |
|
||||||
let junctionLogAction = hoistLogAction liftIO globalLogger |
|
||||||
let env = |
|
||||||
JunctionEnv |
|
||||||
{ |
|
||||||
peRedisSocket = redis, |
|
||||||
peConfigPath = robotsConfigsPath cfg, |
|
||||||
peQuoteThread = qt, |
|
||||||
peBroker = bro, |
|
||||||
peRobots = robotsMap, |
|
||||||
peRemoteControlSocket = rcSocket, |
|
||||||
peLogAction = junctionLogAction, |
|
||||||
peIoLogAction = globalLogger, |
|
||||||
peProgramConfiguration = cfg, |
|
||||||
peBarsMap = barsMap, |
|
||||||
peTickerInfoMap = tickerInfoMap, |
|
||||||
peBrokerService = broService, |
|
||||||
peDescriptors = descriptors |
|
||||||
} |
|
||||||
withJunction env $ do |
|
||||||
startRobots cfg |
|
||||||
forever $ do |
|
||||||
notifications <- getNotifications broService |
|
||||||
forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications globalLogger) |
|
||||||
saveRobots |
|
||||||
handleRemoteControl 1000 |
|
||||||
where |
where |
||||||
startRobots :: ProgramConfiguration -> JunctionM () |
parseOptions = undefined |
||||||
startRobots cfg = forM_ (instances cfg) startRobot |
|
||||||
|
mkStrategies :: [StrategyInstanceDescriptor] -> IO [StrategyInstance] |
||||||
withJunction :: JunctionEnv -> JunctionM () -> IO () |
mkStrategies = mapM mkStrategy |
||||||
withJunction env = (`runReaderT` env) . unJunctionM |
|
||||||
|
mkStrategy :: StrategyInstanceDescriptor -> IO StrategyInstance |
||||||
handleBrokerNotification :: IORef (M.Map T.Text RobotDriverHandle) -> |
mkStrategy desc = do |
||||||
IORef (M.Map OrderId T.Text) -> |
sState <- load (stateKey desc) |
||||||
IORef (S.Set NotificationSqnum) -> |
sCfg <- load (configKey desc) |
||||||
LogAction IO Message -> |
case M.lookup (strategyId desc) descriptors of |
||||||
Notification -> |
Just (StrategyDescriptor _sName sCallback _sDefState) -> |
||||||
IO () |
case (decode $ BL.fromStrict sCfg, decode $ BL.fromStrict sState) of |
||||||
handleBrokerNotification robotsRef ordersMapRef handled logger' notification= do |
(Just pCfg, Just pState) -> do |
||||||
logWith logger' Trace "Junction" $ "Incoming notification: " <> (T.pack . show . unNotificationSqnum . getNotificationSqnum) notification |
cfgRef <- newIORef pCfg |
||||||
whenM (notMember (getNotificationSqnum notification) <$> readIORef handled) $ do |
stateRef <- newIORef pState |
||||||
robotsMap <- readIORef robotsRef |
return $ StrategyInstance |
||||||
ordersMap <- readIORef ordersMapRef |
{ |
||||||
|
strategyInstanceId = strategyName desc, |
||||||
case getNotificationTarget robotsMap ordersMap notification of |
strategyEventCallback = sCallback, |
||||||
Just robot -> postNotificationEvent robot notification |
strategyState = stateRef, |
||||||
Nothing -> do |
strategyConfig = cfgRef |
||||||
logWith logger' Warning "Junction" $ "Unknown order: " <> (T.pack . show) (notificationOrderId notification) |
} |
||||||
logWith logger' Debug "Junction" $ "Ordermap: " <> (T.pack . show) (M.toList ordersMap) |
_ -> undefined |
||||||
|
_ -> undefined |
||||||
atomicModifyIORef' handled (\s -> (S.insert (getNotificationSqnum notification) s, ())) |
|
||||||
|
start = undefined |
||||||
getNotificationTarget :: M.Map T.Text RobotDriverHandle -> M.Map OrderId T.Text -> Notification -> Maybe RobotDriverHandle |
|
||||||
getNotificationTarget robotsMap ordersMap notification = do |
|
||||||
robotId <- M.lookup (notificationOrderId notification) ordersMap |
|
||||||
M.lookup robotId robotsMap |
|
||||||
|
|
||||||
notificationOrderId (OrderNotification _ oid _) = oid |
|
||||||
notificationOrderId (TradeNotification _ trade) = tradeOrderId trade |
|
||||||
|
|
||||||
withBroker cfg robotsMap ordersMap handled logger' f = do |
|
||||||
bracket |
|
||||||
(startBrokerClient |
|
||||||
(brokerIdentity cfg) |
|
||||||
(brokerEndpoint cfg) |
|
||||||
[handleBrokerNotification robotsMap ordersMap handled logger'] |
|
||||||
logger') |
|
||||||
stopBrokerClient f |
|
||||||
|
|
||||||
parseOptions = execParser options |
|
||||||
options = info (optionsParser <**> helper) |
|
||||||
(fullDesc <> |
|
||||||
progDesc "Robocom-zero junction mode driver" <> |
|
||||||
header "robocom-zero-junction") |
|
||||||
|
|
||||||
optionsParser :: Parser ProgramOptions |
|
||||||
optionsParser = ProgramOptions |
|
||||||
<$> strOption |
|
||||||
(long "config" <> |
|
||||||
short 'c' <> |
|
||||||
metavar "FILENAME" <> |
|
||||||
help "Configuration file path") |
|
||||||
|
|
||||||
|
|||||||
@ -1,64 +0,0 @@ |
|||||||
{-# LANGUAGE FlexibleContexts #-} |
|
||||||
{-# LANGUAGE OverloadedStrings #-} |
|
||||||
|
|
||||||
module ATrade.Driver.Junction.BrokerService |
|
||||||
( |
|
||||||
BrokerService, |
|
||||||
mkBrokerService, |
|
||||||
submitOrder, |
|
||||||
cancelOrder, |
|
||||||
getNotifications |
|
||||||
) where |
|
||||||
|
|
||||||
import qualified ATrade.Broker.Client as Bro |
|
||||||
import ATrade.Broker.Protocol (Notification (..)) |
|
||||||
import ATrade.Logging (Message, logDebug, logWarning) |
|
||||||
import ATrade.Types (Order (..), OrderId) |
|
||||||
import Colog (WithLog) |
|
||||||
import Control.Monad.IO.Class (MonadIO (liftIO)) |
|
||||||
import Control.Monad.Reader.Class (MonadReader) |
|
||||||
import Data.IORef (IORef, atomicModifyIORef', |
|
||||||
newIORef) |
|
||||||
import qualified Data.Map.Strict as M |
|
||||||
import qualified Data.Text as T |
|
||||||
|
|
||||||
data BrokerService = |
|
||||||
BrokerService |
|
||||||
{ |
|
||||||
broker :: Bro.BrokerClientHandle, |
|
||||||
orderMap :: IORef (M.Map OrderId T.Text), |
|
||||||
orderIdCounter :: IORef OrderId |
|
||||||
} |
|
||||||
|
|
||||||
mkBrokerService :: Bro.BrokerClientHandle -> IORef (M.Map OrderId T.Text) -> IO BrokerService |
|
||||||
mkBrokerService h om = BrokerService h om <$> newIORef 1 |
|
||||||
|
|
||||||
submitOrder :: (MonadIO m, WithLog env Message m, MonadReader env m) => BrokerService -> T.Text -> Order -> m OrderId |
|
||||||
submitOrder service identity order = do |
|
||||||
oid <- nextOrderId service |
|
||||||
logDebug "BrokerService" $ "New order, id: " <> (T.pack . show) oid |
|
||||||
liftIO $ atomicModifyIORef' (orderMap service) (\s -> (M.insert oid identity s, ())) |
|
||||||
r <- liftIO $ Bro.submitOrder (broker service) order { orderId = oid } |
|
||||||
case r of |
|
||||||
Left err -> logWarning "BrokerService" $ "Submit order error: " <> err |
|
||||||
_ -> return () |
|
||||||
return oid |
|
||||||
where |
|
||||||
nextOrderId srv = liftIO $ atomicModifyIORef' (orderIdCounter srv) (\s -> (s + 1, s)) |
|
||||||
|
|
||||||
cancelOrder :: (MonadIO m, WithLog env Message m) => BrokerService -> OrderId -> m () |
|
||||||
cancelOrder service oid = do |
|
||||||
r <- liftIO $ Bro.cancelOrder (broker service) oid |
|
||||||
case r of |
|
||||||
Left err -> logWarning "BrokerServer" $ "Cancel order error: " <> err |
|
||||||
_ -> return () |
|
||||||
return () |
|
||||||
|
|
||||||
getNotifications :: (MonadIO m, WithLog env Message m) => BrokerService -> m [Notification] |
|
||||||
getNotifications service = do |
|
||||||
v <- liftIO $ Bro.getNotifications (broker service) |
|
||||||
case v of |
|
||||||
Left err -> do |
|
||||||
logWarning "BrokerServer" $ "Get notifications order error: " <> err |
|
||||||
return [] |
|
||||||
Right n -> return n |
|
||||||
@ -1,258 +0,0 @@ |
|||||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
|
||||||
{-# LANGUAGE MultiParamTypeClasses #-} |
|
||||||
{-# LANGUAGE OverloadedStrings #-} |
|
||||||
|
|
||||||
|
|
||||||
module ATrade.Driver.Junction.JunctionMonad |
|
||||||
( |
|
||||||
JunctionEnv(..), |
|
||||||
JunctionM(..), |
|
||||||
startRobot, |
|
||||||
saveRobots, |
|
||||||
reloadConfig, |
|
||||||
getState, |
|
||||||
setState |
|
||||||
) where |
|
||||||
|
|
||||||
import ATrade.Broker.Client (BrokerClientHandle) |
|
||||||
import ATrade.Driver.Junction.BrokerService (BrokerService) |
|
||||||
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (logBasePath)) |
|
||||||
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), |
|
||||||
QuoteSubscription (QuoteSubscription)) |
|
||||||
import ATrade.Driver.Junction.QuoteThread (QuoteThreadHandle) |
|
||||||
import qualified ATrade.Driver.Junction.QuoteThread as QT |
|
||||||
import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, RobotEnv (RobotEnv), |
|
||||||
RobotM (unRobotM), |
|
||||||
createRobotDriverThread, |
|
||||||
getInstanceDescriptor, |
|
||||||
onStrategyInstance, |
|
||||||
onStrategyInstanceM) |
|
||||||
import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE), |
|
||||||
StrategyInstanceDescriptor, |
|
||||||
accountId, |
|
||||||
confStrategy, |
|
||||||
confTickers, |
|
||||||
configKey, |
|
||||||
stateKey, |
|
||||||
strategyBaseName, |
|
||||||
strategyConfig, |
|
||||||
strategyId, |
|
||||||
strategyInstanceId, |
|
||||||
strategyState, |
|
||||||
strategyTimers, |
|
||||||
tickerId, |
|
||||||
timeframe) |
|
||||||
import ATrade.Logging (Message, Severity (Error, Info), |
|
||||||
fmtMessage, |
|
||||||
logWarning, |
|
||||||
logWith) |
|
||||||
import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) |
|
||||||
import ATrade.RoboCom.Monad (StrategyEnvironment (..)) |
|
||||||
import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) |
|
||||||
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), |
|
||||||
Bars, |
|
||||||
TickerInfoMap) |
|
||||||
import Colog (HasLog (getLogAction, setLogAction), |
|
||||||
LogAction, |
|
||||||
hoistLogAction, |
|
||||||
logTextHandle, |
|
||||||
(>$<)) |
|
||||||
import Control.Exception.Safe (finally) |
|
||||||
import Control.Monad.Reader (MonadIO (liftIO), |
|
||||||
MonadReader, |
|
||||||
ReaderT (runReaderT), |
|
||||||
asks) |
|
||||||
import Data.Aeson (decode, |
|
||||||
eitherDecode, |
|
||||||
encode) |
|
||||||
import qualified Data.ByteString as B |
|
||||||
import qualified Data.ByteString.Lazy as BL |
|
||||||
import Data.Default (Default (def)) |
|
||||||
import Data.Foldable (traverse_) |
|
||||||
import Data.IORef (IORef, |
|
||||||
atomicModifyIORef', |
|
||||||
newIORef, |
|
||||||
readIORef, |
|
||||||
writeIORef) |
|
||||||
import Data.List.NonEmpty (NonEmpty ((:|))) |
|
||||||
import qualified Data.Map.Strict as M |
|
||||||
import qualified Data.Text as T |
|
||||||
import Data.Text.Encoding (encodeUtf8) |
|
||||||
import Data.Text.IO (readFile) |
|
||||||
import Data.Time (getCurrentTime) |
|
||||||
import Data.Time.Clock.POSIX (getPOSIXTime) |
|
||||||
import Database.Redis (Connection, get, |
|
||||||
mset, runRedis) |
|
||||||
import Dhall (auto, input) |
|
||||||
import Prelude hiding (log, |
|
||||||
readFile) |
|
||||||
import System.IO (BufferMode (LineBuffering), |
|
||||||
IOMode (AppendMode), |
|
||||||
hClose, |
|
||||||
hSetBuffering, |
|
||||||
openFile) |
|
||||||
import System.ZMQ4 (Router, Socket) |
|
||||||
import UnliftIO (MonadUnliftIO) |
|
||||||
import UnliftIO.Exception (catchAny, |
|
||||||
onException) |
|
||||||
|
|
||||||
data JunctionEnv = |
|
||||||
JunctionEnv |
|
||||||
{ |
|
||||||
peRedisSocket :: Connection, |
|
||||||
peConfigPath :: FilePath, |
|
||||||
peQuoteThread :: QuoteThreadHandle, |
|
||||||
peBroker :: BrokerClientHandle, |
|
||||||
peRobots :: IORef (M.Map T.Text RobotDriverHandle), |
|
||||||
peRemoteControlSocket :: Socket Router, |
|
||||||
peLogAction :: LogAction JunctionM Message, |
|
||||||
peIoLogAction :: LogAction IO Message, |
|
||||||
peProgramConfiguration :: ProgramConfiguration, |
|
||||||
peBarsMap :: IORef Bars, |
|
||||||
peTickerInfoMap :: IORef TickerInfoMap, |
|
||||||
peBrokerService :: BrokerService, |
|
||||||
peDescriptors :: M.Map T.Text StrategyDescriptorE |
|
||||||
} |
|
||||||
|
|
||||||
newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a } |
|
||||||
deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadUnliftIO) |
|
||||||
|
|
||||||
instance HasLog JunctionEnv Message JunctionM where |
|
||||||
getLogAction = peLogAction |
|
||||||
setLogAction a e = e { peLogAction = a } |
|
||||||
|
|
||||||
instance ConfigStorage JunctionM where |
|
||||||
loadConfig key = do |
|
||||||
basePath <- asks peConfigPath |
|
||||||
let path = basePath <> "/" <> T.unpack key -- TODO fix path construction |
|
||||||
liftIO $ readFile path >>= input auto |
|
||||||
|
|
||||||
instance MonadPersistence JunctionM where |
|
||||||
saveState newState key = do |
|
||||||
conn <- asks peRedisSocket |
|
||||||
now <- liftIO getPOSIXTime |
|
||||||
res <- liftIO $ runRedis conn $ mset [(encodeUtf8 key, BL.toStrict $ encode newState), |
|
||||||
(encodeUtf8 (key <> ":last_store") , encodeUtf8 . T.pack . show $ now)] |
|
||||||
case res of |
|
||||||
Left _ -> logWarning "Junction " "Unable to save state" |
|
||||||
Right _ -> return () |
|
||||||
|
|
||||||
loadState key = do |
|
||||||
conn <- asks peRedisSocket |
|
||||||
res <- liftIO $ runRedis conn $ get (encodeUtf8 key) |
|
||||||
-- TODO: just chain eithers |
|
||||||
case res of |
|
||||||
Left _ -> do |
|
||||||
logWarning "Junction" "Unable to load state" |
|
||||||
return def |
|
||||||
Right maybeRawState -> |
|
||||||
case maybeRawState of |
|
||||||
Just rawState -> case eitherDecode $ BL.fromStrict rawState of |
|
||||||
Left _ -> do |
|
||||||
logWarning "Junction" "Unable to decode state" |
|
||||||
return def |
|
||||||
Right decodedState -> return decodedState |
|
||||||
Nothing -> do |
|
||||||
logWarning "Junction" "Unable to decode state" |
|
||||||
return def |
|
||||||
|
|
||||||
instance QuoteStream JunctionM where |
|
||||||
addSubscription (QuoteSubscription ticker tf) chan = do |
|
||||||
qt <- asks peQuoteThread |
|
||||||
QT.addSubscription qt ticker tf chan |
|
||||||
removeSubscription subId = do |
|
||||||
qt <- asks peQuoteThread |
|
||||||
QT.removeSubscription qt subId |
|
||||||
|
|
||||||
startRobot :: StrategyInstanceDescriptor -> JunctionM () |
|
||||||
startRobot inst = do |
|
||||||
ioLogger <- asks peIoLogAction |
|
||||||
descriptors <- asks peDescriptors |
|
||||||
cfg <- asks peProgramConfiguration |
|
||||||
barsMap <- asks peBarsMap |
|
||||||
tickerInfoMap <- asks peTickerInfoMap |
|
||||||
broService <- asks peBrokerService |
|
||||||
now <- liftIO getCurrentTime |
|
||||||
let lLogger = hoistLogAction liftIO ioLogger |
|
||||||
logWith lLogger Info "Junction" $ "Starting strategy: " <> strategyBaseName inst |
|
||||||
case M.lookup (strategyBaseName inst) descriptors of |
|
||||||
Just (StrategyDescriptorE desc) -> flip catchAny (\e -> logWith lLogger Error "Junction" $ "Exception: " <> (T.pack . show $ e)) $ do |
|
||||||
bigConf <- loadConfig (configKey inst) |
|
||||||
case confTickers bigConf of |
|
||||||
(firstTicker:restTickers) -> do |
|
||||||
rConf <- liftIO $ newIORef (confStrategy bigConf) |
|
||||||
rState <- loadState (stateKey inst) >>= liftIO . newIORef |
|
||||||
rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef |
|
||||||
localH <- liftIO $ openFile (logBasePath cfg <> "/" <> T.unpack (strategyId inst) <> ".log") AppendMode |
|
||||||
liftIO $ hSetBuffering localH LineBuffering |
|
||||||
let robotLogAction = hoistLogAction liftIO ioLogger <> (fmtMessage >$< logTextHandle localH) |
|
||||||
stratEnv <- liftIO $ newIORef StrategyEnvironment |
|
||||||
{ |
|
||||||
_seInstanceId = strategyId inst, |
|
||||||
_seAccount = accountId inst, |
|
||||||
_seVolume = 1, |
|
||||||
_seLastTimestamp = now |
|
||||||
} |
|
||||||
let robotEnv = |
|
||||||
RobotEnv rState rConf rTimers barsMap tickerInfoMap stratEnv robotLogAction broService (toBarSeriesId <$> (firstTicker :| restTickers)) |
|
||||||
robot <- createRobotDriverThread inst desc (\a -> (flip runReaderT robotEnv . unRobotM) a `finally` hClose localH) bigConf rConf rState rTimers |
|
||||||
robotsMap' <- asks peRobots |
|
||||||
liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ())) |
|
||||||
_ -> logWith lLogger Error (strategyId inst) "No tickers configured !!!" |
|
||||||
Nothing -> logWith lLogger Error "Junction" $ "Unknown strategy: " <> strategyBaseName inst |
|
||||||
|
|
||||||
where |
|
||||||
toBarSeriesId t = BarSeriesId (tickerId t) (timeframe t) |
|
||||||
|
|
||||||
saveRobots :: JunctionM () |
|
||||||
saveRobots = do |
|
||||||
robotsMap <- asks peRobots >>= (liftIO . readIORef) |
|
||||||
traverse_ saveRobotState robotsMap |
|
||||||
|
|
||||||
saveRobotState :: RobotDriverHandle -> JunctionM () |
|
||||||
saveRobotState handle = onStrategyInstance handle $ \inst -> do |
|
||||||
currentState <- liftIO $ readIORef (strategyState inst) |
|
||||||
saveState currentState (strategyInstanceId inst) |
|
||||||
currentTimers <- liftIO $ readIORef (strategyTimers inst) |
|
||||||
saveState currentTimers (strategyInstanceId inst <> ":timers") |
|
||||||
|
|
||||||
reloadConfig :: T.Text -> JunctionM (Either T.Text ()) |
|
||||||
reloadConfig instId = flip catchAny (\_ -> return $ Left "Exception") $ do |
|
||||||
robotsMap' <- asks peRobots |
|
||||||
robots <- liftIO $ readIORef robotsMap' |
|
||||||
case M.lookup instId robots of |
|
||||||
Just robot -> do |
|
||||||
onStrategyInstanceM robot |
|
||||||
(\inst -> do |
|
||||||
let instDesc = getInstanceDescriptor robot |
|
||||||
bigConf <- loadConfig (configKey instDesc) |
|
||||||
liftIO $ writeIORef (strategyConfig inst) (confStrategy bigConf)) |
|
||||||
return $ Right () |
|
||||||
Nothing -> return $ Left "Unable to load config" |
|
||||||
|
|
||||||
getState :: T.Text -> JunctionM (Either T.Text B.ByteString) |
|
||||||
getState instId = do |
|
||||||
robotsMap' <- asks peRobots |
|
||||||
robots <- liftIO $ readIORef robotsMap' |
|
||||||
case M.lookup instId robots of |
|
||||||
Just robot -> do |
|
||||||
Right <$> onStrategyInstanceM robot |
|
||||||
(\inst -> do |
|
||||||
v <- liftIO $ readIORef (strategyState inst) |
|
||||||
return $ BL.toStrict $ encode v) |
|
||||||
Nothing -> return $ Left $ "Unknown robot: " <> instId |
|
||||||
|
|
||||||
setState :: T.Text -> B.ByteString -> JunctionM (Either T.Text ()) |
|
||||||
setState instId newState = do |
|
||||||
robotsMap' <- asks peRobots |
|
||||||
robots <- liftIO $ readIORef robotsMap' |
|
||||||
case M.lookup instId robots of |
|
||||||
Just robot -> do |
|
||||||
onStrategyInstanceM robot |
|
||||||
(\inst -> do |
|
||||||
case decode . BL.fromStrict $ newState of |
|
||||||
Just newS -> do |
|
||||||
liftIO $ writeIORef (strategyState inst) newS |
|
||||||
return $ Right () |
|
||||||
Nothing -> return $ Left $ "Unable to decode state for " <> instId) |
|
||||||
Nothing -> return $ Left $ "Unknown robot: " <> instId |
|
||||||
@ -1,72 +0,0 @@ |
|||||||
{-# LANGUAGE DeriveGeneric #-} |
|
||||||
{-# LANGUAGE NamedFieldPuns #-} |
|
||||||
{-# LANGUAGE OverloadedStrings #-} |
|
||||||
{-# LANGUAGE RecordWildCards #-} |
|
||||||
|
|
||||||
module ATrade.Driver.Junction.ProgramConfiguration |
|
||||||
( |
|
||||||
ProgramOptions(..), |
|
||||||
ProgramConfiguration(..) |
|
||||||
) where |
|
||||||
import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor) |
|
||||||
import ATrade.Logging (Severity (..)) |
|
||||||
import qualified Data.Text as T |
|
||||||
import Dhall (FromDhall, autoWith) |
|
||||||
import Dhall.Core (Expr (..), FieldSelection (..)) |
|
||||||
import qualified Dhall.Map |
|
||||||
import Dhall.Marshal.Decode (Decoder (..), typeError) |
|
||||||
import GHC.Generics (Generic) |
|
||||||
|
|
||||||
newtype ProgramOptions = |
|
||||||
ProgramOptions |
|
||||||
{ |
|
||||||
configPath :: FilePath |
|
||||||
} |
|
||||||
|
|
||||||
data ProgramConfiguration = |
|
||||||
ProgramConfiguration |
|
||||||
{ |
|
||||||
brokerEndpoint :: T.Text, |
|
||||||
brokerNotificationEndpoint :: T.Text, |
|
||||||
brokerServerCert :: Maybe FilePath, |
|
||||||
brokerClientCert :: Maybe FilePath, |
|
||||||
brokerIdentity :: T.Text, |
|
||||||
quotesourceEndpoint :: T.Text, |
|
||||||
quotesourceServerCert :: Maybe FilePath, |
|
||||||
quotesourceClientCert :: Maybe FilePath, |
|
||||||
qhpEndpoint :: T.Text, |
|
||||||
qtisEndpoint :: T.Text, |
|
||||||
remoteControlEndpoint :: T.Text, |
|
||||||
redisSocket :: T.Text, |
|
||||||
robotsConfigsPath :: FilePath, |
|
||||||
logBasePath :: FilePath, |
|
||||||
logLevels :: [(T.Text, Severity)], |
|
||||||
instances :: [StrategyInstanceDescriptor] |
|
||||||
} deriving (Generic, Show) |
|
||||||
|
|
||||||
instance FromDhall Severity where |
|
||||||
autoWith _ = Decoder {..} |
|
||||||
where |
|
||||||
extract expr@(Field _ FieldSelection{ fieldSelectionLabel }) = |
|
||||||
case fieldSelectionLabel of |
|
||||||
"Trace" -> pure Trace |
|
||||||
"Debug" -> pure Debug |
|
||||||
"Info" -> pure Info |
|
||||||
"Warning" -> pure Warning |
|
||||||
"Error" -> pure Error |
|
||||||
_ -> typeError expected expr |
|
||||||
extract expr = typeError expected expr |
|
||||||
|
|
||||||
expected = pure |
|
||||||
(Union |
|
||||||
(Dhall.Map.fromList |
|
||||||
[ ("Trace", Nothing) |
|
||||||
, ("Debug", Nothing) |
|
||||||
, ("Info", Nothing) |
|
||||||
, ("Warning", Nothing) |
|
||||||
, ("Error", Nothing) |
|
||||||
] |
|
||||||
) |
|
||||||
) |
|
||||||
|
|
||||||
instance FromDhall ProgramConfiguration |
|
||||||
@ -1,30 +0,0 @@ |
|||||||
{-# LANGUAGE DeriveGeneric #-} |
|
||||||
|
|
||||||
module ATrade.Driver.Junction.QuoteStream |
|
||||||
( |
|
||||||
QuoteSubscription(..), |
|
||||||
QuoteStream(..), |
|
||||||
SubscriptionId(..) |
|
||||||
) where |
|
||||||
|
|
||||||
import ATrade.QuoteSource.Client (QuoteData) |
|
||||||
import ATrade.Types (BarTimeframe, TickerId) |
|
||||||
import Control.Concurrent.BoundedChan (BoundedChan) |
|
||||||
import Data.Hashable (Hashable) |
|
||||||
import GHC.Generics (Generic) |
|
||||||
|
|
||||||
data QuoteSubscription = |
|
||||||
QuoteSubscription TickerId BarTimeframe |
|
||||||
deriving (Generic, Eq) |
|
||||||
|
|
||||||
instance Hashable BarTimeframe |
|
||||||
instance Hashable QuoteSubscription |
|
||||||
|
|
||||||
newtype SubscriptionId = SubscriptionId { unSubscriptionId :: Int } |
|
||||||
deriving (Show, Eq, Generic) |
|
||||||
|
|
||||||
instance Hashable SubscriptionId |
|
||||||
|
|
||||||
class (Monad m) => QuoteStream m where |
|
||||||
addSubscription :: QuoteSubscription -> BoundedChan QuoteData -> m SubscriptionId |
|
||||||
removeSubscription :: SubscriptionId -> m () |
|
||||||
@ -1,304 +0,0 @@ |
|||||||
{-# LANGUAGE DeriveGeneric #-} |
|
||||||
{-# LANGUAGE FlexibleContexts #-} |
|
||||||
{-# LANGUAGE FlexibleInstances #-} |
|
||||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
|
||||||
{-# LANGUAGE LambdaCase #-} |
|
||||||
{-# LANGUAGE MultiParamTypeClasses #-} |
|
||||||
{-# LANGUAGE OverloadedStrings #-} |
|
||||||
{-# LANGUAGE ScopedTypeVariables #-} |
|
||||||
{-# LANGUAGE TypeSynonymInstances #-} |
|
||||||
|
|
||||||
module ATrade.Driver.Junction.QuoteThread |
|
||||||
( |
|
||||||
QuoteThreadHandle, |
|
||||||
startQuoteThread, |
|
||||||
stopQuoteThread, |
|
||||||
addSubscription, |
|
||||||
removeSubscription, |
|
||||||
DownloaderM, |
|
||||||
DownloaderEnv(..), |
|
||||||
runDownloaderM, |
|
||||||
withQThread |
|
||||||
) where |
|
||||||
|
|
||||||
import qualified ATrade.BarAggregator as BA |
|
||||||
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..)) |
|
||||||
import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..), |
|
||||||
SubscriptionId (SubscriptionId)) |
|
||||||
import ATrade.Logging (Message, logDebug, |
|
||||||
logInfo, |
|
||||||
logWarning) |
|
||||||
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) |
|
||||||
import ATrade.Quotes.QHP (QHPHandle, requestHistoryFromQHP) |
|
||||||
import ATrade.Quotes.QTIS (TickerInfo (tiLotSize, tiTickSize, tiTicker), |
|
||||||
qtisGetTickersInfo) |
|
||||||
import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) |
|
||||||
import ATrade.QuoteSource.Client (QuoteData (QDBar, QDTick), |
|
||||||
QuoteSourceClientHandle, |
|
||||||
quoteSourceClientSubscribe, |
|
||||||
startQuoteSourceClient, |
|
||||||
stopQuoteSourceClient) |
|
||||||
import ATrade.RoboCom.Types (Bar (barSecurity), |
|
||||||
BarSeries (..), |
|
||||||
BarSeriesId (BarSeriesId), |
|
||||||
Bars, |
|
||||||
InstrumentParameters (InstrumentParameters), |
|
||||||
TickerInfoMap) |
|
||||||
import ATrade.Types (BarTimeframe (BarTimeframe), |
|
||||||
ClientSecurityParams (ClientSecurityParams), |
|
||||||
Tick (security), |
|
||||||
TickerId) |
|
||||||
import Colog (HasLog (getLogAction, setLogAction), |
|
||||||
LogAction, |
|
||||||
WithLog) |
|
||||||
import Control.Concurrent (ThreadId, forkIO, |
|
||||||
killThread) |
|
||||||
import Control.Concurrent.BoundedChan (BoundedChan, |
|
||||||
newBoundedChan, |
|
||||||
readChan, |
|
||||||
tryWriteChan, |
|
||||||
writeChan) |
|
||||||
import Control.Exception.Safe (MonadMask, |
|
||||||
MonadThrow, |
|
||||||
bracket) |
|
||||||
import Control.Monad (forM, forM_, |
|
||||||
forever) |
|
||||||
import Control.Monad.Reader (MonadIO (liftIO), ReaderT (runReaderT), |
|
||||||
lift) |
|
||||||
import Control.Monad.Reader.Class (MonadReader, asks) |
|
||||||
import qualified Data.HashMap.Strict as HM |
|
||||||
import Data.IORef (IORef, |
|
||||||
atomicModifyIORef', |
|
||||||
newIORef, |
|
||||||
readIORef) |
|
||||||
import qualified Data.Map.Strict as M |
|
||||||
import qualified Data.Text as T |
|
||||||
import Data.Time (addUTCTime, |
|
||||||
getCurrentTime) |
|
||||||
import System.ZMQ4 (Context) |
|
||||||
import System.ZMQ4.ZAP (loadCertificateFromFile) |
|
||||||
|
|
||||||
|
|
||||||
data QuoteThreadHandle = QuoteThreadHandle ThreadId ThreadId QuoteThreadEnv |
|
||||||
|
|
||||||
data QuoteThreadEnv = |
|
||||||
QuoteThreadEnv |
|
||||||
{ |
|
||||||
bars :: IORef Bars, |
|
||||||
endpoints :: IORef (HM.HashMap QuoteSubscription [(SubscriptionId, BoundedChan QuoteData)]), |
|
||||||
qsclient :: QuoteSourceClientHandle, |
|
||||||
paramsCache :: IORef TickerInfoMap, |
|
||||||
downloaderChan :: BoundedChan QuoteSubscription, |
|
||||||
subscriptionIdCounter :: IORef Int, |
|
||||||
subscriptions :: IORef (HM.HashMap SubscriptionId QuoteSubscription), |
|
||||||
aggregators :: IORef (HM.HashMap (TickerId, BarTimeframe) BA.BarAggregator) |
|
||||||
} |
|
||||||
|
|
||||||
startQuoteThread :: (MonadIO m, |
|
||||||
MonadIO m1, |
|
||||||
WithLog env Message m1, |
|
||||||
HistoryProvider m1, |
|
||||||
TickerInfoProvider m1) => |
|
||||||
IORef Bars -> |
|
||||||
IORef TickerInfoMap -> |
|
||||||
Context -> |
|
||||||
T.Text -> |
|
||||||
ClientSecurityParams -> |
|
||||||
(m1 () -> IO ()) -> |
|
||||||
LogAction IO Message -> |
|
||||||
m QuoteThreadHandle |
|
||||||
startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do |
|
||||||
chan <- liftIO $ newBoundedChan 2000 |
|
||||||
dChan <- liftIO $ newBoundedChan 2000 |
|
||||||
qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger |
|
||||||
env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan <*> newIORef 0 <*> newIORef HM.empty <*> newIORef HM.empty |
|
||||||
tid <- liftIO . forkIO $ quoteThread env chan |
|
||||||
downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) |
|
||||||
return $ QuoteThreadHandle tid downloaderTid env |
|
||||||
where |
|
||||||
downloaderThread env chan = do |
|
||||||
logInfo "QuoteThread" "Started" |
|
||||||
forever $ do |
|
||||||
QuoteSubscription tickerid tf <- liftIO $ readChan chan |
|
||||||
logInfo "QuoteThread" $ "Subscription: " <> tickerid |
|
||||||
paramsMap <- liftIO $ readIORef $ paramsCache env |
|
||||||
mbParams <- case M.lookup tickerid paramsMap of |
|
||||||
Nothing -> do |
|
||||||
paramsList <- getInstrumentParameters [tickerid] |
|
||||||
case paramsList of |
|
||||||
(params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params)) |
|
||||||
_ -> return Nothing |
|
||||||
Just params -> return $ Just params |
|
||||||
logDebug "QuoteThread" $ "Got info params: " <> (T.pack . show $ mbParams) |
|
||||||
barsMap <- liftIO $ readIORef (bars env) |
|
||||||
case M.lookup (BarSeriesId tickerid tf) barsMap of |
|
||||||
Just _ -> return () -- already downloaded |
|
||||||
Nothing -> case mbParams of |
|
||||||
Just params -> do |
|
||||||
now <- liftIO getCurrentTime |
|
||||||
-- Load data in interval [today - 60days; today + 1day]. +1 day guarantees that we will download data up until current time. |
|
||||||
-- If we don't make this adjustment it is possible that we will get data only up to beginning of current day. |
|
||||||
barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) (86400 `addUTCTime` now) |
|
||||||
let barSeries = BarSeries tickerid tf barsData params |
|
||||||
liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) |
|
||||||
_ -> logWarning "QuoteThread" $ "Unable to find parameters for: " <> (T.pack . show $ BarSeriesId tickerid tf) |
|
||||||
|
|
||||||
pushToBarAggregators tick = forM_ (BarTimeframe <$> [60, 300, 900, 3600]) (pushTickToAggregator tick) |
|
||||||
|
|
||||||
pushTickToAggregator tick tf = do |
|
||||||
aggsRef <- asks aggregators |
|
||||||
aggs <- liftIO . readIORef $ aggsRef |
|
||||||
let key = (security tick, tf) |
|
||||||
case HM.lookup key aggs of |
|
||||||
Just agg -> do |
|
||||||
let (mbar, agg') = BA.handleTick tick agg |
|
||||||
liftIO $ atomicModifyIORef' aggsRef (\m -> (HM.insert key agg' m, ())) |
|
||||||
barsRef' <- asks bars |
|
||||||
case mbar of |
|
||||||
Just bar -> do |
|
||||||
liftIO $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ())) |
|
||||||
writeBarData bar tf (QDBar (tf, bar)) |
|
||||||
_ -> do |
|
||||||
pure () |
|
||||||
_ -> do |
|
||||||
let agg = BA.mkAggregatorFromBars (M.singleton (security tick) (BarSeries (security tick) tf [] (InstrumentParameters (security tick) 1 1))) [(0, 86400)] |
|
||||||
liftIO $ atomicModifyIORef' aggsRef (\m -> (HM.insert key agg m, ())) |
|
||||||
|
|
||||||
quoteThread env chan = flip runReaderT env $ forever $ do |
|
||||||
qssData <- lift $ readChan chan |
|
||||||
case qssData of |
|
||||||
QDBar (tf, bar) -> do |
|
||||||
barsRef' <- asks bars |
|
||||||
lift $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ())) |
|
||||||
writeBarData bar tf qssData |
|
||||||
QDTick tick -> do |
|
||||||
pushToBarAggregators tick |
|
||||||
writeTickData tick qssData |
|
||||||
|
|
||||||
writeTickData tick qssData = do |
|
||||||
let key = QuoteSubscription (security tick) (BarTimeframe 0) |
|
||||||
subs <- asks endpoints >>= (lift . readIORef) |
|
||||||
case HM.lookup key subs of |
|
||||||
Just clientChannels -> do |
|
||||||
lift $ mapM_ (\(_, chan') -> tryWriteChan chan' qssData) clientChannels |
|
||||||
Nothing -> return () |
|
||||||
|
|
||||||
writeBarData bar tf qssData = do |
|
||||||
let key = QuoteSubscription (barSecurity bar) tf |
|
||||||
subs <- asks endpoints >>= (lift . readIORef) |
|
||||||
case HM.lookup key subs of |
|
||||||
Just clientChannels -> do |
|
||||||
lift $ mapM_ (\(_, chan') -> tryWriteChan chan' qssData) clientChannels |
|
||||||
Nothing -> return () |
|
||||||
|
|
||||||
stopQuoteThread :: (MonadIO m) => QuoteThreadHandle -> m () |
|
||||||
stopQuoteThread (QuoteThreadHandle tid dtid env) = liftIO $ do |
|
||||||
killThread tid |
|
||||||
killThread dtid |
|
||||||
stopQuoteSourceClient (qsclient env) |
|
||||||
|
|
||||||
addSubscription :: (MonadIO m) => QuoteThreadHandle -> TickerId -> BarTimeframe -> BoundedChan QuoteData -> m SubscriptionId |
|
||||||
addSubscription (QuoteThreadHandle _ _ env) tid tf chan = liftIO $ do |
|
||||||
cnt <- atomicModifyIORef' (subscriptionIdCounter env) (\c -> (c + 1, c)) |
|
||||||
let subscription = QuoteSubscription tid tf |
|
||||||
let subid = SubscriptionId cnt |
|
||||||
writeChan (downloaderChan env) subscription |
|
||||||
atomicModifyIORef' (endpoints env) (\m -> (doAddSubscription m subid tid, ())) |
|
||||||
atomicModifyIORef' (subscriptions env) (\m -> (HM.insert subid subscription m, ())) |
|
||||||
quoteSourceClientSubscribe (qsclient env) [(tid, BarTimeframe 0)] |
|
||||||
return subid |
|
||||||
where |
|
||||||
doAddSubscription m subid tickerid = |
|
||||||
let m1 = HM.alter (\case |
|
||||||
Just chans -> Just ((subid, chan) : chans) |
|
||||||
_ -> Just [(subid, chan)]) (QuoteSubscription tickerid tf) m in |
|
||||||
HM.alter (\case |
|
||||||
Just chans -> Just ((subid, chan) : chans) |
|
||||||
_ -> Just [(subid, chan)]) (QuoteSubscription tickerid (BarTimeframe 0)) m1 |
|
||||||
|
|
||||||
removeSubscription :: (MonadIO m) => QuoteThreadHandle -> SubscriptionId -> m () |
|
||||||
removeSubscription (QuoteThreadHandle _ _ env) subId = liftIO $ do |
|
||||||
subs <- readIORef (subscriptions env) |
|
||||||
case HM.lookup subId subs of |
|
||||||
Just sub -> atomicModifyIORef' (endpoints env) (\m -> (doRemoveSubscription m sub, ())) |
|
||||||
Nothing -> return () |
|
||||||
where |
|
||||||
doRemoveSubscription m sub = |
|
||||||
let m1 = HM.adjust (filter (\(subId', _) -> subId' == subId)) sub m in |
|
||||||
HM.adjust (filter (\(subId', _) -> subId' == subId)) (sub0 sub) m1 |
|
||||||
sub0 sub = let QuoteSubscription tid _ = sub in QuoteSubscription tid (BarTimeframe 0) |
|
||||||
|
|
||||||
updateBarsMap :: Bars -> Bar -> BarTimeframe -> Bars |
|
||||||
updateBarsMap barsMap bar tf = M.adjust (addToSeries bar) (BarSeriesId (barSecurity bar) tf) barsMap |
|
||||||
|
|
||||||
addToSeries :: Bar -> BarSeries -> BarSeries |
|
||||||
addToSeries bar series = series { bsBars = bar : bsBars series } |
|
||||||
|
|
||||||
data DownloaderEnv = |
|
||||||
DownloaderEnv |
|
||||||
{ |
|
||||||
qhp :: QHPHandle, |
|
||||||
downloaderContext :: Context, |
|
||||||
downloaderQtisEndpoint :: T.Text, |
|
||||||
logAction :: LogAction DownloaderM Message |
|
||||||
} |
|
||||||
|
|
||||||
newtype DownloaderM a = DownloaderM { unDownloaderM :: ReaderT DownloaderEnv IO a } |
|
||||||
deriving (Functor, Applicative, Monad, MonadReader DownloaderEnv, MonadIO, MonadThrow) |
|
||||||
|
|
||||||
instance HasLog DownloaderEnv Message DownloaderM where |
|
||||||
getLogAction = logAction |
|
||||||
setLogAction a e = e { logAction = a } |
|
||||||
|
|
||||||
instance HistoryProvider DownloaderM where |
|
||||||
getHistory tid tf from to = do |
|
||||||
q <- asks qhp |
|
||||||
requestHistoryFromQHP q tid tf from to |
|
||||||
|
|
||||||
instance TickerInfoProvider DownloaderM where |
|
||||||
getInstrumentParameters tickers = do |
|
||||||
ctx <- asks downloaderContext |
|
||||||
ep <- asks downloaderQtisEndpoint |
|
||||||
tis <- forM tickers (qtisGetTickersInfo ctx ep) |
|
||||||
pure $ convert `fmap` tis |
|
||||||
where |
|
||||||
convert ti = InstrumentParameters |
|
||||||
(tiTicker ti) |
|
||||||
(fromInteger $ tiLotSize ti) |
|
||||||
(tiTickSize ti) |
|
||||||
|
|
||||||
withQThread :: |
|
||||||
DownloaderEnv |
|
||||||
-> IORef Bars |
|
||||||
-> IORef TickerInfoMap |
|
||||||
-> ProgramConfiguration |
|
||||||
-> Context |
|
||||||
-> LogAction IO Message |
|
||||||
-> (QuoteThreadHandle -> IO ()) |
|
||||||
-> IO () |
|
||||||
withQThread env barsMap tiMap cfg ctx logger f = do |
|
||||||
securityParameters <- loadSecurityParameters |
|
||||||
bracket |
|
||||||
(startQuoteThread |
|
||||||
barsMap |
|
||||||
tiMap |
|
||||||
ctx |
|
||||||
(quotesourceEndpoint cfg) |
|
||||||
securityParameters |
|
||||||
(runDownloaderM env) |
|
||||||
logger) |
|
||||||
stopQuoteThread f |
|
||||||
where |
|
||||||
loadSecurityParameters = |
|
||||||
case (quotesourceClientCert cfg, quotesourceServerCert cfg) of |
|
||||||
(Just clientCertPath, Just serverCertPath) -> do |
|
||||||
eClientCert <- loadCertificateFromFile clientCertPath |
|
||||||
eServerCert <- loadCertificateFromFile serverCertPath |
|
||||||
case (eClientCert, eServerCert) of |
|
||||||
(Right clientCert, Right serverCert) -> return $ ClientSecurityParams (Just clientCert) (Just serverCert) |
|
||||||
(_, _) -> return $ ClientSecurityParams Nothing Nothing |
|
||||||
|
|
||||||
_ -> return $ ClientSecurityParams Nothing Nothing |
|
||||||
|
|
||||||
runDownloaderM :: DownloaderEnv -> DownloaderM () -> IO () |
|
||||||
runDownloaderM env = (`runReaderT` env) . unDownloaderM |
|
||||||
@ -1,151 +0,0 @@ |
|||||||
{-# LANGUAGE FlexibleContexts #-} |
|
||||||
{-# LANGUAGE MultiWayIf #-} |
|
||||||
{-# LANGUAGE OverloadedStrings #-} |
|
||||||
|
|
||||||
module ATrade.Driver.Junction.RemoteControl |
|
||||||
( |
|
||||||
handleRemoteControl |
|
||||||
) where |
|
||||||
|
|
||||||
import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (peLogAction, peRemoteControlSocket, peRobots), |
|
||||||
JunctionM, getState, |
|
||||||
reloadConfig, |
|
||||||
setState, startRobot) |
|
||||||
import ATrade.Driver.Junction.RobotDriverThread (stopRobot) |
|
||||||
import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor) |
|
||||||
import ATrade.Logging (Severity (Info), |
|
||||||
logErrorWith, |
|
||||||
logWith) |
|
||||||
import Control.Monad (unless) |
|
||||||
import Control.Monad.Reader (asks) |
|
||||||
import Data.Aeson (decode) |
|
||||||
import qualified Data.ByteString as B |
|
||||||
import qualified Data.ByteString.Lazy as BL |
|
||||||
import Data.List.NonEmpty (NonEmpty ((:|))) |
|
||||||
import qualified Data.Map.Strict as M |
|
||||||
import qualified Data.Text as T |
|
||||||
import Data.Text.Encoding (decodeUtf8', |
|
||||||
encodeUtf8) |
|
||||||
import System.ZMQ4 (Event (In), |
|
||||||
Poll (Sock), poll, |
|
||||||
receiveMulti, |
|
||||||
sendMulti) |
|
||||||
import UnliftIO (MonadIO (liftIO), |
|
||||||
atomicModifyIORef', |
|
||||||
readIORef) |
|
||||||
|
|
||||||
data RemoteControlResponse = |
|
||||||
ResponseOk |
|
||||||
| ResponseError T.Text |
|
||||||
| ResponseData B.ByteString |
|
||||||
deriving (Show, Eq) |
|
||||||
|
|
||||||
data RemoteControlRequest = |
|
||||||
StartRobot StrategyInstanceDescriptor |
|
||||||
| StopRobot T.Text |
|
||||||
| ReloadConfig T.Text |
|
||||||
| GetState T.Text |
|
||||||
| SetState T.Text B.ByteString |
|
||||||
| Ping |
|
||||||
deriving (Show) |
|
||||||
|
|
||||||
data ParseError = |
|
||||||
UnknownCmd |
|
||||||
| UtfDecodeError |
|
||||||
| JsonDecodeError |
|
||||||
deriving (Show, Eq) |
|
||||||
|
|
||||||
parseRemoteControlRequest :: B.ByteString -> Either ParseError RemoteControlRequest |
|
||||||
parseRemoteControlRequest bs = |
|
||||||
if |
|
||||||
| cmd == "START" -> parseStart |
|
||||||
| cmd == "STOP" -> parseStop |
|
||||||
| cmd == "RELOAD_CONFIG" -> parseReloadConfig |
|
||||||
| cmd == "GET_STATE" -> parseGetState |
|
||||||
| cmd == "SET_STATE" -> parseSetState |
|
||||||
| cmd == "PING" -> Right Ping |
|
||||||
| otherwise -> Left UnknownCmd |
|
||||||
where |
|
||||||
cmd = B.takeWhile (/= 0x20) bs |
|
||||||
rest = B.dropWhile (== 0x20) . B.dropWhile (/= 0x20) $ bs |
|
||||||
|
|
||||||
parseStart = case decode . BL.fromStrict $ rest of |
|
||||||
Just inst -> Right (StartRobot inst) |
|
||||||
Nothing -> Left JsonDecodeError |
|
||||||
|
|
||||||
parseStop = case decodeUtf8' rest of |
|
||||||
Left _ -> Left UtfDecodeError |
|
||||||
Right r -> Right (StopRobot (T.strip r)) |
|
||||||
|
|
||||||
parseReloadConfig = case decodeUtf8' rest of |
|
||||||
Left _ -> Left UtfDecodeError |
|
||||||
Right r -> Right (ReloadConfig (T.strip r)) |
|
||||||
|
|
||||||
parseGetState = case decodeUtf8' (B.takeWhile (/= 0x20) rest) of |
|
||||||
Left _ -> Left UtfDecodeError |
|
||||||
Right r -> Right (GetState r) |
|
||||||
|
|
||||||
parseSetState = case decodeUtf8' (B.takeWhile (/= 0x20) rest) of |
|
||||||
Left _ -> Left UtfDecodeError |
|
||||||
Right r -> Right (SetState r (B.dropWhile (== 0x20) . B.dropWhile (/= 0x20) $ rest)) |
|
||||||
|
|
||||||
|
|
||||||
makeRemoteControlResponse :: RemoteControlResponse -> B.ByteString |
|
||||||
makeRemoteControlResponse ResponseOk = "OK" |
|
||||||
makeRemoteControlResponse (ResponseError msg) = "ERROR " <> encodeUtf8 msg |
|
||||||
makeRemoteControlResponse (ResponseData d) = "DATA\n" <> d |
|
||||||
|
|
||||||
handleRemoteControl :: Int -> JunctionM () |
|
||||||
handleRemoteControl timeout = do |
|
||||||
sock <- asks peRemoteControlSocket |
|
||||||
logger <- asks peLogAction |
|
||||||
evs <- poll (fromIntegral timeout) [Sock sock [In] Nothing] |
|
||||||
case evs of |
|
||||||
(x:_) -> unless (null x) $ do |
|
||||||
frames <- liftIO $ receiveMulti sock |
|
||||||
case frames of |
|
||||||
[peerId, _, rawRequest] -> do |
|
||||||
case parseRemoteControlRequest rawRequest of |
|
||||||
Left err -> logErrorWith logger "RemoteControl" ("Unable to parse request: " <> (T.pack . show) err) |
|
||||||
Right request -> do |
|
||||||
response <- handleRequest request |
|
||||||
liftIO $ sendMulti sock $ peerId :| [B.empty, makeRemoteControlResponse response] |
|
||||||
_ -> logErrorWith logger "RemoteControl" "Invalid incoming request" |
|
||||||
_ -> return () |
|
||||||
where |
|
||||||
handleRequest (StartRobot inst) = do |
|
||||||
startRobot inst |
|
||||||
return ResponseOk |
|
||||||
handleRequest (StopRobot instId) = do |
|
||||||
robotsRef <- asks peRobots |
|
||||||
robots <- readIORef robotsRef |
|
||||||
case M.lookup instId robots of |
|
||||||
Just robot -> do |
|
||||||
logger <- asks peLogAction |
|
||||||
logWith logger Info "RemoteControl" $ "Stopping robot: " <> instId |
|
||||||
stopRobot robot |
|
||||||
liftIO $ atomicModifyIORef' robotsRef (\r -> (M.delete instId r, ())) |
|
||||||
return ResponseOk |
|
||||||
Nothing -> return $ ResponseError $ "Not started: " <> instId |
|
||||||
|
|
||||||
handleRequest (ReloadConfig instId) = do |
|
||||||
res <- reloadConfig instId |
|
||||||
case res of |
|
||||||
Left errmsg -> return $ ResponseError errmsg |
|
||||||
Right () -> return ResponseOk |
|
||||||
|
|
||||||
handleRequest (GetState instId) = do |
|
||||||
res <- getState instId |
|
||||||
case res of |
|
||||||
Left errmsg -> return $ ResponseError errmsg |
|
||||||
Right d -> return $ ResponseData d |
|
||||||
|
|
||||||
handleRequest (SetState instId rawState) = do |
|
||||||
res <- setState instId rawState |
|
||||||
case res of |
|
||||||
Left errmsg -> return $ ResponseError errmsg |
|
||||||
Right () -> return ResponseOk |
|
||||||
|
|
||||||
handleRequest Ping = return ResponseOk |
|
||||||
|
|
||||||
|
|
||||||
@ -1,216 +0,0 @@ |
|||||||
{-# LANGUAGE ExistentialQuantification #-} |
|
||||||
{-# LANGUAGE FlexibleContexts #-} |
|
||||||
{-# LANGUAGE FlexibleInstances #-} |
|
||||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
|
||||||
{-# LANGUAGE MultiParamTypeClasses #-} |
|
||||||
{-# LANGUAGE OverloadedStrings #-} |
|
||||||
{-# LANGUAGE RankNTypes #-} |
|
||||||
|
|
||||||
module ATrade.Driver.Junction.RobotDriverThread |
|
||||||
( |
|
||||||
createRobotDriverThread, |
|
||||||
RobotEnv(..), |
|
||||||
RobotM(..), |
|
||||||
RobotDriverHandle, |
|
||||||
onStrategyInstance, |
|
||||||
onStrategyInstanceM, |
|
||||||
postNotificationEvent, |
|
||||||
stopRobot, |
|
||||||
getInstanceDescriptor |
|
||||||
) where |
|
||||||
|
|
||||||
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification)) |
|
||||||
import qualified ATrade.Driver.Junction.BrokerService as Bro |
|
||||||
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), |
|
||||||
QuoteSubscription (QuoteSubscription), |
|
||||||
SubscriptionId) |
|
||||||
import ATrade.Driver.Junction.Types (BigConfig, |
|
||||||
StrategyDescriptor, |
|
||||||
StrategyInstance (StrategyInstance, strategyEventCallback), |
|
||||||
StrategyInstanceDescriptor (configKey), |
|
||||||
confStrategy, |
|
||||||
confTickers, |
|
||||||
eventCallback, stateKey, |
|
||||||
strategyId, tickerId, |
|
||||||
timeframe) |
|
||||||
import ATrade.Logging (Message, log) |
|
||||||
import ATrade.QuoteSource.Client (QuoteData (..)) |
|
||||||
import ATrade.RoboCom.ConfigStorage (ConfigStorage) |
|
||||||
import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderUpdate), |
|
||||||
MonadRobot (..), |
|
||||||
StrategyEnvironment (..)) |
|
||||||
import ATrade.RoboCom.Persistence (MonadPersistence) |
|
||||||
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), |
|
||||||
Bars, TickerInfoMap) |
|
||||||
import ATrade.Types (OrderId, OrderState, |
|
||||||
Tick (value), Trade) |
|
||||||
import Colog (HasLog (getLogAction, setLogAction), |
|
||||||
LogAction) |
|
||||||
import Control.Concurrent (ThreadId, forkIO, |
|
||||||
killThread) |
|
||||||
import Control.Concurrent.BoundedChan (BoundedChan, |
|
||||||
newBoundedChan, readChan, |
|
||||||
writeChan) |
|
||||||
import Control.Exception.Safe (MonadThrow) |
|
||||||
import Control.Monad (forM, forM_, forever, |
|
||||||
void, when) |
|
||||||
import Control.Monad.IO.Class (MonadIO, liftIO) |
|
||||||
import Control.Monad.Reader (MonadReader (local), |
|
||||||
ReaderT, asks) |
|
||||||
import Data.Aeson (FromJSON, ToJSON) |
|
||||||
import Data.Default (Default) |
|
||||||
import Data.IORef (IORef, |
|
||||||
atomicModifyIORef', |
|
||||||
readIORef, writeIORef) |
|
||||||
import Data.List.NonEmpty (NonEmpty) |
|
||||||
import qualified Data.Map.Strict as M |
|
||||||
import qualified Data.Text.Lazy as TL |
|
||||||
import Data.Time (UTCTime, getCurrentTime) |
|
||||||
import Dhall (FromDhall) |
|
||||||
import Prelude hiding (log) |
|
||||||
|
|
||||||
data RobotDriverHandle = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => |
|
||||||
RobotDriverHandle StrategyInstanceDescriptor (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) [SubscriptionId] |
|
||||||
|
|
||||||
data RobotDriverRequest |
|
||||||
|
|
||||||
data RobotDriverEvent = |
|
||||||
EventRequest RobotDriverRequest |
|
||||||
| QuoteEvent QuoteData |
|
||||||
| NewTradeEvent Trade |
|
||||||
| OrderEvent OrderId OrderState |
|
||||||
|
|
||||||
|
|
||||||
robotDriverThread :: (MonadIO m, |
|
||||||
MonadRobot m c s) => |
|
||||||
StrategyInstance c s -> |
|
||||||
BoundedChan RobotDriverEvent -> |
|
||||||
m () |
|
||||||
|
|
||||||
robotDriverThread inst eventQueue = |
|
||||||
forever $ liftIO (readChan eventQueue) >>= handleEvent |
|
||||||
where |
|
||||||
handleEvent (EventRequest _) = return () |
|
||||||
handleEvent (QuoteEvent d) = |
|
||||||
case d of |
|
||||||
QDTick tick -> when (value tick /= 0) $ strategyEventCallback inst (NewTick tick) |
|
||||||
QDBar (tf, bar) -> strategyEventCallback inst (NewBar (tf, bar)) |
|
||||||
handleEvent (NewTradeEvent trade) = strategyEventCallback inst (NewTrade trade) |
|
||||||
handleEvent (OrderEvent oid newState) = strategyEventCallback inst (OrderUpdate oid newState) |
|
||||||
|
|
||||||
createRobotDriverThread :: (MonadIO m1, |
|
||||||
ConfigStorage m1, |
|
||||||
MonadPersistence m1, |
|
||||||
QuoteStream m1, |
|
||||||
Default s, |
|
||||||
FromJSON s, |
|
||||||
ToJSON s, |
|
||||||
FromDhall c, |
|
||||||
MonadIO m, |
|
||||||
MonadReader (RobotEnv c s) m, |
|
||||||
MonadRobot m c s) => |
|
||||||
StrategyInstanceDescriptor |
|
||||||
-> StrategyDescriptor c s |
|
||||||
-> (m () -> IO ()) |
|
||||||
-> BigConfig c |
|
||||||
-> IORef c |
|
||||||
-> IORef s |
|
||||||
-> IORef [UTCTime] |
|
||||||
-> m1 RobotDriverHandle |
|
||||||
|
|
||||||
createRobotDriverThread instDesc strDesc runner bigConf rConf rState rTimers = do |
|
||||||
eventQueue <- liftIO $ newBoundedChan 2000 |
|
||||||
|
|
||||||
let inst = StrategyInstance (strategyId instDesc) (eventCallback strDesc) rState rConf rTimers |
|
||||||
|
|
||||||
quoteQueue <- liftIO $ newBoundedChan 2000 |
|
||||||
subIds <- forM (confTickers bigConf) (\x -> addSubscription (QuoteSubscription (tickerId x) (timeframe x)) quoteQueue) |
|
||||||
qthread <- liftIO . forkIO $ forever $ passQuoteEvents eventQueue quoteQueue |
|
||||||
|
|
||||||
driver <- liftIO . forkIO $ runner $ robotDriverThread inst eventQueue |
|
||||||
return $ RobotDriverHandle instDesc inst driver qthread eventQueue subIds |
|
||||||
|
|
||||||
where |
|
||||||
passQuoteEvents eventQueue quoteQueue = do |
|
||||||
v <- readChan quoteQueue |
|
||||||
writeChan eventQueue (QuoteEvent v) |
|
||||||
|
|
||||||
stopRobot :: (MonadIO m, QuoteStream m) => RobotDriverHandle -> m () |
|
||||||
stopRobot (RobotDriverHandle _ _ driver qthread _ subIds) = do |
|
||||||
forM_ subIds removeSubscription |
|
||||||
liftIO $ killThread driver |
|
||||||
liftIO $ killThread qthread |
|
||||||
|
|
||||||
onStrategyInstance :: RobotDriverHandle -> forall r. (forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> r) -> r |
|
||||||
onStrategyInstance (RobotDriverHandle _ inst _ _ _ _) f = f inst |
|
||||||
|
|
||||||
onStrategyInstanceM :: (MonadIO m) => RobotDriverHandle -> |
|
||||||
(forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> m r) -> m r |
|
||||||
onStrategyInstanceM (RobotDriverHandle _ inst _ _ _ _) f = f inst |
|
||||||
|
|
||||||
data RobotEnv c s = |
|
||||||
RobotEnv |
|
||||||
{ |
|
||||||
stateRef :: IORef s, |
|
||||||
configRef :: IORef c, |
|
||||||
timersRef :: IORef [UTCTime], |
|
||||||
bars :: IORef Bars, |
|
||||||
tickerInfoMap :: IORef TickerInfoMap, |
|
||||||
env :: IORef StrategyEnvironment, |
|
||||||
logAction :: LogAction (RobotM c s) Message, |
|
||||||
brokerService :: Bro.BrokerService, |
|
||||||
tickers :: NonEmpty BarSeriesId |
|
||||||
} |
|
||||||
|
|
||||||
newtype RobotM c s a = RobotM { unRobotM :: ReaderT (RobotEnv c s) IO a } |
|
||||||
deriving (Functor, Applicative, Monad, MonadReader (RobotEnv c s), MonadIO, MonadThrow) |
|
||||||
|
|
||||||
instance HasLog (RobotEnv c s) Message (RobotM c s) where |
|
||||||
getLogAction = logAction |
|
||||||
setLogAction a e = e { logAction = a } |
|
||||||
|
|
||||||
instance MonadRobot (RobotM c s) c s where |
|
||||||
submitOrder order = do |
|
||||||
instId <- _seInstanceId <$> (asks env >>= liftIO . readIORef) |
|
||||||
bro <- asks brokerService |
|
||||||
Bro.submitOrder bro instId order |
|
||||||
|
|
||||||
cancelOrder oid = do |
|
||||||
bro <- asks brokerService |
|
||||||
Bro.cancelOrder bro oid |
|
||||||
|
|
||||||
appendToLog s t = do |
|
||||||
instId <- _seInstanceId <$> (asks env >>= liftIO . readIORef) |
|
||||||
log s instId $ TL.toStrict t |
|
||||||
|
|
||||||
setupTimer t = do |
|
||||||
ref <- asks timersRef |
|
||||||
liftIO $ atomicModifyIORef' ref (\s -> (t : s, ())) |
|
||||||
|
|
||||||
enqueueIOAction = undefined |
|
||||||
getConfig = asks configRef >>= liftIO . readIORef |
|
||||||
getState = asks stateRef >>= liftIO . readIORef |
|
||||||
setState newState = asks stateRef >>= liftIO . flip writeIORef newState |
|
||||||
getEnvironment = do |
|
||||||
ref <- asks env |
|
||||||
now <- liftIO getCurrentTime |
|
||||||
liftIO $ atomicModifyIORef' ref (\e -> (e { _seLastTimestamp = now }, e { _seLastTimestamp = now})) |
|
||||||
|
|
||||||
getTicker tid tf = do |
|
||||||
b <- asks bars >>= liftIO . readIORef |
|
||||||
return $ M.lookup (BarSeriesId tid tf) b |
|
||||||
|
|
||||||
getTickerInfo tid = do |
|
||||||
b <- asks tickerInfoMap >>= liftIO . readIORef |
|
||||||
return $ M.lookup tid b |
|
||||||
|
|
||||||
getAvailableTickers = asks tickers |
|
||||||
|
|
||||||
postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m () |
|
||||||
postNotificationEvent (RobotDriverHandle _ _ _ _ eventQueue _) notification = liftIO $ |
|
||||||
case notification of |
|
||||||
OrderNotification _ oid state -> writeChan eventQueue (OrderEvent oid state) |
|
||||||
TradeNotification _ trade -> writeChan eventQueue (NewTradeEvent trade) |
|
||||||
|
|
||||||
getInstanceDescriptor :: RobotDriverHandle -> StrategyInstanceDescriptor |
|
||||||
getInstanceDescriptor (RobotDriverHandle instDesc _ _ _ _ _) = instDesc |
|
||||||
@ -0,0 +1,361 @@ |
|||||||
|
{-# LANGUAGE FlexibleInstances #-} |
||||||
|
{-# LANGUAGE OverloadedStrings #-} |
||||||
|
{-# LANGUAGE TypeSynonymInstances #-} |
||||||
|
|
||||||
|
module ATrade.Quotes.Finam ( |
||||||
|
downloadFinamSymbols, |
||||||
|
Symbol(..), |
||||||
|
Period(..), |
||||||
|
DateFormat(..), |
||||||
|
TimeFormat(..), |
||||||
|
FieldSeparator(..), |
||||||
|
RequestParams(..), |
||||||
|
defaultParams, |
||||||
|
downloadQuotes, |
||||||
|
parseQuotes, |
||||||
|
downloadAndParseQuotes, |
||||||
|
Row(..) |
||||||
|
) where |
||||||
|
|
||||||
|
import ATrade.Types |
||||||
|
import Control.Error.Util |
||||||
|
import Control.Exception |
||||||
|
import Control.Lens |
||||||
|
import Control.Monad |
||||||
|
import qualified Data.ByteString as B |
||||||
|
import qualified Data.ByteString.Char8 as B8 |
||||||
|
import qualified Data.ByteString.Lazy as BL |
||||||
|
import Data.Csv hiding (Options) |
||||||
|
import Data.List |
||||||
|
import qualified Data.Map as M |
||||||
|
import Data.Maybe |
||||||
|
import qualified Data.Text as T |
||||||
|
import qualified Data.Text.ICU.Convert as TC |
||||||
|
import Data.Time.Calendar |
||||||
|
import Data.Time.Clock |
||||||
|
import Data.Time.Format |
||||||
|
import qualified Data.Vector as V |
||||||
|
import Network.Wreq |
||||||
|
import Safe |
||||||
|
import System.Log.Logger |
||||||
|
import Text.Parsec |
||||||
|
import Text.ParserCombinators.Parsec.Number |
||||||
|
|
||||||
|
data Period = |
||||||
|
PeriodTick | |
||||||
|
Period1Min | |
||||||
|
Period5Min | |
||||||
|
Period10Min | |
||||||
|
Period15Min | |
||||||
|
Period30Min | |
||||||
|
PeriodHour | |
||||||
|
PeriodDay | |
||||||
|
PeriodWeek | |
||||||
|
PeriodMonth |
||||||
|
deriving (Show, Eq) |
||||||
|
|
||||||
|
instance Enum Period where |
||||||
|
fromEnum PeriodTick = 1 |
||||||
|
fromEnum Period1Min = 2 |
||||||
|
fromEnum Period5Min = 3 |
||||||
|
fromEnum Period10Min = 4 |
||||||
|
fromEnum Period15Min = 5 |
||||||
|
fromEnum Period30Min = 6 |
||||||
|
fromEnum PeriodHour = 7 |
||||||
|
fromEnum PeriodDay = 8 |
||||||
|
fromEnum PeriodWeek = 9 |
||||||
|
fromEnum PeriodMonth = 10 |
||||||
|
|
||||||
|
toEnum 1 = PeriodTick |
||||||
|
toEnum 2 = Period1Min |
||||||
|
toEnum 3 = Period5Min |
||||||
|
toEnum 4 = Period10Min |
||||||
|
toEnum 5 = Period15Min |
||||||
|
toEnum 6 = Period30Min |
||||||
|
toEnum 7 = PeriodHour |
||||||
|
toEnum 8 = PeriodDay |
||||||
|
toEnum 9 = PeriodWeek |
||||||
|
toEnum 10 = PeriodMonth |
||||||
|
toEnum _ = PeriodDay |
||||||
|
|
||||||
|
data DateFormat = |
||||||
|
FormatYYYYMMDD | |
||||||
|
FormatYYMMDD | |
||||||
|
FormatDDMMYY | |
||||||
|
FormatDD_MM_YY | |
||||||
|
FormatMM_DD_YY |
||||||
|
deriving (Show, Eq) |
||||||
|
|
||||||
|
instance Enum DateFormat where |
||||||
|
fromEnum FormatYYYYMMDD = 1 |
||||||
|
fromEnum FormatYYMMDD = 2 |
||||||
|
fromEnum FormatDDMMYY = 3 |
||||||
|
fromEnum FormatDD_MM_YY = 4 |
||||||
|
fromEnum FormatMM_DD_YY = 5 |
||||||
|
|
||||||
|
toEnum 1 = FormatYYYYMMDD |
||||||
|
toEnum 2 = FormatYYMMDD |
||||||
|
toEnum 3 = FormatDDMMYY |
||||||
|
toEnum 4 = FormatDD_MM_YY |
||||||
|
toEnum 5 = FormatMM_DD_YY |
||||||
|
toEnum _ = FormatYYYYMMDD |
||||||
|
|
||||||
|
|
||||||
|
data TimeFormat = |
||||||
|
FormatHHMMSS | |
||||||
|
FormatHHMM | |
||||||
|
FormatHH_MM_SS | |
||||||
|
FormatHH_MM |
||||||
|
deriving (Show, Eq) |
||||||
|
|
||||||
|
instance Enum TimeFormat where |
||||||
|
fromEnum FormatHHMMSS = 1 |
||||||
|
fromEnum FormatHHMM = 2 |
||||||
|
fromEnum FormatHH_MM_SS = 3 |
||||||
|
fromEnum FormatHH_MM = 4 |
||||||
|
|
||||||
|
toEnum 1 = FormatHHMMSS |
||||||
|
toEnum 2 = FormatHHMM |
||||||
|
toEnum 3 = FormatHH_MM_SS |
||||||
|
toEnum 4 = FormatHH_MM |
||||||
|
toEnum _ = FormatHHMMSS |
||||||
|
|
||||||
|
data FieldSeparator = |
||||||
|
SeparatorComma | |
||||||
|
SeparatorPeriod | |
||||||
|
SeparatorSemicolon | |
||||||
|
SeparatorTab | |
||||||
|
SeparatorSpace |
||||||
|
deriving (Show, Eq) |
||||||
|
|
||||||
|
instance Enum FieldSeparator where |
||||||
|
fromEnum SeparatorComma = 1 |
||||||
|
fromEnum SeparatorPeriod = 2 |
||||||
|
fromEnum SeparatorSemicolon = 3 |
||||||
|
fromEnum SeparatorTab = 4 |
||||||
|
fromEnum SeparatorSpace = 5 |
||||||
|
|
||||||
|
toEnum 1 = SeparatorComma |
||||||
|
toEnum 2 = SeparatorPeriod |
||||||
|
toEnum 3 = SeparatorSemicolon |
||||||
|
toEnum 4 = SeparatorTab |
||||||
|
toEnum 5 = SeparatorSpace |
||||||
|
toEnum _ = SeparatorComma |
||||||
|
|
||||||
|
data RequestParams = RequestParams { |
||||||
|
ticker :: T.Text, |
||||||
|
startDate :: Day, |
||||||
|
endDate :: Day, |
||||||
|
period :: Period, |
||||||
|
dateFormat :: DateFormat, |
||||||
|
timeFormat :: TimeFormat, |
||||||
|
fieldSeparator :: FieldSeparator, |
||||||
|
includeHeader :: Bool, |
||||||
|
fillEmpty :: Bool |
||||||
|
} |
||||||
|
|
||||||
|
defaultParams :: RequestParams |
||||||
|
defaultParams = RequestParams { |
||||||
|
ticker = "", |
||||||
|
startDate = fromGregorian 1970 1 1, |
||||||
|
endDate = fromGregorian 1970 1 1, |
||||||
|
period = PeriodDay, |
||||||
|
dateFormat = FormatYYYYMMDD, |
||||||
|
timeFormat = FormatHHMMSS, |
||||||
|
fieldSeparator = SeparatorComma, |
||||||
|
includeHeader = True, |
||||||
|
fillEmpty = False |
||||||
|
} |
||||||
|
|
||||||
|
data Symbol = Symbol { |
||||||
|
symCode :: T.Text, |
||||||
|
symName :: T.Text, |
||||||
|
symId :: Integer, |
||||||
|
symMarketCode :: Integer, |
||||||
|
symMarketName :: T.Text |
||||||
|
} |
||||||
|
deriving (Show, Eq) |
||||||
|
|
||||||
|
data Row = Row { |
||||||
|
rowTicker :: T.Text, |
||||||
|
rowTime :: UTCTime, |
||||||
|
rowOpen :: Price, |
||||||
|
rowHigh :: Price, |
||||||
|
rowLow :: Price, |
||||||
|
rowClose :: Price, |
||||||
|
rowVolume :: Integer |
||||||
|
} deriving (Show, Eq) |
||||||
|
|
||||||
|
instance FromField Price where |
||||||
|
parseField s = fromDouble <$> (parseField s :: Parser Double) |
||||||
|
|
||||||
|
instance FromRecord Row where |
||||||
|
parseRecord v |
||||||
|
| length v == 9 = do |
||||||
|
tkr <- v .! 0 |
||||||
|
date <- v .! 2 |
||||||
|
time <- v .! 3 |
||||||
|
dt <- addUTCTime (-3 * 3600) <$> (parseDt date time) |
||||||
|
open <- v .! 4 |
||||||
|
high <- v .! 5 |
||||||
|
low <- v .! 6 |
||||||
|
close <- v .! 7 |
||||||
|
vol <- v .! 8 |
||||||
|
return $ Row tkr dt open high low close vol |
||||||
|
| otherwise = mzero |
||||||
|
where |
||||||
|
parseDt :: B.ByteString -> B.ByteString -> Parser UTCTime |
||||||
|
parseDt d t = case parseTimeM True defaultTimeLocale "%Y%m%d %H%M%S" $ B8.unpack d ++ " " ++ B8.unpack t of |
||||||
|
Just dt -> return dt |
||||||
|
Nothing -> fail "Unable to parse date/time" |
||||||
|
|
||||||
|
downloadAndParseQuotes :: RequestParams -> IO (Maybe [Row]) |
||||||
|
downloadAndParseQuotes requestParams = downloadAndParseQuotes' 3 |
||||||
|
where |
||||||
|
downloadAndParseQuotes' iter = do |
||||||
|
raw <- downloadQuotes requestParams `catch` (\e -> do |
||||||
|
debugM "History" $ "exception: " ++ show (e :: SomeException) |
||||||
|
return Nothing) |
||||||
|
case raw of |
||||||
|
Just r -> return $ parseQuotes r |
||||||
|
Nothing -> if iter <= 0 then return Nothing else downloadAndParseQuotes' (iter - 1) |
||||||
|
|
||||||
|
parseQuotes :: B.ByteString -> Maybe [Row] |
||||||
|
parseQuotes csvData = case decode HasHeader $ BL.fromStrict csvData of |
||||||
|
Left _ -> Nothing |
||||||
|
Right d -> Just $ V.toList d |
||||||
|
|
||||||
|
downloadQuotes :: RequestParams -> IO (Maybe B.ByteString) |
||||||
|
downloadQuotes requestParams = do |
||||||
|
symbols <- downloadFinamSymbols |
||||||
|
case requestUrl symbols requestParams of |
||||||
|
Just (url, options') -> do |
||||||
|
resp <- getWith options' url |
||||||
|
return $ Just $ BL.toStrict $ resp ^. responseBody |
||||||
|
Nothing -> return Nothing |
||||||
|
|
||||||
|
requestUrl :: [Symbol] -> RequestParams -> Maybe (String, Options) |
||||||
|
requestUrl symbols requestParams = case getFinamCode symbols (ticker requestParams) of |
||||||
|
Just (sym, market) -> Just ("http://export.finam.ru/export9.out", getOptions sym market) |
||||||
|
Nothing -> Nothing |
||||||
|
where |
||||||
|
getOptions sym market = defaults & |
||||||
|
param "market" .~ [T.pack . show $ market] & |
||||||
|
param "f" .~ [ticker requestParams] & |
||||||
|
param "e" .~ [".csv"] & |
||||||
|
param "dtf" .~ [T.pack . show . fromEnum . dateFormat $ requestParams] & |
||||||
|
param "tmf" .~ [T.pack . show . fromEnum . dateFormat $ requestParams] & |
||||||
|
param "MSOR" .~ ["0"] & |
||||||
|
param "mstime" .~ ["on"] & |
||||||
|
param "mstimever" .~ ["1"] & |
||||||
|
param "sep" .~ [T.pack . show . fromEnum . fieldSeparator $ requestParams] & |
||||||
|
param "sep2" .~ ["1"] & |
||||||
|
param "at" .~ [if includeHeader requestParams then "1" else "0"] & |
||||||
|
param "fsp" .~ [if fillEmpty requestParams then "1" else "0"] & |
||||||
|
param "p" .~ [T.pack . show . fromEnum $ period requestParams] & |
||||||
|
param "em" .~ [T.pack . show $ sym ] & |
||||||
|
param "df" .~ [T.pack . show $ dayFrom] & |
||||||
|
param "mf" .~ [T.pack . show $ (monthFrom - 1)] & |
||||||
|
param "yf" .~ [T.pack . show $ yearFrom] & |
||||||
|
param "dt" .~ [T.pack . show $ dayTo] & |
||||||
|
param "mt" .~ [T.pack . show $ (monthTo - 1)] & |
||||||
|
param "yt" .~ [T.pack . show $ yearTo] & |
||||||
|
param "code" .~ [ticker requestParams] & |
||||||
|
param "datf" .~ if period requestParams == PeriodTick then ["11"] else ["1"] |
||||||
|
(yearFrom, monthFrom, dayFrom) = toGregorian $ startDate requestParams |
||||||
|
(yearTo, monthTo, dayTo) = toGregorian $ endDate requestParams |
||||||
|
|
||||||
|
getFinamCode :: [Symbol] -> T.Text -> Maybe (Integer, Integer) |
||||||
|
getFinamCode symbols tickerCode = case find (\x -> symCode x == tickerCode && symMarketCode x `notElem` archives) symbols of |
||||||
|
Just sym -> Just (symId sym, symMarketCode sym) |
||||||
|
Nothing -> Nothing |
||||||
|
|
||||||
|
downloadFinamSymbols :: IO [Symbol] |
||||||
|
downloadFinamSymbols = do |
||||||
|
conv <- TC.open "cp1251" Nothing |
||||||
|
result <- get "http://www.finam.ru/cache/icharts/icharts.js" |
||||||
|
if result ^. responseStatus . statusCode == 200 |
||||||
|
then return $ parseSymbols . T.lines $ TC.toUnicode conv $ BL.toStrict $ result ^. responseBody |
||||||
|
else return [] |
||||||
|
where |
||||||
|
parseSymbols :: [T.Text] -> [Symbol] |
||||||
|
parseSymbols strs = zipWith5 Symbol codes names ids marketCodes marketNames |
||||||
|
where |
||||||
|
getWithParser parser pos = fromMaybe [] $ do |
||||||
|
s <- T.unpack <$> strs `atMay` pos |
||||||
|
hush $ parse parser "" s |
||||||
|
|
||||||
|
ids :: [Integer] |
||||||
|
ids = getWithParser intlist 0 |
||||||
|
|
||||||
|
names :: [T.Text] |
||||||
|
names = T.pack <$> getWithParser strlist 1 |
||||||
|
|
||||||
|
codes :: [T.Text] |
||||||
|
codes = T.pack <$> getWithParser strlist 2 |
||||||
|
|
||||||
|
marketCodes :: [Integer] |
||||||
|
marketCodes = getWithParser intlist 3 |
||||||
|
|
||||||
|
marketNames :: [T.Text] |
||||||
|
marketNames = fmap (\code -> fromMaybe "" $ M.lookup code codeToName) marketCodes |
||||||
|
|
||||||
|
intlist = do |
||||||
|
_ <- string "var" |
||||||
|
spaces |
||||||
|
skipMany1 alphaNum |
||||||
|
spaces |
||||||
|
_ <- char '=' |
||||||
|
spaces |
||||||
|
_ <- char '[' |
||||||
|
manyTill (do |
||||||
|
i <- int |
||||||
|
_ <- char ',' <|> char ']' |
||||||
|
return i) (char '\'' <|> char ';') |
||||||
|
|
||||||
|
strlist = do |
||||||
|
_ <- string "var" |
||||||
|
spaces |
||||||
|
skipMany1 alphaNum |
||||||
|
spaces |
||||||
|
_ <- char '=' |
||||||
|
spaces |
||||||
|
_ <- char '[' |
||||||
|
(char '\'' >> manyTill ((char '\\' >> char '\'') <|> anyChar) (char '\'')) `sepBy` char ',' |
||||||
|
|
||||||
|
codeToName :: M.Map Integer T.Text |
||||||
|
codeToName = M.fromList [ |
||||||
|
(200, "МосБиржа топ"), |
||||||
|
(1 , "МосБиржа акции"), |
||||||
|
(14 , "МосБиржа фьючерсы"), |
||||||
|
(41, "Курс рубля"), |
||||||
|
(45, "МосБиржа валютный рынок"), |
||||||
|
(2, "МосБиржа облигации"), |
||||||
|
(12, "МосБиржа внесписочные облигации"), |
||||||
|
(29, "МосБиржа пифы"), |
||||||
|
(8, "Расписки"), |
||||||
|
(6, "Мировые Индексы"), |
||||||
|
(24, "Товары"), |
||||||
|
(5, "Мировые валюты"), |
||||||
|
(25, "Акции США(BATS)"), |
||||||
|
(7, "Фьючерсы США"), |
||||||
|
(27, "Отрасли экономики США"), |
||||||
|
(26, "Гособлигации США"), |
||||||
|
(28, "ETF"), |
||||||
|
(30, "Индексы мировой экономики"), |
||||||
|
(3, "РТС"), |
||||||
|
(20, "RTS Board"), |
||||||
|
(10, "РТС-GAZ"), |
||||||
|
(17, "ФОРТС Архив"), |
||||||
|
(31, "Сырье Архив"), |
||||||
|
(38, "RTS Standard Архив"), |
||||||
|
(16, "ММВБ Архив"), |
||||||
|
(18, "РТС Архив"), |
||||||
|
(9, "СПФБ Архив"), |
||||||
|
(32, "РТС-BOARD Архив"), |
||||||
|
(39, "Расписки Архив"), |
||||||
|
(-1, "Отрасли") ] |
||||||
|
|
||||||
|
|
||||||
|
archives :: [Integer] |
||||||
|
archives = [3, 8, 16, 17, 18, 31, 32, 38, 39, 517] |
||||||
@ -1,12 +0,0 @@ |
|||||||
|
|
||||||
module ATrade.Quotes.HistoryProvider |
|
||||||
( |
|
||||||
HistoryProvider(..) |
|
||||||
) where |
|
||||||
|
|
||||||
import ATrade.RoboCom.Types (Bar) |
|
||||||
import ATrade.Types (BarTimeframe, TickerId) |
|
||||||
import Data.Time (UTCTime) |
|
||||||
|
|
||||||
class (Monad m) => HistoryProvider m where |
|
||||||
getHistory :: TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] |
|
||||||
@ -1,12 +0,0 @@ |
|||||||
|
|
||||||
module ATrade.Quotes.TickerInfoProvider |
|
||||||
( |
|
||||||
TickerInfoProvider(..) |
|
||||||
) where |
|
||||||
|
|
||||||
import ATrade.RoboCom.Types (InstrumentParameters) |
|
||||||
import ATrade.Types (TickerId) |
|
||||||
|
|
||||||
class (Monad m) => TickerInfoProvider m where |
|
||||||
getInstrumentParameters :: [TickerId] -> m [InstrumentParameters] |
|
||||||
|
|
||||||
@ -1,14 +0,0 @@ |
|||||||
{-# LANGUAGE RankNTypes #-} |
|
||||||
|
|
||||||
module ATrade.RoboCom.ConfigStorage |
|
||||||
( |
|
||||||
ConfigStorage(..) |
|
||||||
) where |
|
||||||
|
|
||||||
import qualified Data.Text as T |
|
||||||
import Dhall (FromDhall) |
|
||||||
|
|
||||||
class (Monad m) => ConfigStorage m where |
|
||||||
loadConfig :: forall c. (FromDhall c) => T.Text -> m c |
|
||||||
|
|
||||||
|
|
||||||
@ -1,16 +0,0 @@ |
|||||||
{-# LANGUAGE RankNTypes #-} |
|
||||||
|
|
||||||
module ATrade.RoboCom.Persistence |
|
||||||
( |
|
||||||
MonadPersistence(..) |
|
||||||
) where |
|
||||||
|
|
||||||
import Data.Aeson |
|
||||||
import Data.Default (Default) |
|
||||||
import qualified Data.Text as T |
|
||||||
|
|
||||||
class (Monad m) => MonadPersistence m where |
|
||||||
saveState :: forall s. (ToJSON s) => s -> T.Text -> m () |
|
||||||
loadState :: forall s. (Default s, FromJSON s) => T.Text -> m s |
|
||||||
|
|
||||||
|
|
||||||
@ -1,117 +0,0 @@ |
|||||||
{-# LANGUAGE FlexibleInstances #-} |
|
||||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
|
||||||
{-# LANGUAGE MultiParamTypeClasses #-} |
|
||||||
{-# LANGUAGE OverloadedStrings #-} |
|
||||||
{-# LANGUAGE TypeSynonymInstances #-} |
|
||||||
|
|
||||||
module Test.Driver.Junction.QuoteThread |
|
||||||
( |
|
||||||
unitTests |
|
||||||
) where |
|
||||||
|
|
||||||
import Test.Tasty |
|
||||||
import Test.Tasty.HUnit |
|
||||||
import Test.Tasty.QuickCheck as QC |
|
||||||
import Test.Tasty.SmallCheck as SC |
|
||||||
|
|
||||||
import ATrade.Driver.Junction.QuoteThread (addSubscription, |
|
||||||
startQuoteThread, |
|
||||||
stopQuoteThread) |
|
||||||
import ATrade.Logging (Message) |
|
||||||
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) |
|
||||||
import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) |
|
||||||
import ATrade.QuoteSource.Client (QuoteData (QDBar)) |
|
||||||
import ATrade.QuoteSource.Server (QuoteSourceServerData (..), |
|
||||||
startQuoteSourceServer, |
|
||||||
stopQuoteSourceServer) |
|
||||||
import ATrade.RoboCom.Types (BarSeries (bsBars), |
|
||||||
BarSeriesId (BarSeriesId), |
|
||||||
InstrumentParameters (InstrumentParameters)) |
|
||||||
import ATrade.Types |
|
||||||
import Colog.Core (LogAction (..)) |
|
||||||
import Colog.Core.Class (HasLog (..)) |
|
||||||
import Control.Concurrent (forkIO, threadDelay) |
|
||||||
import Control.Concurrent.BoundedChan (newBoundedChan, readChan, |
|
||||||
writeChan) |
|
||||||
import Control.Exception (bracket) |
|
||||||
import Control.Monad (forever) |
|
||||||
import Control.Monad.Reader |
|
||||||
import Data.IORef (IORef, newIORef, readIORef) |
|
||||||
import qualified Data.Map.Strict as M |
|
||||||
import qualified Data.Text as T |
|
||||||
import Data.Time (UTCTime (UTCTime), |
|
||||||
fromGregorian) |
|
||||||
import System.IO (BufferMode (LineBuffering), |
|
||||||
hSetBuffering, stderr) |
|
||||||
import System.ZMQ4 (withContext) |
|
||||||
import Test.Mock.HistoryProvider (MockHistoryProvider, |
|
||||||
mkMockHistoryProvider, |
|
||||||
mockGetHistory) |
|
||||||
import Test.Mock.TickerInfoProvider (MockTickerInfoProvider, |
|
||||||
mkMockTickerInfoProvider, |
|
||||||
mockGetInstrumentParameters) |
|
||||||
|
|
||||||
data TestEnv = |
|
||||||
TestEnv |
|
||||||
{ |
|
||||||
historyProvider :: MockHistoryProvider, |
|
||||||
tickerInfoProvider :: MockTickerInfoProvider |
|
||||||
} |
|
||||||
|
|
||||||
type TestM = ReaderT TestEnv IO |
|
||||||
|
|
||||||
instance HistoryProvider TestM where |
|
||||||
getHistory tid tf from to = do |
|
||||||
hp <- asks historyProvider |
|
||||||
liftIO $ mockGetHistory hp tid tf from to |
|
||||||
|
|
||||||
instance TickerInfoProvider TestM where |
|
||||||
getInstrumentParameters tickers = do |
|
||||||
tip <- asks tickerInfoProvider |
|
||||||
liftIO $ mockGetInstrumentParameters tip tickers |
|
||||||
|
|
||||||
instance HasLog TestEnv Message TestM where |
|
||||||
getLogAction env = LogAction $ \msg -> return () |
|
||||||
|
|
||||||
qsEndpoint = "inproc://qs" |
|
||||||
|
|
||||||
mockHistoryProvider = mkMockHistoryProvider $ M.fromList [(BarSeriesId "FOO" (BarTimeframe 3600), bars)] |
|
||||||
where |
|
||||||
bars = [] |
|
||||||
|
|
||||||
mockTickerInfoProvider = mkMockTickerInfoProvider $ M.fromList [("FOO", InstrumentParameters "FOO" 10 0.1)] |
|
||||||
|
|
||||||
unitTests = testGroup "Driver.Junction.QuoteThread" [ |
|
||||||
testSubscription |
|
||||||
] |
|
||||||
|
|
||||||
testSubscription :: TestTree |
|
||||||
testSubscription = testCase "Subscription" $ withContext $ \ctx -> do |
|
||||||
barsRef <- newIORef M.empty |
|
||||||
tiRef <- newIORef M.empty |
|
||||||
serverChan <- newBoundedChan 2000 |
|
||||||
let clientSecurityParams = defaultClientSecurityParams |
|
||||||
bracket |
|
||||||
(startQuoteSourceServer serverChan ctx qsEndpoint defaultServerSecurityParams) |
|
||||||
stopQuoteSourceServer $ \_ -> |
|
||||||
bracket |
|
||||||
(startQuoteThread barsRef tiRef ctx qsEndpoint clientSecurityParams (`runReaderT` (TestEnv mockHistoryProvider mockTickerInfoProvider)) (LogAction $ \_ -> return ())) |
|
||||||
|
|
||||||
stopQuoteThread $ \qt -> do |
|
||||||
chan <- newBoundedChan 2000 |
|
||||||
addSubscription qt "FOO" (BarTimeframe 3600) chan |
|
||||||
|
|
||||||
forkIO $ forever $ threadDelay 50000 >> writeChan serverChan (QSSBar (BarTimeframe 3600, bar)) |
|
||||||
|
|
||||||
clientData <- readChan chan |
|
||||||
assertEqual "Invalid client data" clientData (QDBar (BarTimeframe 3600, bar)) |
|
||||||
|
|
||||||
bars <- readIORef barsRef |
|
||||||
case M.lookup (BarSeriesId "FOO" (BarTimeframe 3600)) bars of |
|
||||||
Just series -> assertBool "Length should be >= 1" $ (not . null . bsBars) series |
|
||||||
Nothing -> assertFailure "Bar Series should be present" |
|
||||||
where |
|
||||||
bar = |
|
||||||
Bar { |
|
||||||
barSecurity="FOO", barTimestamp=UTCTime (fromGregorian 2021 11 20) 7200, barOpen=10, barHigh=12, barLow=9, barClose=11, barVolume=100 |
|
||||||
} |
|
||||||
@ -1,27 +0,0 @@ |
|||||||
|
|
||||||
module Test.Mock.HistoryProvider |
|
||||||
( |
|
||||||
MockHistoryProvider, |
|
||||||
mkMockHistoryProvider, |
|
||||||
mockGetHistory |
|
||||||
) where |
|
||||||
|
|
||||||
import ATrade.Quotes.HistoryProvider |
|
||||||
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), Bars) |
|
||||||
import ATrade.Types (Bar (Bar, barTimestamp), |
|
||||||
BarTimeframe (BarTimeframe), |
|
||||||
TickerId) |
|
||||||
import Control.Monad.IO.Class (MonadIO) |
|
||||||
import qualified Data.Map.Strict as M |
|
||||||
import Data.Time (UTCTime) |
|
||||||
|
|
||||||
data MockHistoryProvider = MockHistoryProvider (M.Map BarSeriesId [Bar]) |
|
||||||
|
|
||||||
mkMockHistoryProvider :: M.Map BarSeriesId [Bar] -> MockHistoryProvider |
|
||||||
mkMockHistoryProvider = MockHistoryProvider |
|
||||||
|
|
||||||
mockGetHistory :: (MonadIO m) => MockHistoryProvider -> TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] |
|
||||||
mockGetHistory (MockHistoryProvider bars) tid tf from to = |
|
||||||
case M.lookup (BarSeriesId tid tf) bars of |
|
||||||
Just series -> return $ filter (\bar -> (barTimestamp bar >= from) && (barTimestamp bar <= to)) series |
|
||||||
Nothing -> return [] |
|
||||||
@ -1,22 +0,0 @@ |
|||||||
|
|
||||||
module Test.Mock.TickerInfoProvider |
|
||||||
( |
|
||||||
MockTickerInfoProvider, |
|
||||||
mkMockTickerInfoProvider, |
|
||||||
mockGetInstrumentParameters |
|
||||||
) where |
|
||||||
|
|
||||||
import ATrade.Quotes.TickerInfoProvider |
|
||||||
import ATrade.RoboCom.Types (InstrumentParameters) |
|
||||||
import ATrade.Types (TickerId) |
|
||||||
import Control.Monad.IO.Class (MonadIO) |
|
||||||
import qualified Data.Map.Strict as M |
|
||||||
import Data.Maybe (catMaybes, mapMaybe) |
|
||||||
|
|
||||||
data MockTickerInfoProvider = MockTickerInfoProvider (M.Map TickerId InstrumentParameters) |
|
||||||
|
|
||||||
mkMockTickerInfoProvider :: (M.Map TickerId InstrumentParameters) -> MockTickerInfoProvider |
|
||||||
mkMockTickerInfoProvider = MockTickerInfoProvider |
|
||||||
|
|
||||||
mockGetInstrumentParameters :: MockTickerInfoProvider -> [TickerId] -> IO [InstrumentParameters] |
|
||||||
mockGetInstrumentParameters (MockTickerInfoProvider params) = return . mapMaybe (`M.lookup` params) |
|
||||||
Loading…
Reference in new issue