3 changed files with 417 additions and 0 deletions
@ -0,0 +1,211 @@
@@ -0,0 +1,211 @@
|
||||
#!/usr/bin/env python3 |
||||
|
||||
import sys |
||||
import argparse |
||||
import zmq |
||||
import io |
||||
import json |
||||
import csv |
||||
import datetime |
||||
import struct |
||||
from pytz import timezone |
||||
|
||||
def timeframe_to_seconds(tf): |
||||
if tf == 'M1': |
||||
return 60 |
||||
elif tf == 'M5': |
||||
return 5 * 60 |
||||
elif tf == 'M15': |
||||
return 15 * 60 |
||||
elif tf == 'H1': |
||||
return 3600 |
||||
elif tf == 'D': |
||||
return 86400 |
||||
elif tf == 'W': |
||||
return 7 * 86400 |
||||
else: |
||||
raise ValueError('Invalid value') |
||||
|
||||
class BarAggregator: |
||||
def __init__(self, timeframe): |
||||
self.open_ = 0 |
||||
self.high = 0 |
||||
self.low = 0 |
||||
self.close = 0 |
||||
self.volume = 0 |
||||
self.timestamp = None |
||||
self.current_bar_number = None |
||||
self.timeframe = timeframe |
||||
|
||||
def push_bar(self, timestamp, open_, high, low, close, volume): |
||||
bar_number = timestamp.timestamp() // self.timeframe |
||||
if bar_number != self.current_bar_number: |
||||
b_open = self.open_ |
||||
b_high = self.high |
||||
b_low = self.low |
||||
b_close = self.close |
||||
b_volume = self.volume |
||||
if self.current_bar_number is not None: |
||||
b_timestamp = datetime.datetime.fromtimestamp(self.current_bar_number * self.timeframe) |
||||
|
||||
self.open_ = open_ |
||||
self.high = high |
||||
self.low = low |
||||
self.close = close |
||||
self.volume = volume |
||||
self.timestamp = timestamp |
||||
prev_bar_number = self.current_bar_number |
||||
self.current_bar_number = bar_number |
||||
|
||||
if prev_bar_number is not None: |
||||
return (b_timestamp, b_open, b_high, b_low, b_close, b_volume) |
||||
else: |
||||
self.high = max(high, self.high) |
||||
self.low = min(low, self.low) |
||||
self.close = close |
||||
self.volume += volume |
||||
return None |
||||
|
||||
def get_bar(self): |
||||
b_open = self.open_ |
||||
b_high = self.high |
||||
b_low = self.low |
||||
b_close = self.close |
||||
b_volume = self.volume |
||||
b_timestamp = datetime.datetime.fromtimestamp(self.timeframe * ( self.timestamp.timestamp() // self.timeframe)) |
||||
|
||||
return (b_timestamp, b_open, b_high, b_low, b_close, b_volume) |
||||
|
||||
def get_data(qhp, ticker, start_time, end_time, period, tz, timedelta): |
||||
rq = { |
||||
"ticker" : ticker, |
||||
"from" : start_time.strftime("%Y-%m-%dT%H:%M:%S"), |
||||
"to" : end_time.strftime("%Y-%m-%dT%H:%M:%S"), |
||||
"timeframe" : period |
||||
} |
||||
|
||||
qhp.send_multipart([bytes(json.dumps(rq), "utf-8")]) |
||||
resp = qhp.recv() |
||||
|
||||
if resp != b'OK': |
||||
errmsg = qhp.recv_string() |
||||
return None |
||||
|
||||
bar_count = 0 |
||||
result = [] |
||||
while True: |
||||
if qhp.getsockopt(zmq.RCVMORE) == 0: |
||||
break |
||||
rawdata = qhp.recv() |
||||
for line in struct.iter_unpack("<qddddQ", rawdata): |
||||
|
||||
timestamp = int(line[0]) |
||||
open_ = float(line[1]) |
||||
high = float(line[2]) |
||||
low = float(line[3]) |
||||
close = float(line[4]) |
||||
volume = int(line[5]) |
||||
dt = datetime.datetime.fromtimestamp(timestamp, tz) + timedelta |
||||
|
||||
bar_count += 1 |
||||
result.append((dt, open_, high, low, close, volume)) |
||||
|
||||
return result |
||||
|
||||
def write_to_file(writer, bars, ticker, period): |
||||
for bar in bars: |
||||
writer.writerow([ticker, period, bar[0].strftime("%Y%m%d"), bar[0].strftime("%H%M%S"), bar[1], bar[2], bar[3], bar[4], bar[5]]) |
||||
|
||||
def make_tickers_list(base, start_time, end_time, futures_interval): |
||||
result = [] |
||||
month = start_time.date().month |
||||
year = start_time.date().year |
||||
|
||||
while True: |
||||
if month % futures_interval == 0: |
||||
result.append(base + '-' + str(month) + '.' + str(year)[-2:]) |
||||
if month > end_time.date().month and year >= end_time.date().year: |
||||
break |
||||
|
||||
month += 1 |
||||
if month > 12: |
||||
month = 1 |
||||
year += 1 |
||||
|
||||
return result |
||||
|
||||
|
||||
def main(): |
||||
parser = argparse.ArgumentParser(description='QHP client') |
||||
parser.add_argument('-o', '--output-file', action='store', dest='output_file', help='Output filename', required=True) |
||||
parser.add_argument('-p', '--timeframe', action='store', dest='timeframe', help='Data timeframe', required=True) |
||||
parser.add_argument('-q', '--qhp', action='store', dest='qhp', help='QHP endpoint', required=True) |
||||
parser.add_argument('-y', '--symbol', action='store', dest='symbol', help='Base symbol', required=True) |
||||
parser.add_argument('-f', '--from', action='store', dest='from_', help='Starting date', required=True) |
||||
parser.add_argument('-t', '--to', action='store', dest='to', help='Ending date', required=True) |
||||
parser.add_argument('-r', '--rescale', action='store', dest='rescale', help='Rescale to timeframe') |
||||
parser.add_argument('-d', '--time-delta', action='store', dest='time_delta', help='Add given time delta (in seconds)', required=False) |
||||
parser.add_argument('-i', '--futures-interval', action='store', dest='futures_interval', help='Futures interval between exprations in month', required=True) |
||||
parser.add_argument('-s', '--stitch-delta', action='store', dest='stitch_delta', help='Futures interval between exprations in month', required=True) |
||||
parser.add_argument('-e', '--replace-ticker', action='store', dest='replace_ticker', help='Replace ticker id in file', required=False) |
||||
|
||||
args = parser.parse_args() |
||||
|
||||
period = args.timeframe |
||||
symbol = args.symbol |
||||
filename = args.output_file |
||||
|
||||
ctx = zmq.Context.instance() |
||||
s = ctx.socket(zmq.REQ) |
||||
s.connect(args.qhp) |
||||
|
||||
start_time = datetime.datetime.strptime(args.from_, "%Y%m%d") |
||||
end_time = datetime.datetime.strptime(args.to, "%Y%m%d") |
||||
|
||||
timedelta = datetime.timedelta() |
||||
if args.time_delta: |
||||
timedelta = datetime.timedelta(seconds=int(args.time_delta)) |
||||
|
||||
delta = int(args.stitch_delta) |
||||
|
||||
agg = None |
||||
if args.rescale: |
||||
agg = BarAggregator(int(args.rescale)) |
||||
|
||||
data = {} |
||||
tickers = make_tickers_list(symbol, start_time, end_time, int(args.futures_interval)) |
||||
print("Tickers: {}".format(tickers)) |
||||
for ticker in tickers: |
||||
print("Requesting data: {}".format(ticker)) |
||||
bars = get_data(s, ticker, start_time, end_time, period, timezone('UTC'), timedelta) |
||||
|
||||
if len(bars) > 0: |
||||
data[ticker] = { 'bars' : bars } |
||||
print("Cutting off trailing data: {}".format(ticker)) |
||||
end_date = data[ticker]['bars'][-1][0] |
||||
cutoff_date = datetime.date.fromordinal(end_date.toordinal() - delta) |
||||
#cutoff_date_num = cutoff_date.year * 10000 + cutoff_date.month * 100 + cutoff_date.day |
||||
|
||||
data[ticker]['bars'] = [s for s in data[ticker]['bars'] if s[0].date() <= cutoff_date] |
||||
data[ticker]['end_date'] = cutoff_date |
||||
|
||||
prev_ticker = None |
||||
for k, v in sorted(data.items(), key=lambda x: x[1]['end_date']): |
||||
print("Cutting off starting data: {}".format(k)) |
||||
if prev_ticker is not None: |
||||
start_date = data[prev_ticker]['bars'][-1][0] |
||||
v['bars'] = [s for s in data[k]['bars'] if s[0] > start_date] |
||||
prev_ticker = k |
||||
|
||||
with open(args.output_file, 'w+') as f: |
||||
writer = csv.writer(f) |
||||
writer.writerow(['<TICKER>', '<PER>', '<DATE>', '<TIME>', '<OPEN>', '<HIGH>', '<LOW>', '<CLOSE>', '<VOLUME>']) |
||||
for k, v in sorted(data.items(), key=lambda x: x[1]['end_date']): |
||||
ticker = args.replace_ticker |
||||
if ticker is None: |
||||
ticker = k |
||||
write_to_file(writer, v['bars'], k, period) |
||||
|
||||
if __name__ == '__main__': |
||||
main() |
||||
|
||||
@ -0,0 +1,195 @@
@@ -0,0 +1,195 @@
|
||||
#!/usr/bin/env python3 |
||||
|
||||
import sys |
||||
import argparse |
||||
import zmq |
||||
import io |
||||
import json |
||||
import csv |
||||
import datetime |
||||
import struct |
||||
import termcolor |
||||
from pytz import timezone |
||||
|
||||
def sec_from_period(period): |
||||
if period == "M1": |
||||
return 60 |
||||
elif period == "M5": |
||||
return 60 * 5 |
||||
elif period == "M15": |
||||
return 60 * 15 |
||||
elif period == "M30": |
||||
return 60 * 30 |
||||
elif period == "H1": |
||||
return 60 * 60 |
||||
elif period == "D": |
||||
return 86400 |
||||
|
||||
def request_ticker_list(socket): |
||||
rq = { |
||||
"get_sec_list" : True, |
||||
} |
||||
|
||||
socket.send_multipart([bytes(json.dumps(rq), "utf-8")]) |
||||
resp = socket.recv() |
||||
|
||||
if resp != b'OK': |
||||
errmsg = s.recv_string() |
||||
print("Error:", errmsg) |
||||
sys.exit(1) |
||||
|
||||
|
||||
rawdata = b'' |
||||
while True: |
||||
if socket.getsockopt(zmq.RCVMORE) == 0: |
||||
break |
||||
rawdata += socket.recv() |
||||
|
||||
s = rawdata.decode('utf-8') |
||||
tickers = s.split(',') |
||||
|
||||
print("Got {} tickers".format(len(tickers))) |
||||
return tickers |
||||
|
||||
def get_month_by_code(code): |
||||
try: |
||||
mon = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'].index(code) |
||||
except ValueError: |
||||
return None |
||||
return mon + 1 |
||||
|
||||
def get_data(qhp, ticker, start_time, end_time, period, tz, timedelta): |
||||
rq = { |
||||
"ticker" : ticker, |
||||
"from" : start_time.strftime("%Y-%m-%dT%H:%M:%S"), |
||||
"to" : end_time.strftime("%Y-%m-%dT%H:%M:%S"), |
||||
"timeframe" : period |
||||
} |
||||
|
||||
qhp.send_multipart([bytes(json.dumps(rq), "utf-8")]) |
||||
resp = qhp.recv() |
||||
|
||||
if resp != b'OK': |
||||
errmsg = qhp.recv_string() |
||||
return None |
||||
|
||||
bar_count = 0 |
||||
result = [] |
||||
while True: |
||||
if qhp.getsockopt(zmq.RCVMORE) == 0: |
||||
break |
||||
rawdata = qhp.recv() |
||||
for line in struct.iter_unpack("<qddddQ", rawdata): |
||||
|
||||
timestamp = int(line[0]) |
||||
open_ = float(line[1]) |
||||
high = float(line[2]) |
||||
low = float(line[3]) |
||||
close = float(line[4]) |
||||
volume = int(line[5]) |
||||
dt = datetime.datetime.fromtimestamp(timestamp, tz) + timedelta |
||||
|
||||
bar_count += 1 |
||||
result.append((dt, open_, high, low, close, volume)) |
||||
|
||||
return result |
||||
|
||||
def upload_data(hap, data, ticker, period): |
||||
print("Uploading ticker: {}".format(ticker)) |
||||
serialized_bars = io.BytesIO() |
||||
min_dt = None |
||||
max_dt = None |
||||
|
||||
for bar in data: |
||||
dt = bar[0] |
||||
serialized_bars.write(struct.pack("<qddddQ", int(dt.timestamp()), bar[1], bar[2], bar[3], bar[4], bar[5])) |
||||
if min_dt is None: |
||||
min_dt = dt |
||||
else: |
||||
if dt < min_dt: |
||||
min_dt = dt |
||||
|
||||
if max_dt is None: |
||||
max_dt = dt |
||||
else: |
||||
if dt > max_dt: |
||||
max_dt = dt |
||||
|
||||
rq = { |
||||
"ticker" : ticker, |
||||
"start_time" : min_dt.strftime("%Y-%m-%dT%H:%M:%S"), |
||||
"end_time" : max_dt.strftime("%Y-%m-%dT%H:%M:%S"), |
||||
"timeframe_sec" : sec_from_period(period) |
||||
} |
||||
|
||||
raw_data = serialized_bars.getvalue() |
||||
|
||||
hap.send_multipart([bytes(json.dumps(rq), "utf-8"), raw_data]) |
||||
parts = hap.recv_multipart() |
||||
if parts[0] != b'OK': |
||||
return False |
||||
return True |
||||
|
||||
def convert_ticker(s, data): |
||||
if s.startswith("SPBFUT#"): |
||||
last_ts = data[-1][0] |
||||
year = int(s[-1]) |
||||
current_year = last_ts.date().year - 2000 |
||||
current_year_in_dec = current_year % 10 |
||||
current_dec = current_year - current_year_in_dec |
||||
current_mon = last_ts.date().month |
||||
mon = get_month_by_code(s[-2]) |
||||
if year >= current_year_in_dec: |
||||
return s[:-2] + ".{}-{}".format(mon, current_dec + year) |
||||
else: |
||||
return s[:-2] + ".{}-{}".format(mon, current_dec + 10 + year) |
||||
|
||||
else: |
||||
return s |
||||
|
||||
def main(): |
||||
parser = argparse.ArgumentParser(description='QHP-HAP transfer agent') |
||||
parser.add_argument('-q', '--qhp', action='store', dest='qhp', help='QHP endpoint', required=True) |
||||
parser.add_argument('-a', '--hap', action='store', dest='hap', help='HAP endpoint', required=True) |
||||
parser.add_argument('-f', '--from', action='store', dest='from_', help='Starting date', required=True) |
||||
parser.add_argument('-t', '--to', action='store', dest='to', help='Ending date', required=True) |
||||
parser.add_argument('-p', '--period', action='store', dest='period', help='Timeframe', required=True) |
||||
parser.add_argument('-d', '--time-delta', action='store', dest='time_delta', help='Add given time delta (in seconds)') |
||||
parser.add_argument('-z', '--timezone', action='store', dest='timezone', help='Timezone') |
||||
|
||||
args = parser.parse_args() |
||||
|
||||
start_time = datetime.datetime.strptime(args.from_, "%Y%m%d") |
||||
end_time = datetime.datetime.strptime(args.to, "%Y%m%d") |
||||
|
||||
ctx = zmq.Context.instance() |
||||
qhp = ctx.socket(zmq.REQ) |
||||
qhp.connect(args.qhp) |
||||
|
||||
hap = ctx.socket(zmq.REQ) |
||||
hap.connect(args.hap) |
||||
|
||||
tickers = request_ticker_list(qhp) |
||||
|
||||
tz = timezone('UTC') |
||||
if args.timezone is not None: |
||||
tz = timezone(args.timezone) |
||||
|
||||
timedelta = datetime.timedelta(seconds=0) |
||||
if args.time_delta is not None: |
||||
timedelta = datetime.timedelta(seconds=int(args.time_delta)) |
||||
|
||||
max_retries = 3 |
||||
for ticker in tickers: |
||||
for trynum in range(0, max_retries): |
||||
print("Requesting ticker from QHP: {}".format(ticker)) |
||||
data = get_data(qhp, ticker, start_time, end_time, args.period, tz, timedelta) |
||||
if data is not None: |
||||
if len(data) > 0: |
||||
upload_data(hap, data, convert_ticker(ticker, data), args.period) |
||||
break |
||||
else: |
||||
print("Timeout, retry {} of {}".format(trynum + 1, max_retries)) |
||||
|
||||
if __name__ == '__main__': |
||||
main() |
||||
Loading…
Reference in new issue