Browse Source

Import qtis & zeromq tests

master
Denis Tereshkin 4 years ago
commit
2b43f2016b
  1. 222
      qtis.lua
  2. 87
      tests/test_zeromq.lua
  3. 320
      zeromq.lua

222
qtis.lua

@ -0,0 +1,222 @@ @@ -0,0 +1,222 @@
local alien = require("alien")
json = require "JSON"
date = require "date"
require "string"
struct = require "alien.struct"
w32sleep = alien.kernel32.Sleep
w32sleep:types { ret = "void", abi = "stdcall", "int" }
zmq_pollitem = alien.defstruct {
{"socket", "pointer"},
{"fd", "int"},
{"events", "short"},
{"revents", "short"}
}
zmq_ctx_new = alien.libzmq.zmq_ctx_new
zmq_ctx_new:types { ret = "pointer", abi="stdcall" }
zmq_ctx_term = alien.libzmq.zmq_ctx_term
zmq_ctx_term:types { ret = "int", abi="stdcall", "pointer" }
zmq_bind = alien.libzmq.zmq_bind
zmq_bind:types { ret = "int", abi="stdcall", "pointer", "string" }
zmq_close = alien.libzmq.zmq_close
zmq_close:types { ret = "int", abi="stdcall", "pointer" }
zmq_disconnect = alien.libzmq.zmq_disconnect
zmq_disconnect:types { ret = "int", abi="stdcall", "pointer", "string" }
zmq_setsockopt = alien.libzmq.zmq_setsockopt
zmq_setsockopt:types { ret = "int", abi="stdcall", "pointer", "int", "pointer", "int" }
zmq_getsockopt = alien.libzmq.zmq_setsockopt
zmq_getsockopt:types { ret = "int", abi="stdcall", "pointer", "int", "pointer", "int" }
zmq_errno = alien.libzmq.zmq_errno
zmq_errno:types { ret = "int", abi="stdcall" }
zmq_msg_close = alien.libzmq.zmq_msg_close
zmq_msg_close:types { ret = "int", abi="stdcall", "pointer" }
zmq_msg_data = alien.libzmq.zmq_msg_data
zmq_msg_data:types { ret = "pointer", abi="stdcall", "pointer" }
zmq_msg_get = alien.libzmq.zmq_msg_get
zmq_msg_get:types { ret = "int", abi="stdcall", "pointer", "int" }
zmq_msg_init_size = alien.libzmq.zmq_msg_init_size
zmq_msg_init_size:types { ret = "int", abi="stdcall", "pointer", "int" }
zmq_msg_init = alien.libzmq.zmq_msg_init
zmq_msg_init:types { ret = "int", abi="stdcall", "pointer" }
zmq_msg_more = alien.libzmq.zmq_msg_more
zmq_msg_more:types { ret = "int", abi="stdcall", "pointer" }
zmq_msg_recv = alien.libzmq.zmq_msg_recv
zmq_msg_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" }
zmq_msg_send = alien.libzmq.zmq_msg_send
zmq_msg_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" }
zmq_msg_set = alien.libzmq.zmq_msg_set
zmq_msg_set:types { ret = "int", abi="stdcall", "pointer", "int", "int" }
zmq_msg_size = alien.libzmq.zmq_msg_size
zmq_msg_size:types { ret = "int", abi="stdcall", "pointer" }
zmq_poll = alien.libzmq.zmq_poll
zmq_poll:types { ret = "int", abi="stdcall", "pointer", "int", "int" }
zmq_recv = alien.libzmq.zmq_recv
zmq_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"}
zmq_send = alien.libzmq.zmq_send
zmq_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"}
zmq_socket = alien.libzmq.zmq_socket
zmq_socket:types { ret = "pointer", abi="stdcall", "pointer", "int"}
zmq_version = alien.libzmq.zmq_version
zmq_version:types { ret = "void", abi="stdcall", "ref int", "ref int", "ref int"}
zmq_unbind = alien.libzmq.zmq_unbind
zmq_unbind:types { ret = "int", abi="stdcall", "pointer", "string"}
function zmq_msg_new()
local buf = alien.buffer(64)
return buf
end
function zmq_msg_as_string(msg)
return alien.tostring(zmq_msg_data(msg), zmq_msg_size(msg))
end
ZMQ_PAIR = 0
ZMQ_PUB = 1
ZMQ_SUB = 2
ZMQ_REQ = 3
ZMQ_REP = 4
ZMQ_DEALER = 5
ZMQ_ROUTER = 6
ZMQ_PULL = 7
ZMQ_PUSH = 8
ZMQ_XPUB = 9
ZMQ_XSUB = 10
ZMQ_STREAM = 11
ZMQ_POLLIN = 1
ZMQ_POLLOUT = 2
ZMQ_POLLERR = 4
ZMQ_SNDMORE = 2
ZMQ_LINGER = 17
running = true
function zmq_msg_from_string(str)
local msg = zmq_msg_new()
zmq_msg_init_size(msg, str:len())
buf = zmq_msg_data(msg)
bsource = alien.buffer(str)
alien.memcpy(buf, bsource, str:len())
return msg
end
function zmq_msg_empty()
local msg = zmq_msg_new()
zmq_msg_init(msg)
return msg
end
function get_data(class, code)
local lot_size = tonumber(getParamEx(class, code, "LOTSIZE").param_value)
local tick_size = tonumber(getParamEx(class, code, "SEC_PRICE_STEP").param_value)
local tick_value = tonumber(getParamEx(class, code, "STEPPRICET").param_value)
local long_name = getParamEx(class, code, "LONGNAME").param_image
return { ticker = class .. "#" .. code,
lot_size = lot_size,
tick_size = tick_size,
tick_value = tick_value,
long_name = long_name }
end
function handle_message(msg)
message("Incoming message")
local rq = json:decode(zmq_msg_as_string(msg))
local class, code = string.match(rq.ticker, "(%w+)#(%w+)")
message("Requested: " .. rq.ticker)
local data = get_data(class, code)
if data == nil then
message("Error")
return zmq_msg_from_string("ERROR: can't get data: (" .. class .. "/" .. code .. ")" ), zmq_msg_empty()
end
message("Returning data")
return zmq_msg_from_string("OK"), zmq_msg_from_string(json:encode(data))
end
ctx = nil
sock = nil
function OnStop()
if sock then
zmq_close(sock)
sock = nil
end
if ctx then
zmq_ctx_term(ctx)
ctx = nil
end
end
function main()
local status, err = pcall(pmain)
if not status then
message("Error: " .. err)
end
OnStop()
end
function pmain()
ctx = zmq_ctx_new()
sock = zmq_socket(ctx, ZMQ_REP)
local linger = alien.array("int", {0})
zmq_setsockopt(sock, ZMQ_LINGER, linger.buffer, 4)
local rc = zmq_bind(sock, "tcp://*:5523")
if rc ~= 0 then
message("Bind error: " .. tostring(rc))
return
end
while running do
local pi = zmq_pollitem:new()
pi.socket = sock
pi.fd = 0
pi.events = ZMQ_POLLIN
pi.revents = 0
rc = zmq_poll(pi(), 1, 60000)
if rc > 0 and pi.revents == ZMQ_POLLIN then
message("Incoming")
local msg = zmq_msg_new()
zmq_msg_init(msg)
rc = zmq_msg_recv(msg, sock, 0)
if rc == -1 then
running = false
message("rc: " .. tostring(rc) .. "; " .. tostring(zmq_errno()))
else
local outmsg_header, outmsg_data = handle_message(msg)
zmq_msg_send(outmsg_header, sock, ZMQ_SNDMORE)
zmq_msg_send(outmsg_data, sock, 0)
end
zmq_msg_close(msg)
end
end
end

87
tests/test_zeromq.lua

@ -0,0 +1,87 @@ @@ -0,0 +1,87 @@
require "luarocks.loader"
package.path = package.path .. ";../?.lua"
local lu = require("luaunit")
local zmq = require("zeromq")
function test_context_creation()
local ctx, err = zmq.ctx_new()
lu.assertNil(err)
ctx:term()
end
function test_req_rep()
local err, msg
local ctx = zmq.ctx_new()
local rep = ctx:socket(zmq.REP)
rep:bind("inproc://foo")
rep:setsockopt_int(zmq.RCVTIMEO, 5000)
local req = ctx:socket(zmq.REQ)
req:connect("inproc://foo")
_, err = req:send({ "foo", "bar" })
lu.assertNil(err)
msg, err = rep:recv()
lu.assertNil(err)
lu.assertEquals(#msg, 2)
lu.assertEquals(msg[1], "foo")
lu.assertEquals(msg[2], "bar")
end
function test_router_pair()
local err, msg
local ctx = zmq.ctx_new()
local server = ctx:socket(zmq.ROUTER)
server:setsockopt_str(zmq.SOCKOPT_ROUTING_ID, "server")
server:setsockopt_int(zmq.SOCKOPT_RCVTIMEO, 1000)
server:bind("inproc://foo")
local client = ctx:socket(zmq.ROUTER)
client:setsockopt_str(zmq.SOCKOPT_ROUTING_ID, "client")
client:setsockopt_int(zmq.SOCKOPT_ROUTER_MANDATORY, 1)
client:connect("inproc://foo")
repeat
_, err = client:send({ "server", "foo" })
until not err
msg, err = server:recv()
lu.assertNil(err)
lu.assertEquals(#msg, 2)
lu.assertEquals(msg[1], "client")
lu.assertEquals(msg[2], "foo")
end
function test_poll()
local err, msg
local ctx = zmq.ctx_new()
local rep = ctx:socket(zmq.REP)
rep:bind("inproc://foo")
rep:setsockopt_int(zmq.RCVTIMEO, 5000)
local req = ctx:socket(zmq.REQ)
req:connect("inproc://foo")
_, err = req:send({ "foo", "bar" })
lu.assertNil(err)
local poller = ctx:poller()
local poll_result
poller:add(rep, zmq.POLLIN)
poll_result, err = poller:poll(1500)
lu.assertNil(err)
lu.assertEquals(#poll_result, 1)
if poll_result[1] == rep then
msg, err = rep:recv()
lu.assertNil(err)
lu.assertEquals(#msg, 2)
lu.assertEquals(msg[1], "foo")
lu.assertEquals(msg[2], "bar")
end
end
os.exit(lu.LuaUnit.run())

320
zeromq.lua

@ -0,0 +1,320 @@ @@ -0,0 +1,320 @@
local alien = require("alien")
libzmq = alien.load("/usr/lib/libzmq.so")
local zmq_pollitem = alien.defstruct {
{"socket", "pointer"},
{"fd", "int"},
{"events", "short"},
{"revents", "short"}
}
local zmq_ctx_new = libzmq.zmq_ctx_new
zmq_ctx_new:types { ret = "pointer", abi="stdcall" }
local zmq_ctx_term = libzmq.zmq_ctx_term
zmq_ctx_term:types { ret = "int", abi="stdcall", "pointer" }
local zmq_bind = libzmq.zmq_bind
zmq_bind:types { ret = "int", abi="stdcall", "pointer", "string" }
local zmq_connect = libzmq.zmq_connect
zmq_connect:types { ret = "int", abi="stdcall", "pointer", "string" }
local zmq_close = libzmq.zmq_close
zmq_close:types { ret = "int", abi="stdcall", "pointer" }
local zmq_disconnect = libzmq.zmq_disconnect
zmq_disconnect:types { ret = "int", abi="stdcall", "pointer", "string" }
local zmq_setsockopt = libzmq.zmq_setsockopt
zmq_setsockopt:types { ret = "int", abi="stdcall", "pointer", "int", "pointer", "int" }
local zmq_errno = libzmq.zmq_errno
zmq_errno:types { ret = "int", abi="stdcall" }
local zmq_msg_close = libzmq.zmq_msg_close
zmq_msg_close:types { ret = "int", abi="stdcall", "pointer" }
local zmq_msg_data = libzmq.zmq_msg_data
zmq_msg_data:types { ret = "pointer", abi="stdcall", "pointer" }
local zmq_msg_get = libzmq.zmq_msg_get
zmq_msg_get:types { ret = "int", abi="stdcall", "pointer", "int" }
local zmq_msg_init_size = libzmq.zmq_msg_init_size
zmq_msg_init_size:types { ret = "int", abi="stdcall", "pointer", "int" }
local zmq_msg_init = libzmq.zmq_msg_init
zmq_msg_init:types { ret = "int", abi="stdcall", "pointer" }
local zmq_msg_more = libzmq.zmq_msg_more
zmq_msg_more:types { ret = "int", abi="stdcall", "pointer" }
local zmq_msg_recv = libzmq.zmq_msg_recv
zmq_msg_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" }
local zmq_msg_send = libzmq.zmq_msg_send
zmq_msg_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" }
local zmq_msg_set = libzmq.zmq_msg_set
zmq_msg_set:types { ret = "int", abi="stdcall", "pointer", "int", "int" }
local zmq_msg_size = libzmq.zmq_msg_size
zmq_msg_size:types { ret = "int", abi="stdcall", "pointer" }
local zmq_poll = libzmq.zmq_poll
zmq_poll:types { ret = "int", abi="stdcall", "pointer", "int", "long" }
local zmq_recv = libzmq.zmq_recv
zmq_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"}
local zmq_send = libzmq.zmq_send
zmq_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"}
local zmq_socket = libzmq.zmq_socket
zmq_socket:types { ret = "pointer", abi="stdcall", "pointer", "int"}
local zmq_version = libzmq.zmq_version
zmq_version:types { ret = "void", abi="stdcall", "ref int", "ref int", "ref int"}
local zmq_unbind = libzmq.zmq_unbind
zmq_unbind:types { ret = "int", abi="stdcall", "pointer", "string"}
local function zmq_msg_new()
local buf = alien.buffer(64)
return buf
end
local function zmq_msg_as_string(msg)
return alien.tostring(zmq_msg_data(msg), zmq_msg_size(msg))
end
local ZMQ_PAIR = 0
local ZMQ_PUB = 1
local ZMQ_SUB = 2
local ZMQ_REQ = 3
local ZMQ_REP = 4
local ZMQ_DEALER = 5
local ZMQ_ROUTER = 6
local ZMQ_PULL = 7
local ZMQ_PUSH = 8
local ZMQ_XPUB = 9
local ZMQ_XSUB = 10
local ZMQ_STREAM = 11
local ZMQ_POLLIN = 1
local ZMQ_POLLOUT = 2
local ZMQ_POLLERR = 4
local ZMQ_SNDMORE = 2
local ZMQ_ROUTING_ID = 5
local ZMQ_LINGER = 17
local ZMQ_RCVTIMEO = 27
local ZMQ_SNDTIMEO = 28
local ZMQ_ROUTER_MANDATORY = 33
local function check_if_error(value, success_value, err_value)
if value == err_value then
return nil, zmq_errno()
else
return success_value, nil
end
end
local function ctx_term(ctx)
zmq_ctx_term(ctx.ctx)
end
local function socket_bind(socket, endpoint)
return check_if_error(
zmq_bind(socket.socket, endpoint), nil, -1)
end
local function socket_connect(socket, endpoint)
return check_if_error(
zmq_connect(socket.socket, endpoint), nil, -1)
end
local function socket_setsockopt_int(socket, option_name, value)
local arr = alien.array("int", {value})
return check_if_error(
zmq_setsockopt(socket.socket, option_name, arr.buffer:topointer(), arr.size),
nil, -1)
end
local function socket_setsockopt_str(socket, option_name, value)
local arr = alien.buffer(value)
return check_if_error(
zmq_setsockopt(socket.socket, option_name, value, #arr - 1),
nil, -1)
end
local function socket_close(socket)
return check_if_error(
zmq_close(socket.socket), nil, -1)
end
local function socket_disconnect(socket)
return check_if_error(
zmq_disconnect(socket.socket), nil, -1)
end
local function socket_send(socket, message)
local rc = 0
if #message > 1 then
for i = 1, #message - 1 do
rc = zmq_send(socket.socket, message[i], string.len(message[i]),
ZMQ_SNDMORE)
if rc < 0 then
print(rc)
return nil, zmq_errno()
end
end
end
rc = zmq_send(socket.socket, message[#message], string.len(message[#message]), 0)
if rc < 0 then
return nil, zmq_errno()
end
return nil, nil
end
local function socket_recv(socket)
local msg = {}
local more = false
repeat
local part = zmq_msg_new()
if not part then
return nil, zmq_errno()
end
if not zmq_msg_init(part) then
return nil, zmq_errno()
end
local rc = zmq_msg_recv(part, socket.socket, 0)
if rc < 0 then
return nil, zmq_errno()
else
local len = zmq_msg_size(part)
local buf = alien.buffer(zmq_msg_data(part))
msg[#msg + 1] = buf:tostring(len)
more = (zmq_msg_more(part) == 0)
end
rc = zmq_msg_close(part)
if rc == -1 then
return nil, zmq_errno()
end
until more
return msg, nil
end
local function poller_add_item(poller, sock, event)
table.insert(poller.items, { sock = sock, event = event })
end
local function poller_poll(poller, timeout)
local events_buffer = alien.buffer(zmq_pollitem.size * #poller.items)
local ptr = 1
for _, item in ipairs(poller.items) do
local pollitem = zmq_pollitem:new(events_buffer:topointer(ptr))
pollitem.socket = item.sock.socket
pollitem.fd = 0
pollitem.events = item.event
pollitem.revents = 0
ptr = ptr + zmq_pollitem.size
end
local rc = zmq_poll(events_buffer:topointer(), #poller.items, timeout)
if rc < 0 then
return nil, zmq_errno()
end
local result = {}
local added = 0
ptr = 1
for i = 1, #poller.items do
if added == rc then
break
end
local pollitem = zmq_pollitem:new(events_buffer:topointer(ptr))
if pollitem.revents ~= 0 then
table.insert(result, poller.items[i])
added = added + 1
end
ptr = ptr + zmq_pollitem.size
end
return result
end
local function mk_poller(ctx)
local p = {
ctx = ctx,
items = {},
add = poller_add_item,
poll = poller_poll,
}
return p
end
local function mk_socket(ctx, socktype)
local sock = zmq_socket(ctx.ctx, socktype)
return check_if_error(sock,
{
ctx = ctx.ctx,
socket = sock,
bind = socket_bind,
connect = socket_connect,
close = socket_close,
disconnect = socket_disconnect,
send = socket_send,
recv = socket_recv,
setsockopt_int = socket_setsockopt_int,
setsockopt_str = socket_setsockopt_str,
}, nil)
end
local function ctx_new()
local ctx = zmq_ctx_new()
return check_if_error(ctx,
{
ctx = ctx,
term = ctx_term,
socket = mk_socket,
poller = mk_poller,
}, nil)
end
local P =
{
ctx_new = ctx_new,
PUB = ZMQ_PUB,
SUB = ZMQ_SUB,
REQ = ZMQ_REQ,
REP = ZMQ_REP,
ROUTER = ZMQ_ROUTER,
DEALER = ZMQ_DEALER,
PAIR = ZMQ_PAIR,
PULL = ZMQ_PULL,
PUSH = ZMQ_PUSH,
XPUB = ZMQ_XPUB,
XSUB = ZMQ_XSUB,
STREAM = ZMQ_STREAM,
POLLIN = ZMQ_POLLIN,
POLLOUT = ZMQ_POLLOUT,
POLLERR = ZMQ_POLLERR,
SOCKOPT_RCVTIMEO = ZMQ_RCVTIMEO,
SOCKOPT_SNDTIMEO = ZMQ_SNDTIMEO,
SOCKOPT_LINGER = ZMQ_LINGER,
SOCKOPT_ROUTING_ID = ZMQ_ROUTING_ID,
SOCKOPT_ROUTER_MANDATORY = ZMQ_ROUTER_MANDATORY,
}
if _REQUIREDNAME == nil then
zmq = P
else
_G[_REQUIREDNAME] = P
end
return P
Loading…
Cancel
Save