RedZMQ.cpp 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. //
  2. // RedZMQ.cpp
  3. // rebolt
  4. //
  5. // Created by zhuge on 2022/11/7.
  6. //
  7. #include "RedZMQ.h"
  8. #include "zmq.h"
  9. #include <unistd.h>
  10. #include <thread>
  11. #include <iostream>
  12. #include "assert.h"
  13. static int s_send(void *socket, const char *send_msg) {
  14. // 初始化一个zmq_msg_t对象, 分配的大小为string的大小
  15. zmq_msg_t msg;
  16. size_t send_msg_len = strlen(send_msg);
  17. zmq_msg_init_size(&msg, send_msg_len);
  18. memcpy(zmq_msg_data(&msg), send_msg, send_msg_len);
  19. // 发送数据
  20. int rc = zmq_msg_send(&msg, socket, 0);
  21. // 关闭zmq_msg_t对象
  22. zmq_msg_close(&msg);
  23. return rc;
  24. }
  25. static std::string s_recv(void *socket) {
  26. int rc;
  27. zmq_msg_t msg;
  28. zmq_msg_init(&msg);
  29. rc = zmq_msg_recv(&msg, socket, 0);
  30. if(rc == -1) {
  31. printf("zmq请求超时.\n");
  32. return "";
  33. }
  34. char *ret_msg = (char*)malloc(rc + 1);
  35. memcpy(ret_msg, zmq_msg_data(&msg), rc);
  36. zmq_msg_close(&msg);
  37. ret_msg[rc] = 0;
  38. std::string ret_string(ret_msg);
  39. free(ret_msg);
  40. return ret_string;
  41. }
  42. red_zmq::Pub::Pub(std::string endPoint)
  43. : _endPoint(endPoint) {
  44. this->_bind();
  45. }
  46. void red_zmq::Pub::_bind() {
  47. this->_context = zmq_ctx_new();
  48. this->_socket = zmq_socket(this->_context, ZMQ_PUB);
  49. assert(this->_socket);
  50. int rc = zmq_bind(this->_socket, _endPoint.c_str());
  51. if (rc != 0) {
  52. printf("Pub初始化错误:%s: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno()));
  53. }
  54. assert(rc == 0);
  55. }
  56. int red_zmq::Pub::pubMsg(const char* sendMsg) {
  57. return s_send(this->_socket, sendMsg);
  58. }
  59. #pragma mark - sub
  60. red_zmq::Sub::Sub(const std::string& endPoint, const std::function<void(const std::string&)>& handleMsgCb)
  61. : _endPoint(endPoint), _handleMsgCb(handleMsgCb){
  62. this->_connect();
  63. }
  64. void red_zmq::Sub::_connect() {
  65. this->_context = zmq_ctx_new();
  66. this->_socket = zmq_socket(this->_context, ZMQ_SUB);
  67. assert(this->_socket);
  68. int rc = zmq_connect(this->_socket, _endPoint.c_str());
  69. if (rc != 0) {
  70. printf("Sub初始化错误:%s: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno()));
  71. }
  72. assert(rc == 0);
  73. zmq_setsockopt(_socket, ZMQ_SUBSCRIBE, nullptr, 0);
  74. std::thread th(&red_zmq::Sub::_threadReceiveMsg, this);
  75. th.detach();
  76. }
  77. void red_zmq::Sub::_threadReceiveMsg() {
  78. while (true) {
  79. /*接收一帧,不会接收到地址帧*/
  80. int rc;
  81. zmq_msg_t msg;
  82. zmq_msg_init(&msg);
  83. rc = zmq_msg_recv(&msg, this->_socket, 0);
  84. if (rc == -1) {
  85. // 连接出现问题!!
  86. printf("Sub:%s接收广播错误: %s", _endPoint.c_str(), zmq_strerror(zmq_errno()));
  87. assert(false);
  88. break;
  89. }
  90. char *ret_msg = (char*)malloc(rc + 1);
  91. memcpy(ret_msg, zmq_msg_data(&msg), rc);
  92. zmq_msg_close(&msg);
  93. ret_msg[rc] = 0;
  94. if(_handleMsgCb) {
  95. _handleMsgCb(ret_msg);
  96. }
  97. usleep(1000);
  98. }
  99. }
  100. #pragma mark - req
  101. red_zmq::Req::Req(std::string endPoint)
  102. : _endPoint(endPoint) {
  103. _connect();
  104. }
  105. std::string red_zmq::Req::sendMsg(const char* sendMsg) {
  106. // 设置接收超时时间为5秒
  107. int receiveTimeout = 5000;
  108. zmq_setsockopt(this->_socket, ZMQ_RCVTIMEO, &receiveTimeout, sizeof(receiveTimeout));
  109. s_send(this->_socket, sendMsg);
  110. return s_recv(this->_socket);
  111. }
  112. void red_zmq::Req::_connect() {
  113. this->_context = zmq_ctx_new();
  114. this->_socket = zmq_socket(this->_context, ZMQ_REQ);
  115. assert(this->_socket);
  116. int rc = zmq_connect(this->_socket, _endPoint.c_str());
  117. if (rc != 0) {
  118. printf("Req 初始化错误:%s: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno()));
  119. }
  120. assert(rc == 0);
  121. }
  122. #pragma mark - rep
  123. red_zmq::Rep::Rep(const std::string& endPoint, const std::function<std::string(const std::string&)>& handleMsgCb)
  124. : _endPoint(endPoint), _handleMsgCb(handleMsgCb) {
  125. this->_bind();
  126. }
  127. void red_zmq::Rep::_bind() {
  128. this->_context = zmq_ctx_new();
  129. this->_socket = zmq_socket(this->_context, ZMQ_REP);
  130. assert(this->_socket);
  131. int rc = zmq_bind(this->_socket, _endPoint.c_str());
  132. if (rc != 0) {
  133. printf("Rep初始化错误:%s: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno()));
  134. }
  135. assert(rc == 0);
  136. std::thread th(&red_zmq::Rep::_threadReceiveMsg, this);
  137. th.detach();
  138. }
  139. void red_zmq::Rep::_threadReceiveMsg() {
  140. while (true) {
  141. /*接收一帧,不会接收到地址帧*/
  142. int rc;
  143. zmq_msg_t msg;
  144. zmq_msg_init(&msg);
  145. rc = zmq_msg_recv(&msg, this->_socket, 0);
  146. if (rc == -1) {
  147. // 连接出现问题!!
  148. printf("Rep:%s接收广播错误: %s\n", _endPoint.c_str(), zmq_strerror(zmq_errno()));
  149. assert(false);
  150. break;
  151. }
  152. char *ret_msg = (char*)malloc(rc + 1);
  153. memcpy(ret_msg, zmq_msg_data(&msg), rc);
  154. zmq_msg_close(&msg);
  155. ret_msg[rc] = 0;
  156. // rep收到消息后必须应答一下子
  157. if(_handleMsgCb) {
  158. std::string ret = _handleMsgCb(ret_msg);
  159. zmq_send(this->_socket, ret.c_str(), ret.size(), 0);
  160. } else {
  161. assert(false);
  162. }
  163. }
  164. }