Compare commits
71 Commits
36 changed files with 2438 additions and 1109 deletions
@ -1,58 +1,211 @@
@@ -1,58 +1,211 @@
|
||||
{-# LANGUAGE DuplicateRecordFields #-} |
||||
{-# LANGUAGE FlexibleContexts #-} |
||||
{-# LANGUAGE FlexibleInstances #-} |
||||
{-# LANGUAGE MultiParamTypeClasses #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
{-# LANGUAGE RankNTypes #-} |
||||
|
||||
module ATrade.Driver.Junction |
||||
( |
||||
junctionMain |
||||
) where |
||||
|
||||
import ATrade.Driver.Junction.Types (StrategyDescriptor (..), |
||||
StrategyInstance (..), |
||||
StrategyInstanceDescriptor (..)) |
||||
import Data.Aeson (decode) |
||||
import qualified Data.ByteString as B |
||||
import qualified Data.ByteString.Lazy as BL |
||||
import Data.IORef |
||||
import qualified Data.Map.Strict as M |
||||
import qualified Data.Text as T |
||||
import ATrade.Broker.Client (startBrokerClient, |
||||
stopBrokerClient) |
||||
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification), |
||||
NotificationSqnum (unNotificationSqnum), |
||||
getNotificationSqnum) |
||||
import ATrade.Driver.Junction.BrokerService (getNotifications, |
||||
mkBrokerService) |
||||
import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (..), |
||||
JunctionM (..), |
||||
saveRobots, |
||||
startRobot) |
||||
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..), |
||||
ProgramOptions (ProgramOptions, configPath)) |
||||
import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv), |
||||
withQThread) |
||||
import ATrade.Driver.Junction.RemoteControl (handleRemoteControl) |
||||
import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, postNotificationEvent) |
||||
import ATrade.Driver.Junction.Types (StrategyDescriptorE) |
||||
import ATrade.Logging (Message (..), Severity (Debug, Info, Trace, Warning), |
||||
fmtMessage, |
||||
logWith) |
||||
import ATrade.Quotes.QHP (mkQHPHandle) |
||||
import ATrade.Types (OrderId, Trade (tradeOrderId)) |
||||
import Colog (LogAction (LogAction), |
||||
cfilter, |
||||
hoistLogAction, |
||||
logTextStderr, |
||||
(<&), (>$<)) |
||||
import Colog.Actions (logTextHandle) |
||||
import Control.Concurrent.QSem (newQSem) |
||||
import Control.Monad (forM_, forever) |
||||
import Control.Monad.Extra (whenM) |
||||
import Control.Monad.IO.Class (MonadIO (liftIO)) |
||||
import Control.Monad.Reader (ReaderT (runReaderT)) |
||||
import Data.IORef (IORef, |
||||
atomicModifyIORef', |
||||
newIORef, |
||||
readIORef) |
||||
import qualified Data.Map.Strict as M |
||||
import Data.Set (notMember) |
||||
import qualified Data.Set as S |
||||
import qualified Data.Text as T |
||||
import Data.Text.IO (readFile) |
||||
import Database.Redis (ConnectInfo (..), PortID (UnixSocket), |
||||
checkedConnect, |
||||
defaultConnectInfo) |
||||
import Dhall (auto, input) |
||||
import Options.Applicative (Parser, |
||||
execParser, |
||||
fullDesc, header, |
||||
help, helper, |
||||
info, long, |
||||
metavar, progDesc, |
||||
short, strOption, |
||||
(<**>)) |
||||
import Prelude hiding (log, |
||||
readFile) |
||||
import System.IO (BufferMode (LineBuffering), |
||||
Handle, |
||||
IOMode (AppendMode), |
||||
hSetBuffering, |
||||
withFile) |
||||
import System.ZMQ4 (Router (Router), |
||||
bind, withContext, |
||||
withSocket) |
||||
import UnliftIO (MonadUnliftIO) |
||||
import UnliftIO.Exception (bracket) |
||||
import UnliftIO.QSem (QSem, withQSem) |
||||
|
||||
load :: T.Text -> IO B.ByteString |
||||
load = undefined |
||||
|
||||
junctionMain :: M.Map T.Text StrategyDescriptor -> IO () |
||||
locked :: (MonadIO m, MonadUnliftIO m) => QSem -> LogAction m a -> LogAction m a |
||||
locked sem action = LogAction (\m -> withQSem sem (action <& m)) |
||||
|
||||
logger :: (MonadIO m) => M.Map T.Text Severity -> Handle -> LogAction m Message |
||||
logger loglevels h = cfilter checkLoglevel (fmtMessage >$< (logTextStderr <> logTextHandle h)) |
||||
where |
||||
checkLoglevel msg = |
||||
case M.lookup (msgComponent msg) loglevels of |
||||
Just level -> msgSeverity msg >= level |
||||
Nothing -> True |
||||
|
||||
junctionMain :: M.Map T.Text StrategyDescriptorE -> IO () |
||||
junctionMain descriptors = do |
||||
parseOptions |
||||
instanceDescriptors <- undefined |
||||
strategies <- mkStrategies instanceDescriptors |
||||
opts <- parseOptions |
||||
|
||||
let initialLogger = fmtMessage >$< logTextStderr |
||||
|
||||
logWith initialLogger Info "Junction" $ "Reading config from: " <> (T.pack . show) (configPath opts) |
||||
|
||||
cfg <- readFile (configPath opts) >>= input auto |
||||
|
||||
withFile (logBasePath cfg <> "/all.log") AppendMode $ \h -> do |
||||
|
||||
hSetBuffering h LineBuffering |
||||
|
||||
start strategies |
||||
locksem <- newQSem 1 |
||||
let globalLogger = locked locksem (logger (M.fromList $ logLevels cfg) h) |
||||
let log = logWith globalLogger |
||||
|
||||
barsMap <- newIORef M.empty |
||||
tickerInfoMap <- newIORef M.empty |
||||
|
||||
log Info "Junction" $ "Connecting to redis: " <> redisSocket cfg |
||||
redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) }) |
||||
log Info "Junction" "redis: connected" |
||||
withContext $ \ctx -> do |
||||
log Debug "Junction" "0mq context created" |
||||
let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) (hoistLogAction liftIO globalLogger) |
||||
robotsMap <- newIORef M.empty |
||||
ordersMap <- newIORef M.empty |
||||
handledNotifications <- newIORef S.empty |
||||
withBroker cfg robotsMap ordersMap handledNotifications globalLogger $ \bro -> |
||||
withQThread downloaderEnv barsMap tickerInfoMap cfg ctx globalLogger $ \qt -> |
||||
withSocket ctx Router $ \rcSocket -> do |
||||
liftIO $ bind rcSocket (T.unpack . remoteControlEndpoint $ cfg) |
||||
broService <- mkBrokerService bro ordersMap |
||||
let junctionLogAction = hoistLogAction liftIO globalLogger |
||||
let env = |
||||
JunctionEnv |
||||
{ |
||||
peRedisSocket = redis, |
||||
peConfigPath = robotsConfigsPath cfg, |
||||
peQuoteThread = qt, |
||||
peBroker = bro, |
||||
peRobots = robotsMap, |
||||
peRemoteControlSocket = rcSocket, |
||||
peLogAction = junctionLogAction, |
||||
peIoLogAction = globalLogger, |
||||
peProgramConfiguration = cfg, |
||||
peBarsMap = barsMap, |
||||
peTickerInfoMap = tickerInfoMap, |
||||
peBrokerService = broService, |
||||
peDescriptors = descriptors |
||||
} |
||||
withJunction env $ do |
||||
startRobots cfg |
||||
forever $ do |
||||
notifications <- getNotifications broService |
||||
forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications globalLogger) |
||||
saveRobots |
||||
handleRemoteControl 1000 |
||||
where |
||||
parseOptions = undefined |
||||
|
||||
mkStrategies :: [StrategyInstanceDescriptor] -> IO [StrategyInstance] |
||||
mkStrategies = mapM mkStrategy |
||||
|
||||
mkStrategy :: StrategyInstanceDescriptor -> IO StrategyInstance |
||||
mkStrategy desc = do |
||||
sState <- load (stateKey desc) |
||||
sCfg <- load (configKey desc) |
||||
case M.lookup (strategyId desc) descriptors of |
||||
Just (StrategyDescriptor _sName sCallback _sDefState) -> |
||||
case (decode $ BL.fromStrict sCfg, decode $ BL.fromStrict sState) of |
||||
(Just pCfg, Just pState) -> do |
||||
cfgRef <- newIORef pCfg |
||||
stateRef <- newIORef pState |
||||
return $ StrategyInstance |
||||
{ |
||||
strategyInstanceId = strategyName desc, |
||||
strategyEventCallback = sCallback, |
||||
strategyState = stateRef, |
||||
strategyConfig = cfgRef |
||||
} |
||||
_ -> undefined |
||||
_ -> undefined |
||||
|
||||
start = undefined |
||||
startRobots :: ProgramConfiguration -> JunctionM () |
||||
startRobots cfg = forM_ (instances cfg) startRobot |
||||
|
||||
withJunction :: JunctionEnv -> JunctionM () -> IO () |
||||
withJunction env = (`runReaderT` env) . unJunctionM |
||||
|
||||
handleBrokerNotification :: IORef (M.Map T.Text RobotDriverHandle) -> |
||||
IORef (M.Map OrderId T.Text) -> |
||||
IORef (S.Set NotificationSqnum) -> |
||||
LogAction IO Message -> |
||||
Notification -> |
||||
IO () |
||||
handleBrokerNotification robotsRef ordersMapRef handled logger' notification= do |
||||
logWith logger' Trace "Junction" $ "Incoming notification: " <> (T.pack . show . unNotificationSqnum . getNotificationSqnum) notification |
||||
whenM (notMember (getNotificationSqnum notification) <$> readIORef handled) $ do |
||||
robotsMap <- readIORef robotsRef |
||||
ordersMap <- readIORef ordersMapRef |
||||
|
||||
case getNotificationTarget robotsMap ordersMap notification of |
||||
Just robot -> postNotificationEvent robot notification |
||||
Nothing -> do |
||||
logWith logger' Warning "Junction" $ "Unknown order: " <> (T.pack . show) (notificationOrderId notification) |
||||
logWith logger' Debug "Junction" $ "Ordermap: " <> (T.pack . show) (M.toList ordersMap) |
||||
|
||||
atomicModifyIORef' handled (\s -> (S.insert (getNotificationSqnum notification) s, ())) |
||||
|
||||
getNotificationTarget :: M.Map T.Text RobotDriverHandle -> M.Map OrderId T.Text -> Notification -> Maybe RobotDriverHandle |
||||
getNotificationTarget robotsMap ordersMap notification = do |
||||
robotId <- M.lookup (notificationOrderId notification) ordersMap |
||||
M.lookup robotId robotsMap |
||||
|
||||
notificationOrderId (OrderNotification _ oid _) = oid |
||||
notificationOrderId (TradeNotification _ trade) = tradeOrderId trade |
||||
|
||||
withBroker cfg robotsMap ordersMap handled logger' f = do |
||||
bracket |
||||
(startBrokerClient |
||||
(brokerIdentity cfg) |
||||
(brokerEndpoint cfg) |
||||
[handleBrokerNotification robotsMap ordersMap handled logger'] |
||||
logger') |
||||
stopBrokerClient f |
||||
|
||||
parseOptions = execParser options |
||||
options = info (optionsParser <**> helper) |
||||
(fullDesc <> |
||||
progDesc "Robocom-zero junction mode driver" <> |
||||
header "robocom-zero-junction") |
||||
|
||||
optionsParser :: Parser ProgramOptions |
||||
optionsParser = ProgramOptions |
||||
<$> strOption |
||||
(long "config" <> |
||||
short 'c' <> |
||||
metavar "FILENAME" <> |
||||
help "Configuration file path") |
||||
|
||||
|
||||
@ -0,0 +1,64 @@
@@ -0,0 +1,64 @@
|
||||
{-# LANGUAGE FlexibleContexts #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
|
||||
module ATrade.Driver.Junction.BrokerService |
||||
( |
||||
BrokerService, |
||||
mkBrokerService, |
||||
submitOrder, |
||||
cancelOrder, |
||||
getNotifications |
||||
) where |
||||
|
||||
import qualified ATrade.Broker.Client as Bro |
||||
import ATrade.Broker.Protocol (Notification (..)) |
||||
import ATrade.Logging (Message, logDebug, logWarning) |
||||
import ATrade.Types (Order (..), OrderId) |
||||
import Colog (WithLog) |
||||
import Control.Monad.IO.Class (MonadIO (liftIO)) |
||||
import Control.Monad.Reader.Class (MonadReader) |
||||
import Data.IORef (IORef, atomicModifyIORef', |
||||
newIORef) |
||||
import qualified Data.Map.Strict as M |
||||
import qualified Data.Text as T |
||||
|
||||
data BrokerService = |
||||
BrokerService |
||||
{ |
||||
broker :: Bro.BrokerClientHandle, |
||||
orderMap :: IORef (M.Map OrderId T.Text), |
||||
orderIdCounter :: IORef OrderId |
||||
} |
||||
|
||||
mkBrokerService :: Bro.BrokerClientHandle -> IORef (M.Map OrderId T.Text) -> IO BrokerService |
||||
mkBrokerService h om = BrokerService h om <$> newIORef 1 |
||||
|
||||
submitOrder :: (MonadIO m, WithLog env Message m, MonadReader env m) => BrokerService -> T.Text -> Order -> m OrderId |
||||
submitOrder service identity order = do |
||||
oid <- nextOrderId service |
||||
logDebug "BrokerService" $ "New order, id: " <> (T.pack . show) oid |
||||
liftIO $ atomicModifyIORef' (orderMap service) (\s -> (M.insert oid identity s, ())) |
||||
r <- liftIO $ Bro.submitOrder (broker service) order { orderId = oid } |
||||
case r of |
||||
Left err -> logWarning "BrokerService" $ "Submit order error: " <> err |
||||
_ -> return () |
||||
return oid |
||||
where |
||||
nextOrderId srv = liftIO $ atomicModifyIORef' (orderIdCounter srv) (\s -> (s + 1, s)) |
||||
|
||||
cancelOrder :: (MonadIO m, WithLog env Message m) => BrokerService -> OrderId -> m () |
||||
cancelOrder service oid = do |
||||
r <- liftIO $ Bro.cancelOrder (broker service) oid |
||||
case r of |
||||
Left err -> logWarning "BrokerServer" $ "Cancel order error: " <> err |
||||
_ -> return () |
||||
return () |
||||
|
||||
getNotifications :: (MonadIO m, WithLog env Message m) => BrokerService -> m [Notification] |
||||
getNotifications service = do |
||||
v <- liftIO $ Bro.getNotifications (broker service) |
||||
case v of |
||||
Left err -> do |
||||
logWarning "BrokerServer" $ "Get notifications order error: " <> err |
||||
return [] |
||||
Right n -> return n |
||||
@ -0,0 +1,258 @@
@@ -0,0 +1,258 @@
|
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
||||
{-# LANGUAGE MultiParamTypeClasses #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
|
||||
|
||||
module ATrade.Driver.Junction.JunctionMonad |
||||
( |
||||
JunctionEnv(..), |
||||
JunctionM(..), |
||||
startRobot, |
||||
saveRobots, |
||||
reloadConfig, |
||||
getState, |
||||
setState |
||||
) where |
||||
|
||||
import ATrade.Broker.Client (BrokerClientHandle) |
||||
import ATrade.Driver.Junction.BrokerService (BrokerService) |
||||
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (logBasePath)) |
||||
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), |
||||
QuoteSubscription (QuoteSubscription)) |
||||
import ATrade.Driver.Junction.QuoteThread (QuoteThreadHandle) |
||||
import qualified ATrade.Driver.Junction.QuoteThread as QT |
||||
import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, RobotEnv (RobotEnv), |
||||
RobotM (unRobotM), |
||||
createRobotDriverThread, |
||||
getInstanceDescriptor, |
||||
onStrategyInstance, |
||||
onStrategyInstanceM) |
||||
import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE), |
||||
StrategyInstanceDescriptor, |
||||
accountId, |
||||
confStrategy, |
||||
confTickers, |
||||
configKey, |
||||
stateKey, |
||||
strategyBaseName, |
||||
strategyConfig, |
||||
strategyId, |
||||
strategyInstanceId, |
||||
strategyState, |
||||
strategyTimers, |
||||
tickerId, |
||||
timeframe) |
||||
import ATrade.Logging (Message, Severity (Error, Info), |
||||
fmtMessage, |
||||
logWarning, |
||||
logWith) |
||||
import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) |
||||
import ATrade.RoboCom.Monad (StrategyEnvironment (..)) |
||||
import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) |
||||
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), |
||||
Bars, |
||||
TickerInfoMap) |
||||
import Colog (HasLog (getLogAction, setLogAction), |
||||
LogAction, |
||||
hoistLogAction, |
||||
logTextHandle, |
||||
(>$<)) |
||||
import Control.Exception.Safe (finally) |
||||
import Control.Monad.Reader (MonadIO (liftIO), |
||||
MonadReader, |
||||
ReaderT (runReaderT), |
||||
asks) |
||||
import Data.Aeson (decode, |
||||
eitherDecode, |
||||
encode) |
||||
import qualified Data.ByteString as B |
||||
import qualified Data.ByteString.Lazy as BL |
||||
import Data.Default (Default (def)) |
||||
import Data.Foldable (traverse_) |
||||
import Data.IORef (IORef, |
||||
atomicModifyIORef', |
||||
newIORef, |
||||
readIORef, |
||||
writeIORef) |
||||
import Data.List.NonEmpty (NonEmpty ((:|))) |
||||
import qualified Data.Map.Strict as M |
||||
import qualified Data.Text as T |
||||
import Data.Text.Encoding (encodeUtf8) |
||||
import Data.Text.IO (readFile) |
||||
import Data.Time (getCurrentTime) |
||||
import Data.Time.Clock.POSIX (getPOSIXTime) |
||||
import Database.Redis (Connection, get, |
||||
mset, runRedis) |
||||
import Dhall (auto, input) |
||||
import Prelude hiding (log, |
||||
readFile) |
||||
import System.IO (BufferMode (LineBuffering), |
||||
IOMode (AppendMode), |
||||
hClose, |
||||
hSetBuffering, |
||||
openFile) |
||||
import System.ZMQ4 (Router, Socket) |
||||
import UnliftIO (MonadUnliftIO) |
||||
import UnliftIO.Exception (catchAny, |
||||
onException) |
||||
|
||||
data JunctionEnv = |
||||
JunctionEnv |
||||
{ |
||||
peRedisSocket :: Connection, |
||||
peConfigPath :: FilePath, |
||||
peQuoteThread :: QuoteThreadHandle, |
||||
peBroker :: BrokerClientHandle, |
||||
peRobots :: IORef (M.Map T.Text RobotDriverHandle), |
||||
peRemoteControlSocket :: Socket Router, |
||||
peLogAction :: LogAction JunctionM Message, |
||||
peIoLogAction :: LogAction IO Message, |
||||
peProgramConfiguration :: ProgramConfiguration, |
||||
peBarsMap :: IORef Bars, |
||||
peTickerInfoMap :: IORef TickerInfoMap, |
||||
peBrokerService :: BrokerService, |
||||
peDescriptors :: M.Map T.Text StrategyDescriptorE |
||||
} |
||||
|
||||
newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a } |
||||
deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadUnliftIO) |
||||
|
||||
instance HasLog JunctionEnv Message JunctionM where |
||||
getLogAction = peLogAction |
||||
setLogAction a e = e { peLogAction = a } |
||||
|
||||
instance ConfigStorage JunctionM where |
||||
loadConfig key = do |
||||
basePath <- asks peConfigPath |
||||
let path = basePath <> "/" <> T.unpack key -- TODO fix path construction |
||||
liftIO $ readFile path >>= input auto |
||||
|
||||
instance MonadPersistence JunctionM where |
||||
saveState newState key = do |
||||
conn <- asks peRedisSocket |
||||
now <- liftIO getPOSIXTime |
||||
res <- liftIO $ runRedis conn $ mset [(encodeUtf8 key, BL.toStrict $ encode newState), |
||||
(encodeUtf8 (key <> ":last_store") , encodeUtf8 . T.pack . show $ now)] |
||||
case res of |
||||
Left _ -> logWarning "Junction " "Unable to save state" |
||||
Right _ -> return () |
||||
|
||||
loadState key = do |
||||
conn <- asks peRedisSocket |
||||
res <- liftIO $ runRedis conn $ get (encodeUtf8 key) |
||||
-- TODO: just chain eithers |
||||
case res of |
||||
Left _ -> do |
||||
logWarning "Junction" "Unable to load state" |
||||
return def |
||||
Right maybeRawState -> |
||||
case maybeRawState of |
||||
Just rawState -> case eitherDecode $ BL.fromStrict rawState of |
||||
Left _ -> do |
||||
logWarning "Junction" "Unable to decode state" |
||||
return def |
||||
Right decodedState -> return decodedState |
||||
Nothing -> do |
||||
logWarning "Junction" "Unable to decode state" |
||||
return def |
||||
|
||||
instance QuoteStream JunctionM where |
||||
addSubscription (QuoteSubscription ticker tf) chan = do |
||||
qt <- asks peQuoteThread |
||||
QT.addSubscription qt ticker tf chan |
||||
removeSubscription subId = do |
||||
qt <- asks peQuoteThread |
||||
QT.removeSubscription qt subId |
||||
|
||||
startRobot :: StrategyInstanceDescriptor -> JunctionM () |
||||
startRobot inst = do |
||||
ioLogger <- asks peIoLogAction |
||||
descriptors <- asks peDescriptors |
||||
cfg <- asks peProgramConfiguration |
||||
barsMap <- asks peBarsMap |
||||
tickerInfoMap <- asks peTickerInfoMap |
||||
broService <- asks peBrokerService |
||||
now <- liftIO getCurrentTime |
||||
let lLogger = hoistLogAction liftIO ioLogger |
||||
logWith lLogger Info "Junction" $ "Starting strategy: " <> strategyBaseName inst |
||||
case M.lookup (strategyBaseName inst) descriptors of |
||||
Just (StrategyDescriptorE desc) -> flip catchAny (\e -> logWith lLogger Error "Junction" $ "Exception: " <> (T.pack . show $ e)) $ do |
||||
bigConf <- loadConfig (configKey inst) |
||||
case confTickers bigConf of |
||||
(firstTicker:restTickers) -> do |
||||
rConf <- liftIO $ newIORef (confStrategy bigConf) |
||||
rState <- loadState (stateKey inst) >>= liftIO . newIORef |
||||
rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef |
||||
localH <- liftIO $ openFile (logBasePath cfg <> "/" <> T.unpack (strategyId inst) <> ".log") AppendMode |
||||
liftIO $ hSetBuffering localH LineBuffering |
||||
let robotLogAction = hoistLogAction liftIO ioLogger <> (fmtMessage >$< logTextHandle localH) |
||||
stratEnv <- liftIO $ newIORef StrategyEnvironment |
||||
{ |
||||
_seInstanceId = strategyId inst, |
||||
_seAccount = accountId inst, |
||||
_seVolume = 1, |
||||
_seLastTimestamp = now |
||||
} |
||||
let robotEnv = |
||||
RobotEnv rState rConf rTimers barsMap tickerInfoMap stratEnv robotLogAction broService (toBarSeriesId <$> (firstTicker :| restTickers)) |
||||
robot <- createRobotDriverThread inst desc (\a -> (flip runReaderT robotEnv . unRobotM) a `finally` hClose localH) bigConf rConf rState rTimers |
||||
robotsMap' <- asks peRobots |
||||
liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ())) |
||||
_ -> logWith lLogger Error (strategyId inst) "No tickers configured !!!" |
||||
Nothing -> logWith lLogger Error "Junction" $ "Unknown strategy: " <> strategyBaseName inst |
||||
|
||||
where |
||||
toBarSeriesId t = BarSeriesId (tickerId t) (timeframe t) |
||||
|
||||
saveRobots :: JunctionM () |
||||
saveRobots = do |
||||
robotsMap <- asks peRobots >>= (liftIO . readIORef) |
||||
traverse_ saveRobotState robotsMap |
||||
|
||||
saveRobotState :: RobotDriverHandle -> JunctionM () |
||||
saveRobotState handle = onStrategyInstance handle $ \inst -> do |
||||
currentState <- liftIO $ readIORef (strategyState inst) |
||||
saveState currentState (strategyInstanceId inst) |
||||
currentTimers <- liftIO $ readIORef (strategyTimers inst) |
||||
saveState currentTimers (strategyInstanceId inst <> ":timers") |
||||
|
||||
reloadConfig :: T.Text -> JunctionM (Either T.Text ()) |
||||
reloadConfig instId = flip catchAny (\_ -> return $ Left "Exception") $ do |
||||
robotsMap' <- asks peRobots |
||||
robots <- liftIO $ readIORef robotsMap' |
||||
case M.lookup instId robots of |
||||
Just robot -> do |
||||
onStrategyInstanceM robot |
||||
(\inst -> do |
||||
let instDesc = getInstanceDescriptor robot |
||||
bigConf <- loadConfig (configKey instDesc) |
||||
liftIO $ writeIORef (strategyConfig inst) (confStrategy bigConf)) |
||||
return $ Right () |
||||
Nothing -> return $ Left "Unable to load config" |
||||
|
||||
getState :: T.Text -> JunctionM (Either T.Text B.ByteString) |
||||
getState instId = do |
||||
robotsMap' <- asks peRobots |
||||
robots <- liftIO $ readIORef robotsMap' |
||||
case M.lookup instId robots of |
||||
Just robot -> do |
||||
Right <$> onStrategyInstanceM robot |
||||
(\inst -> do |
||||
v <- liftIO $ readIORef (strategyState inst) |
||||
return $ BL.toStrict $ encode v) |
||||
Nothing -> return $ Left $ "Unknown robot: " <> instId |
||||
|
||||
setState :: T.Text -> B.ByteString -> JunctionM (Either T.Text ()) |
||||
setState instId newState = do |
||||
robotsMap' <- asks peRobots |
||||
robots <- liftIO $ readIORef robotsMap' |
||||
case M.lookup instId robots of |
||||
Just robot -> do |
||||
onStrategyInstanceM robot |
||||
(\inst -> do |
||||
case decode . BL.fromStrict $ newState of |
||||
Just newS -> do |
||||
liftIO $ writeIORef (strategyState inst) newS |
||||
return $ Right () |
||||
Nothing -> return $ Left $ "Unable to decode state for " <> instId) |
||||
Nothing -> return $ Left $ "Unknown robot: " <> instId |
||||
@ -0,0 +1,72 @@
@@ -0,0 +1,72 @@
|
||||
{-# LANGUAGE DeriveGeneric #-} |
||||
{-# LANGUAGE NamedFieldPuns #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
{-# LANGUAGE RecordWildCards #-} |
||||
|
||||
module ATrade.Driver.Junction.ProgramConfiguration |
||||
( |
||||
ProgramOptions(..), |
||||
ProgramConfiguration(..) |
||||
) where |
||||
import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor) |
||||
import ATrade.Logging (Severity (..)) |
||||
import qualified Data.Text as T |
||||
import Dhall (FromDhall, autoWith) |
||||
import Dhall.Core (Expr (..), FieldSelection (..)) |
||||
import qualified Dhall.Map |
||||
import Dhall.Marshal.Decode (Decoder (..), typeError) |
||||
import GHC.Generics (Generic) |
||||
|
||||
newtype ProgramOptions = |
||||
ProgramOptions |
||||
{ |
||||
configPath :: FilePath |
||||
} |
||||
|
||||
data ProgramConfiguration = |
||||
ProgramConfiguration |
||||
{ |
||||
brokerEndpoint :: T.Text, |
||||
brokerNotificationEndpoint :: T.Text, |
||||
brokerServerCert :: Maybe FilePath, |
||||
brokerClientCert :: Maybe FilePath, |
||||
brokerIdentity :: T.Text, |
||||
quotesourceEndpoint :: T.Text, |
||||
quotesourceServerCert :: Maybe FilePath, |
||||
quotesourceClientCert :: Maybe FilePath, |
||||
qhpEndpoint :: T.Text, |
||||
qtisEndpoint :: T.Text, |
||||
remoteControlEndpoint :: T.Text, |
||||
redisSocket :: T.Text, |
||||
robotsConfigsPath :: FilePath, |
||||
logBasePath :: FilePath, |
||||
logLevels :: [(T.Text, Severity)], |
||||
instances :: [StrategyInstanceDescriptor] |
||||
} deriving (Generic, Show) |
||||
|
||||
instance FromDhall Severity where |
||||
autoWith _ = Decoder {..} |
||||
where |
||||
extract expr@(Field _ FieldSelection{ fieldSelectionLabel }) = |
||||
case fieldSelectionLabel of |
||||
"Trace" -> pure Trace |
||||
"Debug" -> pure Debug |
||||
"Info" -> pure Info |
||||
"Warning" -> pure Warning |
||||
"Error" -> pure Error |
||||
_ -> typeError expected expr |
||||
extract expr = typeError expected expr |
||||
|
||||
expected = pure |
||||
(Union |
||||
(Dhall.Map.fromList |
||||
[ ("Trace", Nothing) |
||||
, ("Debug", Nothing) |
||||
, ("Info", Nothing) |
||||
, ("Warning", Nothing) |
||||
, ("Error", Nothing) |
||||
] |
||||
) |
||||
) |
||||
|
||||
instance FromDhall ProgramConfiguration |
||||
@ -0,0 +1,30 @@
@@ -0,0 +1,30 @@
|
||||
{-# LANGUAGE DeriveGeneric #-} |
||||
|
||||
module ATrade.Driver.Junction.QuoteStream |
||||
( |
||||
QuoteSubscription(..), |
||||
QuoteStream(..), |
||||
SubscriptionId(..) |
||||
) where |
||||
|
||||
import ATrade.QuoteSource.Client (QuoteData) |
||||
import ATrade.Types (BarTimeframe, TickerId) |
||||
import Control.Concurrent.BoundedChan (BoundedChan) |
||||
import Data.Hashable (Hashable) |
||||
import GHC.Generics (Generic) |
||||
|
||||
data QuoteSubscription = |
||||
QuoteSubscription TickerId BarTimeframe |
||||
deriving (Generic, Eq) |
||||
|
||||
instance Hashable BarTimeframe |
||||
instance Hashable QuoteSubscription |
||||
|
||||
newtype SubscriptionId = SubscriptionId { unSubscriptionId :: Int } |
||||
deriving (Show, Eq, Generic) |
||||
|
||||
instance Hashable SubscriptionId |
||||
|
||||
class (Monad m) => QuoteStream m where |
||||
addSubscription :: QuoteSubscription -> BoundedChan QuoteData -> m SubscriptionId |
||||
removeSubscription :: SubscriptionId -> m () |
||||
@ -0,0 +1,304 @@
@@ -0,0 +1,304 @@
|
||||
{-# LANGUAGE DeriveGeneric #-} |
||||
{-# LANGUAGE FlexibleContexts #-} |
||||
{-# LANGUAGE FlexibleInstances #-} |
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
||||
{-# LANGUAGE LambdaCase #-} |
||||
{-# LANGUAGE MultiParamTypeClasses #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
{-# LANGUAGE ScopedTypeVariables #-} |
||||
{-# LANGUAGE TypeSynonymInstances #-} |
||||
|
||||
module ATrade.Driver.Junction.QuoteThread |
||||
( |
||||
QuoteThreadHandle, |
||||
startQuoteThread, |
||||
stopQuoteThread, |
||||
addSubscription, |
||||
removeSubscription, |
||||
DownloaderM, |
||||
DownloaderEnv(..), |
||||
runDownloaderM, |
||||
withQThread |
||||
) where |
||||
|
||||
import qualified ATrade.BarAggregator as BA |
||||
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..)) |
||||
import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..), |
||||
SubscriptionId (SubscriptionId)) |
||||
import ATrade.Logging (Message, logDebug, |
||||
logInfo, |
||||
logWarning) |
||||
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) |
||||
import ATrade.Quotes.QHP (QHPHandle, requestHistoryFromQHP) |
||||
import ATrade.Quotes.QTIS (TickerInfo (tiLotSize, tiTickSize, tiTicker), |
||||
qtisGetTickersInfo) |
||||
import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) |
||||
import ATrade.QuoteSource.Client (QuoteData (QDBar, QDTick), |
||||
QuoteSourceClientHandle, |
||||
quoteSourceClientSubscribe, |
||||
startQuoteSourceClient, |
||||
stopQuoteSourceClient) |
||||
import ATrade.RoboCom.Types (Bar (barSecurity), |
||||
BarSeries (..), |
||||
BarSeriesId (BarSeriesId), |
||||
Bars, |
||||
InstrumentParameters (InstrumentParameters), |
||||
TickerInfoMap) |
||||
import ATrade.Types (BarTimeframe (BarTimeframe), |
||||
ClientSecurityParams (ClientSecurityParams), |
||||
Tick (security), |
||||
TickerId) |
||||
import Colog (HasLog (getLogAction, setLogAction), |
||||
LogAction, |
||||
WithLog) |
||||
import Control.Concurrent (ThreadId, forkIO, |
||||
killThread) |
||||
import Control.Concurrent.BoundedChan (BoundedChan, |
||||
newBoundedChan, |
||||
readChan, |
||||
tryWriteChan, |
||||
writeChan) |
||||
import Control.Exception.Safe (MonadMask, |
||||
MonadThrow, |
||||
bracket) |
||||
import Control.Monad (forM, forM_, |
||||
forever) |
||||
import Control.Monad.Reader (MonadIO (liftIO), ReaderT (runReaderT), |
||||
lift) |
||||
import Control.Monad.Reader.Class (MonadReader, asks) |
||||
import qualified Data.HashMap.Strict as HM |
||||
import Data.IORef (IORef, |
||||
atomicModifyIORef', |
||||
newIORef, |
||||
readIORef) |
||||
import qualified Data.Map.Strict as M |
||||
import qualified Data.Text as T |
||||
import Data.Time (addUTCTime, |
||||
getCurrentTime) |
||||
import System.ZMQ4 (Context) |
||||
import System.ZMQ4.ZAP (loadCertificateFromFile) |
||||
|
||||
|
||||
data QuoteThreadHandle = QuoteThreadHandle ThreadId ThreadId QuoteThreadEnv |
||||
|
||||
data QuoteThreadEnv = |
||||
QuoteThreadEnv |
||||
{ |
||||
bars :: IORef Bars, |
||||
endpoints :: IORef (HM.HashMap QuoteSubscription [(SubscriptionId, BoundedChan QuoteData)]), |
||||
qsclient :: QuoteSourceClientHandle, |
||||
paramsCache :: IORef TickerInfoMap, |
||||
downloaderChan :: BoundedChan QuoteSubscription, |
||||
subscriptionIdCounter :: IORef Int, |
||||
subscriptions :: IORef (HM.HashMap SubscriptionId QuoteSubscription), |
||||
aggregators :: IORef (HM.HashMap (TickerId, BarTimeframe) BA.BarAggregator) |
||||
} |
||||
|
||||
startQuoteThread :: (MonadIO m, |
||||
MonadIO m1, |
||||
WithLog env Message m1, |
||||
HistoryProvider m1, |
||||
TickerInfoProvider m1) => |
||||
IORef Bars -> |
||||
IORef TickerInfoMap -> |
||||
Context -> |
||||
T.Text -> |
||||
ClientSecurityParams -> |
||||
(m1 () -> IO ()) -> |
||||
LogAction IO Message -> |
||||
m QuoteThreadHandle |
||||
startQuoteThread barsRef tiRef ctx ep secparams downloadThreadRunner logger = do |
||||
chan <- liftIO $ newBoundedChan 2000 |
||||
dChan <- liftIO $ newBoundedChan 2000 |
||||
qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger |
||||
env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> pure tiRef <*> pure dChan <*> newIORef 0 <*> newIORef HM.empty <*> newIORef HM.empty |
||||
tid <- liftIO . forkIO $ quoteThread env chan |
||||
downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) |
||||
return $ QuoteThreadHandle tid downloaderTid env |
||||
where |
||||
downloaderThread env chan = do |
||||
logInfo "QuoteThread" "Started" |
||||
forever $ do |
||||
QuoteSubscription tickerid tf <- liftIO $ readChan chan |
||||
logInfo "QuoteThread" $ "Subscription: " <> tickerid |
||||
paramsMap <- liftIO $ readIORef $ paramsCache env |
||||
mbParams <- case M.lookup tickerid paramsMap of |
||||
Nothing -> do |
||||
paramsList <- getInstrumentParameters [tickerid] |
||||
case paramsList of |
||||
(params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params)) |
||||
_ -> return Nothing |
||||
Just params -> return $ Just params |
||||
logDebug "QuoteThread" $ "Got info params: " <> (T.pack . show $ mbParams) |
||||
barsMap <- liftIO $ readIORef (bars env) |
||||
case M.lookup (BarSeriesId tickerid tf) barsMap of |
||||
Just _ -> return () -- already downloaded |
||||
Nothing -> case mbParams of |
||||
Just params -> do |
||||
now <- liftIO getCurrentTime |
||||
-- Load data in interval [today - 60days; today + 1day]. +1 day guarantees that we will download data up until current time. |
||||
-- If we don't make this adjustment it is possible that we will get data only up to beginning of current day. |
||||
barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) (86400 `addUTCTime` now) |
||||
let barSeries = BarSeries tickerid tf barsData params |
||||
liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) |
||||
_ -> logWarning "QuoteThread" $ "Unable to find parameters for: " <> (T.pack . show $ BarSeriesId tickerid tf) |
||||
|
||||
pushToBarAggregators tick = forM_ (BarTimeframe <$> [60, 300, 900, 3600]) (pushTickToAggregator tick) |
||||
|
||||
pushTickToAggregator tick tf = do |
||||
aggsRef <- asks aggregators |
||||
aggs <- liftIO . readIORef $ aggsRef |
||||
let key = (security tick, tf) |
||||
case HM.lookup key aggs of |
||||
Just agg -> do |
||||
let (mbar, agg') = BA.handleTick tick agg |
||||
liftIO $ atomicModifyIORef' aggsRef (\m -> (HM.insert key agg' m, ())) |
||||
barsRef' <- asks bars |
||||
case mbar of |
||||
Just bar -> do |
||||
liftIO $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ())) |
||||
writeBarData bar tf (QDBar (tf, bar)) |
||||
_ -> do |
||||
pure () |
||||
_ -> do |
||||
let agg = BA.mkAggregatorFromBars (M.singleton (security tick) (BarSeries (security tick) tf [] (InstrumentParameters (security tick) 1 1))) [(0, 86400)] |
||||
liftIO $ atomicModifyIORef' aggsRef (\m -> (HM.insert key agg m, ())) |
||||
|
||||
quoteThread env chan = flip runReaderT env $ forever $ do |
||||
qssData <- lift $ readChan chan |
||||
case qssData of |
||||
QDBar (tf, bar) -> do |
||||
barsRef' <- asks bars |
||||
lift $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ())) |
||||
writeBarData bar tf qssData |
||||
QDTick tick -> do |
||||
pushToBarAggregators tick |
||||
writeTickData tick qssData |
||||
|
||||
writeTickData tick qssData = do |
||||
let key = QuoteSubscription (security tick) (BarTimeframe 0) |
||||
subs <- asks endpoints >>= (lift . readIORef) |
||||
case HM.lookup key subs of |
||||
Just clientChannels -> do |
||||
lift $ mapM_ (\(_, chan') -> tryWriteChan chan' qssData) clientChannels |
||||
Nothing -> return () |
||||
|
||||
writeBarData bar tf qssData = do |
||||
let key = QuoteSubscription (barSecurity bar) tf |
||||
subs <- asks endpoints >>= (lift . readIORef) |
||||
case HM.lookup key subs of |
||||
Just clientChannels -> do |
||||
lift $ mapM_ (\(_, chan') -> tryWriteChan chan' qssData) clientChannels |
||||
Nothing -> return () |
||||
|
||||
stopQuoteThread :: (MonadIO m) => QuoteThreadHandle -> m () |
||||
stopQuoteThread (QuoteThreadHandle tid dtid env) = liftIO $ do |
||||
killThread tid |
||||
killThread dtid |
||||
stopQuoteSourceClient (qsclient env) |
||||
|
||||
addSubscription :: (MonadIO m) => QuoteThreadHandle -> TickerId -> BarTimeframe -> BoundedChan QuoteData -> m SubscriptionId |
||||
addSubscription (QuoteThreadHandle _ _ env) tid tf chan = liftIO $ do |
||||
cnt <- atomicModifyIORef' (subscriptionIdCounter env) (\c -> (c + 1, c)) |
||||
let subscription = QuoteSubscription tid tf |
||||
let subid = SubscriptionId cnt |
||||
writeChan (downloaderChan env) subscription |
||||
atomicModifyIORef' (endpoints env) (\m -> (doAddSubscription m subid tid, ())) |
||||
atomicModifyIORef' (subscriptions env) (\m -> (HM.insert subid subscription m, ())) |
||||
quoteSourceClientSubscribe (qsclient env) [(tid, BarTimeframe 0)] |
||||
return subid |
||||
where |
||||
doAddSubscription m subid tickerid = |
||||
let m1 = HM.alter (\case |
||||
Just chans -> Just ((subid, chan) : chans) |
||||
_ -> Just [(subid, chan)]) (QuoteSubscription tickerid tf) m in |
||||
HM.alter (\case |
||||
Just chans -> Just ((subid, chan) : chans) |
||||
_ -> Just [(subid, chan)]) (QuoteSubscription tickerid (BarTimeframe 0)) m1 |
||||
|
||||
removeSubscription :: (MonadIO m) => QuoteThreadHandle -> SubscriptionId -> m () |
||||
removeSubscription (QuoteThreadHandle _ _ env) subId = liftIO $ do |
||||
subs <- readIORef (subscriptions env) |
||||
case HM.lookup subId subs of |
||||
Just sub -> atomicModifyIORef' (endpoints env) (\m -> (doRemoveSubscription m sub, ())) |
||||
Nothing -> return () |
||||
where |
||||
doRemoveSubscription m sub = |
||||
let m1 = HM.adjust (filter (\(subId', _) -> subId' == subId)) sub m in |
||||
HM.adjust (filter (\(subId', _) -> subId' == subId)) (sub0 sub) m1 |
||||
sub0 sub = let QuoteSubscription tid _ = sub in QuoteSubscription tid (BarTimeframe 0) |
||||
|
||||
updateBarsMap :: Bars -> Bar -> BarTimeframe -> Bars |
||||
updateBarsMap barsMap bar tf = M.adjust (addToSeries bar) (BarSeriesId (barSecurity bar) tf) barsMap |
||||
|
||||
addToSeries :: Bar -> BarSeries -> BarSeries |
||||
addToSeries bar series = series { bsBars = bar : bsBars series } |
||||
|
||||
data DownloaderEnv = |
||||
DownloaderEnv |
||||
{ |
||||
qhp :: QHPHandle, |
||||
downloaderContext :: Context, |
||||
downloaderQtisEndpoint :: T.Text, |
||||
logAction :: LogAction DownloaderM Message |
||||
} |
||||
|
||||
newtype DownloaderM a = DownloaderM { unDownloaderM :: ReaderT DownloaderEnv IO a } |
||||
deriving (Functor, Applicative, Monad, MonadReader DownloaderEnv, MonadIO, MonadThrow) |
||||
|
||||
instance HasLog DownloaderEnv Message DownloaderM where |
||||
getLogAction = logAction |
||||
setLogAction a e = e { logAction = a } |
||||
|
||||
instance HistoryProvider DownloaderM where |
||||
getHistory tid tf from to = do |
||||
q <- asks qhp |
||||
requestHistoryFromQHP q tid tf from to |
||||
|
||||
instance TickerInfoProvider DownloaderM where |
||||
getInstrumentParameters tickers = do |
||||
ctx <- asks downloaderContext |
||||
ep <- asks downloaderQtisEndpoint |
||||
tis <- forM tickers (qtisGetTickersInfo ctx ep) |
||||
pure $ convert `fmap` tis |
||||
where |
||||
convert ti = InstrumentParameters |
||||
(tiTicker ti) |
||||
(fromInteger $ tiLotSize ti) |
||||
(tiTickSize ti) |
||||
|
||||
withQThread :: |
||||
DownloaderEnv |
||||
-> IORef Bars |
||||
-> IORef TickerInfoMap |
||||
-> ProgramConfiguration |
||||
-> Context |
||||
-> LogAction IO Message |
||||
-> (QuoteThreadHandle -> IO ()) |
||||
-> IO () |
||||
withQThread env barsMap tiMap cfg ctx logger f = do |
||||
securityParameters <- loadSecurityParameters |
||||
bracket |
||||
(startQuoteThread |
||||
barsMap |
||||
tiMap |
||||
ctx |
||||
(quotesourceEndpoint cfg) |
||||
securityParameters |
||||
(runDownloaderM env) |
||||
logger) |
||||
stopQuoteThread f |
||||
where |
||||
loadSecurityParameters = |
||||
case (quotesourceClientCert cfg, quotesourceServerCert cfg) of |
||||
(Just clientCertPath, Just serverCertPath) -> do |
||||
eClientCert <- loadCertificateFromFile clientCertPath |
||||
eServerCert <- loadCertificateFromFile serverCertPath |
||||
case (eClientCert, eServerCert) of |
||||
(Right clientCert, Right serverCert) -> return $ ClientSecurityParams (Just clientCert) (Just serverCert) |
||||
(_, _) -> return $ ClientSecurityParams Nothing Nothing |
||||
|
||||
_ -> return $ ClientSecurityParams Nothing Nothing |
||||
|
||||
runDownloaderM :: DownloaderEnv -> DownloaderM () -> IO () |
||||
runDownloaderM env = (`runReaderT` env) . unDownloaderM |
||||
@ -0,0 +1,151 @@
@@ -0,0 +1,151 @@
|
||||
{-# LANGUAGE FlexibleContexts #-} |
||||
{-# LANGUAGE MultiWayIf #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
|
||||
module ATrade.Driver.Junction.RemoteControl |
||||
( |
||||
handleRemoteControl |
||||
) where |
||||
|
||||
import ATrade.Driver.Junction.JunctionMonad (JunctionEnv (peLogAction, peRemoteControlSocket, peRobots), |
||||
JunctionM, getState, |
||||
reloadConfig, |
||||
setState, startRobot) |
||||
import ATrade.Driver.Junction.RobotDriverThread (stopRobot) |
||||
import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor) |
||||
import ATrade.Logging (Severity (Info), |
||||
logErrorWith, |
||||
logWith) |
||||
import Control.Monad (unless) |
||||
import Control.Monad.Reader (asks) |
||||
import Data.Aeson (decode) |
||||
import qualified Data.ByteString as B |
||||
import qualified Data.ByteString.Lazy as BL |
||||
import Data.List.NonEmpty (NonEmpty ((:|))) |
||||
import qualified Data.Map.Strict as M |
||||
import qualified Data.Text as T |
||||
import Data.Text.Encoding (decodeUtf8', |
||||
encodeUtf8) |
||||
import System.ZMQ4 (Event (In), |
||||
Poll (Sock), poll, |
||||
receiveMulti, |
||||
sendMulti) |
||||
import UnliftIO (MonadIO (liftIO), |
||||
atomicModifyIORef', |
||||
readIORef) |
||||
|
||||
data RemoteControlResponse = |
||||
ResponseOk |
||||
| ResponseError T.Text |
||||
| ResponseData B.ByteString |
||||
deriving (Show, Eq) |
||||
|
||||
data RemoteControlRequest = |
||||
StartRobot StrategyInstanceDescriptor |
||||
| StopRobot T.Text |
||||
| ReloadConfig T.Text |
||||
| GetState T.Text |
||||
| SetState T.Text B.ByteString |
||||
| Ping |
||||
deriving (Show) |
||||
|
||||
data ParseError = |
||||
UnknownCmd |
||||
| UtfDecodeError |
||||
| JsonDecodeError |
||||
deriving (Show, Eq) |
||||
|
||||
parseRemoteControlRequest :: B.ByteString -> Either ParseError RemoteControlRequest |
||||
parseRemoteControlRequest bs = |
||||
if |
||||
| cmd == "START" -> parseStart |
||||
| cmd == "STOP" -> parseStop |
||||
| cmd == "RELOAD_CONFIG" -> parseReloadConfig |
||||
| cmd == "GET_STATE" -> parseGetState |
||||
| cmd == "SET_STATE" -> parseSetState |
||||
| cmd == "PING" -> Right Ping |
||||
| otherwise -> Left UnknownCmd |
||||
where |
||||
cmd = B.takeWhile (/= 0x20) bs |
||||
rest = B.dropWhile (== 0x20) . B.dropWhile (/= 0x20) $ bs |
||||
|
||||
parseStart = case decode . BL.fromStrict $ rest of |
||||
Just inst -> Right (StartRobot inst) |
||||
Nothing -> Left JsonDecodeError |
||||
|
||||
parseStop = case decodeUtf8' rest of |
||||
Left _ -> Left UtfDecodeError |
||||
Right r -> Right (StopRobot (T.strip r)) |
||||
|
||||
parseReloadConfig = case decodeUtf8' rest of |
||||
Left _ -> Left UtfDecodeError |
||||
Right r -> Right (ReloadConfig (T.strip r)) |
||||
|
||||
parseGetState = case decodeUtf8' (B.takeWhile (/= 0x20) rest) of |
||||
Left _ -> Left UtfDecodeError |
||||
Right r -> Right (GetState r) |
||||
|
||||
parseSetState = case decodeUtf8' (B.takeWhile (/= 0x20) rest) of |
||||
Left _ -> Left UtfDecodeError |
||||
Right r -> Right (SetState r (B.dropWhile (== 0x20) . B.dropWhile (/= 0x20) $ rest)) |
||||
|
||||
|
||||
makeRemoteControlResponse :: RemoteControlResponse -> B.ByteString |
||||
makeRemoteControlResponse ResponseOk = "OK" |
||||
makeRemoteControlResponse (ResponseError msg) = "ERROR " <> encodeUtf8 msg |
||||
makeRemoteControlResponse (ResponseData d) = "DATA\n" <> d |
||||
|
||||
handleRemoteControl :: Int -> JunctionM () |
||||
handleRemoteControl timeout = do |
||||
sock <- asks peRemoteControlSocket |
||||
logger <- asks peLogAction |
||||
evs <- poll (fromIntegral timeout) [Sock sock [In] Nothing] |
||||
case evs of |
||||
(x:_) -> unless (null x) $ do |
||||
frames <- liftIO $ receiveMulti sock |
||||
case frames of |
||||
[peerId, _, rawRequest] -> do |
||||
case parseRemoteControlRequest rawRequest of |
||||
Left err -> logErrorWith logger "RemoteControl" ("Unable to parse request: " <> (T.pack . show) err) |
||||
Right request -> do |
||||
response <- handleRequest request |
||||
liftIO $ sendMulti sock $ peerId :| [B.empty, makeRemoteControlResponse response] |
||||
_ -> logErrorWith logger "RemoteControl" "Invalid incoming request" |
||||
_ -> return () |
||||
where |
||||
handleRequest (StartRobot inst) = do |
||||
startRobot inst |
||||
return ResponseOk |
||||
handleRequest (StopRobot instId) = do |
||||
robotsRef <- asks peRobots |
||||
robots <- readIORef robotsRef |
||||
case M.lookup instId robots of |
||||
Just robot -> do |
||||
logger <- asks peLogAction |
||||
logWith logger Info "RemoteControl" $ "Stopping robot: " <> instId |
||||
stopRobot robot |
||||
liftIO $ atomicModifyIORef' robotsRef (\r -> (M.delete instId r, ())) |
||||
return ResponseOk |
||||
Nothing -> return $ ResponseError $ "Not started: " <> instId |
||||
|
||||
handleRequest (ReloadConfig instId) = do |
||||
res <- reloadConfig instId |
||||
case res of |
||||
Left errmsg -> return $ ResponseError errmsg |
||||
Right () -> return ResponseOk |
||||
|
||||
handleRequest (GetState instId) = do |
||||
res <- getState instId |
||||
case res of |
||||
Left errmsg -> return $ ResponseError errmsg |
||||
Right d -> return $ ResponseData d |
||||
|
||||
handleRequest (SetState instId rawState) = do |
||||
res <- setState instId rawState |
||||
case res of |
||||
Left errmsg -> return $ ResponseError errmsg |
||||
Right () -> return ResponseOk |
||||
|
||||
handleRequest Ping = return ResponseOk |
||||
|
||||
|
||||
@ -0,0 +1,216 @@
@@ -0,0 +1,216 @@
|
||||
{-# LANGUAGE ExistentialQuantification #-} |
||||
{-# LANGUAGE FlexibleContexts #-} |
||||
{-# LANGUAGE FlexibleInstances #-} |
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
||||
{-# LANGUAGE MultiParamTypeClasses #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
{-# LANGUAGE RankNTypes #-} |
||||
|
||||
module ATrade.Driver.Junction.RobotDriverThread |
||||
( |
||||
createRobotDriverThread, |
||||
RobotEnv(..), |
||||
RobotM(..), |
||||
RobotDriverHandle, |
||||
onStrategyInstance, |
||||
onStrategyInstanceM, |
||||
postNotificationEvent, |
||||
stopRobot, |
||||
getInstanceDescriptor |
||||
) where |
||||
|
||||
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification)) |
||||
import qualified ATrade.Driver.Junction.BrokerService as Bro |
||||
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), |
||||
QuoteSubscription (QuoteSubscription), |
||||
SubscriptionId) |
||||
import ATrade.Driver.Junction.Types (BigConfig, |
||||
StrategyDescriptor, |
||||
StrategyInstance (StrategyInstance, strategyEventCallback), |
||||
StrategyInstanceDescriptor (configKey), |
||||
confStrategy, |
||||
confTickers, |
||||
eventCallback, stateKey, |
||||
strategyId, tickerId, |
||||
timeframe) |
||||
import ATrade.Logging (Message, log) |
||||
import ATrade.QuoteSource.Client (QuoteData (..)) |
||||
import ATrade.RoboCom.ConfigStorage (ConfigStorage) |
||||
import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderUpdate), |
||||
MonadRobot (..), |
||||
StrategyEnvironment (..)) |
||||
import ATrade.RoboCom.Persistence (MonadPersistence) |
||||
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), |
||||
Bars, TickerInfoMap) |
||||
import ATrade.Types (OrderId, OrderState, |
||||
Tick (value), Trade) |
||||
import Colog (HasLog (getLogAction, setLogAction), |
||||
LogAction) |
||||
import Control.Concurrent (ThreadId, forkIO, |
||||
killThread) |
||||
import Control.Concurrent.BoundedChan (BoundedChan, |
||||
newBoundedChan, readChan, |
||||
writeChan) |
||||
import Control.Exception.Safe (MonadThrow) |
||||
import Control.Monad (forM, forM_, forever, |
||||
void, when) |
||||
import Control.Monad.IO.Class (MonadIO, liftIO) |
||||
import Control.Monad.Reader (MonadReader (local), |
||||
ReaderT, asks) |
||||
import Data.Aeson (FromJSON, ToJSON) |
||||
import Data.Default (Default) |
||||
import Data.IORef (IORef, |
||||
atomicModifyIORef', |
||||
readIORef, writeIORef) |
||||
import Data.List.NonEmpty (NonEmpty) |
||||
import qualified Data.Map.Strict as M |
||||
import qualified Data.Text.Lazy as TL |
||||
import Data.Time (UTCTime, getCurrentTime) |
||||
import Dhall (FromDhall) |
||||
import Prelude hiding (log) |
||||
|
||||
data RobotDriverHandle = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => |
||||
RobotDriverHandle StrategyInstanceDescriptor (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) [SubscriptionId] |
||||
|
||||
data RobotDriverRequest |
||||
|
||||
data RobotDriverEvent = |
||||
EventRequest RobotDriverRequest |
||||
| QuoteEvent QuoteData |
||||
| NewTradeEvent Trade |
||||
| OrderEvent OrderId OrderState |
||||
|
||||
|
||||
robotDriverThread :: (MonadIO m, |
||||
MonadRobot m c s) => |
||||
StrategyInstance c s -> |
||||
BoundedChan RobotDriverEvent -> |
||||
m () |
||||
|
||||
robotDriverThread inst eventQueue = |
||||
forever $ liftIO (readChan eventQueue) >>= handleEvent |
||||
where |
||||
handleEvent (EventRequest _) = return () |
||||
handleEvent (QuoteEvent d) = |
||||
case d of |
||||
QDTick tick -> when (value tick /= 0) $ strategyEventCallback inst (NewTick tick) |
||||
QDBar (tf, bar) -> strategyEventCallback inst (NewBar (tf, bar)) |
||||
handleEvent (NewTradeEvent trade) = strategyEventCallback inst (NewTrade trade) |
||||
handleEvent (OrderEvent oid newState) = strategyEventCallback inst (OrderUpdate oid newState) |
||||
|
||||
createRobotDriverThread :: (MonadIO m1, |
||||
ConfigStorage m1, |
||||
MonadPersistence m1, |
||||
QuoteStream m1, |
||||
Default s, |
||||
FromJSON s, |
||||
ToJSON s, |
||||
FromDhall c, |
||||
MonadIO m, |
||||
MonadReader (RobotEnv c s) m, |
||||
MonadRobot m c s) => |
||||
StrategyInstanceDescriptor |
||||
-> StrategyDescriptor c s |
||||
-> (m () -> IO ()) |
||||
-> BigConfig c |
||||
-> IORef c |
||||
-> IORef s |
||||
-> IORef [UTCTime] |
||||
-> m1 RobotDriverHandle |
||||
|
||||
createRobotDriverThread instDesc strDesc runner bigConf rConf rState rTimers = do |
||||
eventQueue <- liftIO $ newBoundedChan 2000 |
||||
|
||||
let inst = StrategyInstance (strategyId instDesc) (eventCallback strDesc) rState rConf rTimers |
||||
|
||||
quoteQueue <- liftIO $ newBoundedChan 2000 |
||||
subIds <- forM (confTickers bigConf) (\x -> addSubscription (QuoteSubscription (tickerId x) (timeframe x)) quoteQueue) |
||||
qthread <- liftIO . forkIO $ forever $ passQuoteEvents eventQueue quoteQueue |
||||
|
||||
driver <- liftIO . forkIO $ runner $ robotDriverThread inst eventQueue |
||||
return $ RobotDriverHandle instDesc inst driver qthread eventQueue subIds |
||||
|
||||
where |
||||
passQuoteEvents eventQueue quoteQueue = do |
||||
v <- readChan quoteQueue |
||||
writeChan eventQueue (QuoteEvent v) |
||||
|
||||
stopRobot :: (MonadIO m, QuoteStream m) => RobotDriverHandle -> m () |
||||
stopRobot (RobotDriverHandle _ _ driver qthread _ subIds) = do |
||||
forM_ subIds removeSubscription |
||||
liftIO $ killThread driver |
||||
liftIO $ killThread qthread |
||||
|
||||
onStrategyInstance :: RobotDriverHandle -> forall r. (forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> r) -> r |
||||
onStrategyInstance (RobotDriverHandle _ inst _ _ _ _) f = f inst |
||||
|
||||
onStrategyInstanceM :: (MonadIO m) => RobotDriverHandle -> |
||||
(forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> m r) -> m r |
||||
onStrategyInstanceM (RobotDriverHandle _ inst _ _ _ _) f = f inst |
||||
|
||||
data RobotEnv c s = |
||||
RobotEnv |
||||
{ |
||||
stateRef :: IORef s, |
||||
configRef :: IORef c, |
||||
timersRef :: IORef [UTCTime], |
||||
bars :: IORef Bars, |
||||
tickerInfoMap :: IORef TickerInfoMap, |
||||
env :: IORef StrategyEnvironment, |
||||
logAction :: LogAction (RobotM c s) Message, |
||||
brokerService :: Bro.BrokerService, |
||||
tickers :: NonEmpty BarSeriesId |
||||
} |
||||
|
||||
newtype RobotM c s a = RobotM { unRobotM :: ReaderT (RobotEnv c s) IO a } |
||||
deriving (Functor, Applicative, Monad, MonadReader (RobotEnv c s), MonadIO, MonadThrow) |
||||
|
||||
instance HasLog (RobotEnv c s) Message (RobotM c s) where |
||||
getLogAction = logAction |
||||
setLogAction a e = e { logAction = a } |
||||
|
||||
instance MonadRobot (RobotM c s) c s where |
||||
submitOrder order = do |
||||
instId <- _seInstanceId <$> (asks env >>= liftIO . readIORef) |
||||
bro <- asks brokerService |
||||
Bro.submitOrder bro instId order |
||||
|
||||
cancelOrder oid = do |
||||
bro <- asks brokerService |
||||
Bro.cancelOrder bro oid |
||||
|
||||
appendToLog s t = do |
||||
instId <- _seInstanceId <$> (asks env >>= liftIO . readIORef) |
||||
log s instId $ TL.toStrict t |
||||
|
||||
setupTimer t = do |
||||
ref <- asks timersRef |
||||
liftIO $ atomicModifyIORef' ref (\s -> (t : s, ())) |
||||
|
||||
enqueueIOAction = undefined |
||||
getConfig = asks configRef >>= liftIO . readIORef |
||||
getState = asks stateRef >>= liftIO . readIORef |
||||
setState newState = asks stateRef >>= liftIO . flip writeIORef newState |
||||
getEnvironment = do |
||||
ref <- asks env |
||||
now <- liftIO getCurrentTime |
||||
liftIO $ atomicModifyIORef' ref (\e -> (e { _seLastTimestamp = now }, e { _seLastTimestamp = now})) |
||||
|
||||
getTicker tid tf = do |
||||
b <- asks bars >>= liftIO . readIORef |
||||
return $ M.lookup (BarSeriesId tid tf) b |
||||
|
||||
getTickerInfo tid = do |
||||
b <- asks tickerInfoMap >>= liftIO . readIORef |
||||
return $ M.lookup tid b |
||||
|
||||
getAvailableTickers = asks tickers |
||||
|
||||
postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m () |
||||
postNotificationEvent (RobotDriverHandle _ _ _ _ eventQueue _) notification = liftIO $ |
||||
case notification of |
||||
OrderNotification _ oid state -> writeChan eventQueue (OrderEvent oid state) |
||||
TradeNotification _ trade -> writeChan eventQueue (NewTradeEvent trade) |
||||
|
||||
getInstanceDescriptor :: RobotDriverHandle -> StrategyInstanceDescriptor |
||||
getInstanceDescriptor (RobotDriverHandle instDesc _ _ _ _ _) = instDesc |
||||
@ -1,361 +0,0 @@
@@ -1,361 +0,0 @@
|
||||
{-# LANGUAGE FlexibleInstances #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
{-# LANGUAGE TypeSynonymInstances #-} |
||||
|
||||
module ATrade.Quotes.Finam ( |
||||
downloadFinamSymbols, |
||||
Symbol(..), |
||||
Period(..), |
||||
DateFormat(..), |
||||
TimeFormat(..), |
||||
FieldSeparator(..), |
||||
RequestParams(..), |
||||
defaultParams, |
||||
downloadQuotes, |
||||
parseQuotes, |
||||
downloadAndParseQuotes, |
||||
Row(..) |
||||
) where |
||||
|
||||
import ATrade.Types |
||||
import Control.Error.Util |
||||
import Control.Exception |
||||
import Control.Lens |
||||
import Control.Monad |
||||
import qualified Data.ByteString as B |
||||
import qualified Data.ByteString.Char8 as B8 |
||||
import qualified Data.ByteString.Lazy as BL |
||||
import Data.Csv hiding (Options) |
||||
import Data.List |
||||
import qualified Data.Map as M |
||||
import Data.Maybe |
||||
import qualified Data.Text as T |
||||
import qualified Data.Text.ICU.Convert as TC |
||||
import Data.Time.Calendar |
||||
import Data.Time.Clock |
||||
import Data.Time.Format |
||||
import qualified Data.Vector as V |
||||
import Network.Wreq |
||||
import Safe |
||||
import System.Log.Logger |
||||
import Text.Parsec |
||||
import Text.ParserCombinators.Parsec.Number |
||||
|
||||
data Period = |
||||
PeriodTick | |
||||
Period1Min | |
||||
Period5Min | |
||||
Period10Min | |
||||
Period15Min | |
||||
Period30Min | |
||||
PeriodHour | |
||||
PeriodDay | |
||||
PeriodWeek | |
||||
PeriodMonth |
||||
deriving (Show, Eq) |
||||
|
||||
instance Enum Period where |
||||
fromEnum PeriodTick = 1 |
||||
fromEnum Period1Min = 2 |
||||
fromEnum Period5Min = 3 |
||||
fromEnum Period10Min = 4 |
||||
fromEnum Period15Min = 5 |
||||
fromEnum Period30Min = 6 |
||||
fromEnum PeriodHour = 7 |
||||
fromEnum PeriodDay = 8 |
||||
fromEnum PeriodWeek = 9 |
||||
fromEnum PeriodMonth = 10 |
||||
|
||||
toEnum 1 = PeriodTick |
||||
toEnum 2 = Period1Min |
||||
toEnum 3 = Period5Min |
||||
toEnum 4 = Period10Min |
||||
toEnum 5 = Period15Min |
||||
toEnum 6 = Period30Min |
||||
toEnum 7 = PeriodHour |
||||
toEnum 8 = PeriodDay |
||||
toEnum 9 = PeriodWeek |
||||
toEnum 10 = PeriodMonth |
||||
toEnum _ = PeriodDay |
||||
|
||||
data DateFormat = |
||||
FormatYYYYMMDD | |
||||
FormatYYMMDD | |
||||
FormatDDMMYY | |
||||
FormatDD_MM_YY | |
||||
FormatMM_DD_YY |
||||
deriving (Show, Eq) |
||||
|
||||
instance Enum DateFormat where |
||||
fromEnum FormatYYYYMMDD = 1 |
||||
fromEnum FormatYYMMDD = 2 |
||||
fromEnum FormatDDMMYY = 3 |
||||
fromEnum FormatDD_MM_YY = 4 |
||||
fromEnum FormatMM_DD_YY = 5 |
||||
|
||||
toEnum 1 = FormatYYYYMMDD |
||||
toEnum 2 = FormatYYMMDD |
||||
toEnum 3 = FormatDDMMYY |
||||
toEnum 4 = FormatDD_MM_YY |
||||
toEnum 5 = FormatMM_DD_YY |
||||
toEnum _ = FormatYYYYMMDD |
||||
|
||||
|
||||
data TimeFormat = |
||||
FormatHHMMSS | |
||||
FormatHHMM | |
||||
FormatHH_MM_SS | |
||||
FormatHH_MM |
||||
deriving (Show, Eq) |
||||
|
||||
instance Enum TimeFormat where |
||||
fromEnum FormatHHMMSS = 1 |
||||
fromEnum FormatHHMM = 2 |
||||
fromEnum FormatHH_MM_SS = 3 |
||||
fromEnum FormatHH_MM = 4 |
||||
|
||||
toEnum 1 = FormatHHMMSS |
||||
toEnum 2 = FormatHHMM |
||||
toEnum 3 = FormatHH_MM_SS |
||||
toEnum 4 = FormatHH_MM |
||||
toEnum _ = FormatHHMMSS |
||||
|
||||
data FieldSeparator = |
||||
SeparatorComma | |
||||
SeparatorPeriod | |
||||
SeparatorSemicolon | |
||||
SeparatorTab | |
||||
SeparatorSpace |
||||
deriving (Show, Eq) |
||||
|
||||
instance Enum FieldSeparator where |
||||
fromEnum SeparatorComma = 1 |
||||
fromEnum SeparatorPeriod = 2 |
||||
fromEnum SeparatorSemicolon = 3 |
||||
fromEnum SeparatorTab = 4 |
||||
fromEnum SeparatorSpace = 5 |
||||
|
||||
toEnum 1 = SeparatorComma |
||||
toEnum 2 = SeparatorPeriod |
||||
toEnum 3 = SeparatorSemicolon |
||||
toEnum 4 = SeparatorTab |
||||
toEnum 5 = SeparatorSpace |
||||
toEnum _ = SeparatorComma |
||||
|
||||
data RequestParams = RequestParams { |
||||
ticker :: T.Text, |
||||
startDate :: Day, |
||||
endDate :: Day, |
||||
period :: Period, |
||||
dateFormat :: DateFormat, |
||||
timeFormat :: TimeFormat, |
||||
fieldSeparator :: FieldSeparator, |
||||
includeHeader :: Bool, |
||||
fillEmpty :: Bool |
||||
} |
||||
|
||||
defaultParams :: RequestParams |
||||
defaultParams = RequestParams { |
||||
ticker = "", |
||||
startDate = fromGregorian 1970 1 1, |
||||
endDate = fromGregorian 1970 1 1, |
||||
period = PeriodDay, |
||||
dateFormat = FormatYYYYMMDD, |
||||
timeFormat = FormatHHMMSS, |
||||
fieldSeparator = SeparatorComma, |
||||
includeHeader = True, |
||||
fillEmpty = False |
||||
} |
||||
|
||||
data Symbol = Symbol { |
||||
symCode :: T.Text, |
||||
symName :: T.Text, |
||||
symId :: Integer, |
||||
symMarketCode :: Integer, |
||||
symMarketName :: T.Text |
||||
} |
||||
deriving (Show, Eq) |
||||
|
||||
data Row = Row { |
||||
rowTicker :: T.Text, |
||||
rowTime :: UTCTime, |
||||
rowOpen :: Price, |
||||
rowHigh :: Price, |
||||
rowLow :: Price, |
||||
rowClose :: Price, |
||||
rowVolume :: Integer |
||||
} deriving (Show, Eq) |
||||
|
||||
instance FromField Price where |
||||
parseField s = fromDouble <$> (parseField s :: Parser Double) |
||||
|
||||
instance FromRecord Row where |
||||
parseRecord v |
||||
| length v == 9 = do |
||||
tkr <- v .! 0 |
||||
date <- v .! 2 |
||||
time <- v .! 3 |
||||
dt <- addUTCTime (-3 * 3600) <$> (parseDt date time) |
||||
open <- v .! 4 |
||||
high <- v .! 5 |
||||
low <- v .! 6 |
||||
close <- v .! 7 |
||||
vol <- v .! 8 |
||||
return $ Row tkr dt open high low close vol |
||||
| otherwise = mzero |
||||
where |
||||
parseDt :: B.ByteString -> B.ByteString -> Parser UTCTime |
||||
parseDt d t = case parseTimeM True defaultTimeLocale "%Y%m%d %H%M%S" $ B8.unpack d ++ " " ++ B8.unpack t of |
||||
Just dt -> return dt |
||||
Nothing -> fail "Unable to parse date/time" |
||||
|
||||
downloadAndParseQuotes :: RequestParams -> IO (Maybe [Row]) |
||||
downloadAndParseQuotes requestParams = downloadAndParseQuotes' 3 |
||||
where |
||||
downloadAndParseQuotes' iter = do |
||||
raw <- downloadQuotes requestParams `catch` (\e -> do |
||||
debugM "History" $ "exception: " ++ show (e :: SomeException) |
||||
return Nothing) |
||||
case raw of |
||||
Just r -> return $ parseQuotes r |
||||
Nothing -> if iter <= 0 then return Nothing else downloadAndParseQuotes' (iter - 1) |
||||
|
||||
parseQuotes :: B.ByteString -> Maybe [Row] |
||||
parseQuotes csvData = case decode HasHeader $ BL.fromStrict csvData of |
||||
Left _ -> Nothing |
||||
Right d -> Just $ V.toList d |
||||
|
||||
downloadQuotes :: RequestParams -> IO (Maybe B.ByteString) |
||||
downloadQuotes requestParams = do |
||||
symbols <- downloadFinamSymbols |
||||
case requestUrl symbols requestParams of |
||||
Just (url, options') -> do |
||||
resp <- getWith options' url |
||||
return $ Just $ BL.toStrict $ resp ^. responseBody |
||||
Nothing -> return Nothing |
||||
|
||||
requestUrl :: [Symbol] -> RequestParams -> Maybe (String, Options) |
||||
requestUrl symbols requestParams = case getFinamCode symbols (ticker requestParams) of |
||||
Just (sym, market) -> Just ("http://export.finam.ru/export9.out", getOptions sym market) |
||||
Nothing -> Nothing |
||||
where |
||||
getOptions sym market = defaults & |
||||
param "market" .~ [T.pack . show $ market] & |
||||
param "f" .~ [ticker requestParams] & |
||||
param "e" .~ [".csv"] & |
||||
param "dtf" .~ [T.pack . show . fromEnum . dateFormat $ requestParams] & |
||||
param "tmf" .~ [T.pack . show . fromEnum . dateFormat $ requestParams] & |
||||
param "MSOR" .~ ["0"] & |
||||
param "mstime" .~ ["on"] & |
||||
param "mstimever" .~ ["1"] & |
||||
param "sep" .~ [T.pack . show . fromEnum . fieldSeparator $ requestParams] & |
||||
param "sep2" .~ ["1"] & |
||||
param "at" .~ [if includeHeader requestParams then "1" else "0"] & |
||||
param "fsp" .~ [if fillEmpty requestParams then "1" else "0"] & |
||||
param "p" .~ [T.pack . show . fromEnum $ period requestParams] & |
||||
param "em" .~ [T.pack . show $ sym ] & |
||||
param "df" .~ [T.pack . show $ dayFrom] & |
||||
param "mf" .~ [T.pack . show $ (monthFrom - 1)] & |
||||
param "yf" .~ [T.pack . show $ yearFrom] & |
||||
param "dt" .~ [T.pack . show $ dayTo] & |
||||
param "mt" .~ [T.pack . show $ (monthTo - 1)] & |
||||
param "yt" .~ [T.pack . show $ yearTo] & |
||||
param "code" .~ [ticker requestParams] & |
||||
param "datf" .~ if period requestParams == PeriodTick then ["11"] else ["1"] |
||||
(yearFrom, monthFrom, dayFrom) = toGregorian $ startDate requestParams |
||||
(yearTo, monthTo, dayTo) = toGregorian $ endDate requestParams |
||||
|
||||
getFinamCode :: [Symbol] -> T.Text -> Maybe (Integer, Integer) |
||||
getFinamCode symbols tickerCode = case find (\x -> symCode x == tickerCode && symMarketCode x `notElem` archives) symbols of |
||||
Just sym -> Just (symId sym, symMarketCode sym) |
||||
Nothing -> Nothing |
||||
|
||||
downloadFinamSymbols :: IO [Symbol] |
||||
downloadFinamSymbols = do |
||||
conv <- TC.open "cp1251" Nothing |
||||
result <- get "http://www.finam.ru/cache/icharts/icharts.js" |
||||
if result ^. responseStatus . statusCode == 200 |
||||
then return $ parseSymbols . T.lines $ TC.toUnicode conv $ BL.toStrict $ result ^. responseBody |
||||
else return [] |
||||
where |
||||
parseSymbols :: [T.Text] -> [Symbol] |
||||
parseSymbols strs = zipWith5 Symbol codes names ids marketCodes marketNames |
||||
where |
||||
getWithParser parser pos = fromMaybe [] $ do |
||||
s <- T.unpack <$> strs `atMay` pos |
||||
hush $ parse parser "" s |
||||
|
||||
ids :: [Integer] |
||||
ids = getWithParser intlist 0 |
||||
|
||||
names :: [T.Text] |
||||
names = T.pack <$> getWithParser strlist 1 |
||||
|
||||
codes :: [T.Text] |
||||
codes = T.pack <$> getWithParser strlist 2 |
||||
|
||||
marketCodes :: [Integer] |
||||
marketCodes = getWithParser intlist 3 |
||||
|
||||
marketNames :: [T.Text] |
||||
marketNames = fmap (\code -> fromMaybe "" $ M.lookup code codeToName) marketCodes |
||||
|
||||
intlist = do |
||||
_ <- string "var" |
||||
spaces |
||||
skipMany1 alphaNum |
||||
spaces |
||||
_ <- char '=' |
||||
spaces |
||||
_ <- char '[' |
||||
manyTill (do |
||||
i <- int |
||||
_ <- char ',' <|> char ']' |
||||
return i) (char '\'' <|> char ';') |
||||
|
||||
strlist = do |
||||
_ <- string "var" |
||||
spaces |
||||
skipMany1 alphaNum |
||||
spaces |
||||
_ <- char '=' |
||||
spaces |
||||
_ <- char '[' |
||||
(char '\'' >> manyTill ((char '\\' >> char '\'') <|> anyChar) (char '\'')) `sepBy` char ',' |
||||
|
||||
codeToName :: M.Map Integer T.Text |
||||
codeToName = M.fromList [ |
||||
(200, "МосБиржа топ"), |
||||
(1 , "МосБиржа акции"), |
||||
(14 , "МосБиржа фьючерсы"), |
||||
(41, "Курс рубля"), |
||||
(45, "МосБиржа валютный рынок"), |
||||
(2, "МосБиржа облигации"), |
||||
(12, "МосБиржа внесписочные облигации"), |
||||
(29, "МосБиржа пифы"), |
||||
(8, "Расписки"), |
||||
(6, "Мировые Индексы"), |
||||
(24, "Товары"), |
||||
(5, "Мировые валюты"), |
||||
(25, "Акции США(BATS)"), |
||||
(7, "Фьючерсы США"), |
||||
(27, "Отрасли экономики США"), |
||||
(26, "Гособлигации США"), |
||||
(28, "ETF"), |
||||
(30, "Индексы мировой экономики"), |
||||
(3, "РТС"), |
||||
(20, "RTS Board"), |
||||
(10, "РТС-GAZ"), |
||||
(17, "ФОРТС Архив"), |
||||
(31, "Сырье Архив"), |
||||
(38, "RTS Standard Архив"), |
||||
(16, "ММВБ Архив"), |
||||
(18, "РТС Архив"), |
||||
(9, "СПФБ Архив"), |
||||
(32, "РТС-BOARD Архив"), |
||||
(39, "Расписки Архив"), |
||||
(-1, "Отрасли") ] |
||||
|
||||
|
||||
archives :: [Integer] |
||||
archives = [3, 8, 16, 17, 18, 31, 32, 38, 39, 517] |
||||
@ -0,0 +1,12 @@
@@ -0,0 +1,12 @@
|
||||
|
||||
module ATrade.Quotes.HistoryProvider |
||||
( |
||||
HistoryProvider(..) |
||||
) where |
||||
|
||||
import ATrade.RoboCom.Types (Bar) |
||||
import ATrade.Types (BarTimeframe, TickerId) |
||||
import Data.Time (UTCTime) |
||||
|
||||
class (Monad m) => HistoryProvider m where |
||||
getHistory :: TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] |
||||
@ -0,0 +1,12 @@
@@ -0,0 +1,12 @@
|
||||
|
||||
module ATrade.Quotes.TickerInfoProvider |
||||
( |
||||
TickerInfoProvider(..) |
||||
) where |
||||
|
||||
import ATrade.RoboCom.Types (InstrumentParameters) |
||||
import ATrade.Types (TickerId) |
||||
|
||||
class (Monad m) => TickerInfoProvider m where |
||||
getInstrumentParameters :: [TickerId] -> m [InstrumentParameters] |
||||
|
||||
@ -0,0 +1,14 @@
@@ -0,0 +1,14 @@
|
||||
{-# LANGUAGE RankNTypes #-} |
||||
|
||||
module ATrade.RoboCom.ConfigStorage |
||||
( |
||||
ConfigStorage(..) |
||||
) where |
||||
|
||||
import qualified Data.Text as T |
||||
import Dhall (FromDhall) |
||||
|
||||
class (Monad m) => ConfigStorage m where |
||||
loadConfig :: forall c. (FromDhall c) => T.Text -> m c |
||||
|
||||
|
||||
@ -0,0 +1,16 @@
@@ -0,0 +1,16 @@
|
||||
{-# LANGUAGE RankNTypes #-} |
||||
|
||||
module ATrade.RoboCom.Persistence |
||||
( |
||||
MonadPersistence(..) |
||||
) where |
||||
|
||||
import Data.Aeson |
||||
import Data.Default (Default) |
||||
import qualified Data.Text as T |
||||
|
||||
class (Monad m) => MonadPersistence m where |
||||
saveState :: forall s. (ToJSON s) => s -> T.Text -> m () |
||||
loadState :: forall s. (Default s, FromJSON s) => T.Text -> m s |
||||
|
||||
|
||||
@ -0,0 +1,117 @@
@@ -0,0 +1,117 @@
|
||||
{-# LANGUAGE FlexibleInstances #-} |
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
||||
{-# LANGUAGE MultiParamTypeClasses #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
{-# LANGUAGE TypeSynonymInstances #-} |
||||
|
||||
module Test.Driver.Junction.QuoteThread |
||||
( |
||||
unitTests |
||||
) where |
||||
|
||||
import Test.Tasty |
||||
import Test.Tasty.HUnit |
||||
import Test.Tasty.QuickCheck as QC |
||||
import Test.Tasty.SmallCheck as SC |
||||
|
||||
import ATrade.Driver.Junction.QuoteThread (addSubscription, |
||||
startQuoteThread, |
||||
stopQuoteThread) |
||||
import ATrade.Logging (Message) |
||||
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) |
||||
import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) |
||||
import ATrade.QuoteSource.Client (QuoteData (QDBar)) |
||||
import ATrade.QuoteSource.Server (QuoteSourceServerData (..), |
||||
startQuoteSourceServer, |
||||
stopQuoteSourceServer) |
||||
import ATrade.RoboCom.Types (BarSeries (bsBars), |
||||
BarSeriesId (BarSeriesId), |
||||
InstrumentParameters (InstrumentParameters)) |
||||
import ATrade.Types |
||||
import Colog.Core (LogAction (..)) |
||||
import Colog.Core.Class (HasLog (..)) |
||||
import Control.Concurrent (forkIO, threadDelay) |
||||
import Control.Concurrent.BoundedChan (newBoundedChan, readChan, |
||||
writeChan) |
||||
import Control.Exception (bracket) |
||||
import Control.Monad (forever) |
||||
import Control.Monad.Reader |
||||
import Data.IORef (IORef, newIORef, readIORef) |
||||
import qualified Data.Map.Strict as M |
||||
import qualified Data.Text as T |
||||
import Data.Time (UTCTime (UTCTime), |
||||
fromGregorian) |
||||
import System.IO (BufferMode (LineBuffering), |
||||
hSetBuffering, stderr) |
||||
import System.ZMQ4 (withContext) |
||||
import Test.Mock.HistoryProvider (MockHistoryProvider, |
||||
mkMockHistoryProvider, |
||||
mockGetHistory) |
||||
import Test.Mock.TickerInfoProvider (MockTickerInfoProvider, |
||||
mkMockTickerInfoProvider, |
||||
mockGetInstrumentParameters) |
||||
|
||||
data TestEnv = |
||||
TestEnv |
||||
{ |
||||
historyProvider :: MockHistoryProvider, |
||||
tickerInfoProvider :: MockTickerInfoProvider |
||||
} |
||||
|
||||
type TestM = ReaderT TestEnv IO |
||||
|
||||
instance HistoryProvider TestM where |
||||
getHistory tid tf from to = do |
||||
hp <- asks historyProvider |
||||
liftIO $ mockGetHistory hp tid tf from to |
||||
|
||||
instance TickerInfoProvider TestM where |
||||
getInstrumentParameters tickers = do |
||||
tip <- asks tickerInfoProvider |
||||
liftIO $ mockGetInstrumentParameters tip tickers |
||||
|
||||
instance HasLog TestEnv Message TestM where |
||||
getLogAction env = LogAction $ \msg -> return () |
||||
|
||||
qsEndpoint = "inproc://qs" |
||||
|
||||
mockHistoryProvider = mkMockHistoryProvider $ M.fromList [(BarSeriesId "FOO" (BarTimeframe 3600), bars)] |
||||
where |
||||
bars = [] |
||||
|
||||
mockTickerInfoProvider = mkMockTickerInfoProvider $ M.fromList [("FOO", InstrumentParameters "FOO" 10 0.1)] |
||||
|
||||
unitTests = testGroup "Driver.Junction.QuoteThread" [ |
||||
testSubscription |
||||
] |
||||
|
||||
testSubscription :: TestTree |
||||
testSubscription = testCase "Subscription" $ withContext $ \ctx -> do |
||||
barsRef <- newIORef M.empty |
||||
tiRef <- newIORef M.empty |
||||
serverChan <- newBoundedChan 2000 |
||||
let clientSecurityParams = defaultClientSecurityParams |
||||
bracket |
||||
(startQuoteSourceServer serverChan ctx qsEndpoint defaultServerSecurityParams) |
||||
stopQuoteSourceServer $ \_ -> |
||||
bracket |
||||
(startQuoteThread barsRef tiRef ctx qsEndpoint clientSecurityParams (`runReaderT` (TestEnv mockHistoryProvider mockTickerInfoProvider)) (LogAction $ \_ -> return ())) |
||||
|
||||
stopQuoteThread $ \qt -> do |
||||
chan <- newBoundedChan 2000 |
||||
addSubscription qt "FOO" (BarTimeframe 3600) chan |
||||
|
||||
forkIO $ forever $ threadDelay 50000 >> writeChan serverChan (QSSBar (BarTimeframe 3600, bar)) |
||||
|
||||
clientData <- readChan chan |
||||
assertEqual "Invalid client data" clientData (QDBar (BarTimeframe 3600, bar)) |
||||
|
||||
bars <- readIORef barsRef |
||||
case M.lookup (BarSeriesId "FOO" (BarTimeframe 3600)) bars of |
||||
Just series -> assertBool "Length should be >= 1" $ (not . null . bsBars) series |
||||
Nothing -> assertFailure "Bar Series should be present" |
||||
where |
||||
bar = |
||||
Bar { |
||||
barSecurity="FOO", barTimestamp=UTCTime (fromGregorian 2021 11 20) 7200, barOpen=10, barHigh=12, barLow=9, barClose=11, barVolume=100 |
||||
} |
||||
@ -0,0 +1,27 @@
@@ -0,0 +1,27 @@
|
||||
|
||||
module Test.Mock.HistoryProvider |
||||
( |
||||
MockHistoryProvider, |
||||
mkMockHistoryProvider, |
||||
mockGetHistory |
||||
) where |
||||
|
||||
import ATrade.Quotes.HistoryProvider |
||||
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), Bars) |
||||
import ATrade.Types (Bar (Bar, barTimestamp), |
||||
BarTimeframe (BarTimeframe), |
||||
TickerId) |
||||
import Control.Monad.IO.Class (MonadIO) |
||||
import qualified Data.Map.Strict as M |
||||
import Data.Time (UTCTime) |
||||
|
||||
data MockHistoryProvider = MockHistoryProvider (M.Map BarSeriesId [Bar]) |
||||
|
||||
mkMockHistoryProvider :: M.Map BarSeriesId [Bar] -> MockHistoryProvider |
||||
mkMockHistoryProvider = MockHistoryProvider |
||||
|
||||
mockGetHistory :: (MonadIO m) => MockHistoryProvider -> TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] |
||||
mockGetHistory (MockHistoryProvider bars) tid tf from to = |
||||
case M.lookup (BarSeriesId tid tf) bars of |
||||
Just series -> return $ filter (\bar -> (barTimestamp bar >= from) && (barTimestamp bar <= to)) series |
||||
Nothing -> return [] |
||||
@ -0,0 +1,22 @@
@@ -0,0 +1,22 @@
|
||||
|
||||
module Test.Mock.TickerInfoProvider |
||||
( |
||||
MockTickerInfoProvider, |
||||
mkMockTickerInfoProvider, |
||||
mockGetInstrumentParameters |
||||
) where |
||||
|
||||
import ATrade.Quotes.TickerInfoProvider |
||||
import ATrade.RoboCom.Types (InstrumentParameters) |
||||
import ATrade.Types (TickerId) |
||||
import Control.Monad.IO.Class (MonadIO) |
||||
import qualified Data.Map.Strict as M |
||||
import Data.Maybe (catMaybes, mapMaybe) |
||||
|
||||
data MockTickerInfoProvider = MockTickerInfoProvider (M.Map TickerId InstrumentParameters) |
||||
|
||||
mkMockTickerInfoProvider :: (M.Map TickerId InstrumentParameters) -> MockTickerInfoProvider |
||||
mkMockTickerInfoProvider = MockTickerInfoProvider |
||||
|
||||
mockGetInstrumentParameters :: MockTickerInfoProvider -> [TickerId] -> IO [InstrumentParameters] |
||||
mockGetInstrumentParameters (MockTickerInfoProvider params) = return . mapMaybe (`M.lookup` params) |
||||
Loading…
Reference in new issue