msg.hpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. /*
  2. Copyright (c) 2007-2016 Contributors as noted in the AUTHORS file
  3. This file is part of libzmq, the ZeroMQ core engine in C++.
  4. libzmq is free software; you can redistribute it and/or modify it under
  5. the terms of the GNU Lesser General Public License (LGPL) as published
  6. by the Free Software Foundation; either version 3 of the License, or
  7. (at your option) any later version.
  8. As a special exception, the Contributors give you permission to link
  9. this library with independent modules to produce an executable,
  10. regardless of the license terms of these independent modules, and to
  11. copy and distribute the resulting executable under terms of your choice,
  12. provided that you also meet, for each linked independent module, the
  13. terms and conditions of the license of that module. An independent
  14. module is a module which is not derived from or based on this library.
  15. If you modify this library, you must extend this exception to your
  16. version of the library.
  17. libzmq is distributed in the hope that it will be useful, but WITHOUT
  18. ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
  19. FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
  20. License for more details.
  21. You should have received a copy of the GNU Lesser General Public License
  22. along with this program. If not, see <http://www.gnu.org/licenses/>.
  23. */
  24. #ifndef __ZMQ_MSG_HPP_INCLUDE__
  25. #define __ZMQ_MSG_HPP_INCLUDE__
  26. #include <stddef.h>
  27. #include <stdio.h>
  28. #include "config.hpp"
  29. #include "err.hpp"
  30. #include "fd.hpp"
  31. #include "atomic_counter.hpp"
  32. #include "metadata.hpp"
  33. // bits 2-5
  34. #define CMD_TYPE_MASK 0x1c
  35. // Signature for free function to deallocate the message content.
  36. // Note that it has to be declared as "C" so that it is the same as
  37. // zmq_free_fn defined in zmq.h.
  38. extern "C" {
  39. typedef void (msg_free_fn) (void *data_, void *hint_);
  40. }
  41. namespace zmq
  42. {
  43. // Note that this structure needs to be explicitly constructed
  44. // (init functions) and destructed (close function).
  45. static const char cancel_cmd_name[] = "\6CANCEL";
  46. static const char sub_cmd_name[] = "\x9SUBSCRIBE";
  47. class msg_t
  48. {
  49. public:
  50. // Shared message buffer. Message data are either allocated in one
  51. // continuous block along with this structure - thus avoiding one
  52. // malloc/free pair or they are stored in user-supplied memory.
  53. // In the latter case, ffn member stores pointer to the function to be
  54. // used to deallocate the data. If the buffer is actually shared (there
  55. // are at least 2 references to it) refcount member contains number of
  56. // references.
  57. struct content_t
  58. {
  59. void *data;
  60. size_t size;
  61. msg_free_fn *ffn;
  62. void *hint;
  63. zmq::atomic_counter_t refcnt;
  64. };
  65. // Message flags.
  66. enum
  67. {
  68. more = 1, // Followed by more parts
  69. command = 2, // Command frame (see ZMTP spec)
  70. // Command types, use only bits 2-5 and compare with ==, not bitwise,
  71. // a command can never be of more that one type at the same time
  72. ping = 4,
  73. pong = 8,
  74. subscribe = 12,
  75. cancel = 16,
  76. close_cmd = 20,
  77. credential = 32,
  78. routing_id = 64,
  79. shared = 128
  80. };
  81. bool check () const;
  82. int init ();
  83. int init (void *data_,
  84. size_t size_,
  85. msg_free_fn *ffn_,
  86. void *hint_,
  87. content_t *content_ = NULL);
  88. int init_size (size_t size_);
  89. int init_buffer (const void *buf_, size_t size_);
  90. int init_data (void *data_, size_t size_, msg_free_fn *ffn_, void *hint_);
  91. int init_external_storage (content_t *content_,
  92. void *data_,
  93. size_t size_,
  94. msg_free_fn *ffn_,
  95. void *hint_);
  96. int init_delimiter ();
  97. int init_join ();
  98. int init_leave ();
  99. int init_subscribe (const size_t size_, const unsigned char *topic);
  100. int init_cancel (const size_t size_, const unsigned char *topic);
  101. int close ();
  102. int move (msg_t &src_);
  103. int copy (msg_t &src_);
  104. void *data ();
  105. size_t size () const;
  106. unsigned char flags () const;
  107. void set_flags (unsigned char flags_);
  108. void reset_flags (unsigned char flags_);
  109. metadata_t *metadata () const;
  110. void set_metadata (metadata_t *metadata_);
  111. void reset_metadata ();
  112. bool is_routing_id () const;
  113. bool is_credential () const;
  114. bool is_delimiter () const;
  115. bool is_join () const;
  116. bool is_leave () const;
  117. bool is_ping () const;
  118. bool is_pong () const;
  119. bool is_close_cmd () const;
  120. // These are called on each message received by the session_base class,
  121. // so get them inlined to avoid the overhead of 2 function calls per msg
  122. bool is_subscribe () const
  123. {
  124. return (_u.base.flags & CMD_TYPE_MASK) == subscribe;
  125. }
  126. bool is_cancel () const
  127. {
  128. return (_u.base.flags & CMD_TYPE_MASK) == cancel;
  129. }
  130. size_t command_body_size () const;
  131. void *command_body ();
  132. bool is_vsm () const;
  133. bool is_cmsg () const;
  134. bool is_lmsg () const;
  135. bool is_zcmsg () const;
  136. uint32_t get_routing_id () const;
  137. int set_routing_id (uint32_t routing_id_);
  138. int reset_routing_id ();
  139. const char *group () const;
  140. int set_group (const char *group_);
  141. int set_group (const char *, size_t length_);
  142. // After calling this function you can copy the message in POD-style
  143. // refs_ times. No need to call copy.
  144. void add_refs (int refs_);
  145. // Removes references previously added by add_refs. If the number of
  146. // references drops to 0, the message is closed and false is returned.
  147. bool rm_refs (int refs_);
  148. void shrink (size_t new_size_);
  149. // Size in bytes of the largest message that is still copied around
  150. // rather than being reference-counted.
  151. enum
  152. {
  153. msg_t_size = 64
  154. };
  155. enum
  156. {
  157. max_vsm_size =
  158. msg_t_size - (sizeof (metadata_t *) + 3 + 16 + sizeof (uint32_t))
  159. };
  160. enum
  161. {
  162. ping_cmd_name_size = 5, // 4PING
  163. cancel_cmd_name_size = 7, // 6CANCEL
  164. sub_cmd_name_size = 10 // 9SUBSCRIBE
  165. };
  166. private:
  167. zmq::atomic_counter_t *refcnt ();
  168. // Different message types.
  169. enum type_t
  170. {
  171. type_min = 101,
  172. // VSM messages store the content in the message itself
  173. type_vsm = 101,
  174. // LMSG messages store the content in malloc-ed memory
  175. type_lmsg = 102,
  176. // Delimiter messages are used in envelopes
  177. type_delimiter = 103,
  178. // CMSG messages point to constant data
  179. type_cmsg = 104,
  180. // zero-copy LMSG message for v2_decoder
  181. type_zclmsg = 105,
  182. // Join message for radio_dish
  183. type_join = 106,
  184. // Leave message for radio_dish
  185. type_leave = 107,
  186. type_max = 107
  187. };
  188. enum group_type_t
  189. {
  190. group_type_short,
  191. group_type_long
  192. };
  193. struct long_group_t
  194. {
  195. char group[ZMQ_GROUP_MAX_LENGTH + 1];
  196. atomic_counter_t refcnt;
  197. };
  198. union group_t
  199. {
  200. unsigned char type;
  201. struct
  202. {
  203. unsigned char type;
  204. char group[15];
  205. } sgroup;
  206. struct
  207. {
  208. unsigned char type;
  209. long_group_t *content;
  210. } lgroup;
  211. };
  212. // Note that fields shared between different message types are not
  213. // moved to the parent class (msg_t). This way we get tighter packing
  214. // of the data. Shared fields can be accessed via 'base' member of
  215. // the union.
  216. union
  217. {
  218. struct
  219. {
  220. metadata_t *metadata;
  221. unsigned char unused[msg_t_size
  222. - (sizeof (metadata_t *) + 2
  223. + sizeof (uint32_t) + sizeof (group_t))];
  224. unsigned char type;
  225. unsigned char flags;
  226. uint32_t routing_id;
  227. group_t group;
  228. } base;
  229. struct
  230. {
  231. metadata_t *metadata;
  232. unsigned char data[max_vsm_size];
  233. unsigned char size;
  234. unsigned char type;
  235. unsigned char flags;
  236. uint32_t routing_id;
  237. group_t group;
  238. } vsm;
  239. struct
  240. {
  241. metadata_t *metadata;
  242. content_t *content;
  243. unsigned char
  244. unused[msg_t_size
  245. - (sizeof (metadata_t *) + sizeof (content_t *) + 2
  246. + sizeof (uint32_t) + sizeof (group_t))];
  247. unsigned char type;
  248. unsigned char flags;
  249. uint32_t routing_id;
  250. group_t group;
  251. } lmsg;
  252. struct
  253. {
  254. metadata_t *metadata;
  255. content_t *content;
  256. unsigned char
  257. unused[msg_t_size
  258. - (sizeof (metadata_t *) + sizeof (content_t *) + 2
  259. + sizeof (uint32_t) + sizeof (group_t))];
  260. unsigned char type;
  261. unsigned char flags;
  262. uint32_t routing_id;
  263. group_t group;
  264. } zclmsg;
  265. struct
  266. {
  267. metadata_t *metadata;
  268. void *data;
  269. size_t size;
  270. unsigned char unused[msg_t_size
  271. - (sizeof (metadata_t *) + sizeof (void *)
  272. + sizeof (size_t) + 2 + sizeof (uint32_t)
  273. + sizeof (group_t))];
  274. unsigned char type;
  275. unsigned char flags;
  276. uint32_t routing_id;
  277. group_t group;
  278. } cmsg;
  279. struct
  280. {
  281. metadata_t *metadata;
  282. unsigned char unused[msg_t_size
  283. - (sizeof (metadata_t *) + 2
  284. + sizeof (uint32_t) + sizeof (group_t))];
  285. unsigned char type;
  286. unsigned char flags;
  287. uint32_t routing_id;
  288. group_t group;
  289. } delimiter;
  290. } _u;
  291. };
  292. inline int close_and_return (zmq::msg_t *msg_, int echo_)
  293. {
  294. // Since we abort on close failure we preserve errno for success case.
  295. const int err = errno;
  296. const int rc = msg_->close ();
  297. errno_assert (rc == 0);
  298. errno = err;
  299. return echo_;
  300. }
  301. inline int close_and_return (zmq::msg_t msg_[], int count_, int echo_)
  302. {
  303. for (int i = 0; i < count_; i++)
  304. close_and_return (&msg_[i], 0);
  305. return echo_;
  306. }
  307. }
  308. #endif