Denis Tereshkin 5 years ago
parent
commit
1b0af52d20
  1. 63
      hap_csv_upload.py
  2. 211
      qhp-download-futures.py
  3. 5
      qhp-download.py
  4. 223
      qhp-hap-transfer.py
  5. 81
      stitch_futures.py

63
hap_csv_upload.py

@ -8,7 +8,9 @@ import json
import csv import csv
import datetime import datetime
import struct import struct
import re
from pytz import timezone
def sec_from_period(period): def sec_from_period(period):
if period == "M1": if period == "M1":
@ -24,21 +26,35 @@ def sec_from_period(period):
elif period == "D": elif period == "D":
return 86400 return 86400
def get_month_code(month):
if month < 1 or month > 12:
return None
codes = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z']
return codes[month - 1]
def main(): def main():
parser = argparse.ArgumentParser(description='Finam quote downloader') parser = argparse.ArgumentParser(description='Finam quote downloader')
parser.add_argument('-i', '--input-file', action='store', dest='input_file', help='Input filename', required=True) parser.add_argument('-i', '--input-file', action='store', dest='input_file', help='Input filename', required=True)
parser.add_argument('-p', '--timeframe', action='store', dest='timeframe', help='Data timeframe', required=True) parser.add_argument('-p', '--timeframe', action='store', dest='timeframe', help='Data timeframe', required=True)
parser.add_argument('-o', '--hap', action='store', dest='hap', help='HAP endpoint', required=True) parser.add_argument('-o', '--hap', action='store', dest='hap', help='HAP endpoint', required=True)
parser.add_argument('-y', '--hap-symbol', action='store', dest='hap_symbol', help='HAP symbol', required=True) parser.add_argument('-y', '--hap-symbol', action='store', dest='hap_symbol', help='HAP symbol', required=True)
parser.add_argument('-d', '--time-delta', action='store', dest='time_delta', help='Time delta (hours)') parser.add_argument('-d', '--time-delta', action='store', dest='time_delta', help='Time delta (seconds)')
parser.add_argument('-f', '--force-from', action='store', dest='force_from', help='Force period start') parser.add_argument('-f', '--force-from', action='store', dest='force_from', help='Force period start')
parser.add_argument('-t', '--force-to', action='store', dest='force_to', help='Force period end') parser.add_argument('-t', '--force-to', action='store', dest='force_to', help='Force period end')
parser.add_argument('-z', '--timezone', action='store', dest='timezone', help='Timestamps timezone')
args = parser.parse_args() args = parser.parse_args()
period = args.timeframe period = args.timeframe
utc_tz = timezone('UTC')
if args.timezone is None:
tz = utc_tz
else:
tz = timezone(args.timezone)
out_symbol = args.hap_symbol out_symbol = args.hap_symbol
ctx = zmq.Context.instance() ctx = zmq.Context.instance()
@ -52,11 +68,18 @@ def main():
time_delta = datetime.timedelta(seconds=int(args.time_delta)) time_delta = datetime.timedelta(seconds=int(args.time_delta))
print('Applying delta:', time_delta) print('Applying delta:', time_delta)
line_count = 0 line_count = 0
ticker = None
with open(args.input_file, 'r') as f: with open(args.input_file, 'r') as f:
reader = csv.reader(f, delimiter=',') reader = csv.reader(f, delimiter=',')
next(reader) next(reader)
for line in reader: for line in reader:
line_count += 1 line_count += 1
if ticker is None:
ticker = line[0]
elif ticker != line[0]:
print('Different tickers in file, aborting')
break
date = line[2] date = line[2]
time = line[3] time = line[3]
open_ = line[4] open_ = line[4]
@ -72,8 +95,9 @@ def main():
minute = int(time[2:4]) minute = int(time[2:4])
second = int(time[4:6]) second = int(time[4:6])
dt = datetime.datetime(year, month, day, hour, minute, second, 0, datetime.timezone.utc) - time_delta dt = datetime.datetime(year, month, day, hour, minute, second, 0, utc_tz).astimezone(tz) - time_delta
dt = dt.astimezone(utc_tz)
serialized_bars.write(struct.pack("<qddddQ", int(dt.timestamp()), float(open_), float(high), float(low), float(close), int(volume))) serialized_bars.write(struct.pack("<qddddQ", int(dt.timestamp()), float(open_), float(high), float(low), float(close), int(volume)))
if min_dt is None: if min_dt is None:
@ -88,15 +112,40 @@ def main():
if dt > max_dt: if dt > max_dt:
max_dt = dt max_dt = dt
if line_count > 0:
if args.force_from is not None: if args.force_from is not None:
min_dt = datetime.datetime.strptime(args.force_from, "%Y%m%d") min_dt = datetime.datetime.strptime(args.force_from, "%Y%m%d")
if args.force_to is not None: if args.force_to is not None:
max_dt = datetime.datetime.strptime(args.force_to, "%Y%m%d") max_dt = datetime.datetime.strptime(args.force_to, "%Y%m%d")
out_ticker = out_symbol
if out_symbol[0] == '@':
base = out_symbol[1:]
matches = re.match('^([^-]+)-(\\d+)\\.(\\d+)$', ticker)
if not matches:
print('Invalid ticker id in file')
return
year_code = matches.group(3)[-1]
month_code = get_month_code(int(matches.group(2)))
out_ticker = base + month_code + year_code
elif out_symbol[0] == '~':
base = out_symbol[1:]
matches = re.match('^([^-]+)-(\\d+)\\.(\\d+)$', ticker)
if not matches:
print('Invalid ticker id in file')
return
year_code = matches.group(3)
month_code = matches.group(2)
out_ticker = base + "-" + month_code + '.' + year_code
print("Resulting ticker: {}".format(out_ticker))
rq = { rq = {
"ticker" : out_symbol, "ticker" : out_ticker,
"start_time" : min_dt.strftime("%Y-%m-%dT%H:%M:%S"), "start_time" : min_dt.strftime("%Y-%m-%dT%H:%M:%S"),
"end_time" : max_dt.strftime("%Y-%m-%dT%H:%M:%S"), "end_time" : max_dt.strftime("%Y-%m-%dT%H:%M:%S"),
"timeframe_sec" : sec_from_period(period) "timeframe_sec" : sec_from_period(period)
@ -109,8 +158,12 @@ def main():
s.send_multipart([bytes(json.dumps(rq), "utf-8"), raw_data]) s.send_multipart([bytes(json.dumps(rq), "utf-8"), raw_data])
parts = s.recv_multipart() parts = s.recv_multipart()
print("Response:", parts) print("Response:", parts)
return True
if __name__ == '__main__': if __name__ == '__main__':
main() ret = main()
if ret is None:
sys.exit(1)

211
qhp-download-futures.py

@ -0,0 +1,211 @@
#!/usr/bin/env python3
import sys
import argparse
import zmq
import io
import json
import csv
import datetime
import struct
from pytz import timezone
def timeframe_to_seconds(tf):
if tf == 'M1':
return 60
elif tf == 'M5':
return 5 * 60
elif tf == 'M15':
return 15 * 60
elif tf == 'H1':
return 3600
elif tf == 'D':
return 86400
elif tf == 'W':
return 7 * 86400
else:
raise ValueError('Invalid value')
class BarAggregator:
def __init__(self, timeframe):
self.open_ = 0
self.high = 0
self.low = 0
self.close = 0
self.volume = 0
self.timestamp = None
self.current_bar_number = None
self.timeframe = timeframe
def push_bar(self, timestamp, open_, high, low, close, volume):
bar_number = timestamp.timestamp() // self.timeframe
if bar_number != self.current_bar_number:
b_open = self.open_
b_high = self.high
b_low = self.low
b_close = self.close
b_volume = self.volume
if self.current_bar_number is not None:
b_timestamp = datetime.datetime.fromtimestamp(self.current_bar_number * self.timeframe)
self.open_ = open_
self.high = high
self.low = low
self.close = close
self.volume = volume
self.timestamp = timestamp
prev_bar_number = self.current_bar_number
self.current_bar_number = bar_number
if prev_bar_number is not None:
return (b_timestamp, b_open, b_high, b_low, b_close, b_volume)
else:
self.high = max(high, self.high)
self.low = min(low, self.low)
self.close = close
self.volume += volume
return None
def get_bar(self):
b_open = self.open_
b_high = self.high
b_low = self.low
b_close = self.close
b_volume = self.volume
b_timestamp = datetime.datetime.fromtimestamp(self.timeframe * ( self.timestamp.timestamp() // self.timeframe))
return (b_timestamp, b_open, b_high, b_low, b_close, b_volume)
def get_data(qhp, ticker, start_time, end_time, period, tz, timedelta):
rq = {
"ticker" : ticker,
"from" : start_time.strftime("%Y-%m-%dT%H:%M:%S"),
"to" : end_time.strftime("%Y-%m-%dT%H:%M:%S"),
"timeframe" : period
}
qhp.send_multipart([bytes(json.dumps(rq), "utf-8")])
resp = qhp.recv()
if resp != b'OK':
errmsg = qhp.recv_string()
return None
bar_count = 0
result = []
while True:
if qhp.getsockopt(zmq.RCVMORE) == 0:
break
rawdata = qhp.recv()
for line in struct.iter_unpack("<qddddQ", rawdata):
timestamp = int(line[0])
open_ = float(line[1])
high = float(line[2])
low = float(line[3])
close = float(line[4])
volume = int(line[5])
dt = datetime.datetime.fromtimestamp(timestamp, tz) + timedelta
bar_count += 1
result.append((dt, open_, high, low, close, volume))
return result
def write_to_file(writer, bars, ticker, period):
for bar in bars:
writer.writerow([ticker, period, bar[0].strftime("%Y%m%d"), bar[0].strftime("%H%M%S"), bar[1], bar[2], bar[3], bar[4], bar[5]])
def make_tickers_list(base, start_time, end_time, futures_interval):
result = []
month = start_time.date().month
year = start_time.date().year
while True:
if month % futures_interval == 0:
result.append(base + '-' + str(month) + '.' + str(year)[-2:])
if month > end_time.date().month and year >= end_time.date().year:
break
month += 1
if month > 12:
month = 1
year += 1
return result
def main():
parser = argparse.ArgumentParser(description='QHP client')
parser.add_argument('-o', '--output-file', action='store', dest='output_file', help='Output filename', required=True)
parser.add_argument('-p', '--timeframe', action='store', dest='timeframe', help='Data timeframe', required=True)
parser.add_argument('-q', '--qhp', action='store', dest='qhp', help='QHP endpoint', required=True)
parser.add_argument('-y', '--symbol', action='store', dest='symbol', help='Base symbol', required=True)
parser.add_argument('-f', '--from', action='store', dest='from_', help='Starting date', required=True)
parser.add_argument('-t', '--to', action='store', dest='to', help='Ending date', required=True)
parser.add_argument('-r', '--rescale', action='store', dest='rescale', help='Rescale to timeframe')
parser.add_argument('-d', '--time-delta', action='store', dest='time_delta', help='Add given time delta (in seconds)', required=False)
parser.add_argument('-i', '--futures-interval', action='store', dest='futures_interval', help='Futures interval between exprations in month', required=True)
parser.add_argument('-s', '--stitch-delta', action='store', dest='stitch_delta', help='Futures interval between exprations in month', required=True)
parser.add_argument('-e', '--replace-ticker', action='store', dest='replace_ticker', help='Replace ticker id in file', required=False)
args = parser.parse_args()
period = args.timeframe
symbol = args.symbol
filename = args.output_file
ctx = zmq.Context.instance()
s = ctx.socket(zmq.REQ)
s.connect(args.qhp)
start_time = datetime.datetime.strptime(args.from_, "%Y%m%d")
end_time = datetime.datetime.strptime(args.to, "%Y%m%d")
timedelta = datetime.timedelta()
if args.time_delta:
timedelta = datetime.timedelta(seconds=int(args.time_delta))
delta = int(args.stitch_delta)
agg = None
if args.rescale:
agg = BarAggregator(int(args.rescale))
data = {}
tickers = make_tickers_list(symbol, start_time, end_time, int(args.futures_interval))
print("Tickers: {}".format(tickers))
for ticker in tickers:
print("Requesting data: {}".format(ticker))
bars = get_data(s, ticker, start_time, end_time, period, timezone('UTC'), timedelta)
if len(bars) > 0:
data[ticker] = { 'bars' : bars }
print("Cutting off trailing data: {}".format(ticker))
end_date = data[ticker]['bars'][-1][0]
cutoff_date = datetime.date.fromordinal(end_date.toordinal() - delta)
#cutoff_date_num = cutoff_date.year * 10000 + cutoff_date.month * 100 + cutoff_date.day
data[ticker]['bars'] = [s for s in data[ticker]['bars'] if s[0].date() <= cutoff_date]
data[ticker]['end_date'] = cutoff_date
prev_ticker = None
for k, v in sorted(data.items(), key=lambda x: x[1]['end_date']):
print("Cutting off starting data: {}".format(k))
if prev_ticker is not None:
start_date = data[prev_ticker]['bars'][-1][0]
v['bars'] = [s for s in data[k]['bars'] if s[0] > start_date]
prev_ticker = k
with open(args.output_file, 'w+') as f:
writer = csv.writer(f)
writer.writerow(['<TICKER>', '<PER>', '<DATE>', '<TIME>', '<OPEN>', '<HIGH>', '<LOW>', '<CLOSE>', '<VOLUME>'])
for k, v in sorted(data.items(), key=lambda x: x[1]['end_date']):
ticker = args.replace_ticker
if ticker is None:
ticker = k
write_to_file(writer, v['bars'], k, period)
if __name__ == '__main__':
main()

5
qhp-download.py

@ -85,6 +85,7 @@ def main():
parser.add_argument('-t', '--to', action='store', dest='to', help='Ending date', required=True) parser.add_argument('-t', '--to', action='store', dest='to', help='Ending date', required=True)
parser.add_argument('-r', '--rescale', action='store', dest='rescale', help='Rescale to timeframe') parser.add_argument('-r', '--rescale', action='store', dest='rescale', help='Rescale to timeframe')
parser.add_argument('-d', '--time-delta', action='store', dest='time_delta', help='Add given time delta (in seconds)', required=False) parser.add_argument('-d', '--time-delta', action='store', dest='time_delta', help='Add given time delta (in seconds)', required=False)
parser.add_argument('-c', '--replace-ticker', action='store', dest='replace_ticker', help='Resulting symbol')
args = parser.parse_args() args = parser.parse_args()
@ -92,6 +93,8 @@ def main():
symbol = args.symbol symbol = args.symbol
filename = args.output_file filename = args.output_file
replace_ticker = args.replace_ticker
ctx = zmq.Context.instance() ctx = zmq.Context.instance()
s = ctx.socket(zmq.REQ) s = ctx.socket(zmq.REQ)
s.connect(args.qhp) s.connect(args.qhp)
@ -125,6 +128,8 @@ def main():
print("Error:", errmsg) print("Error:", errmsg)
sys.exit(1) sys.exit(1)
if replace_ticker is not None:
symbol = replace_ticker
line_count = 0 line_count = 0
with open(args.output_file, 'w', newline='') as f: with open(args.output_file, 'w', newline='') as f:

223
qhp-hap-transfer.py

@ -0,0 +1,223 @@
#!/usr/bin/env python3
import sys
import argparse
import zmq
import io
import json
import csv
import datetime
import struct
import re
from pytz import timezone
def sec_from_period(period):
if period == "M1":
return 60
elif period == "M5":
return 60 * 5
elif period == "M15":
return 60 * 15
elif period == "M30":
return 60 * 30
elif period == "H1":
return 60 * 60
elif period == "D":
return 86400
def request_ticker_list(socket):
rq = {
"get_sec_list" : True,
}
socket.send_multipart([bytes(json.dumps(rq), "utf-8")])
resp = socket.recv()
if resp != b'OK':
errmsg = s.recv_string()
print("Error:", errmsg)
sys.exit(1)
rawdata = b''
while True:
if socket.getsockopt(zmq.RCVMORE) == 0:
break
rawdata += socket.recv()
s = rawdata.decode('utf-8')
tickers = s.split(',')
print("Got {} tickers".format(len(tickers)))
return tickers
def get_month_by_code(code):
try:
mon = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'].index(code)
except ValueError:
return None
return mon + 1
def get_data(qhp, ticker, start_time, end_time, period, tz, timedelta):
rq = {
"ticker" : ticker,
"from" : start_time.strftime("%Y-%m-%dT%H:%M:%S"),
"to" : end_time.strftime("%Y-%m-%dT%H:%M:%S"),
"timeframe" : period
}
qhp.send_multipart([bytes(json.dumps(rq), "utf-8")])
resp = qhp.recv()
if resp != b'OK':
errmsg = qhp.recv_string()
return None
bar_count = 0
result = []
while True:
if qhp.getsockopt(zmq.RCVMORE) == 0:
break
rawdata = qhp.recv()
for line in struct.iter_unpack("<qddddQ", rawdata):
timestamp = int(line[0])
open_ = float(line[1])
high = float(line[2])
low = float(line[3])
close = float(line[4])
volume = int(line[5])
dt = datetime.datetime.fromtimestamp(timestamp, tz) + timedelta
bar_count += 1
result.append((dt, open_, high, low, close, volume))
return result
def upload_data(hap, data, ticker, period):
print("Uploading ticker: {}".format(ticker))
serialized_bars = io.BytesIO()
min_dt = None
max_dt = None
for bar in data:
dt = bar[0]
serialized_bars.write(struct.pack("<qddddQ", int(dt.timestamp()), bar[1], bar[2], bar[3], bar[4], bar[5]))
if min_dt is None:
min_dt = dt
else:
if dt < min_dt:
min_dt = dt
if max_dt is None:
max_dt = dt
else:
if dt > max_dt:
max_dt = dt
rq = {
"ticker" : ticker,
"start_time" : min_dt.strftime("%Y-%m-%dT%H:%M:%S"),
"end_time" : max_dt.strftime("%Y-%m-%dT%H:%M:%S"),
"timeframe_sec" : sec_from_period(period)
}
raw_data = serialized_bars.getvalue()
hap.send_multipart([bytes(json.dumps(rq), "utf-8"), raw_data])
parts = hap.recv_multipart()
if parts[0] != b'OK':
return False
return True
def convert_ticker(s, data):
if s.startswith("SPBFUT#"):
last_ts = data[-1][0]
year = int(s[-1])
current_year = last_ts.date().year - 2000
current_year_in_dec = current_year % 10
current_dec = current_year - current_year_in_dec
current_mon = last_ts.date().month
mon = get_month_by_code(s[-2])
if year >= current_year_in_dec:
return s[:-2] + ".{}-{}".format(mon, current_dec + year)
else:
return s[:-2] + ".{}-{}".format(mon, current_dec + 10 + year)
else:
return s
def load_blacklist(filename):
result = []
with open(filename, 'r') as f:
lines = f.readlines()
for line in lines:
line = line.strip()
if line != "":
result.append(re.compile(line))
return result
def allow_ticker(blacklist, ticker):
for rx in blacklist:
if rx.match(ticker):
return False
return True
def main():
parser = argparse.ArgumentParser(description='QHP-HAP transfer agent')
parser.add_argument('-q', '--qhp', action='store', dest='qhp', help='QHP endpoint', required=True)
parser.add_argument('-a', '--hap', action='store', dest='hap', help='HAP endpoint', required=True)
parser.add_argument('-f', '--from', action='store', dest='from_', help='Starting date', required=True)
parser.add_argument('-t', '--to', action='store', dest='to', help='Ending date', required=True)
parser.add_argument('-p', '--period', action='store', dest='period', help='Timeframe', required=True)
parser.add_argument('-d', '--time-delta', action='store', dest='time_delta', help='Add given time delta (in seconds)')
parser.add_argument('-z', '--timezone', action='store', dest='timezone', help='Timezone')
parser.add_argument('-b', '--blacklist-file', action='store', dest='blacklist_file', help='File with blacklisted tickers')
args = parser.parse_args()
start_time = datetime.datetime.strptime(args.from_, "%Y%m%d")
end_time = datetime.datetime.strptime(args.to, "%Y%m%d")
ctx = zmq.Context.instance()
qhp = ctx.socket(zmq.REQ)
qhp.connect(args.qhp)
hap = ctx.socket(zmq.REQ)
hap.connect(args.hap)
tickers = request_ticker_list(qhp)
tz = timezone('UTC')
if args.timezone is not None:
tz = timezone(args.timezone)
timedelta = datetime.timedelta(seconds=0)
if args.time_delta is not None:
timedelta = datetime.timedelta(seconds=int(args.time_delta))
blacklist = []
if args.blacklist_file is not None:
blacklist = load_blacklist(args.blacklist_file)
max_retries = 3
for ticker in tickers:
for trynum in range(0, max_retries):
if allow_ticker(blacklist, ticker):
print("Requesting ticker from QHP: {}".format(ticker))
data = get_data(qhp, ticker, start_time, end_time, args.period, tz, timedelta)
if data is not None:
if len(data) > 0:
upload_data(hap, data, convert_ticker(ticker, data), args.period)
break
else:
print("Timeout, retry {} of {}".format(trynum + 1, max_retries))
else:
print("Skipping blacklisted ticker: {}".format(ticker))
if __name__ == '__main__':
main()

81
stitch_futures.py

@ -0,0 +1,81 @@
#!/usr/bin/env python3
import sys
import argparse
import csv
import os
import datetime
def parse_date(x):
return datetime.datetime.strptime(x, '%Y%m%d').date()
def read_file(f):
reader = csv.reader(f, delimiter=',')
ticker = None
next(reader)
result = { 'bars' : [] }
for line in reader:
result['bars'].append(line)
if ticker is None:
ticker = line[0]
result['ticker'] = ticker
return result
def write_to_file(writer, bars, ticker):
for bar in bars:
if ticker is not None:
bar[0] = ticker
writer.writerow(bar)
def main():
parser = argparse.ArgumentParser(description='Stitch futures')
parser.add_argument('-i', '--input-directory', action='store', dest='input_directory', help='Input directory', required=True)
parser.add_argument('-o', '--output-file', action='store', dest='output_file', help='Output filename', required=True)
parser.add_argument('-d', '--stitch-delta', action='store', dest='stitch_delta', help='Offset at which stitching occurs (days)', required=False)
parser.add_argument('-t', '--ticker', action='store', dest='replace_ticker', help='Replace ticker')
args = parser.parse_args()
input_directory = args.input_directory
output_file = args.output_file
delta = int(args.stitch_delta)
ticker = args.replace_ticker
data = []
for filename in os.listdir(input_directory):
full_name = os.path.join(input_directory, filename)
print("Reading {}".format(full_name))
with open(full_name, 'r') as f:
data.append(read_file(f))
for f in data:
print("Cutting off trailing data: {}".format(f['ticker']))
end_date = parse_date(f['bars'][-1][2])
cutoff_date = datetime.date.fromordinal(end_date.toordinal() - delta)
cutoff_date_num = cutoff_date.year * 10000 + cutoff_date.month * 100 + cutoff_date.day
f['bars'] = [s for s in f['bars'] if int(s[2]) <= cutoff_date_num]
f['end_date'] = cutoff_date
data.sort(key=lambda x: x['end_date'])
for i in range(1, len(data)):
print("Cutting off starting data: {}".format(data[i]['ticker']))
start_date = parse_date(data[i - 1]['bars'][-1][2])
start_date_num = start_date.year * 10000 + start_date.month * 100 + start_date.day
data[i]['bars'] = [s for s in data[i]['bars'] if int(s[2]) > start_date_num]
with open(args.output_file, 'w+') as f:
writer = csv.writer(f)
writer.writerow(['<TICKER>', '<PER>', '<DATE>', '<TIME>', '<OPEN>', '<HIGH>', '<LOW>', '<CLOSE>', '<VOLUME>'])
for d in data:
write_to_file(writer, d['bars'], ticker)
if __name__ == '__main__':
main()
Loading…
Cancel
Save