8 : context_(context), async_handler_socket_(*context, zmq::socket_type::router),
9 unique_id(
"reactor_" +
std::to_string((uintptr_t) this))
11 async_handler_socket_.bind(
"inproc://" +
unique_id);
16 sockets_.emplace(name, socket);
19 void reactor::add_handler(
const std::vector<std::string> &origins, std::shared_ptr<handler_interface> handler)
21 auto wrapper = std::make_shared<handler_wrapper>(*
this, handler);
23 for (
auto &origin : origins) {
24 handlers_.emplace(origin, wrapper);
30 auto wrapper = std::make_shared<asynchronous_handler_wrapper>(*context_, async_handler_socket_, *
this, handler);
32 for (
auto &origin : origins) {
33 handlers_.emplace(origin, wrapper);
39 auto it = sockets_.find(message.
key);
42 if (it != std::end(sockets_)) {
43 it->second->send_message(message);
51 auto range = handlers_.equal_range(message.
key);
53 for (
auto it = range.first; it != range.second; ++it) {
54 (*it->second)(message);
60 std::vector<zmq::pollitem_t> pollitems;
61 std::vector<std::string> pollitem_names;
64 for (
auto it : sockets_) {
65 it.second->initialize();
66 pollitems.push_back(it.second->get_pollitem());
67 pollitem_names.push_back(it.first);
72 zmq_pollitem_t{.socket = (
void *) async_handler_socket_, .fd = 0, .events = ZMQ_POLLIN, .revents = 0});
74 termination_flag_.store(
false);
77 while (!termination_flag_.load()) {
78 auto time_before_poll = std::chrono::system_clock::now();
79 zmq::poll(pollitems, std::chrono::milliseconds(100));
80 auto time_after_poll = std::chrono::system_clock::now();
82 auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(time_after_poll - time_before_poll);
85 for (
auto item : pollitems) {
86 if (item.revents & ZMQ_POLLIN) {
89 if (i < pollitem_names.size()) {
91 received_msg.
key = pollitem_names.at(i);
92 sockets_.at(received_msg.
key)->receive_message(received_msg);
98 zmq::message_t zmessage;
101 async_handler_socket_.recv(&zmessage);
103 async_handler_socket_.recv(&zmessage);
104 received_msg.
key = std::string(static_cast<char *>(zmessage.data()), zmessage.size());
106 async_handler_socket_.recv(&zmessage);
107 received_msg.
identity = std::string(static_cast<char *>(zmessage.data()), zmessage.size());
109 while (zmessage.more()) {
110 async_handler_socket_.recv(&zmessage);
111 received_msg.
data.emplace_back(static_cast<char *>(zmessage.data()), zmessage.size());
124 timer_msg.
data.push_back(std::to_string(elapsed_time.count()));
134 termination_flag_.store(
true);
138 : handler_(handler), reactor_(reactor_ref)
151 const std::string asynchronous_handler_wrapper::TERMINATE_MSG =
"TERMINATE";
154 zmq::socket_t &async_handler_socket,
156 std::shared_ptr<handler_interface> handler)
157 :
handler_wrapper(reactor_ref, handler), reactor_socket_(async_handler_socket),
158 unique_id_(
std::to_string((uintptr_t) this)), context_(context)
160 worker_ = std::thread([
this]() { handler_thread(); });
166 reactor_socket_.send(unique_id_.data(), unique_id_.size(), ZMQ_SNDMORE);
167 reactor_socket_.send(TERMINATE_MSG.data(), TERMINATE_MSG.size());
175 reactor_socket_.send(unique_id_.data(), unique_id_.size(), ZMQ_SNDMORE);
176 reactor_socket_.send(message.
key.data(), message.
key.size(), ZMQ_SNDMORE);
177 reactor_socket_.send(message.
identity.data(), message.
identity.size(), ZMQ_SNDMORE);
179 for (
auto it = std::begin(message.
data); it != std::end(message.
data); ++it) {
180 reactor_socket_.send(it->c_str(), it->size(), std::next(it) != std::end(message.
data) ? ZMQ_SNDMORE : 0);
184 void asynchronous_handler_wrapper::handler_thread()
186 zmq::socket_t socket(context_, zmq::socket_type::dealer);
187 socket.setsockopt(ZMQ_IDENTITY, unique_id_.data(), unique_id_.size());
192 zmq::message_t message;
194 socket.recv(&message, 0);
195 request.
key = std::string(static_cast<char *>(message.data()), message.size());
197 if (request.
key == TERMINATE_MSG) {
201 if (!message.more()) {
205 socket.recv(&message, 0);
206 request.
identity = std::string(static_cast<char *>(message.data()), message.size());
208 while (message.more()) {
209 socket.recv(&message, 0);
210 request.
data.emplace_back(static_cast<char *>(message.data()), message.size());
214 request, [
this, &socket](
const message_container &response) { send_response(socket, response); });
218 void asynchronous_handler_wrapper::send_response(zmq::socket_t &socket,
const message_container &message)
220 socket.send(message.
key.data(), message.
key.size(), ZMQ_SNDMORE);
223 for (
auto it = std::begin(message.
data); it != std::end(message.
data); ++it) {
224 socket.send(it->c_str(), it->size(), std::next(it) != std::end(message.
data) ? ZMQ_SNDMORE : 0);
~asynchronous_handler_wrapper()
reactor(std::shared_ptr< zmq::context_t > context)
virtual void operator()(const message_container &message)
std::shared_ptr< handler_interface > handler_
handler_wrapper(reactor &reactor_ref, std::shared_ptr< handler_interface > handler)
virtual ~handler_wrapper()
virtual void operator()(const message_container &message)
void add_handler(const std::vector< std::string > &origins, std::shared_ptr< handler_interface > handler)
const std::string unique_id
static const std::string KEY_TIMER
void send_message(const message_container &message)
void add_async_handler(const std::vector< std::string > &origins, std::shared_ptr< handler_interface > handler)
asynchronous_handler_wrapper(zmq::context_t &context, zmq::socket_t &async_handler_socket, reactor &reactor_ref, std::shared_ptr< handler_interface > handler)
void process_message(const message_container &message)
std::vector< std::string > data
void add_socket(const std::string &name, std::shared_ptr< socket_wrapper_base > socket)