28 changed files with 1290 additions and 672 deletions
@ -1,58 +1,338 @@
@@ -1,58 +1,338 @@
|
||||
{-# LANGUAGE DeriveGeneric #-} |
||||
{-# LANGUAGE DuplicateRecordFields #-} |
||||
{-# LANGUAGE FlexibleContexts #-} |
||||
{-# LANGUAGE FlexibleInstances #-} |
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
||||
{-# LANGUAGE MultiParamTypeClasses #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
|
||||
module ATrade.Driver.Junction |
||||
( |
||||
junctionMain |
||||
) where |
||||
|
||||
import ATrade.Driver.Junction.Types (StrategyDescriptor (..), |
||||
StrategyInstance (..), |
||||
StrategyInstanceDescriptor (..)) |
||||
import Data.Aeson (decode) |
||||
import qualified Data.ByteString as B |
||||
import ATrade.Broker.Client (BrokerClientHandle, |
||||
startBrokerClient, |
||||
stopBrokerClient) |
||||
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification), |
||||
NotificationSqnum (unNotificationSqnum), |
||||
getNotificationSqnum) |
||||
import ATrade.Driver.Junction.BrokerService (BrokerService, |
||||
getNotifications, |
||||
mkBrokerService) |
||||
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..), |
||||
ProgramOptions (ProgramOptions, configPath)) |
||||
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription, removeSubscription), |
||||
QuoteSubscription (QuoteSubscription), |
||||
SubscriptionId (SubscriptionId)) |
||||
import ATrade.Driver.Junction.QuoteThread (DownloaderEnv (DownloaderEnv), |
||||
QuoteThreadHandle, |
||||
withQThread) |
||||
import qualified ATrade.Driver.Junction.QuoteThread as QT |
||||
import ATrade.Driver.Junction.RobotDriverThread (RobotDriverHandle, RobotEnv (RobotEnv), |
||||
RobotM (..), |
||||
createRobotDriverThread, |
||||
onStrategyInstance, |
||||
postNotificationEvent) |
||||
import ATrade.Driver.Junction.Types (StrategyDescriptorE (StrategyDescriptorE), |
||||
StrategyInstance (strategyInstanceId), |
||||
StrategyInstanceDescriptor (..), |
||||
confStrategy, |
||||
confTickers, |
||||
strategyState, |
||||
strategyTimers, |
||||
tickerId, |
||||
timeframe) |
||||
import ATrade.Logging (Message, Severity (Debug, Error, Info, Trace, Warning), |
||||
fmtMessage, |
||||
logWarning, |
||||
logWith) |
||||
import ATrade.Quotes.QHP (mkQHPHandle) |
||||
import ATrade.RoboCom.ConfigStorage (ConfigStorage (loadConfig)) |
||||
import ATrade.RoboCom.Monad (StrategyEnvironment (..)) |
||||
import ATrade.RoboCom.Persistence (MonadPersistence (loadState, saveState)) |
||||
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), |
||||
Bars) |
||||
import ATrade.Types (ClientSecurityParams (ClientSecurityParams), |
||||
OrderId, |
||||
Trade (tradeOrderId)) |
||||
import Colog (HasLog (getLogAction, setLogAction), |
||||
LogAction, |
||||
logTextStdout, |
||||
(>$<)) |
||||
import Colog.Actions (logTextHandle) |
||||
import Control.Concurrent (threadDelay) |
||||
import Control.Exception.Safe (MonadThrow, |
||||
bracket) |
||||
import Control.Monad (forM_, forever) |
||||
import Control.Monad.Extra (whenM) |
||||
import Control.Monad.IO.Class (MonadIO (liftIO)) |
||||
import Control.Monad.Reader (MonadReader, ReaderT (runReaderT), |
||||
asks) |
||||
import Data.Aeson (eitherDecode, |
||||
encode) |
||||
import qualified Data.ByteString.Lazy as BL |
||||
import Data.IORef |
||||
import Data.Default (Default (def)) |
||||
import Data.Foldable (traverse_) |
||||
import Data.IORef (IORef, |
||||
atomicModifyIORef', |
||||
newIORef, |
||||
readIORef) |
||||
import Data.List.NonEmpty (NonEmpty ((:|))) |
||||
import qualified Data.Map.Strict as M |
||||
import Data.Set (notMember) |
||||
import qualified Data.Set as S |
||||
import qualified Data.Text as T |
||||
import Data.Text.Encoding (encodeUtf8) |
||||
import Data.Text.IO (readFile) |
||||
import Data.Time (getCurrentTime) |
||||
import Data.Time.Clock.POSIX (getPOSIXTime) |
||||
import Database.Redis (ConnectInfo (..), |
||||
Connection, |
||||
PortID (UnixSocket), |
||||
checkedConnect, |
||||
defaultConnectInfo, |
||||
get, mset, |
||||
runRedis) |
||||
import Dhall (auto, input) |
||||
import Options.Applicative (Parser, |
||||
execParser, |
||||
fullDesc, header, |
||||
help, helper, |
||||
info, long, |
||||
metavar, progDesc, |
||||
short, strOption, |
||||
(<**>)) |
||||
import Prelude hiding (log, |
||||
readFile) |
||||
import System.IO (BufferMode (LineBuffering), |
||||
Handle, |
||||
IOMode (AppendMode), |
||||
hSetBuffering, |
||||
openFile, |
||||
withFile) |
||||
import System.ZMQ4 (withContext) |
||||
import System.ZMQ4.ZAP (loadCertificateFromFile) |
||||
|
||||
load :: T.Text -> IO B.ByteString |
||||
load = undefined |
||||
data JunctionEnv = |
||||
JunctionEnv |
||||
{ |
||||
peRedisSocket :: Connection, |
||||
peConfigPath :: FilePath, |
||||
peQuoteThread :: QuoteThreadHandle, |
||||
peBroker :: BrokerClientHandle, |
||||
peRobots :: IORef (M.Map T.Text RobotDriverHandle), |
||||
peLogAction :: LogAction JunctionM Message |
||||
} |
||||
|
||||
newtype JunctionM a = JunctionM { unJunctionM :: ReaderT JunctionEnv IO a } |
||||
deriving (Functor, Applicative, Monad, MonadReader JunctionEnv, MonadIO, MonadThrow) |
||||
|
||||
instance HasLog JunctionEnv Message JunctionM where |
||||
getLogAction = peLogAction |
||||
setLogAction a e = e { peLogAction = a } |
||||
|
||||
instance ConfigStorage JunctionM where |
||||
loadConfig key = do |
||||
basePath <- asks peConfigPath |
||||
let path = basePath <> "/" <> T.unpack key -- TODO fix path construction |
||||
liftIO $ readFile path >>= input auto |
||||
|
||||
instance MonadPersistence JunctionM where |
||||
saveState newState key = do |
||||
conn <- asks peRedisSocket |
||||
now <- liftIO getPOSIXTime |
||||
res <- liftIO $ runRedis conn $ mset [(encodeUtf8 key, BL.toStrict $ encode newState), |
||||
(encodeUtf8 (key <> ":last_store") , encodeUtf8 . T.pack . show $ now)] |
||||
case res of |
||||
Left _ -> logWarning "Junction " "Unable to save state" |
||||
Right _ -> return () |
||||
|
||||
loadState key = do |
||||
conn <- asks peRedisSocket |
||||
res <- liftIO $ runRedis conn $ get (encodeUtf8 key) |
||||
-- TODO: just chain eithers |
||||
case res of |
||||
Left _ -> do |
||||
logWarning "Junction" "Unable to load state" |
||||
return def |
||||
Right maybeRawState -> |
||||
case maybeRawState of |
||||
Just rawState -> case eitherDecode $ BL.fromStrict rawState of |
||||
Left _ -> do |
||||
logWarning "Junction" "Unable to decode state" |
||||
return def |
||||
Right decodedState -> return decodedState |
||||
Nothing -> do |
||||
logWarning "Junction" "Unable to decode state" |
||||
return def |
||||
|
||||
junctionMain :: M.Map T.Text StrategyDescriptor -> IO () |
||||
instance QuoteStream JunctionM where |
||||
addSubscription (QuoteSubscription ticker timeframe) chan = do |
||||
qt <- asks peQuoteThread |
||||
QT.addSubscription qt ticker timeframe chan |
||||
return (SubscriptionId 0) -- TODO subscription Ids |
||||
removeSubscription _ = undefined |
||||
|
||||
logger :: (MonadIO m) => Handle -> LogAction m Message |
||||
logger h = fmtMessage >$< (logTextStdout <> logTextHandle h) |
||||
|
||||
junctionMain :: M.Map T.Text StrategyDescriptorE -> IO () |
||||
junctionMain descriptors = do |
||||
parseOptions |
||||
instanceDescriptors <- undefined |
||||
strategies <- mkStrategies instanceDescriptors |
||||
opts <- parseOptions |
||||
|
||||
let initialLogger = fmtMessage >$< logTextStdout |
||||
|
||||
logWith initialLogger Info "Junction" $ "Reading config from: " <> (T.pack . show) (configPath opts) |
||||
|
||||
cfg <- readFile (configPath opts) >>= input auto |
||||
|
||||
withFile (logBasePath cfg <> "/all.log") AppendMode $ \h -> do |
||||
|
||||
start strategies |
||||
let log = logWith (logger h) |
||||
|
||||
barsMap <- newIORef M.empty |
||||
|
||||
log Info "Junction" $ "Connecting to redis: " <> redisSocket cfg |
||||
redis <- checkedConnect (defaultConnectInfo { connectPort = UnixSocket (T.unpack $ redisSocket cfg) }) |
||||
log Info "Junction" "redis: connected" |
||||
withContext $ \ctx -> do |
||||
log Debug "Junction" "0mq context created" |
||||
let downloaderLogAction = logger h |
||||
let downloaderEnv = DownloaderEnv (mkQHPHandle ctx (qhpEndpoint cfg)) ctx (qtisEndpoint cfg) downloaderLogAction |
||||
robotsMap <- newIORef M.empty |
||||
ordersMap <- newIORef M.empty |
||||
handledNotifications <- newIORef S.empty |
||||
withBroker cfg ctx robotsMap ordersMap handledNotifications (logger h) $ \bro -> |
||||
withQThread downloaderEnv barsMap cfg ctx (logger h) $ \qt -> do |
||||
broService <- mkBrokerService bro ordersMap |
||||
let junctionLogAction = logger h |
||||
let env = |
||||
JunctionEnv |
||||
{ |
||||
peRedisSocket = redis, |
||||
peConfigPath = robotsConfigsPath cfg, |
||||
peQuoteThread = qt, |
||||
peBroker = bro, |
||||
peRobots = robotsMap, |
||||
peLogAction = junctionLogAction |
||||
} |
||||
withJunction env $ do |
||||
startRobots h cfg barsMap broService |
||||
forever $ do |
||||
notifications <- liftIO $ getNotifications broService |
||||
forM_ notifications (liftIO . handleBrokerNotification robotsMap ordersMap handledNotifications (logger h)) |
||||
saveRobots |
||||
liftIO $ threadDelay 1000000 |
||||
where |
||||
parseOptions = undefined |
||||
|
||||
mkStrategies :: [StrategyInstanceDescriptor] -> IO [StrategyInstance] |
||||
mkStrategies = mapM mkStrategy |
||||
|
||||
mkStrategy :: StrategyInstanceDescriptor -> IO StrategyInstance |
||||
mkStrategy desc = do |
||||
sState <- load (stateKey desc) |
||||
sCfg <- load (configKey desc) |
||||
case M.lookup (strategyId desc) descriptors of |
||||
Just (StrategyDescriptor _sName sCallback _sDefState) -> |
||||
case (decode $ BL.fromStrict sCfg, decode $ BL.fromStrict sState) of |
||||
(Just pCfg, Just pState) -> do |
||||
cfgRef <- newIORef pCfg |
||||
stateRef <- newIORef pState |
||||
return $ StrategyInstance |
||||
saveRobots :: JunctionM () |
||||
saveRobots = do |
||||
robotsMap <- asks peRobots >>= (liftIO . readIORef) |
||||
traverse_ saveRobotState robotsMap |
||||
|
||||
saveRobotState :: RobotDriverHandle -> JunctionM () |
||||
saveRobotState handle = onStrategyInstance handle $ \inst -> do |
||||
currentState <- liftIO $ readIORef (strategyState inst) |
||||
saveState currentState (strategyInstanceId inst) |
||||
currentTimers <- liftIO $ readIORef (strategyTimers inst) |
||||
saveState currentTimers (strategyInstanceId inst <> ":timers") |
||||
|
||||
startRobots :: Handle -> ProgramConfiguration -> IORef Bars -> BrokerService -> JunctionM () |
||||
startRobots logHandle cfg barsMap broService = forM_ (instances cfg) $ \inst -> do |
||||
now <- liftIO getCurrentTime |
||||
case M.lookup (strategyBaseName inst) descriptors of |
||||
Just (StrategyDescriptorE desc) -> do |
||||
bigConf <- loadConfig (configKey inst) |
||||
case confTickers bigConf of |
||||
(firstTicker:restTickers) -> do |
||||
rConf <- liftIO $ newIORef (confStrategy bigConf) |
||||
rState <- loadState (stateKey inst) >>= liftIO . newIORef |
||||
rTimers <- loadState (stateKey inst <> ":timers") >>= liftIO . newIORef |
||||
localH <- liftIO $ openFile (logBasePath cfg <> "/" <> T.unpack (strategyId inst) <> ".log") AppendMode |
||||
liftIO $ hSetBuffering localH LineBuffering |
||||
let robotLogAction = logger logHandle <> (fmtMessage >$< logTextHandle localH) |
||||
stratEnv <- liftIO $ newIORef StrategyEnvironment |
||||
{ |
||||
strategyInstanceId = strategyName desc, |
||||
strategyEventCallback = sCallback, |
||||
strategyState = stateRef, |
||||
strategyConfig = cfgRef |
||||
_seInstanceId = strategyId inst, |
||||
_seAccount = accountId inst, |
||||
_seVolume = 1, |
||||
_seLastTimestamp = now |
||||
} |
||||
_ -> undefined |
||||
_ -> undefined |
||||
let robotEnv = RobotEnv rState rConf rTimers barsMap stratEnv robotLogAction broService (toBarSeriesId <$> (firstTicker :| restTickers)) |
||||
robot <- createRobotDriverThread inst desc (flip runReaderT robotEnv . unRobotM) bigConf rConf rState rTimers |
||||
robotsMap' <- asks peRobots |
||||
liftIO $ atomicModifyIORef' robotsMap' (\s -> (M.insert (strategyId inst) robot s, ())) |
||||
_ -> logWith (logger logHandle) Error (strategyId inst) $ "No tickers configured !!!" |
||||
Nothing -> error "Unknown strategy" |
||||
|
||||
toBarSeriesId t = BarSeriesId (tickerId t) (timeframe t) |
||||
|
||||
withJunction :: JunctionEnv -> JunctionM () -> IO () |
||||
withJunction env = (`runReaderT` env) . unJunctionM |
||||
|
||||
handleBrokerNotification :: IORef (M.Map T.Text RobotDriverHandle) -> |
||||
IORef (M.Map OrderId T.Text) -> |
||||
IORef (S.Set NotificationSqnum) -> |
||||
LogAction IO Message -> |
||||
Notification -> |
||||
IO () |
||||
handleBrokerNotification robotsRef ordersMapRef handled logger notification= do |
||||
logWith logger Trace "Junction" $ "Incoming notification: " <> (T.pack . show . unNotificationSqnum . getNotificationSqnum) notification |
||||
whenM (notMember (getNotificationSqnum notification) <$> readIORef handled) $ do |
||||
robotsMap <- readIORef robotsRef |
||||
ordersMap <- readIORef ordersMapRef |
||||
|
||||
case getNotificationTarget robotsMap ordersMap notification of |
||||
Just robot -> postNotificationEvent robot notification |
||||
Nothing -> do |
||||
logWith logger Warning "Junction" $ "Unknown order: " <> (T.pack . show) (notificationOrderId notification) |
||||
logWith logger Debug "Junction" $ "Ordermap: " <> (T.pack . show) (M.toList ordersMap) |
||||
|
||||
atomicModifyIORef' handled (\s -> (S.insert (getNotificationSqnum notification) s, ())) |
||||
|
||||
getNotificationTarget :: M.Map T.Text RobotDriverHandle -> M.Map OrderId T.Text -> Notification -> Maybe RobotDriverHandle |
||||
getNotificationTarget robotsMap ordersMap notification = do |
||||
robotId <- M.lookup (notificationOrderId notification) ordersMap |
||||
M.lookup robotId robotsMap |
||||
|
||||
notificationOrderId (OrderNotification _ oid _) = oid |
||||
notificationOrderId (TradeNotification _ trade) = tradeOrderId trade |
||||
|
||||
withBroker cfg ctx robotsMap ordersMap handled logger f = do |
||||
securityParameters <- loadBrokerSecurityParameters cfg |
||||
bracket |
||||
(startBrokerClient |
||||
"broker" |
||||
ctx |
||||
(brokerEndpoint cfg) |
||||
(brokerNotificationEndpoint cfg) |
||||
[handleBrokerNotification robotsMap ordersMap handled logger] |
||||
securityParameters |
||||
logger) |
||||
stopBrokerClient f |
||||
|
||||
start = undefined |
||||
loadBrokerSecurityParameters cfg = |
||||
case (brokerClientCert cfg, brokerServerCert cfg) of |
||||
(Just clientCertPath, Just serverCertPath) -> do |
||||
eClientCert <- loadCertificateFromFile clientCertPath |
||||
eServerCert <- loadCertificateFromFile serverCertPath |
||||
case (eClientCert, eServerCert) of |
||||
(Right clientCert, Right serverCert) -> return $ ClientSecurityParams (Just clientCert) (Just serverCert) |
||||
(_, _) -> return $ ClientSecurityParams Nothing Nothing |
||||
|
||||
_ -> return $ ClientSecurityParams Nothing Nothing |
||||
|
||||
parseOptions = execParser options |
||||
options = info (optionsParser <**> helper) |
||||
(fullDesc <> |
||||
progDesc "Robocom-zero junction mode driver" <> |
||||
header "robocom-zero-junction") |
||||
|
||||
optionsParser :: Parser ProgramOptions |
||||
optionsParser = ProgramOptions |
||||
<$> strOption |
||||
(long "config" <> |
||||
short 'c' <> |
||||
metavar "FILENAME" <> |
||||
help "Configuration file path") |
||||
|
||||
|
||||
@ -0,0 +1,56 @@
@@ -0,0 +1,56 @@
|
||||
{-# LANGUAGE FlexibleContexts #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
|
||||
module ATrade.Driver.Junction.BrokerService |
||||
( |
||||
BrokerService, |
||||
mkBrokerService, |
||||
submitOrder, |
||||
cancelOrder, |
||||
getNotifications |
||||
) where |
||||
|
||||
import qualified ATrade.Broker.Client as Bro |
||||
import ATrade.Broker.Protocol (Notification (..)) |
||||
import ATrade.Logging (Message, logDebug) |
||||
import ATrade.Types (Order (..), OrderId) |
||||
import Colog (WithLog) |
||||
import Control.Monad.IO.Class (MonadIO (liftIO)) |
||||
import Control.Monad.Reader.Class (MonadReader) |
||||
import Data.IORef (IORef, atomicModifyIORef', |
||||
newIORef) |
||||
import qualified Data.Map.Strict as M |
||||
import qualified Data.Text as T |
||||
|
||||
data BrokerService = |
||||
BrokerService |
||||
{ |
||||
broker :: Bro.BrokerClientHandle, |
||||
orderMap :: IORef (M.Map OrderId T.Text), |
||||
orderIdCounter :: IORef OrderId |
||||
} |
||||
|
||||
mkBrokerService :: Bro.BrokerClientHandle -> IORef (M.Map OrderId T.Text) -> IO BrokerService |
||||
mkBrokerService h om = BrokerService h om <$> newIORef 1 |
||||
|
||||
submitOrder :: (MonadIO m, WithLog env Message m, MonadReader env m) => BrokerService -> T.Text -> Order -> m OrderId |
||||
submitOrder service identity order = do |
||||
oid <- nextOrderId service |
||||
logDebug "BrokerService" $ "New order, id: " <> (T.pack . show) oid |
||||
liftIO $ atomicModifyIORef' (orderMap service) (\s -> (M.insert oid identity s, ())) |
||||
_ <- liftIO $ Bro.submitOrder (broker service) order { orderId = oid } |
||||
return oid |
||||
where |
||||
nextOrderId srv = liftIO $ atomicModifyIORef' (orderIdCounter srv) (\s -> (s + 1, s)) |
||||
|
||||
cancelOrder :: BrokerService -> OrderId -> IO () |
||||
cancelOrder service oid = do |
||||
_ <- Bro.cancelOrder (broker service) oid |
||||
return () |
||||
|
||||
getNotifications :: BrokerService -> IO [Notification] |
||||
getNotifications service = do |
||||
v <- Bro.getNotifications (broker service) |
||||
case v of |
||||
Left _ -> return [] |
||||
Right n -> return n |
||||
@ -0,0 +1,37 @@
@@ -0,0 +1,37 @@
|
||||
{-# LANGUAGE DeriveGeneric #-} |
||||
|
||||
module ATrade.Driver.Junction.ProgramConfiguration |
||||
( |
||||
ProgramOptions(..), |
||||
ProgramConfiguration(..) |
||||
) where |
||||
import ATrade.Driver.Junction.Types (StrategyInstanceDescriptor) |
||||
import qualified Data.Text as T |
||||
import Dhall (FromDhall) |
||||
import GHC.Generics (Generic) |
||||
|
||||
newtype ProgramOptions = |
||||
ProgramOptions |
||||
{ |
||||
configPath :: FilePath |
||||
} |
||||
|
||||
data ProgramConfiguration = |
||||
ProgramConfiguration |
||||
{ |
||||
brokerEndpoint :: T.Text, |
||||
brokerNotificationEndpoint :: T.Text, |
||||
brokerServerCert :: Maybe FilePath, |
||||
brokerClientCert :: Maybe FilePath, |
||||
quotesourceEndpoint :: T.Text, |
||||
quotesourceServerCert :: Maybe FilePath, |
||||
quotesourceClientCert :: Maybe FilePath, |
||||
qhpEndpoint :: T.Text, |
||||
qtisEndpoint :: T.Text, |
||||
redisSocket :: T.Text, |
||||
robotsConfigsPath :: FilePath, |
||||
logBasePath :: FilePath, |
||||
instances :: [StrategyInstanceDescriptor] |
||||
} deriving (Generic, Show) |
||||
|
||||
instance FromDhall ProgramConfiguration |
||||
@ -0,0 +1,27 @@
@@ -0,0 +1,27 @@
|
||||
{-# LANGUAGE DeriveGeneric #-} |
||||
|
||||
module ATrade.Driver.Junction.QuoteStream |
||||
( |
||||
QuoteSubscription(..), |
||||
QuoteStream(..), |
||||
SubscriptionId(..) |
||||
) where |
||||
|
||||
import ATrade.QuoteSource.Client (QuoteData) |
||||
import ATrade.Types (BarTimeframe, TickerId) |
||||
import Control.Concurrent.BoundedChan (BoundedChan) |
||||
import Data.Hashable (Hashable) |
||||
import GHC.Generics (Generic) |
||||
|
||||
data QuoteSubscription = |
||||
QuoteSubscription TickerId BarTimeframe |
||||
deriving (Generic, Eq) |
||||
|
||||
instance Hashable BarTimeframe |
||||
instance Hashable QuoteSubscription |
||||
|
||||
newtype SubscriptionId = SubscriptionId { unSubscriptionId :: Int } |
||||
|
||||
class (Monad m) => QuoteStream m where |
||||
addSubscription :: QuoteSubscription -> BoundedChan QuoteData -> m SubscriptionId |
||||
removeSubscription :: SubscriptionId -> m () |
||||
@ -0,0 +1,228 @@
@@ -0,0 +1,228 @@
|
||||
{-# LANGUAGE DeriveGeneric #-} |
||||
{-# LANGUAGE FlexibleContexts #-} |
||||
{-# LANGUAGE FlexibleInstances #-} |
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
||||
{-# LANGUAGE LambdaCase #-} |
||||
{-# LANGUAGE MultiParamTypeClasses #-} |
||||
{-# LANGUAGE ScopedTypeVariables #-} |
||||
{-# LANGUAGE TypeSynonymInstances #-} |
||||
|
||||
module ATrade.Driver.Junction.QuoteThread |
||||
( |
||||
QuoteThreadHandle, |
||||
startQuoteThread, |
||||
stopQuoteThread, |
||||
addSubscription, |
||||
DownloaderM, |
||||
DownloaderEnv(..), |
||||
runDownloaderM, |
||||
withQThread |
||||
) where |
||||
|
||||
import ATrade.Driver.Junction.ProgramConfiguration (ProgramConfiguration (..)) |
||||
import ATrade.Driver.Junction.QuoteStream (QuoteSubscription (..)) |
||||
import ATrade.Logging (Message) |
||||
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) |
||||
import ATrade.Quotes.QHP (QHPHandle, requestHistoryFromQHP) |
||||
import ATrade.Quotes.QTIS (TickerInfo (tiLotSize, tiTickSize, tiTicker), |
||||
qtisGetTickersInfo) |
||||
import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) |
||||
import ATrade.QuoteSource.Client (QuoteData (QDBar, QDTick), |
||||
QuoteSourceClientHandle, |
||||
quoteSourceClientSubscribe, |
||||
startQuoteSourceClient, |
||||
stopQuoteSourceClient) |
||||
import ATrade.RoboCom.Types (Bar (barSecurity), |
||||
BarSeries (..), |
||||
BarSeriesId (BarSeriesId), |
||||
Bars, |
||||
InstrumentParameters (InstrumentParameters)) |
||||
import ATrade.Types (BarTimeframe (BarTimeframe), |
||||
ClientSecurityParams (ClientSecurityParams), |
||||
Tick (security), |
||||
TickerId) |
||||
import Colog (HasLog (getLogAction, setLogAction), |
||||
LogAction, |
||||
WithLog) |
||||
import Control.Concurrent (ThreadId, forkIO, |
||||
killThread) |
||||
import Control.Concurrent.BoundedChan (BoundedChan, |
||||
newBoundedChan, |
||||
readChan, |
||||
writeChan) |
||||
import Control.Exception.Safe (MonadMask, |
||||
MonadThrow, |
||||
bracket) |
||||
import Control.Monad (forM, forever) |
||||
import Control.Monad.Reader (MonadIO (liftIO), ReaderT (runReaderT), |
||||
lift) |
||||
import Control.Monad.Reader.Class (MonadReader, asks) |
||||
import qualified Data.HashMap.Strict as HM |
||||
import Data.IORef (IORef, |
||||
atomicModifyIORef', |
||||
newIORef, |
||||
readIORef) |
||||
import qualified Data.Map.Strict as M |
||||
import qualified Data.Text as T |
||||
import Data.Time (addUTCTime, |
||||
getCurrentTime) |
||||
import System.ZMQ4 (Context) |
||||
import System.ZMQ4.ZAP (loadCertificateFromFile) |
||||
|
||||
|
||||
data QuoteThreadHandle = QuoteThreadHandle ThreadId ThreadId QuoteThreadEnv |
||||
|
||||
data QuoteThreadEnv = |
||||
QuoteThreadEnv |
||||
{ |
||||
bars :: IORef Bars, |
||||
endpoints :: IORef (HM.HashMap QuoteSubscription [BoundedChan QuoteData]), |
||||
qsclient :: QuoteSourceClientHandle, |
||||
paramsCache :: IORef (M.Map TickerId InstrumentParameters), |
||||
downloaderChan :: BoundedChan QuoteSubscription |
||||
} |
||||
|
||||
startQuoteThread :: (MonadIO m, |
||||
MonadIO m1, |
||||
WithLog DownloaderEnv Message m1, |
||||
HistoryProvider m1, |
||||
TickerInfoProvider m1) => |
||||
IORef Bars -> |
||||
Context -> |
||||
T.Text -> |
||||
ClientSecurityParams -> |
||||
(m1 () -> IO ()) -> |
||||
LogAction IO Message -> |
||||
m QuoteThreadHandle |
||||
startQuoteThread barsRef ctx ep secparams downloadThreadRunner logger = do |
||||
chan <- liftIO $ newBoundedChan 2000 |
||||
dChan <- liftIO $ newBoundedChan 2000 |
||||
qsc <- liftIO $ startQuoteSourceClient chan [] ctx ep secparams logger |
||||
env <- liftIO $ QuoteThreadEnv barsRef <$> newIORef HM.empty <*> pure qsc <*> newIORef M.empty <*> pure dChan |
||||
tid <- liftIO . forkIO $ quoteThread env chan |
||||
downloaderTid <- liftIO . forkIO $ downloadThreadRunner (downloaderThread env dChan) |
||||
return $ QuoteThreadHandle tid downloaderTid env |
||||
where |
||||
downloaderThread env chan = forever $ do |
||||
QuoteSubscription tickerid tf <- liftIO $ readChan chan |
||||
paramsMap <- liftIO $ readIORef $ paramsCache env |
||||
mbParams <- case M.lookup tickerid paramsMap of |
||||
Nothing -> do |
||||
paramsList <- getInstrumentParameters [tickerid] |
||||
case paramsList of |
||||
(params:_) -> liftIO $ atomicModifyIORef' (paramsCache env) (\m -> (M.insert tickerid params m, Just params)) |
||||
_ -> return Nothing |
||||
Just params -> return $ Just params |
||||
barsMap <- liftIO $ readIORef (bars env) |
||||
case M.lookup (BarSeriesId tickerid tf) barsMap of |
||||
Just _ -> return () -- already downloaded |
||||
Nothing -> case mbParams of |
||||
Just params -> do |
||||
now <- liftIO getCurrentTime |
||||
barsData <- getHistory tickerid tf ((-86400 * 60) `addUTCTime` now) now |
||||
let barSeries = BarSeries tickerid tf barsData params |
||||
liftIO $ atomicModifyIORef' (bars env) (\m -> (M.insert (BarSeriesId tickerid tf) barSeries m, ())) |
||||
_ -> return () -- TODO log |
||||
|
||||
|
||||
quoteThread env chan = flip runReaderT env $ forever $ do |
||||
qssData <- lift $ readChan chan |
||||
case qssData of |
||||
QDBar (tf, bar) -> do |
||||
barsRef' <- asks bars |
||||
lift $ atomicModifyIORef' barsRef' (\x -> (updateBarsMap x bar tf, ())) |
||||
_ -> return () -- TODO pass to bar aggregator |
||||
let key = case qssData of |
||||
QDTick tick -> QuoteSubscription (security tick) (BarTimeframe 0) |
||||
QDBar (tf, bar) -> QuoteSubscription (barSecurity bar) tf |
||||
subs <- asks endpoints >>= (lift . readIORef) |
||||
case HM.lookup key subs of |
||||
Just clientChannels -> do |
||||
lift $ mapM_ (`writeChan` qssData) clientChannels |
||||
Nothing -> return () |
||||
|
||||
stopQuoteThread :: (MonadIO m) => QuoteThreadHandle -> m () |
||||
stopQuoteThread (QuoteThreadHandle tid dtid env) = liftIO $ do |
||||
killThread tid |
||||
killThread dtid |
||||
stopQuoteSourceClient (qsclient env) |
||||
|
||||
addSubscription :: (MonadIO m) => QuoteThreadHandle -> TickerId -> BarTimeframe -> BoundedChan QuoteData -> m () |
||||
addSubscription (QuoteThreadHandle _ _ env) tid tf chan = liftIO $ do |
||||
writeChan (downloaderChan env) (QuoteSubscription tid tf) |
||||
atomicModifyIORef' (endpoints env) (\m -> (doAddSubscription m tid, ())) |
||||
quoteSourceClientSubscribe (qsclient env) [(tid, BarTimeframe 0)] |
||||
where |
||||
doAddSubscription m tickerid = |
||||
let m1 = HM.alter (\case |
||||
Just chans -> Just (chan : chans) |
||||
_ -> Just [chan]) (QuoteSubscription tickerid tf) m in |
||||
HM.alter (\case |
||||
Just chans -> Just (chan : chans) |
||||
_ -> Just [chan]) (QuoteSubscription tickerid (BarTimeframe 0)) m1 |
||||
|
||||
updateBarsMap :: Bars -> Bar -> BarTimeframe -> Bars |
||||
updateBarsMap barsMap bar tf = M.adjust (addToSeries bar) (BarSeriesId (barSecurity bar) tf) barsMap |
||||
|
||||
addToSeries :: Bar -> BarSeries -> BarSeries |
||||
addToSeries bar series = series { bsBars = bar : bsBars series } |
||||
|
||||
data DownloaderEnv = |
||||
DownloaderEnv |
||||
{ |
||||
qhp :: QHPHandle, |
||||
downloaderContext :: Context, |
||||
downloaderQtisEndpoint :: T.Text, |
||||
logAction :: LogAction DownloaderM Message |
||||
} |
||||
|
||||
newtype DownloaderM a = DownloaderM { unDownloaderM :: ReaderT DownloaderEnv IO a } |
||||
deriving (Functor, Applicative, Monad, MonadReader DownloaderEnv, MonadIO, MonadThrow) |
||||
|
||||
instance HasLog DownloaderEnv Message DownloaderM where |
||||
getLogAction = logAction |
||||
setLogAction a e = e { logAction = a } |
||||
|
||||
instance HistoryProvider DownloaderM where |
||||
getHistory tid tf from to = do |
||||
q <- asks qhp |
||||
requestHistoryFromQHP q tid tf from to |
||||
|
||||
instance TickerInfoProvider DownloaderM where |
||||
getInstrumentParameters tickers = do |
||||
ctx <- asks downloaderContext |
||||
ep <- asks downloaderQtisEndpoint |
||||
tis <- forM tickers (qtisGetTickersInfo ctx ep) |
||||
pure $ convert `fmap` tis |
||||
where |
||||
convert ti = InstrumentParameters |
||||
(tiTicker ti) |
||||
(fromInteger $ tiLotSize ti) |
||||
(tiTickSize ti) |
||||
|
||||
withQThread :: DownloaderEnv -> IORef Bars -> ProgramConfiguration -> Context -> LogAction IO Message -> (QuoteThreadHandle -> IO ()) -> IO () |
||||
withQThread env barsMap cfg ctx logger f = do |
||||
securityParameters <- loadSecurityParameters |
||||
bracket |
||||
(startQuoteThread |
||||
barsMap |
||||
ctx |
||||
(quotesourceEndpoint cfg) |
||||
securityParameters |
||||
(runDownloaderM env) |
||||
logger) |
||||
stopQuoteThread f |
||||
where |
||||
loadSecurityParameters = |
||||
case (quotesourceClientCert cfg, quotesourceServerCert cfg) of |
||||
(Just clientCertPath, Just serverCertPath) -> do |
||||
eClientCert <- loadCertificateFromFile clientCertPath |
||||
eServerCert <- loadCertificateFromFile serverCertPath |
||||
case (eClientCert, eServerCert) of |
||||
(Right clientCert, Right serverCert) -> return $ ClientSecurityParams (Just clientCert) (Just serverCert) |
||||
(_, _) -> return $ ClientSecurityParams Nothing Nothing |
||||
|
||||
_ -> return $ ClientSecurityParams Nothing Nothing |
||||
|
||||
runDownloaderM :: DownloaderEnv -> DownloaderM () -> IO () |
||||
runDownloaderM env = (`runReaderT` env) . unDownloaderM |
||||
@ -0,0 +1,194 @@
@@ -0,0 +1,194 @@
|
||||
{-# LANGUAGE ExistentialQuantification #-} |
||||
{-# LANGUAGE FlexibleContexts #-} |
||||
{-# LANGUAGE FlexibleInstances #-} |
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
||||
{-# LANGUAGE MultiParamTypeClasses #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
{-# LANGUAGE RankNTypes #-} |
||||
|
||||
module ATrade.Driver.Junction.RobotDriverThread |
||||
( |
||||
createRobotDriverThread, |
||||
RobotEnv(..), |
||||
RobotM(..), |
||||
RobotDriverHandle, |
||||
onStrategyInstance, |
||||
postNotificationEvent) where |
||||
|
||||
import ATrade.Broker.Protocol (Notification (OrderNotification, TradeNotification)) |
||||
import qualified ATrade.Driver.Junction.BrokerService as Bro |
||||
import ATrade.Driver.Junction.QuoteStream (QuoteStream (addSubscription), |
||||
QuoteSubscription (QuoteSubscription)) |
||||
import ATrade.Driver.Junction.Types (BigConfig, |
||||
StrategyDescriptor, |
||||
StrategyInstance (StrategyInstance, strategyEventCallback), |
||||
StrategyInstanceDescriptor (configKey), |
||||
confStrategy, |
||||
confTickers, |
||||
eventCallback, stateKey, |
||||
strategyId, tickerId, |
||||
timeframe) |
||||
import ATrade.Logging (Message, log, logDebug, |
||||
logInfo, logWarning) |
||||
import ATrade.QuoteSource.Client (QuoteData (..)) |
||||
import ATrade.RoboCom.ConfigStorage (ConfigStorage) |
||||
import ATrade.RoboCom.Monad (Event (NewBar, NewTick, NewTrade, OrderSubmitted, OrderUpdate), |
||||
MonadRobot (..), |
||||
StrategyEnvironment (StrategyEnvironment, _seInstanceId, _seLastTimestamp)) |
||||
import ATrade.RoboCom.Persistence (MonadPersistence) |
||||
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), |
||||
Bars) |
||||
import ATrade.Types (Order (orderId), OrderId, |
||||
OrderState, Trade) |
||||
import Colog (HasLog (getLogAction, setLogAction), |
||||
LogAction) |
||||
import Control.Concurrent (ThreadId, forkIO) |
||||
import Control.Concurrent.BoundedChan (BoundedChan, |
||||
newBoundedChan, readChan, |
||||
writeChan) |
||||
import Control.Exception.Safe (MonadThrow) |
||||
import Control.Monad (forM_, forever, void) |
||||
import Control.Monad.IO.Class (MonadIO, liftIO) |
||||
import Control.Monad.Reader (MonadReader (local), |
||||
ReaderT, asks) |
||||
import Data.Aeson (FromJSON, ToJSON) |
||||
import Data.Default |
||||
import Data.IORef (IORef, |
||||
atomicModifyIORef', |
||||
readIORef, writeIORef) |
||||
import Data.List.NonEmpty (NonEmpty) |
||||
import qualified Data.Map.Strict as M |
||||
import qualified Data.Text.Lazy as TL |
||||
import Data.Time (UTCTime, getCurrentTime) |
||||
import Dhall (FromDhall) |
||||
import Prelude hiding (log) |
||||
|
||||
data RobotDriverHandle = forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => |
||||
RobotDriverHandle (StrategyInstance c s) ThreadId ThreadId (BoundedChan RobotDriverEvent) |
||||
|
||||
data RobotDriverRequest |
||||
|
||||
data RobotDriverEvent = |
||||
EventRequest RobotDriverRequest |
||||
| QuoteEvent QuoteData |
||||
| NewTradeEvent Trade |
||||
| OrderEvent OrderId OrderState |
||||
|
||||
|
||||
robotDriverThread :: (MonadIO m, |
||||
MonadRobot m c s) => |
||||
StrategyInstance c s -> |
||||
BoundedChan RobotDriverEvent -> |
||||
m () |
||||
|
||||
robotDriverThread inst eventQueue = |
||||
forever $ liftIO (readChan eventQueue) >>= handleEvent |
||||
where |
||||
handleEvent (EventRequest _) = return () |
||||
handleEvent (QuoteEvent d) = |
||||
case d of |
||||
QDTick tick -> strategyEventCallback inst (NewTick tick) |
||||
QDBar (tf, bar) -> strategyEventCallback inst (NewBar (tf, bar)) |
||||
handleEvent (NewTradeEvent trade) = strategyEventCallback inst (NewTrade trade) |
||||
handleEvent (OrderEvent oid newState) = strategyEventCallback inst (OrderUpdate oid newState) |
||||
|
||||
createRobotDriverThread :: (MonadIO m1, |
||||
ConfigStorage m1, |
||||
MonadPersistence m1, |
||||
QuoteStream m1, |
||||
Default s, |
||||
FromJSON s, |
||||
ToJSON s, |
||||
FromDhall c, |
||||
MonadIO m, |
||||
MonadReader (RobotEnv c s) m, |
||||
MonadRobot m c s) => |
||||
StrategyInstanceDescriptor |
||||
-> StrategyDescriptor c s |
||||
-> (m () -> IO ()) |
||||
-> BigConfig c |
||||
-> IORef c |
||||
-> IORef s |
||||
-> IORef [UTCTime] |
||||
-> m1 RobotDriverHandle |
||||
|
||||
createRobotDriverThread instDesc strDesc runner bigConf rConf rState rTimers = do |
||||
eventQueue <- liftIO $ newBoundedChan 2000 |
||||
|
||||
let inst = StrategyInstance (strategyId instDesc) (eventCallback strDesc) rState rConf rTimers |
||||
|
||||
quoteQueue <- liftIO $ newBoundedChan 2000 |
||||
forM_ (confTickers bigConf) (\x -> addSubscription (QuoteSubscription (tickerId x) (timeframe x)) quoteQueue) |
||||
qthread <- liftIO . forkIO $ forever $ passQuoteEvents eventQueue quoteQueue |
||||
|
||||
driver <- liftIO . forkIO $ runner $ robotDriverThread inst eventQueue |
||||
return $ RobotDriverHandle inst driver qthread eventQueue |
||||
|
||||
where |
||||
passQuoteEvents eventQueue quoteQueue = do |
||||
v <- readChan quoteQueue |
||||
writeChan eventQueue (QuoteEvent v) |
||||
|
||||
onStrategyInstance :: RobotDriverHandle -> forall r. (forall c s. (FromDhall c, Default s, FromJSON s, ToJSON s) => StrategyInstance c s -> r) -> r |
||||
onStrategyInstance (RobotDriverHandle inst _ _ _) f = f inst |
||||
|
||||
data RobotEnv c s = |
||||
RobotEnv |
||||
{ |
||||
stateRef :: IORef s, |
||||
configRef :: IORef c, |
||||
timersRef :: IORef [UTCTime], |
||||
bars :: IORef Bars, |
||||
env :: IORef StrategyEnvironment, |
||||
logAction :: LogAction (RobotM c s) Message, |
||||
brokerService :: Bro.BrokerService, |
||||
tickers :: NonEmpty BarSeriesId |
||||
} |
||||
|
||||
newtype RobotM c s a = RobotM { unRobotM :: ReaderT (RobotEnv c s) IO a } |
||||
deriving (Functor, Applicative, Monad, MonadReader (RobotEnv c s), MonadIO, MonadThrow) |
||||
|
||||
instance HasLog (RobotEnv c s) Message (RobotM c s) where |
||||
getLogAction = logAction |
||||
setLogAction a e = e { logAction = a } |
||||
|
||||
instance MonadRobot (RobotM c s) c s where |
||||
submitOrder order = do |
||||
instId <- _seInstanceId <$> (asks env >>= liftIO . readIORef) |
||||
bro <- asks brokerService |
||||
Bro.submitOrder bro instId order |
||||
|
||||
cancelOrder oid = do |
||||
bro <- asks brokerService |
||||
liftIO . void $ Bro.cancelOrder bro oid |
||||
|
||||
appendToLog s t = do |
||||
instId <- _seInstanceId <$> (asks env >>= liftIO . readIORef) |
||||
log s instId $ TL.toStrict t |
||||
|
||||
setupTimer t = do |
||||
ref <- asks timersRef |
||||
liftIO $ atomicModifyIORef' ref (\s -> (t : s, ())) |
||||
|
||||
enqueueIOAction = undefined |
||||
getConfig = asks configRef >>= liftIO . readIORef |
||||
getState = asks stateRef >>= liftIO . readIORef |
||||
setState newState = asks stateRef >>= liftIO . flip writeIORef newState |
||||
getEnvironment = do |
||||
ref <- asks env |
||||
now <- liftIO getCurrentTime |
||||
liftIO $ atomicModifyIORef' ref (\e -> (e { _seLastTimestamp = now }, e { _seLastTimestamp = now})) |
||||
|
||||
getTicker tid tf = do |
||||
b <- asks bars >>= liftIO . readIORef |
||||
return $ M.lookup (BarSeriesId tid tf) b |
||||
|
||||
getAvailableTickers = asks tickers |
||||
|
||||
postNotificationEvent :: (MonadIO m) => RobotDriverHandle -> Notification -> m () |
||||
postNotificationEvent (RobotDriverHandle _ _ _ eventQueue) notification = liftIO $ |
||||
case notification of |
||||
OrderNotification _ oid state -> writeChan eventQueue (OrderEvent oid state) |
||||
TradeNotification _ trade -> writeChan eventQueue (NewTradeEvent trade) |
||||
|
||||
|
||||
@ -1,361 +0,0 @@
@@ -1,361 +0,0 @@
|
||||
{-# LANGUAGE FlexibleInstances #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
{-# LANGUAGE TypeSynonymInstances #-} |
||||
|
||||
module ATrade.Quotes.Finam ( |
||||
downloadFinamSymbols, |
||||
Symbol(..), |
||||
Period(..), |
||||
DateFormat(..), |
||||
TimeFormat(..), |
||||
FieldSeparator(..), |
||||
RequestParams(..), |
||||
defaultParams, |
||||
downloadQuotes, |
||||
parseQuotes, |
||||
downloadAndParseQuotes, |
||||
Row(..) |
||||
) where |
||||
|
||||
import ATrade.Types |
||||
import Control.Error.Util |
||||
import Control.Exception |
||||
import Control.Lens |
||||
import Control.Monad |
||||
import qualified Data.ByteString as B |
||||
import qualified Data.ByteString.Char8 as B8 |
||||
import qualified Data.ByteString.Lazy as BL |
||||
import Data.Csv hiding (Options) |
||||
import Data.List |
||||
import qualified Data.Map as M |
||||
import Data.Maybe |
||||
import qualified Data.Text as T |
||||
import qualified Data.Text.ICU.Convert as TC |
||||
import Data.Time.Calendar |
||||
import Data.Time.Clock |
||||
import Data.Time.Format |
||||
import qualified Data.Vector as V |
||||
import Network.Wreq |
||||
import Safe |
||||
import System.Log.Logger |
||||
import Text.Parsec |
||||
import Text.ParserCombinators.Parsec.Number |
||||
|
||||
data Period = |
||||
PeriodTick | |
||||
Period1Min | |
||||
Period5Min | |
||||
Period10Min | |
||||
Period15Min | |
||||
Period30Min | |
||||
PeriodHour | |
||||
PeriodDay | |
||||
PeriodWeek | |
||||
PeriodMonth |
||||
deriving (Show, Eq) |
||||
|
||||
instance Enum Period where |
||||
fromEnum PeriodTick = 1 |
||||
fromEnum Period1Min = 2 |
||||
fromEnum Period5Min = 3 |
||||
fromEnum Period10Min = 4 |
||||
fromEnum Period15Min = 5 |
||||
fromEnum Period30Min = 6 |
||||
fromEnum PeriodHour = 7 |
||||
fromEnum PeriodDay = 8 |
||||
fromEnum PeriodWeek = 9 |
||||
fromEnum PeriodMonth = 10 |
||||
|
||||
toEnum 1 = PeriodTick |
||||
toEnum 2 = Period1Min |
||||
toEnum 3 = Period5Min |
||||
toEnum 4 = Period10Min |
||||
toEnum 5 = Period15Min |
||||
toEnum 6 = Period30Min |
||||
toEnum 7 = PeriodHour |
||||
toEnum 8 = PeriodDay |
||||
toEnum 9 = PeriodWeek |
||||
toEnum 10 = PeriodMonth |
||||
toEnum _ = PeriodDay |
||||
|
||||
data DateFormat = |
||||
FormatYYYYMMDD | |
||||
FormatYYMMDD | |
||||
FormatDDMMYY | |
||||
FormatDD_MM_YY | |
||||
FormatMM_DD_YY |
||||
deriving (Show, Eq) |
||||
|
||||
instance Enum DateFormat where |
||||
fromEnum FormatYYYYMMDD = 1 |
||||
fromEnum FormatYYMMDD = 2 |
||||
fromEnum FormatDDMMYY = 3 |
||||
fromEnum FormatDD_MM_YY = 4 |
||||
fromEnum FormatMM_DD_YY = 5 |
||||
|
||||
toEnum 1 = FormatYYYYMMDD |
||||
toEnum 2 = FormatYYMMDD |
||||
toEnum 3 = FormatDDMMYY |
||||
toEnum 4 = FormatDD_MM_YY |
||||
toEnum 5 = FormatMM_DD_YY |
||||
toEnum _ = FormatYYYYMMDD |
||||
|
||||
|
||||
data TimeFormat = |
||||
FormatHHMMSS | |
||||
FormatHHMM | |
||||
FormatHH_MM_SS | |
||||
FormatHH_MM |
||||
deriving (Show, Eq) |
||||
|
||||
instance Enum TimeFormat where |
||||
fromEnum FormatHHMMSS = 1 |
||||
fromEnum FormatHHMM = 2 |
||||
fromEnum FormatHH_MM_SS = 3 |
||||
fromEnum FormatHH_MM = 4 |
||||
|
||||
toEnum 1 = FormatHHMMSS |
||||
toEnum 2 = FormatHHMM |
||||
toEnum 3 = FormatHH_MM_SS |
||||
toEnum 4 = FormatHH_MM |
||||
toEnum _ = FormatHHMMSS |
||||
|
||||
data FieldSeparator = |
||||
SeparatorComma | |
||||
SeparatorPeriod | |
||||
SeparatorSemicolon | |
||||
SeparatorTab | |
||||
SeparatorSpace |
||||
deriving (Show, Eq) |
||||
|
||||
instance Enum FieldSeparator where |
||||
fromEnum SeparatorComma = 1 |
||||
fromEnum SeparatorPeriod = 2 |
||||
fromEnum SeparatorSemicolon = 3 |
||||
fromEnum SeparatorTab = 4 |
||||
fromEnum SeparatorSpace = 5 |
||||
|
||||
toEnum 1 = SeparatorComma |
||||
toEnum 2 = SeparatorPeriod |
||||
toEnum 3 = SeparatorSemicolon |
||||
toEnum 4 = SeparatorTab |
||||
toEnum 5 = SeparatorSpace |
||||
toEnum _ = SeparatorComma |
||||
|
||||
data RequestParams = RequestParams { |
||||
ticker :: T.Text, |
||||
startDate :: Day, |
||||
endDate :: Day, |
||||
period :: Period, |
||||
dateFormat :: DateFormat, |
||||
timeFormat :: TimeFormat, |
||||
fieldSeparator :: FieldSeparator, |
||||
includeHeader :: Bool, |
||||
fillEmpty :: Bool |
||||
} |
||||
|
||||
defaultParams :: RequestParams |
||||
defaultParams = RequestParams { |
||||
ticker = "", |
||||
startDate = fromGregorian 1970 1 1, |
||||
endDate = fromGregorian 1970 1 1, |
||||
period = PeriodDay, |
||||
dateFormat = FormatYYYYMMDD, |
||||
timeFormat = FormatHHMMSS, |
||||
fieldSeparator = SeparatorComma, |
||||
includeHeader = True, |
||||
fillEmpty = False |
||||
} |
||||
|
||||
data Symbol = Symbol { |
||||
symCode :: T.Text, |
||||
symName :: T.Text, |
||||
symId :: Integer, |
||||
symMarketCode :: Integer, |
||||
symMarketName :: T.Text |
||||
} |
||||
deriving (Show, Eq) |
||||
|
||||
data Row = Row { |
||||
rowTicker :: T.Text, |
||||
rowTime :: UTCTime, |
||||
rowOpen :: Price, |
||||
rowHigh :: Price, |
||||
rowLow :: Price, |
||||
rowClose :: Price, |
||||
rowVolume :: Integer |
||||
} deriving (Show, Eq) |
||||
|
||||
instance FromField Price where |
||||
parseField s = fromDouble <$> (parseField s :: Parser Double) |
||||
|
||||
instance FromRecord Row where |
||||
parseRecord v |
||||
| length v == 9 = do |
||||
tkr <- v .! 0 |
||||
date <- v .! 2 |
||||
time <- v .! 3 |
||||
dt <- addUTCTime (-3 * 3600) <$> (parseDt date time) |
||||
open <- v .! 4 |
||||
high <- v .! 5 |
||||
low <- v .! 6 |
||||
close <- v .! 7 |
||||
vol <- v .! 8 |
||||
return $ Row tkr dt open high low close vol |
||||
| otherwise = mzero |
||||
where |
||||
parseDt :: B.ByteString -> B.ByteString -> Parser UTCTime |
||||
parseDt d t = case parseTimeM True defaultTimeLocale "%Y%m%d %H%M%S" $ B8.unpack d ++ " " ++ B8.unpack t of |
||||
Just dt -> return dt |
||||
Nothing -> fail "Unable to parse date/time" |
||||
|
||||
downloadAndParseQuotes :: RequestParams -> IO (Maybe [Row]) |
||||
downloadAndParseQuotes requestParams = downloadAndParseQuotes' 3 |
||||
where |
||||
downloadAndParseQuotes' iter = do |
||||
raw <- downloadQuotes requestParams `catch` (\e -> do |
||||
debugM "History" $ "exception: " ++ show (e :: SomeException) |
||||
return Nothing) |
||||
case raw of |
||||
Just r -> return $ parseQuotes r |
||||
Nothing -> if iter <= 0 then return Nothing else downloadAndParseQuotes' (iter - 1) |
||||
|
||||
parseQuotes :: B.ByteString -> Maybe [Row] |
||||
parseQuotes csvData = case decode HasHeader $ BL.fromStrict csvData of |
||||
Left _ -> Nothing |
||||
Right d -> Just $ V.toList d |
||||
|
||||
downloadQuotes :: RequestParams -> IO (Maybe B.ByteString) |
||||
downloadQuotes requestParams = do |
||||
symbols <- downloadFinamSymbols |
||||
case requestUrl symbols requestParams of |
||||
Just (url, options') -> do |
||||
resp <- getWith options' url |
||||
return $ Just $ BL.toStrict $ resp ^. responseBody |
||||
Nothing -> return Nothing |
||||
|
||||
requestUrl :: [Symbol] -> RequestParams -> Maybe (String, Options) |
||||
requestUrl symbols requestParams = case getFinamCode symbols (ticker requestParams) of |
||||
Just (sym, market) -> Just ("http://export.finam.ru/export9.out", getOptions sym market) |
||||
Nothing -> Nothing |
||||
where |
||||
getOptions sym market = defaults & |
||||
param "market" .~ [T.pack . show $ market] & |
||||
param "f" .~ [ticker requestParams] & |
||||
param "e" .~ [".csv"] & |
||||
param "dtf" .~ [T.pack . show . fromEnum . dateFormat $ requestParams] & |
||||
param "tmf" .~ [T.pack . show . fromEnum . dateFormat $ requestParams] & |
||||
param "MSOR" .~ ["0"] & |
||||
param "mstime" .~ ["on"] & |
||||
param "mstimever" .~ ["1"] & |
||||
param "sep" .~ [T.pack . show . fromEnum . fieldSeparator $ requestParams] & |
||||
param "sep2" .~ ["1"] & |
||||
param "at" .~ [if includeHeader requestParams then "1" else "0"] & |
||||
param "fsp" .~ [if fillEmpty requestParams then "1" else "0"] & |
||||
param "p" .~ [T.pack . show . fromEnum $ period requestParams] & |
||||
param "em" .~ [T.pack . show $ sym ] & |
||||
param "df" .~ [T.pack . show $ dayFrom] & |
||||
param "mf" .~ [T.pack . show $ (monthFrom - 1)] & |
||||
param "yf" .~ [T.pack . show $ yearFrom] & |
||||
param "dt" .~ [T.pack . show $ dayTo] & |
||||
param "mt" .~ [T.pack . show $ (monthTo - 1)] & |
||||
param "yt" .~ [T.pack . show $ yearTo] & |
||||
param "code" .~ [ticker requestParams] & |
||||
param "datf" .~ if period requestParams == PeriodTick then ["11"] else ["1"] |
||||
(yearFrom, monthFrom, dayFrom) = toGregorian $ startDate requestParams |
||||
(yearTo, monthTo, dayTo) = toGregorian $ endDate requestParams |
||||
|
||||
getFinamCode :: [Symbol] -> T.Text -> Maybe (Integer, Integer) |
||||
getFinamCode symbols tickerCode = case find (\x -> symCode x == tickerCode && symMarketCode x `notElem` archives) symbols of |
||||
Just sym -> Just (symId sym, symMarketCode sym) |
||||
Nothing -> Nothing |
||||
|
||||
downloadFinamSymbols :: IO [Symbol] |
||||
downloadFinamSymbols = do |
||||
conv <- TC.open "cp1251" Nothing |
||||
result <- get "http://www.finam.ru/cache/icharts/icharts.js" |
||||
if result ^. responseStatus . statusCode == 200 |
||||
then return $ parseSymbols . T.lines $ TC.toUnicode conv $ BL.toStrict $ result ^. responseBody |
||||
else return [] |
||||
where |
||||
parseSymbols :: [T.Text] -> [Symbol] |
||||
parseSymbols strs = zipWith5 Symbol codes names ids marketCodes marketNames |
||||
where |
||||
getWithParser parser pos = fromMaybe [] $ do |
||||
s <- T.unpack <$> strs `atMay` pos |
||||
hush $ parse parser "" s |
||||
|
||||
ids :: [Integer] |
||||
ids = getWithParser intlist 0 |
||||
|
||||
names :: [T.Text] |
||||
names = T.pack <$> getWithParser strlist 1 |
||||
|
||||
codes :: [T.Text] |
||||
codes = T.pack <$> getWithParser strlist 2 |
||||
|
||||
marketCodes :: [Integer] |
||||
marketCodes = getWithParser intlist 3 |
||||
|
||||
marketNames :: [T.Text] |
||||
marketNames = fmap (\code -> fromMaybe "" $ M.lookup code codeToName) marketCodes |
||||
|
||||
intlist = do |
||||
_ <- string "var" |
||||
spaces |
||||
skipMany1 alphaNum |
||||
spaces |
||||
_ <- char '=' |
||||
spaces |
||||
_ <- char '[' |
||||
manyTill (do |
||||
i <- int |
||||
_ <- char ',' <|> char ']' |
||||
return i) (char '\'' <|> char ';') |
||||
|
||||
strlist = do |
||||
_ <- string "var" |
||||
spaces |
||||
skipMany1 alphaNum |
||||
spaces |
||||
_ <- char '=' |
||||
spaces |
||||
_ <- char '[' |
||||
(char '\'' >> manyTill ((char '\\' >> char '\'') <|> anyChar) (char '\'')) `sepBy` char ',' |
||||
|
||||
codeToName :: M.Map Integer T.Text |
||||
codeToName = M.fromList [ |
||||
(200, "МосБиржа топ"), |
||||
(1 , "МосБиржа акции"), |
||||
(14 , "МосБиржа фьючерсы"), |
||||
(41, "Курс рубля"), |
||||
(45, "МосБиржа валютный рынок"), |
||||
(2, "МосБиржа облигации"), |
||||
(12, "МосБиржа внесписочные облигации"), |
||||
(29, "МосБиржа пифы"), |
||||
(8, "Расписки"), |
||||
(6, "Мировые Индексы"), |
||||
(24, "Товары"), |
||||
(5, "Мировые валюты"), |
||||
(25, "Акции США(BATS)"), |
||||
(7, "Фьючерсы США"), |
||||
(27, "Отрасли экономики США"), |
||||
(26, "Гособлигации США"), |
||||
(28, "ETF"), |
||||
(30, "Индексы мировой экономики"), |
||||
(3, "РТС"), |
||||
(20, "RTS Board"), |
||||
(10, "РТС-GAZ"), |
||||
(17, "ФОРТС Архив"), |
||||
(31, "Сырье Архив"), |
||||
(38, "RTS Standard Архив"), |
||||
(16, "ММВБ Архив"), |
||||
(18, "РТС Архив"), |
||||
(9, "СПФБ Архив"), |
||||
(32, "РТС-BOARD Архив"), |
||||
(39, "Расписки Архив"), |
||||
(-1, "Отрасли") ] |
||||
|
||||
|
||||
archives :: [Integer] |
||||
archives = [3, 8, 16, 17, 18, 31, 32, 38, 39, 517] |
||||
@ -0,0 +1,12 @@
@@ -0,0 +1,12 @@
|
||||
|
||||
module ATrade.Quotes.HistoryProvider |
||||
( |
||||
HistoryProvider(..) |
||||
) where |
||||
|
||||
import ATrade.RoboCom.Types (Bar) |
||||
import ATrade.Types (BarTimeframe, TickerId) |
||||
import Data.Time (UTCTime) |
||||
|
||||
class (Monad m) => HistoryProvider m where |
||||
getHistory :: TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] |
||||
@ -0,0 +1,12 @@
@@ -0,0 +1,12 @@
|
||||
|
||||
module ATrade.Quotes.TickerInfoProvider |
||||
( |
||||
TickerInfoProvider(..) |
||||
) where |
||||
|
||||
import ATrade.RoboCom.Types (InstrumentParameters) |
||||
import ATrade.Types (TickerId) |
||||
|
||||
class (Monad m) => TickerInfoProvider m where |
||||
getInstrumentParameters :: [TickerId] -> m [InstrumentParameters] |
||||
|
||||
@ -0,0 +1,14 @@
@@ -0,0 +1,14 @@
|
||||
{-# LANGUAGE RankNTypes #-} |
||||
|
||||
module ATrade.RoboCom.ConfigStorage |
||||
( |
||||
ConfigStorage(..) |
||||
) where |
||||
|
||||
import qualified Data.Text as T |
||||
import Dhall (FromDhall) |
||||
|
||||
class (Monad m) => ConfigStorage m where |
||||
loadConfig :: forall c. (FromDhall c) => T.Text -> m c |
||||
|
||||
|
||||
@ -0,0 +1,16 @@
@@ -0,0 +1,16 @@
|
||||
{-# LANGUAGE RankNTypes #-} |
||||
|
||||
module ATrade.RoboCom.Persistence |
||||
( |
||||
MonadPersistence(..) |
||||
) where |
||||
|
||||
import Data.Aeson |
||||
import Data.Default (Default) |
||||
import qualified Data.Text as T |
||||
|
||||
class (Monad m) => MonadPersistence m where |
||||
saveState :: forall s. (ToJSON s) => s -> T.Text -> m () |
||||
loadState :: forall s. (Default s, FromJSON s) => T.Text -> m s |
||||
|
||||
|
||||
@ -0,0 +1,112 @@
@@ -0,0 +1,112 @@
|
||||
{-# LANGUAGE FlexibleInstances #-} |
||||
{-# LANGUAGE GeneralizedNewtypeDeriving #-} |
||||
{-# LANGUAGE OverloadedStrings #-} |
||||
{-# LANGUAGE TypeSynonymInstances #-} |
||||
|
||||
module Test.Driver.Junction.QuoteThread |
||||
( |
||||
unitTests |
||||
) where |
||||
|
||||
import Test.Tasty |
||||
import Test.Tasty.HUnit |
||||
import Test.Tasty.QuickCheck as QC |
||||
import Test.Tasty.SmallCheck as SC |
||||
|
||||
import ATrade.Driver.Junction.QuoteThread (addSubscription, |
||||
startQuoteThread, |
||||
stopQuoteThread) |
||||
import ATrade.Quotes.HistoryProvider (HistoryProvider (..)) |
||||
import ATrade.Quotes.TickerInfoProvider (TickerInfoProvider (..)) |
||||
import ATrade.QuoteSource.Client (QuoteData (QDBar)) |
||||
import ATrade.QuoteSource.Server (QuoteSourceServerData (..), |
||||
startQuoteSourceServer, |
||||
stopQuoteSourceServer) |
||||
import ATrade.RoboCom.Types (BarSeries (bsBars), |
||||
BarSeriesId (BarSeriesId), |
||||
InstrumentParameters (InstrumentParameters)) |
||||
import ATrade.Types |
||||
import Control.Concurrent (forkIO, threadDelay) |
||||
import Control.Concurrent.BoundedChan (newBoundedChan, readChan, |
||||
writeChan) |
||||
import Control.Exception (bracket) |
||||
import Control.Monad (forever) |
||||
import Control.Monad.Reader |
||||
import Data.IORef (newIORef, readIORef) |
||||
import qualified Data.Map.Strict as M |
||||
import qualified Data.Text as T |
||||
import Data.Time (UTCTime (UTCTime), |
||||
fromGregorian) |
||||
import System.IO (BufferMode (LineBuffering), |
||||
hSetBuffering, stderr) |
||||
import System.Log.Formatter |
||||
import System.Log.Handler (setFormatter) |
||||
import System.Log.Handler.Simple |
||||
import System.Log.Logger |
||||
import System.ZMQ4 (withContext) |
||||
import Test.Mock.HistoryProvider (MockHistoryProvider, |
||||
mkMockHistoryProvider, |
||||
mockGetHistory) |
||||
import Test.Mock.TickerInfoProvider (MockTickerInfoProvider, |
||||
mkMockTickerInfoProvider, |
||||
mockGetInstrumentParameters) |
||||
|
||||
data TestEnv = |
||||
TestEnv |
||||
{ |
||||
historyProvider :: MockHistoryProvider, |
||||
tickerInfoProvider :: MockTickerInfoProvider |
||||
} |
||||
|
||||
type TestM = ReaderT TestEnv IO |
||||
|
||||
instance HistoryProvider TestM where |
||||
getHistory tid tf from to = do |
||||
hp <- asks historyProvider |
||||
liftIO $ mockGetHistory hp tid tf from to |
||||
|
||||
instance TickerInfoProvider TestM where |
||||
getInstrumentParameters tickers = do |
||||
tip <- asks tickerInfoProvider |
||||
liftIO $ mockGetInstrumentParameters tip tickers |
||||
|
||||
qsEndpoint = "inproc://qs" |
||||
|
||||
mockHistoryProvider = mkMockHistoryProvider $ M.fromList [(BarSeriesId "FOO" (BarTimeframe 3600), bars)] |
||||
where |
||||
bars = [] |
||||
|
||||
mockTickerInfoProvider = mkMockTickerInfoProvider $ M.fromList [("FOO", InstrumentParameters 10 0.1)] |
||||
|
||||
unitTests = testGroup "Driver.Junction.QuoteThread" [ |
||||
testSubscription |
||||
] |
||||
|
||||
testSubscription :: TestTree |
||||
testSubscription = testCase "Subscription" $ withContext $ \ctx -> do |
||||
barsRef <- newIORef M.empty |
||||
serverChan <- newBoundedChan 2000 |
||||
bracket |
||||
(startQuoteSourceServer serverChan ctx qsEndpoint defaultServerSecurityParams) |
||||
stopQuoteSourceServer $ \_ -> |
||||
bracket |
||||
(startQuoteThread barsRef ctx qsEndpoint Nothing Nothing (`runReaderT` (TestEnv mockHistoryProvider mockTickerInfoProvider))) |
||||
|
||||
stopQuoteThread $ \qt -> do |
||||
chan <- newBoundedChan 2000 |
||||
addSubscription qt "FOO" (BarTimeframe 3600) chan |
||||
|
||||
forkIO $ forever $ threadDelay 50000 >> writeChan serverChan (QSSBar (BarTimeframe 3600, bar)) |
||||
|
||||
clientData <- readChan chan |
||||
assertEqual "Invalid client data" clientData (QDBar (BarTimeframe 3600, bar)) |
||||
|
||||
bars <- readIORef barsRef |
||||
case M.lookup (BarSeriesId "FOO" (BarTimeframe 3600)) bars of |
||||
Just series -> assertBool "Length should be >= 1" $ (not . null . bsBars) series |
||||
Nothing -> assertFailure "Bar Series should be present" |
||||
where |
||||
bar = |
||||
Bar { |
||||
barSecurity="FOO", barTimestamp=UTCTime (fromGregorian 2021 11 20) 7200, barOpen=10, barHigh=12, barLow=9, barClose=11, barVolume=100 |
||||
} |
||||
@ -0,0 +1,27 @@
@@ -0,0 +1,27 @@
|
||||
|
||||
module Test.Mock.HistoryProvider |
||||
( |
||||
MockHistoryProvider, |
||||
mkMockHistoryProvider, |
||||
mockGetHistory |
||||
) where |
||||
|
||||
import ATrade.Quotes.HistoryProvider |
||||
import ATrade.RoboCom.Types (BarSeriesId (BarSeriesId), Bars) |
||||
import ATrade.Types (Bar (Bar, barTimestamp), |
||||
BarTimeframe (BarTimeframe), |
||||
TickerId) |
||||
import Control.Monad.IO.Class (MonadIO) |
||||
import qualified Data.Map.Strict as M |
||||
import Data.Time (UTCTime) |
||||
|
||||
data MockHistoryProvider = MockHistoryProvider (M.Map BarSeriesId [Bar]) |
||||
|
||||
mkMockHistoryProvider :: M.Map BarSeriesId [Bar] -> MockHistoryProvider |
||||
mkMockHistoryProvider = MockHistoryProvider |
||||
|
||||
mockGetHistory :: (MonadIO m) => MockHistoryProvider -> TickerId -> BarTimeframe -> UTCTime -> UTCTime -> m [Bar] |
||||
mockGetHistory (MockHistoryProvider bars) tid tf from to = |
||||
case M.lookup (BarSeriesId tid tf) bars of |
||||
Just series -> return $ filter (\bar -> (barTimestamp bar >= from) && (barTimestamp bar <= to)) series |
||||
Nothing -> return [] |
||||
@ -0,0 +1,22 @@
@@ -0,0 +1,22 @@
|
||||
|
||||
module Test.Mock.TickerInfoProvider |
||||
( |
||||
MockTickerInfoProvider, |
||||
mkMockTickerInfoProvider, |
||||
mockGetInstrumentParameters |
||||
) where |
||||
|
||||
import ATrade.Quotes.TickerInfoProvider |
||||
import ATrade.RoboCom.Types (InstrumentParameters) |
||||
import ATrade.Types (TickerId) |
||||
import Control.Monad.IO.Class (MonadIO) |
||||
import qualified Data.Map.Strict as M |
||||
import Data.Maybe (catMaybes, mapMaybe) |
||||
|
||||
data MockTickerInfoProvider = MockTickerInfoProvider (M.Map TickerId InstrumentParameters) |
||||
|
||||
mkMockTickerInfoProvider :: (M.Map TickerId InstrumentParameters) -> MockTickerInfoProvider |
||||
mkMockTickerInfoProvider = MockTickerInfoProvider |
||||
|
||||
mockGetInstrumentParameters :: MockTickerInfoProvider -> [TickerId] -> IO [InstrumentParameters] |
||||
mockGetInstrumentParameters (MockTickerInfoProvider params) = return . mapMaybe (`M.lookup` params) |
||||
Loading…
Reference in new issue