You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

394 lines
9.6 KiB

local alien = require("alien")
json = require "JSON"
date = require "date"
require "string"
struct = require "alien.struct"
w32sleep = alien.kernel32.Sleep
w32sleep:types { ret = "void", abi = "stdcall", "int" }
test_mode = false
test_file = nil
running = true
ctx = nil
callback_socket = nil
filter = {
["SPBFUT#RIH9"] = true,
["SPBFUT#MMH9"] = true,
["SPBFUT#USH9"] = true,
["SPBFUT#EuH9"] = true,
["SPBFUT#EDH9"] = true,
["SPBFUT#SiH9"] = true,
["SPBFUT#BRN9"] = true,
["SPBFUT#GDH9"] = true,
["SPBFUT#GZH9"] = true,
["SPBFUT#SRH9"] = true,
["TQBR#SBER"] = true,
["TQBR#GAZP"] = true,
["TQBR#LKOH"] = true,
["TQBR#NVTK"] = true,
["TQBR#MGNT"] = true,
["TQBR#GMKN"] = true,
["TQBR#ROSN"] = true,
["TQBR#VTBR"] = true,
["TQBR#SNGS"] = true,
["TQBR#MTSS"] = true,
["TQBR#ALRS"] = true,
["TQBR#TATN"] = true
}
data_sources = {}
bar_queue = {}
zmq_pollitem = alien.defstruct {
{"socket", "pointer"},
{"fd", "int"},
{"events", "short"},
{"revents", "short"}
}
zmq_ctx_new = alien.libzmq.zmq_ctx_new
zmq_ctx_new:types { ret = "pointer", abi="stdcall" }
zmq_ctx_term = alien.libzmq.zmq_ctx_term
zmq_ctx_term:types { ret = "int", abi="stdcall", "pointer" }
zmq_connect = alien.libzmq.zmq_connect
zmq_connect:types { ret = "int", abi="stdcall", "pointer", "string" }
zmq_bind = alien.libzmq.zmq_bind
zmq_bind:types { ret = "int", abi="stdcall", "pointer", "string" }
zmq_close = alien.libzmq.zmq_close
zmq_close:types { ret = "int", abi="stdcall", "pointer" }
zmq_disconnect = alien.libzmq.zmq_disconnect
zmq_disconnect:types { ret = "int", abi="stdcall", "pointer", "string" }
zmq_setsockopt = alien.libzmq.zmq_setsockopt
zmq_setsockopt:types { ret = "int", abi="stdcall", "pointer", "int", "pointer", "int" }
zmq_errno = alien.libzmq.zmq_errno
zmq_errno:types { ret = "int", abi="stdcall" }
zmq_msg_close = alien.libzmq.zmq_msg_close
zmq_msg_close:types { ret = "int", abi="stdcall", "pointer" }
zmq_msg_data = alien.libzmq.zmq_msg_data
zmq_msg_data:types { ret = "pointer", abi="stdcall", "pointer" }
zmq_msg_get = alien.libzmq.zmq_msg_get
zmq_msg_get:types { ret = "int", abi="stdcall", "pointer", "int" }
zmq_msg_init_size = alien.libzmq.zmq_msg_init_size
zmq_msg_init_size:types { ret = "int", abi="stdcall", "pointer", "int" }
zmq_msg_init = alien.libzmq.zmq_msg_init
zmq_msg_init:types { ret = "int", abi="stdcall", "pointer" }
zmq_msg_more = alien.libzmq.zmq_msg_more
zmq_msg_more:types { ret = "int", abi="stdcall", "pointer" }
zmq_msg_recv = alien.libzmq.zmq_msg_recv
zmq_msg_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" }
zmq_msg_send = alien.libzmq.zmq_msg_send
zmq_msg_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" }
zmq_msg_set = alien.libzmq.zmq_msg_set
zmq_msg_set:types { ret = "int", abi="stdcall", "pointer", "int", "int" }
zmq_msg_size = alien.libzmq.zmq_msg_size
zmq_msg_size:types { ret = "int", abi="stdcall", "pointer" }
zmq_poll = alien.libzmq.zmq_poll
zmq_poll:types { ret = "int", abi="stdcall", "pointer", "int", "int" }
zmq_recv = alien.libzmq.zmq_recv
zmq_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"}
zmq_send = alien.libzmq.zmq_send
zmq_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"}
zmq_socket = alien.libzmq.zmq_socket
zmq_socket:types { ret = "pointer", abi="stdcall", "pointer", "int"}
zmq_version = alien.libzmq.zmq_version
zmq_version:types { ret = "void", abi="stdcall", "ref int", "ref int", "ref int"}
zmq_unbind = alien.libzmq.zmq_unbind
zmq_unbind:types { ret = "int", abi="stdcall", "pointer", "string"}
function zmq_msg_new()
local buf = alien.buffer(64)
return buf
end
function zmq_msg_as_string(msg)
return alien.tostring(zmq_msg_data(msg), zmq_msg_size(msg))
end
ZMQ_PAIR = 0
ZMQ_PUB = 1
ZMQ_SUB = 2
ZMQ_REQ = 3
ZMQ_REP = 4
ZMQ_DEALER = 5
ZMQ_ROUTER = 6
ZMQ_PULL = 7
ZMQ_PUSH = 8
ZMQ_XPUB = 9
ZMQ_XSUB = 10
ZMQ_STREAM = 11
ZMQ_POLLIN = 1
ZMQ_POLLOUT = 2
ZMQ_POLLERR = 4
ZMQ_DONTWAIT = 1
ZMQ_SNDMORE = 2
ZMQ_LINGER = 17
running = true
input_csv = nil
function open_csv(filename)
input_csv, err = io.open(filename, "r")
if input_csv == nil then
PrintDbgStr(err)
end
input_csv:read("*line") -- Discard header
end
function next_bar(ticker_id)
l = input_csv:read("*line")
pattern = "[^,],%d,(%d+),(%d+),([%d.]),([%d.]),([%d.]),([%d.]),(%d)"
date_,time_,open_,high_,low_,close_,volume_ = string.find(l, pattern)
open = tonumber(open_)
high = tonumber(high_)
low = tonumber(low_)
close = tonumber(close_)
volume = tonumber(volume_)
dt = {
year = tonumber(date_.sub(1, 4)),
month = tonumber(date_.sub(5, 6)),
day = tonumber(date_.sub(7, 8)),
hour = tonumber(time_.sub(1, 2)),
min = tonumber(time_.sub(3, 4)),
sec = tonumber(time_.sub(5, 6)),
ms = 0
}
return { ticker_id, dt, open, high, low, close, volume }
end
function to_unixtime(year, month, day)
return date.diff(date(year, month, day), date(1970, 1, 1)):spanseconds()
end
function to_unixtime_fine(year, month, day, hour, min, sec)
return date.diff(date(year, month, day, hour, min, sec), date(1970, 1, 1)):spanseconds()
end
function zmq_msg_from_string(str)
local msg = zmq_msg_new()
zmq_msg_init_size(msg, str:len())
buf = zmq_msg_data(msg)
bsource = alien.buffer(str)
alien.memcpy(buf, bsource, str:len())
return msg
end
function zmq_msg_empty()
local msg = zmq_msg_new()
zmq_msg_init(msg)
return msg
end
function header_from_bar(bar)
local msg = zmq_msg_new()
ticker_id = bar[1]
header = ticker_id .. ":60;"
return zmq_msg_from_string(header)
end
function deconstruct_datetime(dt)
dt_int = to_unixtime_fine(dt.year, dt.month, dt.day, dt.hour, dt.min, dt.sec)
dt_frac = dt.ms * 1000
return dt_int, dt_frac
end
function data_from_bar(bar)
local msg = zmq_msg_new()
dt = bar[2]
o = bar[3]
h = bar[4]
l = bar[5]
c = bar[6]
v = bar[7]
dt_int, dt_frac = deconstruct_datetime(dt)
end_of_bar_dt_int = dt_int + 60
s = struct.pack("<IILIddddI", 2, 60, end_of_bar_dt_int, dt_frac, o, h, l, c, v)
return zmq_msg_from_string(s)
end
function send_param_entry(sock, class, sec)
full_code = (class .. "#" .. sec)
s = struct.pack("<BBsdidiidii", 1, full_code:len(), full_code, getParamEx(class, sec, "LAST").param_value,
math.floor(getParamEx(class, sec, "VOLTODAY").param_value),
getParamEx(class, sec, "BID").param_value,
math.floor(getParamEx(class, sec, "BIDDEPTH").param_value),
math.floor(getParamEx(class, sec, "BIDDEPTHT").param_value),
getParamEx(class, sec, "OFFER").param_value,
math.floor(getParamEx(class, sec, "OFFERDEPTH").param_value),
math.floor(getParamEx(class, sec, "OFFERDEPTHT").param_value))
zmq_msg_send(zmq_msg_from_string(s), callback_socket, 0)
end
function dt_as_string(datetime)
mcs = 0
if datetime.mcs ~= nil then
mcs = datetime.mcs
end
sec = os.time(datetime)
sec = sec - 3 * 3600 -- Convert to UTC
dt = os.date("*t", sec)
return tostring(dt.year) .. "." ..
tostring(dt.month) .. "." ..
tostring(dt.day) .. " " ..
tostring(dt.hour) .. ":" ..
tostring(dt.min) .. ":" ..
tostring(dt.sec) .. "." ..
tostring(math.floor(mcs / 1000))
end
function send_all_trade_entry(sock, ticker_id, price, quantity, datetime, flags)
mcs = 0
if datetime.mcs ~= nil then
mcs = datetime.mcs
end
sec = os.time(datetime)
sec = sec - 3 * 3600 -- Convert to UTC
dt = os.date("*t", sec)
s = struct.pack("<BBsdiHBBBBBHi", 2,
ticker_id:len(),
ticker_id,
price,
quantity,
dt.year,
dt.month,
dt.day,
dt.hour,
dt.min,
dt.sec,
mcs,
flags)
zmq_msg_send(zmq_msg_from_string(s), callback_socket, 0)
end
function OnParam(class, sec)
local full_code = class .. "#" .. sec
if filter[full_code] ~= nil then
send_param_entry(sock, class, sec)
end
end
function OnAllTrade(alltrade)
local full_code = alltrade.class_code .. "#" .. alltrade.sec_code
if filter[full_code] ~= nil then
send_all_trade_entry(sock, full_code, alltrade.price, alltrade.qty, alltrade.datetime, alltrade.flags)
end
end
function OnStop()
running = false
end
function OnNewBar(ticker_id, source, i)
table.sinsert(bar_queue, {ticker_id, source:T(i), source:O(i), source:H(i), source:L(i), source:C(i), source:V(i)})
end
function unpack_classcode(s)
x, _ = s:find("#")
return s.sub(1, x-1), s.sub(x + 1)
end
function make_callback(ticker_id, source, callback)
local f = function(i)
callback(ticker_id, source, i)
end
return f
end
function main()
local status, err = pcall(pmain)
if not status then
message("Error: " .. err)
PrintDbgStr("Error: " .. err)
end
OnStop()
end
function pmain()
ctx = zmq_ctx_new()
sock = zmq_socket(ctx, ZMQ_PUB)
local linger = alien.array("int", {0})
zmq_setsockopt(sock, ZMQ_LINGER, linger.buffer, 4)
local rc = zmq_bind(sock, "tcp://127.0.0.1:5513")
if rc ~= 0 then
message("Bind error: " .. tostring(rc))
return
end
if not test_mode then
for k, v in filter do
class, code = unpack_classcode(k)
ds, Error = CreateDataSource(class, code, INTERVAL_M1)
if Error ~= nil and Error ~= "" then
message("Can't request stream: " .. k)
end
data_sources[k] = ds
ds:SetUpdateCallback(make_callback(k, ds, OnNewBar))
end
else
open_csv(test_file)
end
message("Starting main loop")
local last_action = os.time()
local prev_bar_timestamp = 0
while running do
local bar = next(bar_queue)
if bar ~= nil then
zmq_msg_send(header_from_bar(bar), sock, ZMQ_SNDMORE)
zmq_msg_send(data_from_bar(bar), sock, 0)
table.sremove(bar_queue, 1)
else
sleep(10)
end
if test_mode then
local now = os.clock()
if now - prev_bar_timestamp > 1 then
prev_bar_timestamp = now
local b = next_bar("TEST")
table.sinsert(bar_queue, b)
end
end
end
zmq_close(sock)
zmq_ctx_term(ctx)
end