You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
279 lines
7.7 KiB
279 lines
7.7 KiB
local ffi = require 'cffi' |
|
|
|
local libzmq = ffi.load("libzmq.so") |
|
ffi.cdef [[ |
|
|
|
void *zmq_ctx_new (void); |
|
int zmq_ctx_term (void *context_); |
|
int zmq_ctx_shutdown (void *context_); |
|
int zmq_ctx_set (void *context_, int option_, int optval_); |
|
int zmq_ctx_get (void *context_, int option_); |
|
|
|
struct zmq_msg_t |
|
{ |
|
unsigned char _[64]; |
|
}; |
|
|
|
struct zmq_pollitem |
|
{ |
|
void* socket; |
|
int fd; |
|
short events; |
|
short revents; |
|
}; |
|
|
|
int zmq_msg_init (struct zmq_msg_t *msg_); |
|
int zmq_msg_init_size (struct zmq_msg_t *msg_, size_t size_); |
|
|
|
int zmq_msg_send (struct zmq_msg_t *msg_, void *s_, int flags_); |
|
int zmq_msg_recv (struct zmq_msg_t *msg_, void *s_, int flags_); |
|
int zmq_msg_close (struct zmq_msg_t *msg_); |
|
int zmq_msg_move (struct zmq_msg_t *dest_, struct zmq_msg_t *src_); |
|
int zmq_msg_copy (struct zmq_msg_t *dest_, struct zmq_msg_t *src_); |
|
void *zmq_msg_data (struct zmq_msg_t *msg_); |
|
size_t zmq_msg_size (const struct zmq_msg_t *msg_); |
|
int zmq_msg_more (const struct zmq_msg_t *msg_); |
|
int zmq_msg_get (const struct zmq_msg_t *msg_, int property_); |
|
int zmq_msg_set (struct zmq_msg_t *msg_, int property_, int optval_); |
|
const char *zmq_msg_gets (const struct zmq_msg_t *msg_, |
|
const char *property_); |
|
void *zmq_socket (void *, int type_); |
|
int zmq_close (void *s_); |
|
int zmq_setsockopt (void *s_, int option_, const void *optval_, size_t optvallen_); |
|
int zmq_bind (void *s_, const char *addr_); |
|
int zmq_connect (void *s_, const char *addr_); |
|
int zmq_unbind (void *s_, const char *addr_); |
|
int zmq_disconnect (void *s_, const char *addr_); |
|
int zmq_send (void *s_, const void *buf_, size_t len_, int flags_); |
|
int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_); |
|
int zmq_recv (void *s_, void *buf_, size_t len_, int flags_); |
|
int zmq_poll (struct zmq_pollitem *items_, int n_, long timeout_); |
|
int zmq_errno(); |
|
|
|
]] |
|
|
|
local ZMQ_PAIR = 0 |
|
local ZMQ_PUB = 1 |
|
local ZMQ_SUB = 2 |
|
local ZMQ_REQ = 3 |
|
local ZMQ_REP = 4 |
|
local ZMQ_DEALER = 5 |
|
local ZMQ_ROUTER = 6 |
|
local ZMQ_PULL = 7 |
|
local ZMQ_PUSH = 8 |
|
local ZMQ_XPUB = 9 |
|
local ZMQ_XSUB = 10 |
|
local ZMQ_STREAM = 11 |
|
|
|
local ZMQ_POLLIN = 1 |
|
local ZMQ_POLLOUT = 2 |
|
local ZMQ_POLLERR = 4 |
|
|
|
local ZMQ_SNDMORE = 2 |
|
|
|
local ZMQ_ROUTING_ID = 5 |
|
local ZMQ_LINGER = 17 |
|
local ZMQ_RCVTIMEO = 27 |
|
local ZMQ_SNDTIMEO = 28 |
|
local ZMQ_ROUTER_MANDATORY = 33 |
|
|
|
local function check_if_error(value, success_value, err_value) |
|
if value == err_value then |
|
return nil, libzmq.zmq_errno() |
|
else |
|
return success_value, nil |
|
end |
|
end |
|
|
|
local function ctx_term(ctx) |
|
libzmq.zmq_ctx_term(ctx.ctx) |
|
end |
|
|
|
local function socket_bind(socket, endpoint) |
|
return check_if_error( |
|
libzmq.zmq_bind(socket.socket, endpoint), nil, -1) |
|
end |
|
|
|
local function socket_connect(socket, endpoint) |
|
return check_if_error( |
|
libzmq.zmq_connect(socket.socket, endpoint), nil, -1) |
|
end |
|
|
|
local function socket_setsockopt_int(socket, option_name, value) |
|
local arr = ffi.new("int[1]", value); |
|
if not option_name then |
|
return nil, -1 |
|
end |
|
return check_if_error( |
|
libzmq.zmq_setsockopt(socket.socket, option_name, arr, 4), |
|
nil, -1) |
|
end |
|
|
|
local function socket_setsockopt_str(socket, option_name, value) |
|
local arr = ffi.new("uint8_t[?]", #value) |
|
ffi.copy(arr, value, #value) |
|
return check_if_error( |
|
libzmq.zmq_setsockopt(socket.socket, option_name, arr, #value), |
|
nil, -1) |
|
end |
|
|
|
local function socket_close(socket) |
|
return check_if_error( |
|
libzmq.zmq_close(socket.socket), nil, -1) |
|
end |
|
|
|
local function socket_disconnect(socket) |
|
return check_if_error( |
|
libzmq.zmq_disconnect(socket.socket), nil, -1) |
|
end |
|
|
|
local function socket_send(socket, message) |
|
local rc = 0 |
|
if #message > 1 then |
|
for i = 1, #message - 1 do |
|
rc = libzmq.zmq_send(socket.socket, message[i], string.len(message[i]), |
|
ZMQ_SNDMORE) |
|
if rc < 0 then |
|
print(rc) |
|
return nil, libzmq.zmq_errno() |
|
end |
|
end |
|
end |
|
rc = libzmq.zmq_send(socket.socket, message[#message], string.len(message[#message]), 0) |
|
if rc < 0 then |
|
return nil, libzmq.zmq_errno() |
|
end |
|
return nil, nil |
|
end |
|
|
|
local function socket_recv(socket) |
|
|
|
local msg = {} |
|
local more = false |
|
repeat |
|
local part = ffi.new("struct zmq_msg_t[1]") |
|
if not part then |
|
return nil, -1 |
|
end |
|
if not libzmq.zmq_msg_init(part) then |
|
return nil, libzmq.zmq_errno() |
|
end |
|
local rc = libzmq.zmq_msg_recv(part, socket.socket, 0) |
|
if rc < 0 then |
|
return nil, libzmq.zmq_errno() |
|
else |
|
local len = libzmq.zmq_msg_size(part) |
|
msg[#msg + 1] = ffi.string(libzmq.zmq_msg_data(part), len) |
|
more = (libzmq.zmq_msg_more(part) == 0) |
|
end |
|
rc = libzmq.zmq_msg_close(part) |
|
if rc == -1 then |
|
return nil, libzmq.zmq_errno() |
|
end |
|
until more |
|
|
|
return msg, nil |
|
end |
|
|
|
local function poller_add_item(poller, sock, event) |
|
table.insert(poller.items, { sock = sock, event = event }) |
|
end |
|
|
|
local function poller_poll(poller, timeout) |
|
local events_buffer = ffi.new("struct zmq_pollitem[?]", #poller.items) |
|
for i, item in ipairs(poller.items) do |
|
events_buffer[i - 1].socket = item.sock.socket |
|
events_buffer[i - 1].fd = 0 |
|
events_buffer[i - 1].events = item.event |
|
events_buffer[i - 1].revents = 0 |
|
end |
|
local rc = libzmq.zmq_poll(events_buffer, #poller.items, timeout) |
|
if rc < 0 then |
|
return nil, libzmq.zmq_errno() |
|
end |
|
local result = {} |
|
local added = 0 |
|
for i = 1, #poller.items do |
|
if added == rc then |
|
break |
|
end |
|
if events_buffer[i - 1].revents ~= 0 then |
|
table.insert(result, poller.items[i]) |
|
added = added + 1 |
|
end |
|
end |
|
return result |
|
end |
|
|
|
local function mk_poller(ctx) |
|
local p = { |
|
ctx = ctx, |
|
items = {}, |
|
add = poller_add_item, |
|
poll = poller_poll, |
|
} |
|
return p |
|
end |
|
|
|
local function mk_socket(ctx, socktype) |
|
local sock = libzmq.zmq_socket(ctx.ctx, socktype) |
|
return check_if_error(sock, |
|
{ |
|
ctx = ctx.ctx, |
|
socket = sock, |
|
bind = socket_bind, |
|
connect = socket_connect, |
|
close = socket_close, |
|
disconnect = socket_disconnect, |
|
send = socket_send, |
|
recv = socket_recv, |
|
setsockopt_int = socket_setsockopt_int, |
|
setsockopt_str = socket_setsockopt_str, |
|
}, nil) |
|
end |
|
|
|
local function ctx_new() |
|
local ctx = libzmq.zmq_ctx_new() |
|
return check_if_error(ctx, |
|
{ |
|
ctx = ctx, |
|
term = ctx_term, |
|
socket = mk_socket, |
|
poller = mk_poller, |
|
}, nil) |
|
end |
|
|
|
local P = |
|
{ |
|
ctx_new = ctx_new, |
|
PUB = ZMQ_PUB, |
|
SUB = ZMQ_SUB, |
|
REQ = ZMQ_REQ, |
|
REP = ZMQ_REP, |
|
ROUTER = ZMQ_ROUTER, |
|
DEALER = ZMQ_DEALER, |
|
PAIR = ZMQ_PAIR, |
|
PULL = ZMQ_PULL, |
|
PUSH = ZMQ_PUSH, |
|
XPUB = ZMQ_XPUB, |
|
XSUB = ZMQ_XSUB, |
|
STREAM = ZMQ_STREAM, |
|
|
|
POLLIN = ZMQ_POLLIN, |
|
POLLOUT = ZMQ_POLLOUT, |
|
POLLERR = ZMQ_POLLERR, |
|
|
|
SOCKOPT_RCVTIMEO = ZMQ_RCVTIMEO, |
|
SOCKOPT_SNDTIMEO = ZMQ_SNDTIMEO, |
|
SOCKOPT_LINGER = ZMQ_LINGER, |
|
SOCKOPT_ROUTING_ID = ZMQ_ROUTING_ID, |
|
SOCKOPT_ROUTER_MANDATORY = ZMQ_ROUTER_MANDATORY, |
|
} |
|
|
|
if _REQUIREDNAME == nil then |
|
zmq = P |
|
else |
|
_G[_REQUIREDNAME] = P |
|
end |
|
|
|
return P
|
|
|