ReCodEx - Task Broker
ReCodEx is complex programmer testing solution, primary targeted to technical universities. It's highly customizable and based on modern technologies.
broker_handler.cpp
1 #include "broker_handler.h"
2 #include "../broker_connect.h"
3 #include "../notifier/reactor_status_notifier.h"
4 
5 broker_handler::broker_handler(std::shared_ptr<const broker_config> config,
6  std::shared_ptr<worker_registry> workers,
7  std::shared_ptr<spdlog::logger> logger)
8  : workers_(workers), logger_(logger), config_(config)
9 {
10  if (logger_ == nullptr) {
11  logger_ = helpers::create_null_logger();
12  }
13 
14  client_commands_.register_command(
15  "eval", [this](const std::string &identity, const std::vector<std::string> &message, response_cb respond) {
16  process_client_eval(identity, message, respond);
17  });
18 
19  worker_commands_.register_command(
20  "init", [this](const std::string &identity, const std::vector<std::string> &message, response_cb respond) {
21  process_worker_init(identity, message, respond);
22  });
23 
24  worker_commands_.register_command(
25  "done", [this](const std::string &identity, const std::vector<std::string> &message, response_cb respond) {
26  process_worker_done(identity, message, respond);
27  });
28 
29  worker_commands_.register_command(
30  "ping", [this](const std::string &identity, const std::vector<std::string> &message, response_cb respond) {
31  process_worker_ping(identity, message, respond);
32  });
33 
34  worker_commands_.register_command(
35  "progress", [this](const std::string &identity, const std::vector<std::string> &message, response_cb respond) {
36  process_worker_progress(identity, message, respond);
37  });
38 }
39 
41 {
42  if (message.key == broker_connect::KEY_WORKERS) {
43  auto worker = workers_->find_worker_by_identity(message.identity);
44 
45  if (worker != nullptr) {
46  worker->liveness = config_->get_max_worker_liveness();
47  worker_timers_[worker] = std::chrono::milliseconds(0);
48  }
49 
50  worker_commands_.call_function(message.data.at(0), message.identity, message.data, respond);
51  }
52 
53  if (message.key == broker_connect::KEY_CLIENTS) {
54  client_commands_.call_function(message.data.at(0), message.identity, message.data, respond);
55  }
56 
57  if (message.key == broker_connect::KEY_TIMER) {
58  process_timer(message, respond);
59  }
60 }
61 
62 void broker_handler::process_client_eval(
63  const std::string &identity, const std::vector<std::string> &message, response_cb respond)
64 {
65  // At first, let client know that we are alive and well
66  respond(message_container(broker_connect::KEY_CLIENTS, identity, {"ack"}));
67 
68  // Get job identification and parse headers
69  std::string job_id = message.at(1);
70  request::headers_t headers;
71 
72  // Load headers terminated by an empty frame
73  auto it = std::begin(message) + 2;
74 
75  while (true) {
76  // End of headers
77  if (it->size() == 0) {
78  ++it;
79  break;
80  }
81 
82  // Unexpected end of message - do nothing and return
83  if (std::next(it) == std::end(message)) {
84  logger_->warn() << "Unexpected end of message from frontend. Skipped.";
85  return;
86  }
87 
88  // Parse header, save it and continue
89  size_t pos = it->find('=');
90  size_t value_size = it->size() - (pos + 1);
91 
92  headers.emplace(it->substr(0, pos), it->substr(pos + 1, value_size));
93  ++it;
94  }
95 
96  worker_registry::worker_ptr worker = workers_->find_worker(headers);
97 
98  if (worker != nullptr) {
99  logger_->debug(" - incomming job {}", job_id);
100 
101  // Forward remaining messages to the worker without actually understanding them
102  std::vector<std::string> additional_data;
103  for (; it != std::end(message); ++it) {
104  additional_data.push_back(*it);
105  }
106  job_request_data request_data(job_id, additional_data);
107 
108  auto eval_request = std::make_shared<request>(headers, request_data);
109  worker->enqueue_request(eval_request);
110 
111  if (!assign_queued_request(worker, respond)) {
112  logger_->debug() << " - saved to queue for worker '" << worker->get_description() << "'";
113  }
114 
115  respond(message_container(broker_connect::KEY_CLIENTS, identity, {"accept"}));
116  workers_->deprioritize_worker(worker);
117  } else {
118  respond(message_container(broker_connect::KEY_CLIENTS, identity, {"reject"}));
119  logger_->error() << "Request '" << job_id << "' rejected. No worker available for headers:";
120  for (auto &header : headers) {
121  logger_->error() << " - " << header.first << ": " << header.second;
122  }
123  }
124 }
125 
126 void broker_handler::process_worker_init(
127  const std::string &identity, const std::vector<std::string> &message, response_cb respond)
128 {
130 
131  // first let us know that message arrived (logging moved from main loop)
132  logger_->debug() << "Received message 'init' from workers";
133 
134  // There must be at least one argument
135  if (message.size() < 2) {
136  logger_->warn() << "Init command without argument. Nothing to do.";
137  return;
138  }
139 
140  std::string hwgroup = message.at(1);
141  request::headers_t headers;
142 
143  auto message_it = std::begin(message) + 2;
144  for (; message_it != std::end(message); ++message_it) {
145  auto &header = *message_it;
146 
147  if (header == "") {
148  break;
149  }
150 
151  size_t pos = header.find('=');
152  size_t value_size = header.size() - (pos + 1);
153 
154  headers.emplace(header.substr(0, pos), header.substr(pos + 1, value_size));
155  }
156 
157  // Check if we know a worker with given identity
158  worker_registry::worker_ptr current_worker = workers_->find_worker_by_identity(identity);
159 
160  if (current_worker != nullptr) {
161  if (current_worker->headers_equal(headers)) {
162  // We don't have to update anything
163  return;
164  }
165 
166  status_notifier.error(
167  "Received two different INIT messages from the same worker (" + current_worker->get_description() + ")");
168  }
169 
170 
171  // Create a new worker with the basic information
172  auto new_worker = worker_registry::worker_ptr(new worker(identity, hwgroup, headers));
173 
174  // Load additional information
175  for (; message_it != std::end(message); ++message_it) {
176  auto &header = *message_it;
177 
178  size_t pos = header.find('=');
179  size_t value_size = header.size() - (pos + 1);
180  auto key = header.substr(0, pos);
181  auto value = header.substr(pos + 1, value_size);
182 
183  if (key == "description") {
184  new_worker->description = value;
185  } else if (key == "current_job") {
186  auto current_request = worker::request_ptr(new request(job_request_data(value)));
187  new_worker->enqueue_request(current_request);
188  new_worker->next_request();
189  }
190  }
191 
192  // Insert the worker into the registry
193  workers_->add_worker(new_worker);
194 
195  // Start an idle timer for our new worker
196  worker_timers_.emplace(new_worker, std::chrono::milliseconds(0));
197 
198  if (logger_->should_log(spdlog::level::debug)) {
199  std::stringstream ss;
200  std::copy(message.begin() + 1, message.end(), std::ostream_iterator<std::string>(ss, " "));
201  logger_->debug(" - added new worker {} with headers: {}", new_worker->get_description(), ss.str());
202  }
203 }
204 
205 void broker_handler::process_worker_done(
206  const std::string &identity, const std::vector<std::string> &message, response_cb respond)
207 {
209 
210  // first let us know that message arrived (logging moved from main loop)
211  logger_->debug() << "Received message 'done' from workers";
212 
213  worker_registry::worker_ptr worker = workers_->find_worker_by_identity(identity);
214 
215  if (worker == nullptr) {
216  logger_->warn() << "Got 'done' message from an unknown worker";
217  return;
218  }
219 
220  if (message.size() < 3) {
221  logger_->error("Got 'done' message with not enough arguments from worker {}", worker->get_description());
222  return;
223  }
224 
225  std::shared_ptr<const request> current = worker->get_current_request();
226 
227  if (message.at(1) != current->data.get_job_id()) {
228  logger_->error() << "Got 'done' message from worker {} with mismatched job id - {} (message) vs. {} (worker)",
229  worker->get_description(), message.at(1), current->data.get_job_id();
230  return;
231  }
232 
233  auto status = message.at(2);
234 
235  if (status == "OK") {
236  // notify frontend that job ended successfully and complete it internally
237  status_notifier.job_done(message.at(1));
238  worker->complete_request();
239 
240  if (!assign_queued_request(worker, respond)) {
241  logger_->debug(" - worker {} is now free", worker->get_description());
242  }
243  } else if (status == "INTERNAL_ERROR") {
244  if (message.size() != 4) {
245  logger_->warn(
246  "Invalid number of arguments in a 'done' message with status 'INTERNAL_FAILURE' from worker {}",
247  worker->get_description());
248  return;
249  }
250 
251  auto failed_request = worker->cancel_request();
252 
253  if (!failed_request->data.is_complete()) {
254  status_notifier.rejected_job(
255  failed_request->data.get_job_id(), "Job failed with '" + message.at(3) + "' and cannot be reassigned");
256  } else if (check_failure_count(failed_request, status_notifier)) {
257  reassign_request(failed_request, respond);
258  } else {
259  assign_queued_request(worker, respond);
260  }
261  } else if (status == "FAILED") {
262  if (message.size() != 4) {
263  logger_->warn("Invalid number of arguments in a 'done' message with status 'FAILED' from worker {}",
264  worker->get_description());
265  return;
266  }
267 
268  status_notifier.job_failed(message.at(1), message.at(3));
269 
270  worker->cancel_request();
271  assign_queued_request(worker, respond);
272  } else {
273  logger_->warn() << "Received unexpected status code {} from worker {}", status, worker->get_description();
274  }
275 }
276 
277 void broker_handler::process_worker_ping(
278  const std::string &identity, const std::vector<std::string> &message, handler_interface::response_cb respond)
279 {
280  // first let us know that message arrived (logging moved from main loop)
281  // logger_->debug() << "Received message 'ping' from workers";
282 
283  worker_registry::worker_ptr worker = workers_->find_worker_by_identity(identity);
284 
285  if (worker == nullptr) {
286  respond(message_container(broker_connect::KEY_WORKERS, identity, {"intro"}));
287  return;
288  }
289 
290  respond(message_container(broker_connect::KEY_WORKERS, identity, {"pong"}));
291 }
292 
293 void broker_handler::process_worker_progress(
294  const std::string &identity, const std::vector<std::string> &message, handler_interface::response_cb respond)
295 {
296  // first let us know that message arrived (logging moved from main loop)
297  // logger_->debug() << "Received message 'progress' from workers";
298 
299  std::vector<std::string> monitor_message;
300  monitor_message.resize(message.size() - 1);
301  std::copy(message.begin() + 1, message.end(), monitor_message.begin());
302 
304 }
305 
306 void broker_handler::process_timer(const message_container &message, handler_interface::response_cb respond)
307 {
308  std::chrono::milliseconds time(std::stoll(message.data.front()));
309  std::list<worker_registry::worker_ptr> to_remove;
310 
311  for (auto worker : workers_->get_workers()) {
312  if (worker_timers_.find(worker) == std::end(worker_timers_)) {
313  worker_timers_[worker] = std::chrono::milliseconds(0);
314  }
315 
316  worker_timers_[worker] += time;
317 
318  if (worker_timers_[worker] > config_->get_worker_ping_interval()) {
319  worker->liveness -= 1;
320  worker_timers_[worker] = std::chrono::milliseconds(0);
321 
322  if (worker->liveness <= 0) {
323  to_remove.push_back(worker);
324  }
325  }
326  }
327 
329 
330  for (auto worker : to_remove) {
331  logger_->notice("Worker {} expired", worker->get_description());
332 
333  workers_->remove_worker(worker);
334  auto requests = worker->terminate();
335  std::vector<worker::request_ptr> unassigned_requests;
336 
337  for (auto request : *requests) {
338  if (!request->data.is_complete()) {
339  status_notifier.rejected_job(
340  request->data.get_job_id(), "Worker timed out and its job cannot be reassigned");
341  continue;
342  }
343 
344  if (!check_failure_count(request, status_notifier)) {
345  continue;
346  }
347 
348  if (!reassign_request(request, respond)) {
349  unassigned_requests.push_back(request);
350  }
351  }
352 
353  // there are requests which cannot be assigned, notify frontend about it
354  if (!unassigned_requests.empty()) {
355  std::string error_message = "Worker " + worker->get_description() + " dieded";
356 
357  for (auto request : unassigned_requests) {
358  status_notifier.rejected_job(request->data.get_job_id(), error_message);
359  }
360  }
361  }
362 }
363 
364 bool broker_handler::reassign_request(worker::request_ptr request, handler_interface::response_cb respond)
365 {
366  logger_->debug(
367  " - reassigning job {} ({} attempts already failed)", request->data.get_job_id(), request->failure_count);
368  worker_registry::worker_ptr substitute_worker = workers_->find_worker(request->headers);
369 
370  if (substitute_worker == nullptr) {
371  return false;
372  }
373 
374  substitute_worker->enqueue_request(request);
375  if (!assign_queued_request(substitute_worker, respond)) {
376  logger_->debug(
377  " - job {} queued for worker {}", request->data.get_job_id(), substitute_worker->get_description());
378  }
379 
380  return true;
381 }
382 
383 bool broker_handler::assign_queued_request(worker_registry::worker_ptr worker, handler_interface::response_cb respond)
384 {
385  if (worker->next_request()) {
386  respond(message_container(
388  logger_->debug(
389  " - job {} sent to worker {}", worker->get_current_request()->data.get_job_id(), worker->get_description());
390  return true;
391  }
392 
393  return false;
394 }
395 
396 bool broker_handler::check_failure_count(worker::request_ptr request, status_notifier_interface &status_notifier)
397 {
398  if (request->failure_count >= config_->get_max_request_failures()) {
399  status_notifier.job_failed(request->data.get_job_id(),
400  "Job was reassigned too many (" + std::to_string(request->failure_count - 1) + ") times");
401  return false;
402  }
403 
404  return true;
405 }
size_t failure_count
Definition: worker.h:89
std::shared_ptr< request > request_ptr
Definition: worker.h:151
broker_handler(std::shared_ptr< const broker_config > config, std::shared_ptr< worker_registry > workers, std::shared_ptr< spdlog::logger > logger)
static const std::string KEY_STATUS_NOTIFIER
size_t liveness
Definition: worker.h:180
virtual void job_failed(const std::string &job_id, const std::string &desc="")
std::function< void(const message_container &)> response_cb
virtual void job_done(const std::string &job_id)
const std::string & get_job_id() const
Definition: worker.h:60
static const std::string MONITOR_IDENTITY
void call_function(const std::string &command, const std::string &identity, const std::vector< std::string > &message, handler_interface::response_cb respond)
virtual void rejected_job(const std::string &job_id, const std::string &desc="")
virtual bool next_request()
Definition: worker.cpp:133
virtual void error(const std::string &desc)
virtual std::shared_ptr< std::vector< request_ptr > > terminate()
Definition: worker.cpp:151
const headers_t headers
Definition: worker.h:83
std::shared_ptr< worker > worker_ptr
void on_request(const message_container &message, response_cb respond)
std::multimap< std::string, std::string > headers_t
Definition: worker.h:80
Definition: worker.h:147
bool is_complete() const
Definition: worker.h:51
virtual std::shared_ptr< const request > get_current_request() const
Definition: worker.cpp:146
static const std::string KEY_WORKERS
virtual void job_failed(const std::string &job_id, const std::string &desc="")=0
static const std::string KEY_CLIENTS
bool register_command(const std::string &command, callback_fn callback)
static const std::string KEY_MONITOR
static const std::string KEY_TIMER
Definition: worker.h:78
const std::string identity
Definition: worker.h:171
std::vector< std::string > data
const job_request_data data
Definition: worker.h:86
std::string get_description() const
Definition: worker.cpp:191