commit 2b43f2016b11697cb8279c8dbdb054d9e3d3c602 Author: Denis Tereshkin Date: Sun May 29 14:51:44 2022 +0700 Import qtis & zeromq tests diff --git a/qtis.lua b/qtis.lua new file mode 100644 index 0000000..27a80ce --- /dev/null +++ b/qtis.lua @@ -0,0 +1,222 @@ +local alien = require("alien") +json = require "JSON" +date = require "date" +require "string" +struct = require "alien.struct" + +w32sleep = alien.kernel32.Sleep +w32sleep:types { ret = "void", abi = "stdcall", "int" } + + +zmq_pollitem = alien.defstruct { + {"socket", "pointer"}, + {"fd", "int"}, + {"events", "short"}, + {"revents", "short"} +} + +zmq_ctx_new = alien.libzmq.zmq_ctx_new +zmq_ctx_new:types { ret = "pointer", abi="stdcall" } + +zmq_ctx_term = alien.libzmq.zmq_ctx_term +zmq_ctx_term:types { ret = "int", abi="stdcall", "pointer" } + +zmq_bind = alien.libzmq.zmq_bind +zmq_bind:types { ret = "int", abi="stdcall", "pointer", "string" } + +zmq_close = alien.libzmq.zmq_close +zmq_close:types { ret = "int", abi="stdcall", "pointer" } + +zmq_disconnect = alien.libzmq.zmq_disconnect +zmq_disconnect:types { ret = "int", abi="stdcall", "pointer", "string" } + +zmq_setsockopt = alien.libzmq.zmq_setsockopt +zmq_setsockopt:types { ret = "int", abi="stdcall", "pointer", "int", "pointer", "int" } + +zmq_getsockopt = alien.libzmq.zmq_setsockopt +zmq_getsockopt:types { ret = "int", abi="stdcall", "pointer", "int", "pointer", "int" } + +zmq_errno = alien.libzmq.zmq_errno +zmq_errno:types { ret = "int", abi="stdcall" } + +zmq_msg_close = alien.libzmq.zmq_msg_close +zmq_msg_close:types { ret = "int", abi="stdcall", "pointer" } + +zmq_msg_data = alien.libzmq.zmq_msg_data +zmq_msg_data:types { ret = "pointer", abi="stdcall", "pointer" } + +zmq_msg_get = alien.libzmq.zmq_msg_get +zmq_msg_get:types { ret = "int", abi="stdcall", "pointer", "int" } + +zmq_msg_init_size = alien.libzmq.zmq_msg_init_size +zmq_msg_init_size:types { ret = "int", abi="stdcall", "pointer", "int" } + +zmq_msg_init = alien.libzmq.zmq_msg_init +zmq_msg_init:types { ret = "int", abi="stdcall", "pointer" } + +zmq_msg_more = alien.libzmq.zmq_msg_more +zmq_msg_more:types { ret = "int", abi="stdcall", "pointer" } + +zmq_msg_recv = alien.libzmq.zmq_msg_recv +zmq_msg_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" } + +zmq_msg_send = alien.libzmq.zmq_msg_send +zmq_msg_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" } + +zmq_msg_set = alien.libzmq.zmq_msg_set +zmq_msg_set:types { ret = "int", abi="stdcall", "pointer", "int", "int" } + +zmq_msg_size = alien.libzmq.zmq_msg_size +zmq_msg_size:types { ret = "int", abi="stdcall", "pointer" } + +zmq_poll = alien.libzmq.zmq_poll +zmq_poll:types { ret = "int", abi="stdcall", "pointer", "int", "int" } + +zmq_recv = alien.libzmq.zmq_recv +zmq_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"} + +zmq_send = alien.libzmq.zmq_send +zmq_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"} + +zmq_socket = alien.libzmq.zmq_socket +zmq_socket:types { ret = "pointer", abi="stdcall", "pointer", "int"} + +zmq_version = alien.libzmq.zmq_version +zmq_version:types { ret = "void", abi="stdcall", "ref int", "ref int", "ref int"} + +zmq_unbind = alien.libzmq.zmq_unbind +zmq_unbind:types { ret = "int", abi="stdcall", "pointer", "string"} + +function zmq_msg_new() + local buf = alien.buffer(64) + return buf +end + +function zmq_msg_as_string(msg) + return alien.tostring(zmq_msg_data(msg), zmq_msg_size(msg)) +end + +ZMQ_PAIR = 0 +ZMQ_PUB = 1 +ZMQ_SUB = 2 +ZMQ_REQ = 3 +ZMQ_REP = 4 +ZMQ_DEALER = 5 +ZMQ_ROUTER = 6 +ZMQ_PULL = 7 +ZMQ_PUSH = 8 +ZMQ_XPUB = 9 +ZMQ_XSUB = 10 +ZMQ_STREAM = 11 + +ZMQ_POLLIN = 1 +ZMQ_POLLOUT = 2 +ZMQ_POLLERR = 4 + +ZMQ_SNDMORE = 2 +ZMQ_LINGER = 17 + +running = true + +function zmq_msg_from_string(str) + local msg = zmq_msg_new() + zmq_msg_init_size(msg, str:len()) + buf = zmq_msg_data(msg) + bsource = alien.buffer(str) + alien.memcpy(buf, bsource, str:len()) + return msg +end + +function zmq_msg_empty() + local msg = zmq_msg_new() + zmq_msg_init(msg) + return msg +end + +function get_data(class, code) + local lot_size = tonumber(getParamEx(class, code, "LOTSIZE").param_value) + local tick_size = tonumber(getParamEx(class, code, "SEC_PRICE_STEP").param_value) + local tick_value = tonumber(getParamEx(class, code, "STEPPRICET").param_value) + local long_name = getParamEx(class, code, "LONGNAME").param_image + + return { ticker = class .. "#" .. code, + lot_size = lot_size, + tick_size = tick_size, + tick_value = tick_value, + long_name = long_name } +end + +function handle_message(msg) + message("Incoming message") + local rq = json:decode(zmq_msg_as_string(msg)) + local class, code = string.match(rq.ticker, "(%w+)#(%w+)") + message("Requested: " .. rq.ticker) + + local data = get_data(class, code) + if data == nil then + message("Error") + return zmq_msg_from_string("ERROR: can't get data: (" .. class .. "/" .. code .. ")" ), zmq_msg_empty() + end + + message("Returning data") + return zmq_msg_from_string("OK"), zmq_msg_from_string(json:encode(data)) +end + +ctx = nil +sock = nil + +function OnStop() + if sock then + zmq_close(sock) + sock = nil + end + if ctx then + zmq_ctx_term(ctx) + ctx = nil + end +end + +function main() + local status, err = pcall(pmain) + if not status then + message("Error: " .. err) + end + OnStop() +end + +function pmain() + ctx = zmq_ctx_new() + sock = zmq_socket(ctx, ZMQ_REP) + local linger = alien.array("int", {0}) + zmq_setsockopt(sock, ZMQ_LINGER, linger.buffer, 4) + local rc = zmq_bind(sock, "tcp://*:5523") + if rc ~= 0 then + message("Bind error: " .. tostring(rc)) + return + end + + while running do + local pi = zmq_pollitem:new() + pi.socket = sock + pi.fd = 0 + pi.events = ZMQ_POLLIN + pi.revents = 0 + + rc = zmq_poll(pi(), 1, 60000) + if rc > 0 and pi.revents == ZMQ_POLLIN then + message("Incoming") + local msg = zmq_msg_new() + zmq_msg_init(msg) + rc = zmq_msg_recv(msg, sock, 0) + if rc == -1 then + running = false + message("rc: " .. tostring(rc) .. "; " .. tostring(zmq_errno())) + else + local outmsg_header, outmsg_data = handle_message(msg) + zmq_msg_send(outmsg_header, sock, ZMQ_SNDMORE) + zmq_msg_send(outmsg_data, sock, 0) + end + zmq_msg_close(msg) + end + end +end diff --git a/tests/test_zeromq.lua b/tests/test_zeromq.lua new file mode 100644 index 0000000..49e893a --- /dev/null +++ b/tests/test_zeromq.lua @@ -0,0 +1,87 @@ +require "luarocks.loader" +package.path = package.path .. ";../?.lua" + +local lu = require("luaunit") +local zmq = require("zeromq") + +function test_context_creation() + local ctx, err = zmq.ctx_new() + lu.assertNil(err) + ctx:term() +end + +function test_req_rep() + local err, msg + local ctx = zmq.ctx_new() + local rep = ctx:socket(zmq.REP) + rep:bind("inproc://foo") + rep:setsockopt_int(zmq.RCVTIMEO, 5000) + + local req = ctx:socket(zmq.REQ) + req:connect("inproc://foo") + + _, err = req:send({ "foo", "bar" }) + lu.assertNil(err) + + msg, err = rep:recv() + lu.assertNil(err) + + lu.assertEquals(#msg, 2) + lu.assertEquals(msg[1], "foo") + lu.assertEquals(msg[2], "bar") +end + +function test_router_pair() + local err, msg + local ctx = zmq.ctx_new() + local server = ctx:socket(zmq.ROUTER) + server:setsockopt_str(zmq.SOCKOPT_ROUTING_ID, "server") + server:setsockopt_int(zmq.SOCKOPT_RCVTIMEO, 1000) + server:bind("inproc://foo") + + local client = ctx:socket(zmq.ROUTER) + client:setsockopt_str(zmq.SOCKOPT_ROUTING_ID, "client") + client:setsockopt_int(zmq.SOCKOPT_ROUTER_MANDATORY, 1) + client:connect("inproc://foo") + + repeat + _, err = client:send({ "server", "foo" }) + until not err + + msg, err = server:recv() + lu.assertNil(err) + + lu.assertEquals(#msg, 2) + lu.assertEquals(msg[1], "client") + lu.assertEquals(msg[2], "foo") +end + +function test_poll() + local err, msg + local ctx = zmq.ctx_new() + local rep = ctx:socket(zmq.REP) + rep:bind("inproc://foo") + rep:setsockopt_int(zmq.RCVTIMEO, 5000) + + local req = ctx:socket(zmq.REQ) + req:connect("inproc://foo") + + _, err = req:send({ "foo", "bar" }) + lu.assertNil(err) + + local poller = ctx:poller() + local poll_result + poller:add(rep, zmq.POLLIN) + poll_result, err = poller:poll(1500) + lu.assertNil(err) + lu.assertEquals(#poll_result, 1) + if poll_result[1] == rep then + msg, err = rep:recv() + lu.assertNil(err) + lu.assertEquals(#msg, 2) + lu.assertEquals(msg[1], "foo") + lu.assertEquals(msg[2], "bar") + end + +end +os.exit(lu.LuaUnit.run()) diff --git a/zeromq.lua b/zeromq.lua new file mode 100644 index 0000000..ae1bf03 --- /dev/null +++ b/zeromq.lua @@ -0,0 +1,320 @@ +local alien = require("alien") + +libzmq = alien.load("/usr/lib/libzmq.so") + +local zmq_pollitem = alien.defstruct { + {"socket", "pointer"}, + {"fd", "int"}, + {"events", "short"}, + {"revents", "short"} +} + +local zmq_ctx_new = libzmq.zmq_ctx_new +zmq_ctx_new:types { ret = "pointer", abi="stdcall" } + +local zmq_ctx_term = libzmq.zmq_ctx_term +zmq_ctx_term:types { ret = "int", abi="stdcall", "pointer" } + +local zmq_bind = libzmq.zmq_bind +zmq_bind:types { ret = "int", abi="stdcall", "pointer", "string" } + +local zmq_connect = libzmq.zmq_connect +zmq_connect:types { ret = "int", abi="stdcall", "pointer", "string" } + +local zmq_close = libzmq.zmq_close +zmq_close:types { ret = "int", abi="stdcall", "pointer" } + +local zmq_disconnect = libzmq.zmq_disconnect +zmq_disconnect:types { ret = "int", abi="stdcall", "pointer", "string" } + +local zmq_setsockopt = libzmq.zmq_setsockopt +zmq_setsockopt:types { ret = "int", abi="stdcall", "pointer", "int", "pointer", "int" } + +local zmq_errno = libzmq.zmq_errno +zmq_errno:types { ret = "int", abi="stdcall" } + +local zmq_msg_close = libzmq.zmq_msg_close +zmq_msg_close:types { ret = "int", abi="stdcall", "pointer" } + +local zmq_msg_data = libzmq.zmq_msg_data +zmq_msg_data:types { ret = "pointer", abi="stdcall", "pointer" } + +local zmq_msg_get = libzmq.zmq_msg_get +zmq_msg_get:types { ret = "int", abi="stdcall", "pointer", "int" } + +local zmq_msg_init_size = libzmq.zmq_msg_init_size +zmq_msg_init_size:types { ret = "int", abi="stdcall", "pointer", "int" } + +local zmq_msg_init = libzmq.zmq_msg_init +zmq_msg_init:types { ret = "int", abi="stdcall", "pointer" } + +local zmq_msg_more = libzmq.zmq_msg_more +zmq_msg_more:types { ret = "int", abi="stdcall", "pointer" } + +local zmq_msg_recv = libzmq.zmq_msg_recv +zmq_msg_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" } + +local zmq_msg_send = libzmq.zmq_msg_send +zmq_msg_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" } + +local zmq_msg_set = libzmq.zmq_msg_set +zmq_msg_set:types { ret = "int", abi="stdcall", "pointer", "int", "int" } + +local zmq_msg_size = libzmq.zmq_msg_size +zmq_msg_size:types { ret = "int", abi="stdcall", "pointer" } + +local zmq_poll = libzmq.zmq_poll +zmq_poll:types { ret = "int", abi="stdcall", "pointer", "int", "long" } + +local zmq_recv = libzmq.zmq_recv +zmq_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"} + +local zmq_send = libzmq.zmq_send +zmq_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"} + +local zmq_socket = libzmq.zmq_socket +zmq_socket:types { ret = "pointer", abi="stdcall", "pointer", "int"} + +local zmq_version = libzmq.zmq_version +zmq_version:types { ret = "void", abi="stdcall", "ref int", "ref int", "ref int"} + +local zmq_unbind = libzmq.zmq_unbind +zmq_unbind:types { ret = "int", abi="stdcall", "pointer", "string"} + +local function zmq_msg_new() + local buf = alien.buffer(64) + return buf +end + +local function zmq_msg_as_string(msg) + return alien.tostring(zmq_msg_data(msg), zmq_msg_size(msg)) +end + +local ZMQ_PAIR = 0 +local ZMQ_PUB = 1 +local ZMQ_SUB = 2 +local ZMQ_REQ = 3 +local ZMQ_REP = 4 +local ZMQ_DEALER = 5 +local ZMQ_ROUTER = 6 +local ZMQ_PULL = 7 +local ZMQ_PUSH = 8 +local ZMQ_XPUB = 9 +local ZMQ_XSUB = 10 +local ZMQ_STREAM = 11 + +local ZMQ_POLLIN = 1 +local ZMQ_POLLOUT = 2 +local ZMQ_POLLERR = 4 + +local ZMQ_SNDMORE = 2 + +local ZMQ_ROUTING_ID = 5 +local ZMQ_LINGER = 17 +local ZMQ_RCVTIMEO = 27 +local ZMQ_SNDTIMEO = 28 +local ZMQ_ROUTER_MANDATORY = 33 + +local function check_if_error(value, success_value, err_value) + if value == err_value then + return nil, zmq_errno() + else + return success_value, nil + end +end + +local function ctx_term(ctx) + zmq_ctx_term(ctx.ctx) +end + +local function socket_bind(socket, endpoint) + return check_if_error( + zmq_bind(socket.socket, endpoint), nil, -1) +end + +local function socket_connect(socket, endpoint) + return check_if_error( + zmq_connect(socket.socket, endpoint), nil, -1) +end + +local function socket_setsockopt_int(socket, option_name, value) + local arr = alien.array("int", {value}) + return check_if_error( + zmq_setsockopt(socket.socket, option_name, arr.buffer:topointer(), arr.size), + nil, -1) +end + +local function socket_setsockopt_str(socket, option_name, value) + local arr = alien.buffer(value) + return check_if_error( + zmq_setsockopt(socket.socket, option_name, value, #arr - 1), + nil, -1) +end + +local function socket_close(socket) + return check_if_error( + zmq_close(socket.socket), nil, -1) +end + +local function socket_disconnect(socket) + return check_if_error( + zmq_disconnect(socket.socket), nil, -1) +end + +local function socket_send(socket, message) + local rc = 0 + if #message > 1 then + for i = 1, #message - 1 do + rc = zmq_send(socket.socket, message[i], string.len(message[i]), + ZMQ_SNDMORE) + if rc < 0 then + print(rc) + return nil, zmq_errno() + end + end + end + rc = zmq_send(socket.socket, message[#message], string.len(message[#message]), 0) + if rc < 0 then + return nil, zmq_errno() + end + return nil, nil +end + +local function socket_recv(socket) + + local msg = {} + local more = false + repeat + local part = zmq_msg_new() + if not part then + return nil, zmq_errno() + end + if not zmq_msg_init(part) then + return nil, zmq_errno() + end + local rc = zmq_msg_recv(part, socket.socket, 0) + if rc < 0 then + return nil, zmq_errno() + else + local len = zmq_msg_size(part) + local buf = alien.buffer(zmq_msg_data(part)) + msg[#msg + 1] = buf:tostring(len) + more = (zmq_msg_more(part) == 0) + end + rc = zmq_msg_close(part) + if rc == -1 then + return nil, zmq_errno() + end + until more + + return msg, nil +end + +local function poller_add_item(poller, sock, event) + table.insert(poller.items, { sock = sock, event = event }) +end + +local function poller_poll(poller, timeout) + local events_buffer = alien.buffer(zmq_pollitem.size * #poller.items) + local ptr = 1 + for _, item in ipairs(poller.items) do + local pollitem = zmq_pollitem:new(events_buffer:topointer(ptr)) + pollitem.socket = item.sock.socket + pollitem.fd = 0 + pollitem.events = item.event + pollitem.revents = 0 + ptr = ptr + zmq_pollitem.size + end + local rc = zmq_poll(events_buffer:topointer(), #poller.items, timeout) + if rc < 0 then + return nil, zmq_errno() + end + local result = {} + local added = 0 + ptr = 1 + for i = 1, #poller.items do + if added == rc then + break + end + local pollitem = zmq_pollitem:new(events_buffer:topointer(ptr)) + if pollitem.revents ~= 0 then + table.insert(result, poller.items[i]) + added = added + 1 + end + ptr = ptr + zmq_pollitem.size + end + return result +end + +local function mk_poller(ctx) + local p = { + ctx = ctx, + items = {}, + add = poller_add_item, + poll = poller_poll, + } + return p +end + +local function mk_socket(ctx, socktype) + local sock = zmq_socket(ctx.ctx, socktype) + return check_if_error(sock, + { + ctx = ctx.ctx, + socket = sock, + bind = socket_bind, + connect = socket_connect, + close = socket_close, + disconnect = socket_disconnect, + send = socket_send, + recv = socket_recv, + setsockopt_int = socket_setsockopt_int, + setsockopt_str = socket_setsockopt_str, + }, nil) +end + +local function ctx_new() + local ctx = zmq_ctx_new() + return check_if_error(ctx, + { + ctx = ctx, + term = ctx_term, + socket = mk_socket, + poller = mk_poller, + }, nil) +end + +local P = + { + ctx_new = ctx_new, + PUB = ZMQ_PUB, + SUB = ZMQ_SUB, + REQ = ZMQ_REQ, + REP = ZMQ_REP, + ROUTER = ZMQ_ROUTER, + DEALER = ZMQ_DEALER, + PAIR = ZMQ_PAIR, + PULL = ZMQ_PULL, + PUSH = ZMQ_PUSH, + XPUB = ZMQ_XPUB, + XSUB = ZMQ_XSUB, + STREAM = ZMQ_STREAM, + + POLLIN = ZMQ_POLLIN, + POLLOUT = ZMQ_POLLOUT, + POLLERR = ZMQ_POLLERR, + + SOCKOPT_RCVTIMEO = ZMQ_RCVTIMEO, + SOCKOPT_SNDTIMEO = ZMQ_SNDTIMEO, + SOCKOPT_LINGER = ZMQ_LINGER, + SOCKOPT_ROUTING_ID = ZMQ_ROUTING_ID, + SOCKOPT_ROUTER_MANDATORY = ZMQ_ROUTER_MANDATORY, + } + +if _REQUIREDNAME == nil then + zmq = P +else + _G[_REQUIREDNAME] = P +end + +return P