Browse Source

zeromq: use ffi instead of lua

master
Denis Tereshkin 4 years ago
parent
commit
efacb19799
  1. 227
      zeromq.lua

227
zeromq.lua

@ -1,94 +1,56 @@
local alien = require("alien") local ffi = require 'cffi'
libzmq = alien.load("/usr/lib/libzmq.so") local libzmq = ffi.load("libzmq.so")
ffi.cdef [[
local zmq_pollitem = alien.defstruct {
{"socket", "pointer"}, void *zmq_ctx_new (void);
{"fd", "int"}, int zmq_ctx_term (void *context_);
{"events", "short"}, int zmq_ctx_shutdown (void *context_);
{"revents", "short"} int zmq_ctx_set (void *context_, int option_, int optval_);
} int zmq_ctx_get (void *context_, int option_);
local zmq_ctx_new = libzmq.zmq_ctx_new struct zmq_msg_t
zmq_ctx_new:types { ret = "pointer", abi="stdcall" } {
unsigned char _[64];
local zmq_ctx_term = libzmq.zmq_ctx_term };
zmq_ctx_term:types { ret = "int", abi="stdcall", "pointer" }
struct zmq_pollitem
local zmq_bind = libzmq.zmq_bind {
zmq_bind:types { ret = "int", abi="stdcall", "pointer", "string" } void* socket;
int fd;
local zmq_connect = libzmq.zmq_connect short events;
zmq_connect:types { ret = "int", abi="stdcall", "pointer", "string" } short revents;
};
local zmq_close = libzmq.zmq_close
zmq_close:types { ret = "int", abi="stdcall", "pointer" } int zmq_msg_init (struct zmq_msg_t *msg_);
int zmq_msg_init_size (struct zmq_msg_t *msg_, size_t size_);
local zmq_disconnect = libzmq.zmq_disconnect
zmq_disconnect:types { ret = "int", abi="stdcall", "pointer", "string" } int zmq_msg_send (struct zmq_msg_t *msg_, void *s_, int flags_);
int zmq_msg_recv (struct zmq_msg_t *msg_, void *s_, int flags_);
local zmq_setsockopt = libzmq.zmq_setsockopt int zmq_msg_close (struct zmq_msg_t *msg_);
zmq_setsockopt:types { ret = "int", abi="stdcall", "pointer", "int", "pointer", "int" } int zmq_msg_move (struct zmq_msg_t *dest_, struct zmq_msg_t *src_);
int zmq_msg_copy (struct zmq_msg_t *dest_, struct zmq_msg_t *src_);
local zmq_errno = libzmq.zmq_errno void *zmq_msg_data (struct zmq_msg_t *msg_);
zmq_errno:types { ret = "int", abi="stdcall" } size_t zmq_msg_size (const struct zmq_msg_t *msg_);
int zmq_msg_more (const struct zmq_msg_t *msg_);
local zmq_msg_close = libzmq.zmq_msg_close int zmq_msg_get (const struct zmq_msg_t *msg_, int property_);
zmq_msg_close:types { ret = "int", abi="stdcall", "pointer" } int zmq_msg_set (struct zmq_msg_t *msg_, int property_, int optval_);
const char *zmq_msg_gets (const struct zmq_msg_t *msg_,
local zmq_msg_data = libzmq.zmq_msg_data const char *property_);
zmq_msg_data:types { ret = "pointer", abi="stdcall", "pointer" } void *zmq_socket (void *, int type_);
int zmq_close (void *s_);
local zmq_msg_get = libzmq.zmq_msg_get int zmq_setsockopt (void *s_, int option_, const void *optval_, size_t optvallen_);
zmq_msg_get:types { ret = "int", abi="stdcall", "pointer", "int" } int zmq_bind (void *s_, const char *addr_);
int zmq_connect (void *s_, const char *addr_);
local zmq_msg_init_size = libzmq.zmq_msg_init_size int zmq_unbind (void *s_, const char *addr_);
zmq_msg_init_size:types { ret = "int", abi="stdcall", "pointer", "int" } int zmq_disconnect (void *s_, const char *addr_);
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_);
local zmq_msg_init = libzmq.zmq_msg_init int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_);
zmq_msg_init:types { ret = "int", abi="stdcall", "pointer" } int zmq_recv (void *s_, void *buf_, size_t len_, int flags_);
int zmq_poll (struct zmq_pollitem *items_, int n_, long timeout_);
local zmq_msg_more = libzmq.zmq_msg_more int zmq_errno();
zmq_msg_more:types { ret = "int", abi="stdcall", "pointer" }
]]
local zmq_msg_recv = libzmq.zmq_msg_recv
zmq_msg_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" }
local zmq_msg_send = libzmq.zmq_msg_send
zmq_msg_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" }
local zmq_msg_set = libzmq.zmq_msg_set
zmq_msg_set:types { ret = "int", abi="stdcall", "pointer", "int", "int" }
local zmq_msg_size = libzmq.zmq_msg_size
zmq_msg_size:types { ret = "int", abi="stdcall", "pointer" }
local zmq_poll = libzmq.zmq_poll
zmq_poll:types { ret = "int", abi="stdcall", "pointer", "int", "long" }
local zmq_recv = libzmq.zmq_recv
zmq_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"}
local zmq_send = libzmq.zmq_send
zmq_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"}
local zmq_socket = libzmq.zmq_socket
zmq_socket:types { ret = "pointer", abi="stdcall", "pointer", "int"}
local zmq_version = libzmq.zmq_version
zmq_version:types { ret = "void", abi="stdcall", "ref int", "ref int", "ref int"}
local zmq_unbind = libzmq.zmq_unbind
zmq_unbind:types { ret = "int", abi="stdcall", "pointer", "string"}
local function zmq_msg_new()
local buf = alien.buffer(64)
return buf
end
local function zmq_msg_as_string(msg)
return alien.tostring(zmq_msg_data(msg), zmq_msg_size(msg))
end
local ZMQ_PAIR = 0 local ZMQ_PAIR = 0
local ZMQ_PUB = 1 local ZMQ_PUB = 1
@ -117,65 +79,69 @@ local ZMQ_ROUTER_MANDATORY = 33
local function check_if_error(value, success_value, err_value) local function check_if_error(value, success_value, err_value)
if value == err_value then if value == err_value then
return nil, zmq_errno() return nil, libzmq.zmq_errno()
else else
return success_value, nil return success_value, nil
end end
end end
local function ctx_term(ctx) local function ctx_term(ctx)
zmq_ctx_term(ctx.ctx) libzmq.zmq_ctx_term(ctx.ctx)
end end
local function socket_bind(socket, endpoint) local function socket_bind(socket, endpoint)
return check_if_error( return check_if_error(
zmq_bind(socket.socket, endpoint), nil, -1) libzmq.zmq_bind(socket.socket, endpoint), nil, -1)
end end
local function socket_connect(socket, endpoint) local function socket_connect(socket, endpoint)
return check_if_error( return check_if_error(
zmq_connect(socket.socket, endpoint), nil, -1) libzmq.zmq_connect(socket.socket, endpoint), nil, -1)
end end
local function socket_setsockopt_int(socket, option_name, value) local function socket_setsockopt_int(socket, option_name, value)
local arr = alien.array("int", {value}) local arr = ffi.new("int[1]", value);
if not option_name then
return nil, -1
end
return check_if_error( return check_if_error(
zmq_setsockopt(socket.socket, option_name, arr.buffer:topointer(), arr.size), libzmq.zmq_setsockopt(socket.socket, option_name, arr, 4),
nil, -1) nil, -1)
end end
local function socket_setsockopt_str(socket, option_name, value) local function socket_setsockopt_str(socket, option_name, value)
local arr = alien.buffer(value) local arr = ffi.new("uint8_t[?]", #value)
ffi.copy(arr, value, #value)
return check_if_error( return check_if_error(
zmq_setsockopt(socket.socket, option_name, value, #arr - 1), libzmq.zmq_setsockopt(socket.socket, option_name, arr, #value),
nil, -1) nil, -1)
end end
local function socket_close(socket) local function socket_close(socket)
return check_if_error( return check_if_error(
zmq_close(socket.socket), nil, -1) libzmq.zmq_close(socket.socket), nil, -1)
end end
local function socket_disconnect(socket) local function socket_disconnect(socket)
return check_if_error( return check_if_error(
zmq_disconnect(socket.socket), nil, -1) libzmq.zmq_disconnect(socket.socket), nil, -1)
end end
local function socket_send(socket, message) local function socket_send(socket, message)
local rc = 0 local rc = 0
if #message > 1 then if #message > 1 then
for i = 1, #message - 1 do for i = 1, #message - 1 do
rc = zmq_send(socket.socket, message[i], string.len(message[i]), rc = libzmq.zmq_send(socket.socket, message[i], string.len(message[i]),
ZMQ_SNDMORE) ZMQ_SNDMORE)
if rc < 0 then if rc < 0 then
print(rc) print(rc)
return nil, zmq_errno() return nil, libzmq.zmq_errno()
end end
end end
end end
rc = zmq_send(socket.socket, message[#message], string.len(message[#message]), 0) rc = libzmq.zmq_send(socket.socket, message[#message], string.len(message[#message]), 0)
if rc < 0 then if rc < 0 then
return nil, zmq_errno() return nil, libzmq.zmq_errno()
end end
return nil, nil return nil, nil
end end
@ -185,25 +151,24 @@ local function socket_recv(socket)
local msg = {} local msg = {}
local more = false local more = false
repeat repeat
local part = zmq_msg_new() local part = ffi.new("struct zmq_msg_t[1]")
if not part then if not part then
return nil, zmq_errno() return nil, -1
end end
if not zmq_msg_init(part) then if not libzmq.zmq_msg_init(part) then
return nil, zmq_errno() return nil, libzmq.zmq_errno()
end end
local rc = zmq_msg_recv(part, socket.socket, 0) local rc = libzmq.zmq_msg_recv(part, socket.socket, 0)
if rc < 0 then if rc < 0 then
return nil, zmq_errno() return nil, libzmq.zmq_errno()
else else
local len = zmq_msg_size(part) local len = libzmq.zmq_msg_size(part)
local buf = alien.buffer(zmq_msg_data(part)) msg[#msg + 1] = ffi.string(libzmq.zmq_msg_data(part), len)
msg[#msg + 1] = buf:tostring(len) more = (libzmq.zmq_msg_more(part) == 0)
more = (zmq_msg_more(part) == 0)
end end
rc = zmq_msg_close(part) rc = libzmq.zmq_msg_close(part)
if rc == -1 then if rc == -1 then
return nil, zmq_errno() return nil, libzmq.zmq_errno()
end end
until more until more
@ -215,33 +180,27 @@ local function poller_add_item(poller, sock, event)
end end
local function poller_poll(poller, timeout) local function poller_poll(poller, timeout)
local events_buffer = alien.buffer(zmq_pollitem.size * #poller.items) local events_buffer = ffi.new("struct zmq_pollitem[?]", #poller.items)
local ptr = 1 for i, item in ipairs(poller.items) do
for _, item in ipairs(poller.items) do events_buffer[i - 1].socket = item.sock.socket
local pollitem = zmq_pollitem:new(events_buffer:topointer(ptr)) events_buffer[i - 1].fd = 0
pollitem.socket = item.sock.socket events_buffer[i - 1].events = item.event
pollitem.fd = 0 events_buffer[i - 1].revents = 0
pollitem.events = item.event
pollitem.revents = 0
ptr = ptr + zmq_pollitem.size
end end
local rc = zmq_poll(events_buffer:topointer(), #poller.items, timeout) local rc = libzmq.zmq_poll(events_buffer, #poller.items, timeout)
if rc < 0 then if rc < 0 then
return nil, zmq_errno() return nil, libzmq.zmq_errno()
end end
local result = {} local result = {}
local added = 0 local added = 0
ptr = 1
for i = 1, #poller.items do for i = 1, #poller.items do
if added == rc then if added == rc then
break break
end end
local pollitem = zmq_pollitem:new(events_buffer:topointer(ptr)) if events_buffer[i - 1].revents ~= 0 then
if pollitem.revents ~= 0 then
table.insert(result, poller.items[i]) table.insert(result, poller.items[i])
added = added + 1 added = added + 1
end end
ptr = ptr + zmq_pollitem.size
end end
return result return result
end end
@ -257,7 +216,7 @@ local function mk_poller(ctx)
end end
local function mk_socket(ctx, socktype) local function mk_socket(ctx, socktype)
local sock = zmq_socket(ctx.ctx, socktype) local sock = libzmq.zmq_socket(ctx.ctx, socktype)
return check_if_error(sock, return check_if_error(sock,
{ {
ctx = ctx.ctx, ctx = ctx.ctx,
@ -274,7 +233,7 @@ local function mk_socket(ctx, socktype)
end end
local function ctx_new() local function ctx_new()
local ctx = zmq_ctx_new() local ctx = libzmq.zmq_ctx_new()
return check_if_error(ctx, return check_if_error(ctx,
{ {
ctx = ctx, ctx = ctx,

Loading…
Cancel
Save