diff --git a/zeromq.lua b/zeromq.lua index ae1bf03..013bed7 100644 --- a/zeromq.lua +++ b/zeromq.lua @@ -1,94 +1,56 @@ -local alien = require("alien") - -libzmq = alien.load("/usr/lib/libzmq.so") - -local zmq_pollitem = alien.defstruct { - {"socket", "pointer"}, - {"fd", "int"}, - {"events", "short"}, - {"revents", "short"} -} - -local zmq_ctx_new = libzmq.zmq_ctx_new -zmq_ctx_new:types { ret = "pointer", abi="stdcall" } - -local zmq_ctx_term = libzmq.zmq_ctx_term -zmq_ctx_term:types { ret = "int", abi="stdcall", "pointer" } - -local zmq_bind = libzmq.zmq_bind -zmq_bind:types { ret = "int", abi="stdcall", "pointer", "string" } - -local zmq_connect = libzmq.zmq_connect -zmq_connect:types { ret = "int", abi="stdcall", "pointer", "string" } - -local zmq_close = libzmq.zmq_close -zmq_close:types { ret = "int", abi="stdcall", "pointer" } - -local zmq_disconnect = libzmq.zmq_disconnect -zmq_disconnect:types { ret = "int", abi="stdcall", "pointer", "string" } - -local zmq_setsockopt = libzmq.zmq_setsockopt -zmq_setsockopt:types { ret = "int", abi="stdcall", "pointer", "int", "pointer", "int" } - -local zmq_errno = libzmq.zmq_errno -zmq_errno:types { ret = "int", abi="stdcall" } - -local zmq_msg_close = libzmq.zmq_msg_close -zmq_msg_close:types { ret = "int", abi="stdcall", "pointer" } - -local zmq_msg_data = libzmq.zmq_msg_data -zmq_msg_data:types { ret = "pointer", abi="stdcall", "pointer" } - -local zmq_msg_get = libzmq.zmq_msg_get -zmq_msg_get:types { ret = "int", abi="stdcall", "pointer", "int" } - -local zmq_msg_init_size = libzmq.zmq_msg_init_size -zmq_msg_init_size:types { ret = "int", abi="stdcall", "pointer", "int" } - -local zmq_msg_init = libzmq.zmq_msg_init -zmq_msg_init:types { ret = "int", abi="stdcall", "pointer" } - -local zmq_msg_more = libzmq.zmq_msg_more -zmq_msg_more:types { ret = "int", abi="stdcall", "pointer" } - -local zmq_msg_recv = libzmq.zmq_msg_recv -zmq_msg_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" } - -local zmq_msg_send = libzmq.zmq_msg_send -zmq_msg_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" } - -local zmq_msg_set = libzmq.zmq_msg_set -zmq_msg_set:types { ret = "int", abi="stdcall", "pointer", "int", "int" } - -local zmq_msg_size = libzmq.zmq_msg_size -zmq_msg_size:types { ret = "int", abi="stdcall", "pointer" } - -local zmq_poll = libzmq.zmq_poll -zmq_poll:types { ret = "int", abi="stdcall", "pointer", "int", "long" } - -local zmq_recv = libzmq.zmq_recv -zmq_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"} - -local zmq_send = libzmq.zmq_send -zmq_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"} - -local zmq_socket = libzmq.zmq_socket -zmq_socket:types { ret = "pointer", abi="stdcall", "pointer", "int"} - -local zmq_version = libzmq.zmq_version -zmq_version:types { ret = "void", abi="stdcall", "ref int", "ref int", "ref int"} - -local zmq_unbind = libzmq.zmq_unbind -zmq_unbind:types { ret = "int", abi="stdcall", "pointer", "string"} - -local function zmq_msg_new() - local buf = alien.buffer(64) - return buf -end - -local function zmq_msg_as_string(msg) - return alien.tostring(zmq_msg_data(msg), zmq_msg_size(msg)) -end +local ffi = require 'cffi' + +local libzmq = ffi.load("libzmq.so") +ffi.cdef [[ + +void *zmq_ctx_new (void); +int zmq_ctx_term (void *context_); +int zmq_ctx_shutdown (void *context_); +int zmq_ctx_set (void *context_, int option_, int optval_); +int zmq_ctx_get (void *context_, int option_); + +struct zmq_msg_t +{ +unsigned char _[64]; +}; + +struct zmq_pollitem +{ + void* socket; + int fd; + short events; + short revents; +}; + +int zmq_msg_init (struct zmq_msg_t *msg_); +int zmq_msg_init_size (struct zmq_msg_t *msg_, size_t size_); + +int zmq_msg_send (struct zmq_msg_t *msg_, void *s_, int flags_); +int zmq_msg_recv (struct zmq_msg_t *msg_, void *s_, int flags_); +int zmq_msg_close (struct zmq_msg_t *msg_); +int zmq_msg_move (struct zmq_msg_t *dest_, struct zmq_msg_t *src_); +int zmq_msg_copy (struct zmq_msg_t *dest_, struct zmq_msg_t *src_); +void *zmq_msg_data (struct zmq_msg_t *msg_); +size_t zmq_msg_size (const struct zmq_msg_t *msg_); +int zmq_msg_more (const struct zmq_msg_t *msg_); +int zmq_msg_get (const struct zmq_msg_t *msg_, int property_); +int zmq_msg_set (struct zmq_msg_t *msg_, int property_, int optval_); +const char *zmq_msg_gets (const struct zmq_msg_t *msg_, + const char *property_); +void *zmq_socket (void *, int type_); +int zmq_close (void *s_); +int zmq_setsockopt (void *s_, int option_, const void *optval_, size_t optvallen_); +int zmq_bind (void *s_, const char *addr_); +int zmq_connect (void *s_, const char *addr_); +int zmq_unbind (void *s_, const char *addr_); +int zmq_disconnect (void *s_, const char *addr_); +int zmq_send (void *s_, const void *buf_, size_t len_, int flags_); +int zmq_send_const (void *s_, const void *buf_, size_t len_, int flags_); +int zmq_recv (void *s_, void *buf_, size_t len_, int flags_); +int zmq_poll (struct zmq_pollitem *items_, int n_, long timeout_); +int zmq_errno(); + +]] local ZMQ_PAIR = 0 local ZMQ_PUB = 1 @@ -117,65 +79,69 @@ local ZMQ_ROUTER_MANDATORY = 33 local function check_if_error(value, success_value, err_value) if value == err_value then - return nil, zmq_errno() + return nil, libzmq.zmq_errno() else return success_value, nil end end local function ctx_term(ctx) - zmq_ctx_term(ctx.ctx) + libzmq.zmq_ctx_term(ctx.ctx) end local function socket_bind(socket, endpoint) return check_if_error( - zmq_bind(socket.socket, endpoint), nil, -1) + libzmq.zmq_bind(socket.socket, endpoint), nil, -1) end local function socket_connect(socket, endpoint) return check_if_error( - zmq_connect(socket.socket, endpoint), nil, -1) + libzmq.zmq_connect(socket.socket, endpoint), nil, -1) end local function socket_setsockopt_int(socket, option_name, value) - local arr = alien.array("int", {value}) + local arr = ffi.new("int[1]", value); + if not option_name then + return nil, -1 + end return check_if_error( - zmq_setsockopt(socket.socket, option_name, arr.buffer:topointer(), arr.size), + libzmq.zmq_setsockopt(socket.socket, option_name, arr, 4), nil, -1) end local function socket_setsockopt_str(socket, option_name, value) - local arr = alien.buffer(value) + local arr = ffi.new("uint8_t[?]", #value) + ffi.copy(arr, value, #value) return check_if_error( - zmq_setsockopt(socket.socket, option_name, value, #arr - 1), + libzmq.zmq_setsockopt(socket.socket, option_name, arr, #value), nil, -1) end local function socket_close(socket) return check_if_error( - zmq_close(socket.socket), nil, -1) + libzmq.zmq_close(socket.socket), nil, -1) end local function socket_disconnect(socket) return check_if_error( - zmq_disconnect(socket.socket), nil, -1) + libzmq.zmq_disconnect(socket.socket), nil, -1) end local function socket_send(socket, message) local rc = 0 if #message > 1 then for i = 1, #message - 1 do - rc = zmq_send(socket.socket, message[i], string.len(message[i]), + rc = libzmq.zmq_send(socket.socket, message[i], string.len(message[i]), ZMQ_SNDMORE) if rc < 0 then print(rc) - return nil, zmq_errno() + return nil, libzmq.zmq_errno() end end end - rc = zmq_send(socket.socket, message[#message], string.len(message[#message]), 0) + rc = libzmq.zmq_send(socket.socket, message[#message], string.len(message[#message]), 0) if rc < 0 then - return nil, zmq_errno() + return nil, libzmq.zmq_errno() end return nil, nil end @@ -185,25 +151,24 @@ local function socket_recv(socket) local msg = {} local more = false repeat - local part = zmq_msg_new() + local part = ffi.new("struct zmq_msg_t[1]") if not part then - return nil, zmq_errno() + return nil, -1 end - if not zmq_msg_init(part) then - return nil, zmq_errno() + if not libzmq.zmq_msg_init(part) then + return nil, libzmq.zmq_errno() end - local rc = zmq_msg_recv(part, socket.socket, 0) + local rc = libzmq.zmq_msg_recv(part, socket.socket, 0) if rc < 0 then - return nil, zmq_errno() + return nil, libzmq.zmq_errno() else - local len = zmq_msg_size(part) - local buf = alien.buffer(zmq_msg_data(part)) - msg[#msg + 1] = buf:tostring(len) - more = (zmq_msg_more(part) == 0) + local len = libzmq.zmq_msg_size(part) + msg[#msg + 1] = ffi.string(libzmq.zmq_msg_data(part), len) + more = (libzmq.zmq_msg_more(part) == 0) end - rc = zmq_msg_close(part) + rc = libzmq.zmq_msg_close(part) if rc == -1 then - return nil, zmq_errno() + return nil, libzmq.zmq_errno() end until more @@ -215,33 +180,27 @@ local function poller_add_item(poller, sock, event) end local function poller_poll(poller, timeout) - local events_buffer = alien.buffer(zmq_pollitem.size * #poller.items) - local ptr = 1 - for _, item in ipairs(poller.items) do - local pollitem = zmq_pollitem:new(events_buffer:topointer(ptr)) - pollitem.socket = item.sock.socket - pollitem.fd = 0 - pollitem.events = item.event - pollitem.revents = 0 - ptr = ptr + zmq_pollitem.size + local events_buffer = ffi.new("struct zmq_pollitem[?]", #poller.items) + for i, item in ipairs(poller.items) do + events_buffer[i - 1].socket = item.sock.socket + events_buffer[i - 1].fd = 0 + events_buffer[i - 1].events = item.event + events_buffer[i - 1].revents = 0 end - local rc = zmq_poll(events_buffer:topointer(), #poller.items, timeout) + local rc = libzmq.zmq_poll(events_buffer, #poller.items, timeout) if rc < 0 then - return nil, zmq_errno() + return nil, libzmq.zmq_errno() end local result = {} local added = 0 - ptr = 1 for i = 1, #poller.items do if added == rc then break end - local pollitem = zmq_pollitem:new(events_buffer:topointer(ptr)) - if pollitem.revents ~= 0 then + if events_buffer[i - 1].revents ~= 0 then table.insert(result, poller.items[i]) added = added + 1 end - ptr = ptr + zmq_pollitem.size end return result end @@ -257,7 +216,7 @@ local function mk_poller(ctx) end local function mk_socket(ctx, socktype) - local sock = zmq_socket(ctx.ctx, socktype) + local sock = libzmq.zmq_socket(ctx.ctx, socktype) return check_if_error(sock, { ctx = ctx.ctx, @@ -274,7 +233,7 @@ local function mk_socket(ctx, socktype) end local function ctx_new() - local ctx = zmq_ctx_new() + local ctx = libzmq.zmq_ctx_new() return check_if_error(ctx, { ctx = ctx,