''' ''' import copy import random import numpy import talib from scipy.integrate.tests.test_odeint_jac import rhs class Signal: def __init__(self): pass def calculate(self, series): pass def get_text(self): pass def mutate(self, factor): return copy.deepcopy(self) class PriceComparisonSignalGenerator: def __init__(self): pass def generate(self): lhs = random.randint(PriceComparisonSignal.OPEN, PriceComparisonSignal.CLOSE) lhs_shift = random.randint(0, 10) rhs = random.randint(PriceComparisonSignal.OPEN, PriceComparisonSignal.CLOSE) rhs_shift = random.randint(0, 10) return PriceComparisonSignal(lhs, lhs_shift, rhs, rhs_shift) def id(self): return 'Price' class PriceComparisonSignal(Signal): OPEN = 0 HIGH = 1 LOW = 2 CLOSE = 3 def __init__(self, lhs, lhs_shift, rhs, rhs_shift): self.lhs = lhs self.lhs_shift = lhs_shift self.rhs = rhs self.rhs_shift = rhs_shift def calculate(self, series): result = [] for i in range(0, series.length()): result.append(self.calculate_at_index(series, i)) return result def calculate_at_index(self, series, index): try: if self.lhs == PriceComparisonSignal.OPEN: lhs = series.get_open(index - self.lhs_shift) elif self.lhs == PriceComparisonSignal.HIGH: lhs = series.get_high(index - self.lhs_shift) elif self.lhs == PriceComparisonSignal.LOW: lhs = series.get_low(index - self.lhs_shift) elif self.lhs == PriceComparisonSignal.CLOSE: lhs = series.get_close(index - self.lhs_shift) else: raise Exception('Invalid lhs type: ' + str(self.lhs)) if self.rhs == PriceComparisonSignal.OPEN: rhs = series.get_open(index - self.rhs_shift) elif self.rhs == PriceComparisonSignal.HIGH: rhs = series.get_high(index - self.rhs_shift) elif self.rhs == PriceComparisonSignal.LOW: rhs = series.get_low(index - self.rhs_shift) elif self.rhs == PriceComparisonSignal.CLOSE: rhs = series.get_close(index - self.rhs_shift) else: raise Exception('Invalid lhs type') return lhs < rhs except IndexError: return False def get_text(self): return self.component_to_str(self.lhs) + '[' + str(self.lhs_shift) + '] < ' + self.component_to_str(self.rhs) + '[' + str(self.rhs_shift) + ']' def component_to_str(self, component): if component == PriceComparisonSignal.OPEN: return "open" elif component == PriceComparisonSignal.HIGH: return "high" elif component == PriceComparisonSignal.LOW: return "low" elif component == PriceComparisonSignal.CLOSE: return "close" else: return "??" def mutate(self, factor): mutation_type = random.randint(0, 3) if mutation_type == 0: self.lhs = random.randint(PriceComparisonSignal.OPEN, PriceComparisonSignal.CLOSE) elif mutation_type == 1: self.rhs = random.randint(PriceComparisonSignal.OPEN, PriceComparisonSignal.CLOSE) elif mutation_type == 2: self.lhs_shift = max(0, self.lhs_shift + random.randint(-int(10 * factor + 1), int(10 * factor + 1))) elif mutation_type == 3: self.rhs_shift = max(0, self.rhs_shift + random.randint(-int(10 * factor + 1), int(10 * factor + 1))) class RsiSignalGenerator: def __init__(self): pass def generate(self): shift = random.randint(0, 5) period = random.randint(2, 30) threshold = random.randrange(1, 9) * 10 ineq_type = random.randint(RsiSignal.LT, RsiSignal.GT) return RsiSignal(period, threshold, ineq_type, shift) def id(self): return 'RSI' class RsiSignal(Signal): LT = 0 GT = 1 def __init__(self, period, threshold, inequality_type, shift): self.period = period self.threshold = threshold self.inequality_type = inequality_type self.shift = shift if inequality_type == RsiSignal.LT: self.inequality_sign_str = '<' else: self.inequality_sign_str = '>' def calculate(self, series): closes = numpy.array([series.get_close(i) for i in range(0, series.length())]) rsi = talib.RSI(closes, self.period) result = [self.calc_signal(v) for v in rsi] if self.shift == 0: return result else: return [False] * self.shift + result[:-self.shift] def calc_signal(self, value): if self.inequality_type == RsiSignal.LT: return value < self.threshold else: return value > self.threshold def get_text(self): return "rsi(c, {:d})[{:d}] {:s} {:d}".format(self.period, self.shift, self.inequality_sign_str, int(self.threshold)) def mutate(self, factor): mutation_type = random.randint(0, 2) if mutation_type == 0: self.period = max(2, self.period + random.randint(-int(10 * factor + 1), int(10 * factor + 1))) elif mutation_type == 1: self.threshold = random.randrange(1, 9) * 10 elif mutation_type == 2: random.randint(RsiSignal.LT, RsiSignal.GT) class AtrSignalGenerator: def __init__(self): pass def generate(self): period = random.randint(2, 30) threshold = random.randint(1, 30) * 0.001 ineq_type = random.randint(AtrSignal.LT, AtrSignal.GT) return AtrSignal(period, threshold, ineq_type) def id(self): return 'ATR' class AtrSignal(Signal): LT = 0 GT = 1 def __init__(self, period, threshold_factor, inequality_type): self.period = period self.threshold_factor = threshold_factor self.inequality_type = inequality_type if inequality_type == AtrSignal.LT: self.inequality_sign_str = '<' else: self.inequality_sign_str = '>' def calculate(self, series): closes = numpy.array([series.get_close(i) for i in range(0, series.length())]) highs = numpy.array([series.get_high(i) for i in range(0, series.length())]) lows = numpy.array([series.get_low(i) for i in range(0, series.length())]) atr = talib.ATR(highs, lows, closes, self.period) result = [self.calc_signal(v, c) for (v, c) in zip(atr, closes)] return result def calc_signal(self, value, close): if self.inequality_type == AtrSignal.LT: return value < self.threshold_factor * close else: return value > self.threshold_factor * close def get_text(self): return "atr(" + str(self.period) + ') ' + self.inequality_sign_str + ' close[0] * ' + "{:.3f}".format(self.threshold_factor) def mutate(self, factor): mutation_type = random.randint(0, 2) if mutation_type == 0: self.period = max(2, self.period + random.randint(-int(10 * factor + 1), int(10 * factor + 1))) elif mutation_type == 1: self.threshold_factor = random.randint(1, 30) * 0.001 elif mutation_type == 2: random.randint(AtrSignal.LT, AtrSignal.GT) class AtrDeltaSignalGenerator: def __init__(self): pass def generate(self): period = random.randint(2, 30) threshold = random.randint(1, 15) * 0.2 ineq_type = random.randint(AtrDeltaSignal.LT, AtrDeltaSignal.GT) sign = random.randint(AtrDeltaSignal.PLUS, AtrDeltaSignal.MINUS) return AtrDeltaSignal(period, threshold, ineq_type, sign) def id(self): return 'ATR Delta' class AtrDeltaSignal(Signal): LT = 0 GT = 1 PLUS = 0 MINUS = 1 def __init__(self, period, threshold_factor, inequality_type, sign): self.period = period self.threshold_factor = threshold_factor self.inequality_type = inequality_type if inequality_type == AtrDeltaSignal.LT: self.inequality_sign_str = '<' else: self.inequality_sign_str = '>' self.sign = sign if self.sign == AtrDeltaSignal.PLUS: self.sign_str = '+' else: self.sign_str = '-' def calculate(self, series): closes = numpy.array([series.get_close(i) for i in range(0, series.length())]) highs = numpy.array([series.get_high(i) for i in range(0, series.length())]) lows = numpy.array([series.get_low(i) for i in range(0, series.length())]) atr = talib.ATR(highs, lows, closes, self.period) result = [False] for i in range(1, len(closes)): result.append(self.calc_signal(atr[i], closes[i], closes[i - 1])) return result def calc_signal(self, value, c1, c2): if self.sign == AtrDeltaSignal.PLUS: if self.inequality_type == AtrDeltaSignal.LT: return c1 < c2 + value * self.threshold_factor else: return c1 > c2 + value * self.threshold_factor else: if self.inequality_type == AtrDeltaSignal.LT: return c1 < c2 - value * self.threshold_factor else: return c1 > c2 - value * self.threshold_factor def get_text(self): return 'close[0] {:s} close[1] {:s} atr({:d}) * {:f}'.format(self.inequality_sign_str, self.sign_str, self.period, self.threshold_factor) def mutate(self, factor): mutation_type = random.randint(0, 2) if mutation_type == 0: self.period = max(2, self.period + random.randint(-int(10 * factor + 1), int(10 * factor + 1))) elif mutation_type == 1: self.threshold_factor = random.randint(1, 30) * 0.001 elif mutation_type == 2: random.randint(AtrDeltaSignal.LT, AtrDeltaSignal.GT) class DayOfWeekSignalGenerator: def __init__(self): pass def generate(self): dow = random.randint(0, 6) return DayOfWeekSignal(dow) def id(self): return 'Day of week' class DayOfWeekSignal(Signal): def __init__(self, day_of_week): self.day_of_week = day_of_week def calculate(self, series): result = [] for i in range(0, series.length()): result.append(series.get_dt(i).date().weekday() == self.day_of_week) return result def get_text(self): return "day_of_week == " + self.dow_str(self.day_of_week) def dow_str(self, dow): return ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday'][dow] def mutate(self, factor): self.day_of_week = random.randint(0, 6) class DayOfMonthSignalGenerator: def __init__(self): pass def generate(self): month_day = random.randint(1, 31) ineq_type = random.randint(DayOfMonthSignal.LT, DayOfMonthSignal.GT) return DayOfMonthSignal(month_day, ineq_type) def id(self): return 'Day of Month' class DayOfMonthSignal(Signal): LT = 0 GT = 1 def __init__(self, day_of_month, inequality_type): self.day_of_month = day_of_month self.inequality_type = inequality_type if inequality_type == CrtdrSignal.LT: self.inequality_sign_str = '<' else: self.inequality_sign_str = '>' def calculate(self, series): result = [] for i in range(0, series.length()): if self.inequality_type == DayOfMonthSignal.LT: result.append(series.get_dt(i).date().day < self.day_of_month) else: result.append(series.get_dt(i).date().day > self.day_of_month) return result def get_text(self): return "day_of_month {:s} {:d}".format(self.inequality_sign_str, self.day_of_month) def mutate(self, factor): self.day_of_month = random.randint(1, 31) class CrtdrSignalGenerator: def __init__(self): pass def generate(self): shift = random.randint(0, 10) ineq_type = random.randint(CrtdrSignal.LT, CrtdrSignal.GT) threshold = 0.05 * random.randint(1, 19) return CrtdrSignal(shift, threshold, ineq_type) def id(self): return 'CRTDR' class CrtdrSignal(Signal): LT = 0 GT = 1 def __init__(self, shift, threshold, inequality_type): self.shift = shift self.threshold = threshold self.inequality_type = inequality_type if inequality_type == CrtdrSignal.LT: self.inequality_sign_str = '<' else: self.inequality_sign_str = '>' def calculate(self, series): result = [] for i in range(0, series.length()): try: h = series.get_high(i - self.shift) l = series.get_low(i - self.shift) c = series.get_close(i - self.shift) if h > l: if self.inequality_type == CrtdrSignal.LT: result.append((c - l) / (h - l) < self.threshold) else: result.append((c - l) / (h - l) > self.threshold) else: result.append(False) except IndexError: result.append(False) return result def get_text(self): return 'crtdr[{:d}] {:s} {:.2f}'.format(self.shift, self.inequality_sign_str, self.threshold) def mutate(self, factor): mutation_type = random.randint(0, 2) if mutation_type == 0: self.shift = max(0, self.shift + random.randint(-int(10 * factor + 1), int(10 * factor + 1))) elif mutation_type == 1: self.threshold = 0.05 * random.randint(1, 19) elif mutation_type == 2: random.randint(CrtdrSignal.LT, CrtdrSignal.GT) class SmaSignalGenerator: def __init__(self): pass def generate(self): lhs = random.randint(SmaSignal.OPEN, SmaSignal.CLOSE) period = random.randint(2, 30) rhs = random.randint(SmaSignal.OPEN, SmaSignal.CLOSE) ineq_sign = random.randint(SmaSignal.LT, SmaSignal.GT) return SmaSignal(period, lhs, rhs, ineq_sign) def id(self): return 'SMA' class SmaSignal(Signal): OPEN = 0 HIGH = 1 LOW = 2 CLOSE = 3 LT = 0 GT = 1 def __init__(self, period, lhs, rhs, inequality_type): self.period = period self.lhs = lhs self.rhs = rhs self.inequality_type = inequality_type if inequality_type == SmaSignal.LT: self.inequality_sign_str = '<' else: self.inequality_sign_str = '>' def calculate(self, series): if self.lhs == SmaSignal.OPEN: lhs = numpy.array([series.get_open(i) for i in range(0, series.length())]) elif self.lhs == SmaSignal.HIGH: lhs = numpy.array([series.get_high(i) for i in range(0, series.length())]) elif self.lhs == SmaSignal.LOW: lhs = numpy.array([series.get_low(i) for i in range(0, series.length())]) elif self.lhs == SmaSignal.CLOSE: lhs = numpy.array([series.get_close(i) for i in range(0, series.length())]) if self.rhs == SmaSignal.OPEN: rhs = numpy.array([series.get_open(i) for i in range(0, series.length())]) elif self.rhs == SmaSignal.HIGH: rhs = numpy.array([series.get_high(i) for i in range(0, series.length())]) elif self.rhs == SmaSignal.LOW: rhs = numpy.array([series.get_low(i) for i in range(0, series.length())]) elif self.rhs == SmaSignal.CLOSE: rhs = numpy.array([series.get_close(i) for i in range(0, series.length())]) rhs_sma = talib.SMA(rhs, self.period) result = [self.calc_signal(l, r) for (l, r) in zip(lhs, rhs_sma)] return result def calc_signal(self, l, r): if self.inequality_type == SmaSignal.LT: return l < r else: return l > r def get_text(self): return '{:s}[0] {:s} sma({:s}, {:d})'.format(self.component_to_str(self.lhs), self.inequality_sign_str, self.component_to_str(self.rhs), self.period) def component_to_str(self, component): if component == SmaSignal.OPEN: return "open" elif component == SmaSignal.HIGH: return "high" elif component == SmaSignal.LOW: return "low" elif component == SmaSignal.CLOSE: return "close" else: return "??" class CciSignalGenerator: def __init__(self): pass def generate(self): period = random.randint(2, 30) shift = random.randint(0, 5) threshold = random.randint(1, 30) * 10 ineq_type = random.randint(CciSignal.LT, CciSignal.GT) return CciSignal(shift, period, threshold, ineq_type) def id(self): return 'CCI' class CciSignal(Signal): LT = 0 GT = 1 def __init__(self, shift, period, threshold, inequality_type): self.shift = shift self.period = period self.threshold = threshold self.inequality_type = inequality_type if inequality_type == CciSignal.LT: self.inequality_sign_str = '<' else: self.inequality_sign_str = '>' def calculate(self, series): closes = numpy.array([series.get_close(i) for i in range(0, series.length())]) highs = numpy.array([series.get_high(i) for i in range(0, series.length())]) lows = numpy.array([series.get_low(i) for i in range(0, series.length())]) cci = talib.CCI(highs, lows, closes, self.period) result = [self.calc_signal(v) for v in cci] if self.shift == 0: return result else: return [False] * self.shift + result[:-self.shift] def calc_signal(self, value): if self.inequality_type == CciSignal.LT: return value < self.threshold else: return value > self.threshold def get_text(self): return "cci({:d})[{:d}] {:s} {:f}".format(self.period, self.shift, self.inequality_sign_str, self.threshold) def mutate(self, factor): mutation_type = random.randint(0, 2) if mutation_type == 0: self.period = max(2, self.period + random.randint(-int(10 * factor + 1), int(10 * factor + 1))) elif mutation_type == 1: self.threshold_factor = random.randint(1, 30) * 0.001 elif mutation_type == 2: random.randint(AtrSignal.LT, AtrSignal.GT) class BbandsSignalGenerator: def __init__(self): pass def generate(self): lhs = random.randint(BbandsSignal.OPEN, BbandsSignal.CLOSE) period = random.randint(2, 30) rhs = random.randint(BbandsSignal.OPEN, BbandsSignal.CLOSE) ineq_sign = random.randint(BbandsSignal.LT, BbandsSignal.GT) dev = random.randint(1, 6) * 0.5 band_type = random.randint(BbandsSignal.UP, BbandsSignal.DOWN) return BbandsSignal(period, lhs, rhs, dev, ineq_sign, band_type) def id(self): return 'Bollinger bands' class BbandsSignal(Signal): OPEN = 0 HIGH = 1 LOW = 2 CLOSE = 3 LT = 0 GT = 1 UP = 0 DOWN = 1 def __init__(self, period, lhs, rhs, dev, inequality_type, band_type): self.period = period self.lhs = lhs self.rhs = rhs self.dev = dev self.inequality_type = inequality_type self.band_type = band_type if inequality_type == SmaSignal.LT: self.inequality_sign_str = '<' else: self.inequality_sign_str = '>' if inequality_type == BbandsSignal.UP: self.band_type_str = 'up' else: self.band_type_str = 'down' def calculate(self, series): if self.lhs == BbandsSignal.OPEN: lhs = numpy.array([series.get_open(i) for i in range(0, series.length())]) elif self.lhs == BbandsSignal.HIGH: lhs = numpy.array([series.get_high(i) for i in range(0, series.length())]) elif self.lhs == BbandsSignal.LOW: lhs = numpy.array([series.get_low(i) for i in range(0, series.length())]) elif self.lhs == BbandsSignal.CLOSE: lhs = numpy.array([series.get_close(i) for i in range(0, series.length())]) if self.rhs == BbandsSignal.OPEN: rhs = numpy.array([series.get_open(i) for i in range(0, series.length())]) elif self.rhs == BbandsSignal.HIGH: rhs = numpy.array([series.get_high(i) for i in range(0, series.length())]) elif self.rhs == BbandsSignal.LOW: rhs = numpy.array([series.get_low(i) for i in range(0, series.length())]) elif self.rhs == BbandsSignal.CLOSE: rhs = numpy.array([series.get_close(i) for i in range(0, series.length())]) (upband, _, downband) = talib.BBANDS(rhs, self.period, self.dev, self.dev) if self.band_type == BbandsSignal.UP: band = upband else: band = downband result = [self.calc_signal(l, r) for (l, r) in zip(lhs, band)] return result def calc_signal(self, l, r): if self.inequality_type == BbandsSignal.LT: return l < r else: return l > r def get_text(self): return '{:s}[0] {:s} bband({:s}, {:s}, {:.2f}, {:d})'.format(self.component_to_str(self.lhs), self.inequality_sign_str, self.component_to_str(self.rhs), self.band_type_str, self.dev, self.period) def component_to_str(self, component): if component == BbandsSignal.OPEN: return "open" elif component == BbandsSignal.HIGH: return "high" elif component == BbandsSignal.LOW: return "low" elif component == BbandsSignal.CLOSE: return "close" else: return "??" class PivotPointsSignalGenerator: def __init__(self): pass def generate(self): lhs = random.randint(PivotPointsSignal.OPEN, PivotPointsSignal.R3) lhs_shift = random.randint(0, 10) rhs = random.randint(PivotPointsSignal.OPEN, PivotPointsSignal.R3) rhs_shift = random.randint(0, 10) return PivotPointsSignal(lhs, lhs_shift, rhs, rhs_shift) def id(self): return 'Pivot points' class PivotPointsSignal(Signal): OPEN = 0 HIGH = 1 LOW = 2 CLOSE = 3 PP = 4 S1 = 5 S2 = 6 S3 = 7 R1 = 8 R2 = 9 R3 = 10 def __init__(self, lhs, lhs_shift, rhs, rhs_shift): self.lhs = lhs self.lhs_shift = lhs_shift self.rhs = rhs self.rhs_shift = rhs_shift def calculate(self, series): result = [] for i in range(0, series.length()): lhs = self.calculate_signal(series, self.lhs, i - self.lhs_shift) rhs = self.calculate_signal(series, self.rhs, i - self.rhs_shift) if i - self.lhs_shift >= 0 and i - self.rhs_shift >= 0: result.append(lhs < rhs) else: result.append(False) return result def calculate_signal(self, series, signal_type, ix): if signal_type == PivotPointsSignal.OPEN: return series.get_open(ix) elif signal_type == PivotPointsSignal.HIGH: return series.get_high(ix) elif signal_type == PivotPointsSignal.LOW: return series.get_low(ix) elif signal_type == PivotPointsSignal.CLOSE: return series.get_close(ix) elif signal_type == PivotPointsSignal.PP: return (series.get_close(ix) + series.get_high(ix) + series.get_low(ix)) / 3 elif signal_type == PivotPointsSignal.S1: pp = (series.get_close(ix) + series.get_high(ix) + series.get_low(ix)) / 3 return pp * 2 - series.get_high(ix) elif signal_type == PivotPointsSignal.S2: pp = (series.get_close(ix) + series.get_high(ix) + series.get_low(ix)) / 3 return pp - series.get_high(ix) + series.get_low(ix) elif signal_type == PivotPointsSignal.S3: pp = (series.get_close(ix) + series.get_high(ix) + series.get_low(ix)) / 3 return pp - 2 * series.get_high(ix) + 2 * series.get_low(ix) elif signal_type == PivotPointsSignal.R1: pp = (series.get_close(ix) + series.get_high(ix) + series.get_low(ix)) / 3 return pp * 2 - series.get_low(ix) elif signal_type == PivotPointsSignal.R2: pp = (series.get_close(ix) + series.get_high(ix) + series.get_low(ix)) / 3 return pp + series.get_high(ix) - series.get_low(ix) elif signal_type == PivotPointsSignal.R3: pp = (series.get_close(ix) + series.get_high(ix) + series.get_low(ix)) / 3 return pp + 2 * series.get_high(ix) - 2 * series.get_low(ix) def get_text(self): return "{:s}[{:d}] < {:s}[{:d}]".format(self.component_str(self.lhs), self.lhs_shift, self.component_str(self.rhs), self.rhs_shift) def component_str(self, comp_id): if comp_id == PivotPointsSignal.OPEN: return "open" elif comp_id == PivotPointsSignal.HIGH: return "high" elif comp_id == PivotPointsSignal.LOW: return "low" elif comp_id == PivotPointsSignal.CLOSE: return "close" elif comp_id == PivotPointsSignal.PP: return "pivot" elif comp_id == PivotPointsSignal.S1: return "s1" elif comp_id == PivotPointsSignal.S2: return "s2" elif comp_id == PivotPointsSignal.S3: return "s3" elif comp_id == PivotPointsSignal.R1: return "r1" elif comp_id == PivotPointsSignal.R2: return "r2" elif comp_id == PivotPointsSignal.R3: return "r3" class StochasticSignalGenerator: def __init__(self): pass def generate(self): shift = random.randint(0, 3) period = random.randint(2, 30) period2 = random.randint(2, 30) threshold = random.randrange(1, 9) * 10 ineq_type = random.randint(StochasticSignal.LT, StochasticSignal.GT) return StochasticSignal(period, period2, threshold, ineq_type, shift) def id(self): return 'Stochastic' class StochasticSignal(Signal): LT = 0 GT = 1 def __init__(self, period, period2, threshold, inequality_type, shift): self.period = period self.period2 = period2 self.threshold = threshold self.inequality_type = inequality_type self.shift = shift if inequality_type == StochasticSignal.LT: self.inequality_sign_str = '<' else: self.inequality_sign_str = '>' def calculate(self, series): closes = numpy.array([series.get_close(i) for i in range(0, series.length())]) highs = numpy.array([series.get_high(i) for i in range(0, series.length())]) lows = numpy.array([series.get_low(i) for i in range(0, series.length())]) stoch_k, stoch_d = talib.STOCHF(highs, lows, closes, self.period, self.period2) result = [self.calc_signal(v) for v in stoch_k] if self.shift == 0: return result else: return [False] * self.shift + result[:-self.shift] def calc_signal(self, value): if self.inequality_type == StochasticSignal.LT: return value < self.threshold else: return value > self.threshold def get_text(self): return "stoch(c, {:d})[{:d}] {:s} {:d}".format(self.period, self.shift, self.inequality_sign_str, int(self.threshold)) class IntradayBarNumberSignalGenerator: def __init__(self, max_ibn): self.max_ibn = max_ibn def generate(self): ibn = random.randint(0, self.max_ibn) ineq_type = random.randint(IntradayBarNumberSignal.LT, IntradayBarNumberSignal.EQ) return IntradayBarNumberSignal(ibn, ineq_type) def id(self): return 'IBN' class IntradayBarNumberSignal(Signal): LT = 0 GT = 1 EQ = 2 def __init__(self, ibn, inequality_type): self.ibn = ibn self.inequality_type = inequality_type if inequality_type == IntradayBarNumberSignal.LT: self.inequality_sign_str = '<' elif inequality_type == IntradayBarNumberSignal.GT: self.inequality_sign_str = '>' else: self.inequality_sign_str = '==' def calculate(self, series): ibn = [] cur_date = None for i in range(0, series.length()): if series.get_dt(i).date() != cur_date: cur_date = series.get_dt(i).date() ctr = 0 else: ctr += 1 ibn.append(ctr) result = [] for i in ibn: result.append(self.calc_signal(i)) return result def calc_signal(self, ibn): if self.inequality_type == IntradayBarNumberSignal.LT: return ibn < self.ibn elif self.inequality_type == IntradayBarNumberSignal.GT: return ibn > self.ibn elif self.inequality_type == IntradayBarNumberSignal.EQ: return ibn == self.ibn def get_text(self): return "ibn {:s} {:d}".format(self.inequality_sign_str, self.ibn)