local alien = require("alien") libzmq = alien.load("/usr/lib/libzmq.so") local zmq_pollitem = alien.defstruct { {"socket", "pointer"}, {"fd", "int"}, {"events", "short"}, {"revents", "short"} } local zmq_ctx_new = libzmq.zmq_ctx_new zmq_ctx_new:types { ret = "pointer", abi="stdcall" } local zmq_ctx_term = libzmq.zmq_ctx_term zmq_ctx_term:types { ret = "int", abi="stdcall", "pointer" } local zmq_bind = libzmq.zmq_bind zmq_bind:types { ret = "int", abi="stdcall", "pointer", "string" } local zmq_connect = libzmq.zmq_connect zmq_connect:types { ret = "int", abi="stdcall", "pointer", "string" } local zmq_close = libzmq.zmq_close zmq_close:types { ret = "int", abi="stdcall", "pointer" } local zmq_disconnect = libzmq.zmq_disconnect zmq_disconnect:types { ret = "int", abi="stdcall", "pointer", "string" } local zmq_setsockopt = libzmq.zmq_setsockopt zmq_setsockopt:types { ret = "int", abi="stdcall", "pointer", "int", "pointer", "int" } local zmq_errno = libzmq.zmq_errno zmq_errno:types { ret = "int", abi="stdcall" } local zmq_msg_close = libzmq.zmq_msg_close zmq_msg_close:types { ret = "int", abi="stdcall", "pointer" } local zmq_msg_data = libzmq.zmq_msg_data zmq_msg_data:types { ret = "pointer", abi="stdcall", "pointer" } local zmq_msg_get = libzmq.zmq_msg_get zmq_msg_get:types { ret = "int", abi="stdcall", "pointer", "int" } local zmq_msg_init_size = libzmq.zmq_msg_init_size zmq_msg_init_size:types { ret = "int", abi="stdcall", "pointer", "int" } local zmq_msg_init = libzmq.zmq_msg_init zmq_msg_init:types { ret = "int", abi="stdcall", "pointer" } local zmq_msg_more = libzmq.zmq_msg_more zmq_msg_more:types { ret = "int", abi="stdcall", "pointer" } local zmq_msg_recv = libzmq.zmq_msg_recv zmq_msg_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" } local zmq_msg_send = libzmq.zmq_msg_send zmq_msg_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int" } local zmq_msg_set = libzmq.zmq_msg_set zmq_msg_set:types { ret = "int", abi="stdcall", "pointer", "int", "int" } local zmq_msg_size = libzmq.zmq_msg_size zmq_msg_size:types { ret = "int", abi="stdcall", "pointer" } local zmq_poll = libzmq.zmq_poll zmq_poll:types { ret = "int", abi="stdcall", "pointer", "int", "long" } local zmq_recv = libzmq.zmq_recv zmq_recv:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"} local zmq_send = libzmq.zmq_send zmq_send:types { ret = "int", abi="stdcall", "pointer", "pointer", "int", "int"} local zmq_socket = libzmq.zmq_socket zmq_socket:types { ret = "pointer", abi="stdcall", "pointer", "int"} local zmq_version = libzmq.zmq_version zmq_version:types { ret = "void", abi="stdcall", "ref int", "ref int", "ref int"} local zmq_unbind = libzmq.zmq_unbind zmq_unbind:types { ret = "int", abi="stdcall", "pointer", "string"} local function zmq_msg_new() local buf = alien.buffer(64) return buf end local function zmq_msg_as_string(msg) return alien.tostring(zmq_msg_data(msg), zmq_msg_size(msg)) end local ZMQ_PAIR = 0 local ZMQ_PUB = 1 local ZMQ_SUB = 2 local ZMQ_REQ = 3 local ZMQ_REP = 4 local ZMQ_DEALER = 5 local ZMQ_ROUTER = 6 local ZMQ_PULL = 7 local ZMQ_PUSH = 8 local ZMQ_XPUB = 9 local ZMQ_XSUB = 10 local ZMQ_STREAM = 11 local ZMQ_POLLIN = 1 local ZMQ_POLLOUT = 2 local ZMQ_POLLERR = 4 local ZMQ_SNDMORE = 2 local ZMQ_ROUTING_ID = 5 local ZMQ_LINGER = 17 local ZMQ_RCVTIMEO = 27 local ZMQ_SNDTIMEO = 28 local ZMQ_ROUTER_MANDATORY = 33 local function check_if_error(value, success_value, err_value) if value == err_value then return nil, zmq_errno() else return success_value, nil end end local function ctx_term(ctx) zmq_ctx_term(ctx.ctx) end local function socket_bind(socket, endpoint) return check_if_error( zmq_bind(socket.socket, endpoint), nil, -1) end local function socket_connect(socket, endpoint) return check_if_error( zmq_connect(socket.socket, endpoint), nil, -1) end local function socket_setsockopt_int(socket, option_name, value) local arr = alien.array("int", {value}) return check_if_error( zmq_setsockopt(socket.socket, option_name, arr.buffer:topointer(), arr.size), nil, -1) end local function socket_setsockopt_str(socket, option_name, value) local arr = alien.buffer(value) return check_if_error( zmq_setsockopt(socket.socket, option_name, value, #arr - 1), nil, -1) end local function socket_close(socket) return check_if_error( zmq_close(socket.socket), nil, -1) end local function socket_disconnect(socket) return check_if_error( zmq_disconnect(socket.socket), nil, -1) end local function socket_send(socket, message) local rc = 0 if #message > 1 then for i = 1, #message - 1 do rc = zmq_send(socket.socket, message[i], string.len(message[i]), ZMQ_SNDMORE) if rc < 0 then print(rc) return nil, zmq_errno() end end end rc = zmq_send(socket.socket, message[#message], string.len(message[#message]), 0) if rc < 0 then return nil, zmq_errno() end return nil, nil end local function socket_recv(socket) local msg = {} local more = false repeat local part = zmq_msg_new() if not part then return nil, zmq_errno() end if not zmq_msg_init(part) then return nil, zmq_errno() end local rc = zmq_msg_recv(part, socket.socket, 0) if rc < 0 then return nil, zmq_errno() else local len = zmq_msg_size(part) local buf = alien.buffer(zmq_msg_data(part)) msg[#msg + 1] = buf:tostring(len) more = (zmq_msg_more(part) == 0) end rc = zmq_msg_close(part) if rc == -1 then return nil, zmq_errno() end until more return msg, nil end local function poller_add_item(poller, sock, event) table.insert(poller.items, { sock = sock, event = event }) end local function poller_poll(poller, timeout) local events_buffer = alien.buffer(zmq_pollitem.size * #poller.items) local ptr = 1 for _, item in ipairs(poller.items) do local pollitem = zmq_pollitem:new(events_buffer:topointer(ptr)) pollitem.socket = item.sock.socket pollitem.fd = 0 pollitem.events = item.event pollitem.revents = 0 ptr = ptr + zmq_pollitem.size end local rc = zmq_poll(events_buffer:topointer(), #poller.items, timeout) if rc < 0 then return nil, zmq_errno() end local result = {} local added = 0 ptr = 1 for i = 1, #poller.items do if added == rc then break end local pollitem = zmq_pollitem:new(events_buffer:topointer(ptr)) if pollitem.revents ~= 0 then table.insert(result, poller.items[i]) added = added + 1 end ptr = ptr + zmq_pollitem.size end return result end local function mk_poller(ctx) local p = { ctx = ctx, items = {}, add = poller_add_item, poll = poller_poll, } return p end local function mk_socket(ctx, socktype) local sock = zmq_socket(ctx.ctx, socktype) return check_if_error(sock, { ctx = ctx.ctx, socket = sock, bind = socket_bind, connect = socket_connect, close = socket_close, disconnect = socket_disconnect, send = socket_send, recv = socket_recv, setsockopt_int = socket_setsockopt_int, setsockopt_str = socket_setsockopt_str, }, nil) end local function ctx_new() local ctx = zmq_ctx_new() return check_if_error(ctx, { ctx = ctx, term = ctx_term, socket = mk_socket, poller = mk_poller, }, nil) end local P = { ctx_new = ctx_new, PUB = ZMQ_PUB, SUB = ZMQ_SUB, REQ = ZMQ_REQ, REP = ZMQ_REP, ROUTER = ZMQ_ROUTER, DEALER = ZMQ_DEALER, PAIR = ZMQ_PAIR, PULL = ZMQ_PULL, PUSH = ZMQ_PUSH, XPUB = ZMQ_XPUB, XSUB = ZMQ_XSUB, STREAM = ZMQ_STREAM, POLLIN = ZMQ_POLLIN, POLLOUT = ZMQ_POLLOUT, POLLERR = ZMQ_POLLERR, SOCKOPT_RCVTIMEO = ZMQ_RCVTIMEO, SOCKOPT_SNDTIMEO = ZMQ_SNDTIMEO, SOCKOPT_LINGER = ZMQ_LINGER, SOCKOPT_ROUTING_ID = ZMQ_ROUTING_ID, SOCKOPT_ROUTER_MANDATORY = ZMQ_ROUTER_MANDATORY, } if _REQUIREDNAME == nil then zmq = P else _G[_REQUIREDNAME] = P end return P