MDS supporting tools
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

170 lines
5.8 KiB

#!/usr/bin/env python3
import sys
import argparse
import zmq
import io
import json
import csv
import datetime
import struct
def timeframe_to_seconds(tf):
if tf == 'M1':
return 60
elif tf == 'M5':
return 5 * 60
elif tf == 'M15':
return 15 * 60
elif tf == 'H1':
return 3600
elif tf == 'D':
return 86400
elif tf == 'W':
return 7 * 86400
else:
raise ValueError('Invalid value')
class BarAggregator:
def __init__(self, timeframe):
self.open_ = 0
self.high = 0
self.low = 0
self.close = 0
self.volume = 0
self.timestamp = None
self.current_bar_number = None
self.timeframe = timeframe
def push_bar(self, timestamp, open_, high, low, close, volume):
bar_number = timestamp.timestamp() // self.timeframe
if bar_number != self.current_bar_number:
b_open = self.open_
b_high = self.high
b_low = self.low
b_close = self.close
b_volume = self.volume
if self.current_bar_number is not None:
b_timestamp = datetime.datetime.fromtimestamp(self.current_bar_number * self.timeframe)
self.open_ = open_
self.high = high
self.low = low
self.close = close
self.volume = volume
self.timestamp = timestamp
prev_bar_number = self.current_bar_number
self.current_bar_number = bar_number
if prev_bar_number is not None:
return (b_timestamp, b_open, b_high, b_low, b_close, b_volume)
else:
self.high = max(high, self.high)
self.low = min(low, self.low)
self.close = close
self.volume += volume
return None
def get_bar(self):
b_open = self.open_
b_high = self.high
b_low = self.low
b_close = self.close
b_volume = self.volume
b_timestamp = datetime.datetime.fromtimestamp(self.timeframe * ( self.timestamp.timestamp() // self.timeframe))
return (b_timestamp, b_open, b_high, b_low, b_close, b_volume)
def main():
parser = argparse.ArgumentParser(description='QHP client')
parser.add_argument('-o', '--output-file', action='store', dest='output_file', help='Output filename', required=True)
parser.add_argument('-p', '--timeframe', action='store', dest='timeframe', help='Data timeframe', required=True)
parser.add_argument('-q', '--qhp', action='store', dest='qhp', help='QHP endpoint', required=True)
parser.add_argument('-y', '--symbol', action='store', dest='symbol', help='Symbol to download', required=True)
parser.add_argument('-f', '--from', action='store', dest='from_', help='Starting date', required=True)
parser.add_argument('-t', '--to', action='store', dest='to', help='Ending date', required=True)
parser.add_argument('-r', '--rescale', action='store', dest='rescale', help='Rescale to timeframe')
parser.add_argument('-d', '--time-delta', action='store', dest='time_delta', help='Add given time delta (in seconds)', required=False)
args = parser.parse_args()
period = args.timeframe
symbol = args.symbol
filename = args.output_file
ctx = zmq.Context.instance()
s = ctx.socket(zmq.REQ)
s.connect(args.qhp)
start_time = datetime.datetime.strptime(args.from_, "%Y%m%d")
end_time = datetime.datetime.strptime(args.to, "%Y%m%d")
timedelta = datetime.timedelta()
if args.time_delta:
timedelta = datetime.timedelta(seconds=int(args.time_delta))
agg = None
if args.rescale:
agg = BarAggregator(int(args.rescale))
rq = {
"ticker" : symbol,
"from" : start_time.strftime("%Y-%m-%dT%H:%M:%S"),
"to" : end_time.strftime("%Y-%m-%dT%H:%M:%S"),
"timeframe" : period
}
print("Sending request:", rq)
s.send_multipart([bytes(json.dumps(rq), "utf-8")])
print("Awaiting response")
resp = s.recv()
print(resp)
if resp != b'OK':
errmsg = s.recv_string()
print("Error:", errmsg)
sys.exit(1)
line_count = 0
with open(args.output_file, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow(['<TICKER>', '<PER>', '<DATE>', '<TIME>', '<OPEN>', '<HIGH>', '<LOW>', '<CLOSE>', '<VOLUME>'])
while True:
if s.getsockopt(zmq.RCVMORE) == 0:
break
rawdata = s.recv()
print("Got chunk: {} bytes".format(len(rawdata)))
for line in struct.iter_unpack("<qddddQ", rawdata):
timestamp = int(line[0])
open_ = float(line[1])
high = float(line[2])
low = float(line[3])
close = float(line[4])
volume = int(line[5])
dt = datetime.datetime.utcfromtimestamp(timestamp) + timedelta
if agg:
mbar = agg.push_bar(dt, open_, high, low, close, volume)
if mbar is not None:
line_count += 1
writer.writerow([symbol, agg.timeframe, mbar[0].strftime('%Y%m%d'), mbar[0].strftime('%H%M%S'), str(mbar[1]), str(mbar[2]), str(mbar[3]), str(mbar[4]), str(mbar[5])])
else:
line_count += 1
writer.writerow([symbol, period, dt.strftime('%Y%m%d'), dt.strftime('%H%M%S'), str(open_), str(high), str(low), str(close), str(volume)])
if agg:
mbar = agg.get_bar()
if mbar is not None:
line_count += 1
writer.writerow([symbol, agg.timeframe, mbar[0].strftime('%Y%m%d'), mbar[0].strftime('%H%M%S'), str(mbar[1]), str(mbar[2]), str(mbar[3]), str(mbar[4]), str(mbar[5])])
print("Written {} lines".format(line_count))
if __name__ == '__main__':
main()