// // RedZMQ.cpp // rebolt // // Created by zhuge on 2022/11/7. // #include "RedZMQ.h" #include "zmq.h" #include #include #include #include "assert.h" static int s_send(void *socket, const char *send_msg) { // 初始化一个zmq_msg_t对象, 分配的大小为string的大小 zmq_msg_t msg; size_t send_msg_len = strlen(send_msg); zmq_msg_init_size(&msg, send_msg_len); memcpy(zmq_msg_data(&msg), send_msg, send_msg_len); // 发送数据 int rc = zmq_msg_send(&msg, socket, 0); // 关闭zmq_msg_t对象 zmq_msg_close(&msg); return rc; } static std::string s_recv(void *socket) { int rc; zmq_msg_t msg; zmq_msg_init(&msg); rc = zmq_msg_recv(&msg, socket, 0); if(rc == -1) { printf("zmq请求超时.\n"); return ""; } char *ret_msg = (char*)malloc(rc + 1); memcpy(ret_msg, zmq_msg_data(&msg), rc); zmq_msg_close(&msg); ret_msg[rc] = 0; std::string ret_string(ret_msg); free(ret_msg); return ret_string; } red_zmq::Pub::Pub(std::string endPoint) : _endPoint(endPoint) { this->_bind(); } void red_zmq::Pub::_bind() { this->_context = zmq_ctx_new(); this->_socket = zmq_socket(this->_context, ZMQ_PUB); assert(this->_socket); int rc = zmq_bind(this->_socket, _endPoint.c_str()); if (rc != 0) { printf("Pub初始化错误:%s: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno())); } assert(rc == 0); } int red_zmq::Pub::pubMsg(const char* sendMsg) { return s_send(this->_socket, sendMsg); } #pragma mark - sub red_zmq::Sub::Sub(const std::string& endPoint, const std::function& handleMsgCb) : _endPoint(endPoint), _handleMsgCb(handleMsgCb){ this->_connect(); } void red_zmq::Sub::_connect() { this->_context = zmq_ctx_new(); this->_socket = zmq_socket(this->_context, ZMQ_SUB); assert(this->_socket); int rc = zmq_connect(this->_socket, _endPoint.c_str()); if (rc != 0) { printf("Sub初始化错误:%s: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno())); } assert(rc == 0); zmq_setsockopt(_socket, ZMQ_SUBSCRIBE, nullptr, 0); std::thread th(&red_zmq::Sub::_threadReceiveMsg, this); th.detach(); } void red_zmq::Sub::_threadReceiveMsg() { while (true) { /*接收一帧,不会接收到地址帧*/ int rc; zmq_msg_t msg; zmq_msg_init(&msg); rc = zmq_msg_recv(&msg, this->_socket, 0); if (rc == -1) { // 连接出现问题!! printf("Sub:%s接收广播错误: %s", _endPoint.c_str(), zmq_strerror(zmq_errno())); assert(false); break; } char *ret_msg = (char*)malloc(rc + 1); memcpy(ret_msg, zmq_msg_data(&msg), rc); zmq_msg_close(&msg); ret_msg[rc] = 0; if(_handleMsgCb) { _handleMsgCb(ret_msg); } usleep(1000); } } #pragma mark - req red_zmq::Req::Req(std::string endPoint) : _endPoint(endPoint) { _connect(); } std::string red_zmq::Req::sendMsg(const char* sendMsg) { // 设置接收超时时间为5秒 int receiveTimeout = 5000; zmq_setsockopt(this->_socket, ZMQ_RCVTIMEO, &receiveTimeout, sizeof(receiveTimeout)); s_send(this->_socket, sendMsg); return s_recv(this->_socket); } void red_zmq::Req::_connect() { this->_context = zmq_ctx_new(); this->_socket = zmq_socket(this->_context, ZMQ_REQ); assert(this->_socket); int rc = zmq_connect(this->_socket, _endPoint.c_str()); if (rc != 0) { printf("Req 初始化错误:%s: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno())); } assert(rc == 0); } #pragma mark - rep red_zmq::Rep::Rep(const std::string& endPoint, const std::function& handleMsgCb) : _endPoint(endPoint), _handleMsgCb(handleMsgCb) { this->_bind(); } void red_zmq::Rep::_bind() { this->_context = zmq_ctx_new(); this->_socket = zmq_socket(this->_context, ZMQ_REP); assert(this->_socket); int rc = zmq_bind(this->_socket, _endPoint.c_str()); if (rc != 0) { printf("Rep初始化错误:%s: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno())); } assert(rc == 0); std::thread th(&red_zmq::Rep::_threadReceiveMsg, this); th.detach(); } void red_zmq::Rep::_threadReceiveMsg() { while (true) { /*接收一帧,不会接收到地址帧*/ int rc; zmq_msg_t msg; zmq_msg_init(&msg); rc = zmq_msg_recv(&msg, this->_socket, 0); if (rc == -1) { // 连接出现问题!! printf("Rep:%s接收广播错误: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno())); assert(false); break; } char *ret_msg = (char*)malloc(rc + 1); memcpy(ret_msg, zmq_msg_data(&msg), rc); zmq_msg_close(&msg); ret_msg[rc] = 0; // rep收到消息后必须应答一下子 if(_handleMsgCb) { std::string ret = _handleMsgCb(ret_msg); zmq_send(this->_socket, ret.c_str(), ret.size(), 0); } else { assert(false); } } }