norm_engine.hpp 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. #ifndef __ZMQ_NORM_ENGINE_HPP_INCLUDED__
  2. #define __ZMQ_NORM_ENGINE_HPP_INCLUDED__
  3. #if defined ZMQ_HAVE_NORM
  4. #if defined(ZMQ_HAVE_WINDOWS) && defined(ZMQ_IOTHREAD_POLLER_USE_EPOLL)
  5. #define ZMQ_USE_NORM_SOCKET_WRAPPER
  6. #endif
  7. #include "io_object.hpp"
  8. #include "i_engine.hpp"
  9. #include "options.hpp"
  10. #include "v2_decoder.hpp"
  11. #include "v2_encoder.hpp"
  12. #include <normApi.h>
  13. namespace zmq
  14. {
  15. class io_thread_t;
  16. class msg_t;
  17. class session_base_t;
  18. class norm_engine_t ZMQ_FINAL : public io_object_t, public i_engine
  19. {
  20. public:
  21. norm_engine_t (zmq::io_thread_t *parent_, const options_t &options_);
  22. ~norm_engine_t () ZMQ_FINAL;
  23. // create NORM instance, session, etc
  24. int init (const char *network_, bool send, bool recv);
  25. void shutdown ();
  26. bool has_handshake_stage () ZMQ_FINAL { return false; };
  27. // i_engine interface implementation.
  28. // Plug the engine to the session.
  29. void plug (zmq::io_thread_t *io_thread_,
  30. class session_base_t *session_) ZMQ_FINAL;
  31. // Terminate and deallocate the engine. Note that 'detached'
  32. // events are not fired on termination.
  33. void terminate () ZMQ_FINAL;
  34. // This method is called by the session to signalise that more
  35. // messages can be written to the pipe.
  36. bool restart_input () ZMQ_FINAL;
  37. // This method is called by the session to signalise that there
  38. // are messages to send available.
  39. void restart_output () ZMQ_FINAL;
  40. void zap_msg_available () ZMQ_FINAL {}
  41. const endpoint_uri_pair_t &get_endpoint () const ZMQ_FINAL;
  42. // i_poll_events interface implementation.
  43. // (we only need in_event() for NormEvent notification)
  44. // (i.e., don't have any output events or timers (yet))
  45. void in_event ();
  46. private:
  47. void unplug ();
  48. void send_data ();
  49. void recv_data (NormObjectHandle stream);
  50. enum
  51. {
  52. BUFFER_SIZE = 2048
  53. };
  54. // Used to keep track of streams from multiple senders
  55. class NormRxStreamState
  56. {
  57. public:
  58. NormRxStreamState (NormObjectHandle normStream,
  59. int64_t maxMsgSize,
  60. bool zeroCopy,
  61. int inBatchSize);
  62. ~NormRxStreamState ();
  63. NormObjectHandle GetStreamHandle () const { return norm_stream; }
  64. bool Init ();
  65. void SetRxReady (bool state) { rx_ready = state; }
  66. bool IsRxReady () const { return rx_ready; }
  67. void SetSync (bool state) { in_sync = state; }
  68. bool InSync () const { return in_sync; }
  69. // These are used to feed data to decoder
  70. // and its underlying "msg" buffer
  71. char *AccessBuffer () { return (char *) (buffer_ptr + buffer_count); }
  72. size_t GetBytesNeeded () const { return buffer_size - buffer_count; }
  73. void IncrementBufferCount (size_t count) { buffer_count += count; }
  74. msg_t *AccessMsg () { return zmq_decoder->msg (); }
  75. // This invokes the decoder "decode" method
  76. // returning 0 if more data is needed,
  77. // 1 if the message is complete, If an error
  78. // occurs the 'sync' is dropped and the
  79. // decoder re-initialized
  80. int Decode ();
  81. class List
  82. {
  83. public:
  84. List ();
  85. ~List ();
  86. void Append (NormRxStreamState &item);
  87. void Remove (NormRxStreamState &item);
  88. bool IsEmpty () const { return NULL == head; }
  89. void Destroy ();
  90. class Iterator
  91. {
  92. public:
  93. Iterator (const List &list);
  94. NormRxStreamState *GetNextItem ();
  95. private:
  96. NormRxStreamState *next_item;
  97. };
  98. friend class Iterator;
  99. private:
  100. NormRxStreamState *head;
  101. NormRxStreamState *tail;
  102. }; // end class zmq::norm_engine_t::NormRxStreamState::List
  103. friend class List;
  104. List *AccessList () { return list; }
  105. private:
  106. NormObjectHandle norm_stream;
  107. int64_t max_msg_size;
  108. bool zero_copy;
  109. int in_batch_size;
  110. bool in_sync;
  111. bool rx_ready;
  112. v2_decoder_t *zmq_decoder;
  113. bool skip_norm_sync;
  114. unsigned char *buffer_ptr;
  115. size_t buffer_size;
  116. size_t buffer_count;
  117. NormRxStreamState *prev;
  118. NormRxStreamState *next;
  119. NormRxStreamState::List *list;
  120. }; // end class zmq::norm_engine_t::NormRxStreamState
  121. const endpoint_uri_pair_t _empty_endpoint;
  122. session_base_t *zmq_session;
  123. options_t options;
  124. NormInstanceHandle norm_instance;
  125. handle_t norm_descriptor_handle;
  126. NormSessionHandle norm_session;
  127. bool is_sender;
  128. bool is_receiver;
  129. // Sender state
  130. msg_t tx_msg;
  131. v2_encoder_t zmq_encoder; // for tx messages (we use v2 for now)
  132. NormObjectHandle norm_tx_stream;
  133. bool tx_first_msg;
  134. bool tx_more_bit;
  135. bool zmq_output_ready; // zmq has msg(s) to send
  136. bool norm_tx_ready; // norm has tx queue vacancy
  137. // TBD - maybe don't need buffer if can access zmq message buffer directly?
  138. char tx_buffer[BUFFER_SIZE];
  139. unsigned int tx_index;
  140. unsigned int tx_len;
  141. // Receiver state
  142. // Lists of norm rx streams from remote senders
  143. bool zmq_input_ready; // zmq ready to receive msg(s)
  144. NormRxStreamState::List
  145. rx_pending_list; // rx streams waiting for data reception
  146. NormRxStreamState::List
  147. rx_ready_list; // rx streams ready for NormStreamRead()
  148. NormRxStreamState::List
  149. msg_ready_list; // rx streams w/ msg ready for push to zmq
  150. #ifdef ZMQ_USE_NORM_SOCKET_WRAPPER
  151. fd_t
  152. wrapper_read_fd; // filedescriptor used to read norm events through the wrapper
  153. DWORD wrapper_thread_id;
  154. HANDLE wrapper_thread_handle;
  155. #endif
  156. }; // end class norm_engine_t
  157. }
  158. #endif // ZMQ_HAVE_NORM
  159. #endif // !__ZMQ_NORM_ENGINE_HPP_INCLUDED__