ReCodEx - Task Broker
ReCodEx is complex programmer testing solution, primary targeted to technical universities. It's highly customizable and based on modern technologies.
reactor.cpp
1 #include <thread>
2 
3 #include "reactor.h"
4 
5 const std::string reactor::KEY_TIMER = "timer";
6 
7 reactor::reactor(std::shared_ptr<zmq::context_t> context)
8  : context_(context), async_handler_socket_(*context, zmq::socket_type::router),
9  unique_id("reactor_" + std::to_string((uintptr_t) this))
10 {
11  async_handler_socket_.bind("inproc://" + unique_id);
12 }
13 
14 void reactor::add_socket(const std::string &name, std::shared_ptr<socket_wrapper_base> socket)
15 {
16  sockets_.emplace(name, socket);
17 }
18 
19 void reactor::add_handler(const std::vector<std::string> &origins, std::shared_ptr<handler_interface> handler)
20 {
21  auto wrapper = std::make_shared<handler_wrapper>(*this, handler);
22 
23  for (auto &origin : origins) {
24  handlers_.emplace(origin, wrapper);
25  }
26 }
27 
28 void reactor::add_async_handler(const std::vector<std::string> &origins, std::shared_ptr<handler_interface> handler)
29 {
30  auto wrapper = std::make_shared<asynchronous_handler_wrapper>(*context_, async_handler_socket_, *this, handler);
31 
32  for (auto &origin : origins) {
33  handlers_.emplace(origin, wrapper);
34  }
35 }
36 
38 {
39  auto it = sockets_.find(message.key);
40 
41  // If there is a matching socket, send the message there. If not, let the reactor handle it
42  if (it != std::end(sockets_)) {
43  it->second->send_message(message);
44  } else {
45  process_message(message);
46  }
47 }
48 
50 {
51  auto range = handlers_.equal_range(message.key);
52 
53  for (auto it = range.first; it != range.second; ++it) {
54  (*it->second)(message);
55  }
56 }
57 
59 {
60  std::vector<zmq::pollitem_t> pollitems;
61  std::vector<std::string> pollitem_names;
62 
63  // Poll all registered sockets
64  for (auto it : sockets_) {
65  it.second->initialize();
66  pollitems.push_back(it.second->get_pollitem());
67  pollitem_names.push_back(it.first);
68  }
69 
70  // Also poll the internal socket for asynchronous communication
71  pollitems.push_back(
72  zmq_pollitem_t{.socket = (void *) async_handler_socket_, .fd = 0, .events = ZMQ_POLLIN, .revents = 0});
73 
74  termination_flag_.store(false);
75 
76  // Enter the poll loop
77  while (!termination_flag_.load()) {
78  auto time_before_poll = std::chrono::system_clock::now();
79  zmq::poll(pollitems, std::chrono::milliseconds(100));
80  auto time_after_poll = std::chrono::system_clock::now();
81 
82  auto elapsed_time = std::chrono::duration_cast<std::chrono::milliseconds>(time_after_poll - time_before_poll);
83 
84  size_t i = 0;
85  for (auto item : pollitems) {
86  if (item.revents & ZMQ_POLLIN) {
87  message_container received_msg;
88 
89  if (i < pollitem_names.size()) {
90  // message came from a registered socket, fill in its key
91  received_msg.key = pollitem_names.at(i);
92  sockets_.at(received_msg.key)->receive_message(received_msg);
93 
94  // messages from sockets must go through a handler
95  process_message(received_msg);
96  } else {
97  // message from the asynchronous handler socket
98  zmq::message_t zmessage;
99 
100  // this is the async handler identity - we don't care about that
101  async_handler_socket_.recv(&zmessage);
102 
103  async_handler_socket_.recv(&zmessage);
104  received_msg.key = std::string(static_cast<char *>(zmessage.data()), zmessage.size());
105 
106  async_handler_socket_.recv(&zmessage);
107  received_msg.identity = std::string(static_cast<char *>(zmessage.data()), zmessage.size());
108 
109  while (zmessage.more()) {
110  async_handler_socket_.recv(&zmessage);
111  received_msg.data.emplace_back(static_cast<char *>(zmessage.data()), zmessage.size());
112  }
113 
114  // messages from async handlers might be destined to a socket
115  send_message(received_msg);
116  }
117  }
118 
119  ++i;
120  }
121 
122  message_container timer_msg;
123  timer_msg.key = KEY_TIMER;
124  timer_msg.data.push_back(std::to_string(elapsed_time.count()));
125 
126  process_message(timer_msg);
127  }
128 
129  handlers_.clear();
130 }
131 
133 {
134  termination_flag_.store(true);
135 }
136 
137 handler_wrapper::handler_wrapper(reactor &reactor_ref, std::shared_ptr<handler_interface> handler)
138  : handler_(handler), reactor_(reactor_ref)
139 {
140 }
141 
143 {
144 }
145 
147 {
148  handler_->on_request(message, [this](const message_container &response) { reactor_.send_message(response); });
149 }
150 
151 const std::string asynchronous_handler_wrapper::TERMINATE_MSG = "TERMINATE";
152 
154  zmq::socket_t &async_handler_socket,
155  reactor &reactor_ref,
156  std::shared_ptr<handler_interface> handler)
157  : handler_wrapper(reactor_ref, handler), reactor_socket_(async_handler_socket),
158  unique_id_(std::to_string((uintptr_t) this)), context_(context)
159 {
160  worker_ = std::thread([this]() { handler_thread(); });
161 }
162 
164 {
165  // Send a message to terminate the worker thread
166  reactor_socket_.send(unique_id_.data(), unique_id_.size(), ZMQ_SNDMORE);
167  reactor_socket_.send(TERMINATE_MSG.data(), TERMINATE_MSG.size());
168 
169  // Join it
170  worker_.join();
171 }
172 
174 {
175  reactor_socket_.send(unique_id_.data(), unique_id_.size(), ZMQ_SNDMORE);
176  reactor_socket_.send(message.key.data(), message.key.size(), ZMQ_SNDMORE);
177  reactor_socket_.send(message.identity.data(), message.identity.size(), ZMQ_SNDMORE);
178 
179  for (auto it = std::begin(message.data); it != std::end(message.data); ++it) {
180  reactor_socket_.send(it->c_str(), it->size(), std::next(it) != std::end(message.data) ? ZMQ_SNDMORE : 0);
181  }
182 }
183 
184 void asynchronous_handler_wrapper::handler_thread()
185 {
186  zmq::socket_t socket(context_, zmq::socket_type::dealer);
187  socket.setsockopt(ZMQ_IDENTITY, unique_id_.data(), unique_id_.size());
188  socket.connect("inproc://" + reactor_.unique_id);
189 
190  while (true) {
192  zmq::message_t message;
193 
194  socket.recv(&message, 0);
195  request.key = std::string(static_cast<char *>(message.data()), message.size());
196 
197  if (request.key == TERMINATE_MSG) {
198  return;
199  }
200 
201  if (!message.more()) {
202  continue;
203  }
204 
205  socket.recv(&message, 0);
206  request.identity = std::string(static_cast<char *>(message.data()), message.size());
207 
208  while (message.more()) {
209  socket.recv(&message, 0);
210  request.data.emplace_back(static_cast<char *>(message.data()), message.size());
211  }
212 
213  handler_->on_request(
214  request, [this, &socket](const message_container &response) { send_response(socket, response); });
215  }
216 }
217 
218 void asynchronous_handler_wrapper::send_response(zmq::socket_t &socket, const message_container &message)
219 {
220  socket.send(message.key.data(), message.key.size(), ZMQ_SNDMORE);
221  socket.send(message.identity.data(), message.identity.size(), ZMQ_SNDMORE);
222 
223  for (auto it = std::begin(message.data); it != std::end(message.data); ++it) {
224  socket.send(it->c_str(), it->size(), std::next(it) != std::end(message.data) ? ZMQ_SNDMORE : 0);
225  }
226 }
reactor(std::shared_ptr< zmq::context_t > context)
Definition: reactor.cpp:7
STL namespace.
virtual void operator()(const message_container &message)
Definition: reactor.cpp:146
std::shared_ptr< handler_interface > handler_
Definition: reactor.h:42
handler_wrapper(reactor &reactor_ref, std::shared_ptr< handler_interface > handler)
Definition: reactor.cpp:137
virtual ~handler_wrapper()
Definition: reactor.cpp:142
virtual void operator()(const message_container &message)
Definition: reactor.cpp:173
void start_loop()
Definition: reactor.cpp:58
void add_handler(const std::vector< std::string > &origins, std::shared_ptr< handler_interface > handler)
Definition: reactor.cpp:19
const std::string unique_id
Definition: reactor.h:134
static const std::string KEY_TIMER
Definition: reactor.h:129
void terminate()
Definition: reactor.cpp:132
void send_message(const message_container &message)
Definition: reactor.cpp:37
void add_async_handler(const std::vector< std::string > &origins, std::shared_ptr< handler_interface > handler)
Definition: reactor.cpp:28
asynchronous_handler_wrapper(zmq::context_t &context, zmq::socket_t &async_handler_socket, reactor &reactor_ref, std::shared_ptr< handler_interface > handler)
Definition: reactor.cpp:153
Definition: worker.h:78
void process_message(const message_container &message)
Definition: reactor.cpp:49
std::vector< std::string > data
void add_socket(const std::string &name, std::shared_ptr< socket_wrapper_base > socket)
Definition: reactor.cpp:14
reactor & reactor_
Definition: reactor.h:45