123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199 |
- //
- // RedZMQ.cpp
- // rebolt
- //
- // Created by zhuge on 2022/11/7.
- //
- #include "RedZMQ.h"
- #include "zmq.h"
- #include <unistd.h>
- #include <thread>
- #include <iostream>
- #include "assert.h"
- static int s_send(void *socket, const char *send_msg) {
- // 初始化一个zmq_msg_t对象, 分配的大小为string的大小
- zmq_msg_t msg;
- size_t send_msg_len = strlen(send_msg);
- zmq_msg_init_size(&msg, send_msg_len);
- memcpy(zmq_msg_data(&msg), send_msg, send_msg_len);
- // 发送数据
- int rc = zmq_msg_send(&msg, socket, 0);
- // 关闭zmq_msg_t对象
- zmq_msg_close(&msg);
- return rc;
- }
- static std::string s_recv(void *socket) {
- int rc;
- zmq_msg_t msg;
- zmq_msg_init(&msg);
-
- rc = zmq_msg_recv(&msg, socket, 0);
- if(rc == -1) {
- printf("zmq请求超时.\n");
- return "";
- }
- char *ret_msg = (char*)malloc(rc + 1);
- memcpy(ret_msg, zmq_msg_data(&msg), rc);
- zmq_msg_close(&msg);
-
- ret_msg[rc] = 0;
-
- std::string ret_string(ret_msg);
- free(ret_msg);
- return ret_string;
- }
- red_zmq::Pub::Pub(std::string endPoint)
- : _endPoint(endPoint) {
- this->_bind();
- }
- void red_zmq::Pub::_bind() {
- this->_context = zmq_ctx_new();
- this->_socket = zmq_socket(this->_context, ZMQ_PUB);
- assert(this->_socket);
- int rc = zmq_bind(this->_socket, _endPoint.c_str());
- if (rc != 0) {
- printf("Pub初始化错误:%s: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno()));
- }
- assert(rc == 0);
- }
- int red_zmq::Pub::pubMsg(const char* sendMsg) {
- return s_send(this->_socket, sendMsg);
- }
- #pragma mark - sub
- red_zmq::Sub::Sub(const std::string& endPoint, const std::function<void(const std::string&)>& handleMsgCb)
- : _endPoint(endPoint), _handleMsgCb(handleMsgCb){
- this->_connect();
- }
- void red_zmq::Sub::_connect() {
- this->_context = zmq_ctx_new();
- this->_socket = zmq_socket(this->_context, ZMQ_SUB);
- assert(this->_socket);
- int rc = zmq_connect(this->_socket, _endPoint.c_str());
- if (rc != 0) {
- printf("Sub初始化错误:%s: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno()));
- }
- assert(rc == 0);
-
- zmq_setsockopt(_socket, ZMQ_SUBSCRIBE, nullptr, 0);
-
- std::thread th(&red_zmq::Sub::_threadReceiveMsg, this);
- th.detach();
- }
- void red_zmq::Sub::_threadReceiveMsg() {
- while (true) {
- /*接收一帧,不会接收到地址帧*/
- int rc;
- zmq_msg_t msg;
- zmq_msg_init(&msg);
-
- rc = zmq_msg_recv(&msg, this->_socket, 0);
- if (rc == -1) {
- // 连接出现问题!!
- printf("Sub:%s接收广播错误: %s", _endPoint.c_str(), zmq_strerror(zmq_errno()));
- assert(false);
- break;
- }
-
- char *ret_msg = (char*)malloc(rc + 1);
- memcpy(ret_msg, zmq_msg_data(&msg), rc);
- zmq_msg_close(&msg);
-
- ret_msg[rc] = 0;
-
- if(_handleMsgCb) {
- _handleMsgCb(ret_msg);
- }
- usleep(1000);
- }
- }
- #pragma mark - req
- red_zmq::Req::Req(std::string endPoint)
- : _endPoint(endPoint) {
- _connect();
- }
- std::string red_zmq::Req::sendMsg(const char* sendMsg) {
- // 设置接收超时时间为5秒
- int receiveTimeout = 5000;
- zmq_setsockopt(this->_socket, ZMQ_RCVTIMEO, &receiveTimeout, sizeof(receiveTimeout));
- s_send(this->_socket, sendMsg);
- return s_recv(this->_socket);
- }
- void red_zmq::Req::_connect() {
- this->_context = zmq_ctx_new();
- this->_socket = zmq_socket(this->_context, ZMQ_REQ);
- assert(this->_socket);
- int rc = zmq_connect(this->_socket, _endPoint.c_str());
- if (rc != 0) {
- printf("Req 初始化错误:%s: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno()));
- }
- assert(rc == 0);
- }
- #pragma mark - rep
- red_zmq::Rep::Rep(const std::string& endPoint, const std::function<std::string(const std::string&)>& handleMsgCb)
- : _endPoint(endPoint), _handleMsgCb(handleMsgCb) {
- this->_bind();
- }
- void red_zmq::Rep::_bind() {
- this->_context = zmq_ctx_new();
- this->_socket = zmq_socket(this->_context, ZMQ_REP);
- assert(this->_socket);
- int rc = zmq_bind(this->_socket, _endPoint.c_str());
- if (rc != 0) {
- printf("Rep初始化错误:%s: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno()));
- }
- assert(rc == 0);
-
- std::thread th(&red_zmq::Rep::_threadReceiveMsg, this);
- th.detach();
- }
- void red_zmq::Rep::_threadReceiveMsg() {
- while (true) {
- /*接收一帧,不会接收到地址帧*/
- int rc;
- zmq_msg_t msg;
- zmq_msg_init(&msg);
-
- rc = zmq_msg_recv(&msg, this->_socket, 0);
- if (rc == -1) {
- // 连接出现问题!!
- printf("Rep:%s接收广播错误: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno()));
- assert(false);
- break;
- }
-
- char *ret_msg = (char*)malloc(rc + 1);
- memcpy(ret_msg, zmq_msg_data(&msg), rc);
- zmq_msg_close(&msg);
-
- ret_msg[rc] = 0;
- // rep收到消息后必须应答一下子
- if(_handleMsgCb) {
- std::string ret = _handleMsgCb(ret_msg);
- zmq_send(this->_socket, ret.c_str(), ret.size(), 0);
- } else {
- assert(false);
- }
- }
- }
|