|
|
|
|
local ffi = require 'cffi'
|
|
|
|
|
|
|
|
|
|
local libzmq = ffi.load("libzmq.so")
|
|
|
|
|
ffi.cdef [[
|
|
|
|
|
|
|
|
|
|
void *zmq_ctx_new (void);
|
|
|
|
|
int zmq_ctx_term (void *context_);
|
|
|
|
|
int zmq_ctx_shutdown (void *context_);
|
|
|
|
|
int zmq_ctx_set (void *context_, int option_, int optval_);
|
|
|
|
|
int zmq_ctx_get (void *context_, int option_);
|
|
|
|
|
|
|
|
|
|
struct zmq_msg_t
|
|
|
|
|
{
|
|
|
|
|
unsigned char _[64];
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
struct zmq_pollitem
|
|
|
|
|
{
|
|
|
|
|
void* socket;
|
|
|
|
|
int fd;
|
|
|
|
|
short events;
|
|
|
|
|
short revents;
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
int zmq_msg_init (struct zmq_msg_t *msg_);
|
|
|
|
|
int zmq_msg_init_size (struct zmq_msg_t *msg_, size_t size_);
|
|
|
|
|
|
|
|
|
|
int zmq_msg_send (struct zmq_msg_t *msg_, void *s_, int flags_);
|
|
|
|
|
int zmq_msg_recv (struct zmq_msg_t *msg_, void *s_, int flags_);
|
|
|
|
|
int zmq_msg_close (struct zmq_msg_t *msg_);
|
|
|
|
|
int zmq_msg_move (struct zmq_msg_t *dest_, struct zmq_msg_t *src_);
|
|
|
|
|
int zmq_msg_copy (struct zmq_msg_t *dest_, struct zmq_msg_t *src_);
|
|
|
|
|
void *zmq_msg_data (struct zmq_msg_t *msg_);
|
|
|
|
|
size_t zmq_msg_size (const struct zmq_msg_t *msg_);
|
|
|
|
|
int zmq_msg_more (const struct zmq_msg_t *msg_);
|
|
|
|
|
int zmq_msg_get (const struct zmq_msg_t *msg_, int property_);
|
|
|
|
|
int zmq_msg_set (struct zmq_msg_t *msg_, int property_, int optval_);
|
|
|
|
|
const char *zmq_msg_gets (const struct zmq_msg_t *msg_,
|
|
|
|
|
const char *property_);
|
|
|
|
|
void *zmq_socket (void *, int type_);
|
|
|
|
|
int zmq_close (void *s_);
|
|
|
|
|
int zmq_setsockopt (void *s_, int option_, const void *optval_, size_t optvallen_);
|
|
|
|
|
int zmq_bind (void *s_, const char *addr_);
|
|
|
|
|
int zmq_connect (void *s_, const char *addr_);
|
|
|
|
|
int zmq_unbind (void *s_, const char *addr_);
|
|
|
|
|
int zmq_disconnect (void *s_, const char *addr_);
|
|
|
|
|
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_);
|
|
|
|
|
int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_);
|
|
|
|
|
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_);
|
|
|
|
|
int zmq_poll (struct zmq_pollitem *items_, int n_, long timeout_);
|
|
|
|
|
int zmq_errno();
|
|
|
|
|
|
|
|
|
|
]]
|
|
|
|
|
|
|
|
|
|
local ZMQ_PAIR = 0
|
|
|
|
|
local ZMQ_PUB = 1
|
|
|
|
|
local ZMQ_SUB = 2
|
|
|
|
|
local ZMQ_REQ = 3
|
|
|
|
|
local ZMQ_REP = 4
|
|
|
|
|
local ZMQ_DEALER = 5
|
|
|
|
|
local ZMQ_ROUTER = 6
|
|
|
|
|
local ZMQ_PULL = 7
|
|
|
|
|
local ZMQ_PUSH = 8
|
|
|
|
|
local ZMQ_XPUB = 9
|
|
|
|
|
local ZMQ_XSUB = 10
|
|
|
|
|
local ZMQ_STREAM = 11
|
|
|
|
|
|
|
|
|
|
local ZMQ_POLLIN = 1
|
|
|
|
|
local ZMQ_POLLOUT = 2
|
|
|
|
|
local ZMQ_POLLERR = 4
|
|
|
|
|
|
|
|
|
|
local ZMQ_SNDMORE = 2
|
|
|
|
|
|
|
|
|
|
local ZMQ_ROUTING_ID = 5
|
|
|
|
|
local ZMQ_LINGER = 17
|
|
|
|
|
local ZMQ_RCVTIMEO = 27
|
|
|
|
|
local ZMQ_SNDTIMEO = 28
|
|
|
|
|
local ZMQ_ROUTER_MANDATORY = 33
|
|
|
|
|
|
|
|
|
|
local function check_if_error(value, success_value, err_value)
|
|
|
|
|
if value == err_value then
|
|
|
|
|
return nil, libzmq.zmq_errno()
|
|
|
|
|
else
|
|
|
|
|
return success_value, nil
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function ctx_term(ctx)
|
|
|
|
|
libzmq.zmq_ctx_term(ctx.ctx)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function socket_bind(socket, endpoint)
|
|
|
|
|
return check_if_error(
|
|
|
|
|
libzmq.zmq_bind(socket.socket, endpoint), nil, -1)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function socket_connect(socket, endpoint)
|
|
|
|
|
return check_if_error(
|
|
|
|
|
libzmq.zmq_connect(socket.socket, endpoint), nil, -1)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function socket_setsockopt_int(socket, option_name, value)
|
|
|
|
|
local arr = ffi.new("int[1]", value);
|
|
|
|
|
if not option_name then
|
|
|
|
|
return nil, -1
|
|
|
|
|
end
|
|
|
|
|
return check_if_error(
|
|
|
|
|
libzmq.zmq_setsockopt(socket.socket, option_name, arr, 4),
|
|
|
|
|
nil, -1)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function socket_setsockopt_str(socket, option_name, value)
|
|
|
|
|
local arr = ffi.new("uint8_t[?]", #value)
|
|
|
|
|
ffi.copy(arr, value, #value)
|
|
|
|
|
return check_if_error(
|
|
|
|
|
libzmq.zmq_setsockopt(socket.socket, option_name, arr, #value),
|
|
|
|
|
nil, -1)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function socket_close(socket)
|
|
|
|
|
return check_if_error(
|
|
|
|
|
libzmq.zmq_close(socket.socket), nil, -1)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function socket_disconnect(socket)
|
|
|
|
|
return check_if_error(
|
|
|
|
|
libzmq.zmq_disconnect(socket.socket), nil, -1)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function socket_send(socket, message)
|
|
|
|
|
local rc = 0
|
|
|
|
|
if #message > 1 then
|
|
|
|
|
for i = 1, #message - 1 do
|
|
|
|
|
rc = libzmq.zmq_send(socket.socket, message[i], string.len(message[i]),
|
|
|
|
|
ZMQ_SNDMORE)
|
|
|
|
|
if rc < 0 then
|
|
|
|
|
print(rc)
|
|
|
|
|
return nil, libzmq.zmq_errno()
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
rc = libzmq.zmq_send(socket.socket, message[#message], string.len(message[#message]), 0)
|
|
|
|
|
if rc < 0 then
|
|
|
|
|
return nil, libzmq.zmq_errno()
|
|
|
|
|
end
|
|
|
|
|
return nil, nil
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function socket_recv(socket)
|
|
|
|
|
|
|
|
|
|
local msg = {}
|
|
|
|
|
local more = false
|
|
|
|
|
repeat
|
|
|
|
|
local part = ffi.new("struct zmq_msg_t[1]")
|
|
|
|
|
if not part then
|
|
|
|
|
return nil, -1
|
|
|
|
|
end
|
|
|
|
|
if not libzmq.zmq_msg_init(part) then
|
|
|
|
|
return nil, libzmq.zmq_errno()
|
|
|
|
|
end
|
|
|
|
|
local rc = libzmq.zmq_msg_recv(part, socket.socket, 0)
|
|
|
|
|
if rc < 0 then
|
|
|
|
|
return nil, libzmq.zmq_errno()
|
|
|
|
|
else
|
|
|
|
|
local len = libzmq.zmq_msg_size(part)
|
|
|
|
|
msg[#msg + 1] = ffi.string(libzmq.zmq_msg_data(part), len)
|
|
|
|
|
more = (libzmq.zmq_msg_more(part) == 0)
|
|
|
|
|
end
|
|
|
|
|
rc = libzmq.zmq_msg_close(part)
|
|
|
|
|
if rc == -1 then
|
|
|
|
|
return nil, libzmq.zmq_errno()
|
|
|
|
|
end
|
|
|
|
|
until more
|
|
|
|
|
|
|
|
|
|
return msg, nil
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function poller_add_item(poller, sock, event)
|
|
|
|
|
table.insert(poller.items, { sock = sock, event = event })
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function poller_poll(poller, timeout)
|
|
|
|
|
local events_buffer = ffi.new("struct zmq_pollitem[?]", #poller.items)
|
|
|
|
|
for i, item in ipairs(poller.items) do
|
|
|
|
|
events_buffer[i - 1].socket = item.sock.socket
|
|
|
|
|
events_buffer[i - 1].fd = 0
|
|
|
|
|
events_buffer[i - 1].events = item.event
|
|
|
|
|
events_buffer[i - 1].revents = 0
|
|
|
|
|
end
|
|
|
|
|
local rc = libzmq.zmq_poll(events_buffer, #poller.items, timeout)
|
|
|
|
|
if rc < 0 then
|
|
|
|
|
return nil, libzmq.zmq_errno()
|
|
|
|
|
end
|
|
|
|
|
local result = {}
|
|
|
|
|
local added = 0
|
|
|
|
|
for i = 1, #poller.items do
|
|
|
|
|
if added == rc then
|
|
|
|
|
break
|
|
|
|
|
end
|
|
|
|
|
if events_buffer[i - 1].revents ~= 0 then
|
|
|
|
|
table.insert(result, poller.items[i])
|
|
|
|
|
added = added + 1
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
return result
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function mk_poller(ctx)
|
|
|
|
|
local p = {
|
|
|
|
|
ctx = ctx,
|
|
|
|
|
items = {},
|
|
|
|
|
add = poller_add_item,
|
|
|
|
|
poll = poller_poll,
|
|
|
|
|
}
|
|
|
|
|
return p
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function mk_socket(ctx, socktype)
|
|
|
|
|
local sock = libzmq.zmq_socket(ctx.ctx, socktype)
|
|
|
|
|
return check_if_error(sock,
|
|
|
|
|
{
|
|
|
|
|
ctx = ctx.ctx,
|
|
|
|
|
socket = sock,
|
|
|
|
|
bind = socket_bind,
|
|
|
|
|
connect = socket_connect,
|
|
|
|
|
close = socket_close,
|
|
|
|
|
disconnect = socket_disconnect,
|
|
|
|
|
send = socket_send,
|
|
|
|
|
recv = socket_recv,
|
|
|
|
|
setsockopt_int = socket_setsockopt_int,
|
|
|
|
|
setsockopt_str = socket_setsockopt_str,
|
|
|
|
|
}, nil)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local function ctx_new()
|
|
|
|
|
local ctx = libzmq.zmq_ctx_new()
|
|
|
|
|
return check_if_error(ctx,
|
|
|
|
|
{
|
|
|
|
|
ctx = ctx,
|
|
|
|
|
term = ctx_term,
|
|
|
|
|
socket = mk_socket,
|
|
|
|
|
poller = mk_poller,
|
|
|
|
|
}, nil)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
local P =
|
|
|
|
|
{
|
|
|
|
|
ctx_new = ctx_new,
|
|
|
|
|
PUB = ZMQ_PUB,
|
|
|
|
|
SUB = ZMQ_SUB,
|
|
|
|
|
REQ = ZMQ_REQ,
|
|
|
|
|
REP = ZMQ_REP,
|
|
|
|
|
ROUTER = ZMQ_ROUTER,
|
|
|
|
|
DEALER = ZMQ_DEALER,
|
|
|
|
|
PAIR = ZMQ_PAIR,
|
|
|
|
|
PULL = ZMQ_PULL,
|
|
|
|
|
PUSH = ZMQ_PUSH,
|
|
|
|
|
XPUB = ZMQ_XPUB,
|
|
|
|
|
XSUB = ZMQ_XSUB,
|
|
|
|
|
STREAM = ZMQ_STREAM,
|
|
|
|
|
|
|
|
|
|
POLLIN = ZMQ_POLLIN,
|
|
|
|
|
POLLOUT = ZMQ_POLLOUT,
|
|
|
|
|
POLLERR = ZMQ_POLLERR,
|
|
|
|
|
|
|
|
|
|
SOCKOPT_RCVTIMEO = ZMQ_RCVTIMEO,
|
|
|
|
|
SOCKOPT_SNDTIMEO = ZMQ_SNDTIMEO,
|
|
|
|
|
SOCKOPT_LINGER = ZMQ_LINGER,
|
|
|
|
|
SOCKOPT_ROUTING_ID = ZMQ_ROUTING_ID,
|
|
|
|
|
SOCKOPT_ROUTER_MANDATORY = ZMQ_ROUTER_MANDATORY,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if _REQUIREDNAME == nil then
|
|
|
|
|
zmq = P
|
|
|
|
|
else
|
|
|
|
|
_G[_REQUIREDNAME] = P
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
return P
|