local alien = require("alien") json = require "JSON" date = require "date" require "string" struct = require "alien.struct" w32sleep = alien.kernel32.Sleep w32sleep:types { ret = "void", abi = "stdcall", "int" } test_mode = false test_file = nil running = true ctx = nil callback_socket = nil filter = { ["SPBFUT#RIH9"] = true, ["SPBFUT#MMH9"] = true, ["SPBFUT#USH9"] = true, ["SPBFUT#EuH9"] = true, ["SPBFUT#EDH9"] = true, ["SPBFUT#SiH9"] = true, ["SPBFUT#BRN9"] = true, ["SPBFUT#GDH9"] = true, ["SPBFUT#GZH9"] = true, ["SPBFUT#SRH9"] = true, ["TQBR#SBER"] = true, ["TQBR#GAZP"] = true, ["TQBR#LKOH"] = true, ["TQBR#NVTK"] = true, ["TQBR#MGNT"] = true, ["TQBR#GMKN"] = true, ["TQBR#ROSN"] = true, ["TQBR#VTBR"] = true, ["TQBR#SNGS"] = true, ["TQBR#MTSS"] = true, ["TQBR#ALRS"] = true, ["TQBR#TATN"] = true } data_sources = {} bar_queue = {} zmq_pollitem = alien.defstruct { {"socket", "pointer"}, {"fd", "int"}, {"events", "short"}, {"revents", "short"} } zmq_ctx_new = alien.libzmq.zmq_ctx_new zmq_ctx_new:types { ret = "pointer", abi="stdcall" } zmq_ctx_term = alien.libzmq.zmq_ctx_term zmq_ctx_term:types { ret = "int", abi="stdcall", "pointer" } zmq_connect = alien.libzmq.zmq_connect zmq_connect:types { ret = "int", abi="stdcall", "pointer", "string" } zmq_bind = alien.libzmq.zmq_bind zmq_bind:types { ret = "int", abi="stdcall", "pointer", "string" } zmq_close = alien.libzmq.zmq_close zmq_close:types { ret = "int", abi="stdcall", "pointer" } zmq_disconnect = alien.libzmq.zmq_disconnect zmq_disconnect:types { ret = "int", abi="stdcall", "pointer", "string" } zmq_setsockopt = alien.libzmq.zmq_setsockopt zmq_setsockopt:types { ret = "int", abi="stdcall", "pointer", "int", "pointer", "int" } zmq_errno = alien.libzmq.zmq_errno zmq_errno:types { ret = "int", abi="stdcall" } zmq_msg_close = alien.libzmq.zmq_msg_close zmq_msg_close:types { ret = "int", abi="stdcall", "pointer" } zmq_msg_data = alien.libzmq.zmq_msg_data zmq_msg_data:types { ret = "pointer", abi="stdcall", "pointer" } zmq_msg_get = alien.libzmq.zmq_msg_get zmq_msg_get:types { ret = "int", abi="stdcall", "pointer", "int" } zmq_msg_init_size = alien.libzmq.zmq_msg_init_size zmq_msg_init_size:types { ret = "int", abi="stdcall", "pointer", "int" } zmq_msg_init = alien.libzmq.zmq_msg_init zmq_msg_init:types { ret = "int", abi="stdcall", "pointer" } zmq_msg_more = alien.libzmq.zmq_msg_more zmq_msg_more:types { ret = "int", abi="stdcall", "pointer" } zmq_msg_recv = alien.libzmq.zmq_msg_recv zmq_msg_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" } zmq_msg_send = alien.libzmq.zmq_msg_send zmq_msg_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" } zmq_msg_set = alien.libzmq.zmq_msg_set zmq_msg_set:types { ret = "int", abi="stdcall", "pointer", "int", "int" } zmq_msg_size = alien.libzmq.zmq_msg_size zmq_msg_size:types { ret = "int", abi="stdcall", "pointer" } zmq_poll = alien.libzmq.zmq_poll zmq_poll:types { ret = "int", abi="stdcall", "pointer", "int", "int" } zmq_recv = alien.libzmq.zmq_recv zmq_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"} zmq_send = alien.libzmq.zmq_send zmq_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"} zmq_socket = alien.libzmq.zmq_socket zmq_socket:types { ret = "pointer", abi="stdcall", "pointer", "int"} zmq_version = alien.libzmq.zmq_version zmq_version:types { ret = "void", abi="stdcall", "ref int", "ref int", "ref int"} zmq_unbind = alien.libzmq.zmq_unbind zmq_unbind:types { ret = "int", abi="stdcall", "pointer", "string"} function zmq_msg_new() local buf = alien.buffer(64) return buf end function zmq_msg_as_string(msg) return alien.tostring(zmq_msg_data(msg), zmq_msg_size(msg)) end ZMQ_PAIR = 0 ZMQ_PUB = 1 ZMQ_SUB = 2 ZMQ_REQ = 3 ZMQ_REP = 4 ZMQ_DEALER = 5 ZMQ_ROUTER = 6 ZMQ_PULL = 7 ZMQ_PUSH = 8 ZMQ_XPUB = 9 ZMQ_XSUB = 10 ZMQ_STREAM = 11 ZMQ_POLLIN = 1 ZMQ_POLLOUT = 2 ZMQ_POLLERR = 4 ZMQ_DONTWAIT = 1 ZMQ_SNDMORE = 2 ZMQ_LINGER = 17 running = true input_csv = nil function open_csv(filename) input_csv, err = io.open(filename, "r") if input_csv == nil then PrintDbgStr(err) end input_csv:read("*line") -- Discard header end function next_bar(ticker_id) l = input_csv:read("*line") pattern = "[^,],%d,(%d+),(%d+),([%d.]),([%d.]),([%d.]),([%d.]),(%d)" date_,time_,open_,high_,low_,close_,volume_ = string.find(l, pattern) open = tonumber(open_) high = tonumber(high_) low = tonumber(low_) close = tonumber(close_) volume = tonumber(volume_) dt = { year = tonumber(date_.sub(1, 4)), month = tonumber(date_.sub(5, 6)), day = tonumber(date_.sub(7, 8)), hour = tonumber(time_.sub(1, 2)), min = tonumber(time_.sub(3, 4)), sec = tonumber(time_.sub(5, 6)), ms = 0 } return { ticker_id, dt, open, high, low, close, volume } end function to_unixtime(year, month, day) return date.diff(date(year, month, day), date(1970, 1, 1)):spanseconds() end function to_unixtime_fine(year, month, day, hour, min, sec) return date.diff(date(year, month, day, hour, min, sec), date(1970, 1, 1)):spanseconds() end function zmq_msg_from_string(str) local msg = zmq_msg_new() zmq_msg_init_size(msg, str:len()) buf = zmq_msg_data(msg) bsource = alien.buffer(str) alien.memcpy(buf, bsource, str:len()) return msg end function zmq_msg_empty() local msg = zmq_msg_new() zmq_msg_init(msg) return msg end function header_from_bar(bar) local msg = zmq_msg_new() ticker_id = bar[1] header = ticker_id .. ":60;" return zmq_msg_from_string(header) end function deconstruct_datetime(dt) dt_int = to_unixtime_fine(dt.year, dt.month, dt.day, dt.hour, dt.min, dt.sec) dt_frac = dt.ms * 1000 return dt_int, dt_frac end function data_from_bar(bar) local msg = zmq_msg_new() dt = bar[2] o = bar[3] h = bar[4] l = bar[5] c = bar[6] v = bar[7] dt_int, dt_frac = deconstruct_datetime(dt) end_of_bar_dt_int = dt_int + 60 s = struct.pack(" 1 then prev_bar_timestamp = now local b = next_bar("TEST") table.sinsert(bar_queue, b) end end end zmq_close(sock) zmq_ctx_term(ctx) end