1 #include "broker_handler.h" 2 #include "../broker_connect.h" 3 #include "../notifier/reactor_status_notifier.h" 6 std::shared_ptr<worker_registry> workers,
7 std::shared_ptr<spdlog::logger> logger)
8 : workers_(workers), logger_(logger), config_(config)
10 if (logger_ ==
nullptr) {
11 logger_ = helpers::create_null_logger();
15 "eval", [
this](
const std::string &identity,
const std::vector<std::string> &message,
response_cb respond) {
16 process_client_eval(identity, message, respond);
20 "init", [
this](
const std::string &identity,
const std::vector<std::string> &message,
response_cb respond) {
21 process_worker_init(identity, message, respond);
25 "done", [
this](
const std::string &identity,
const std::vector<std::string> &message,
response_cb respond) {
26 process_worker_done(identity, message, respond);
30 "ping", [
this](
const std::string &identity,
const std::vector<std::string> &message,
response_cb respond) {
31 process_worker_ping(identity, message, respond);
35 "progress", [
this](
const std::string &identity,
const std::vector<std::string> &message,
response_cb respond) {
36 process_worker_progress(identity, message, respond);
47 worker_timers_[
worker] = std::chrono::milliseconds(0);
58 process_timer(message, respond);
62 void broker_handler::process_client_eval(
63 const std::string &identity,
const std::vector<std::string> &message,
response_cb respond)
69 std::string job_id = message.at(1);
73 auto it = std::begin(message) + 2;
77 if (it->size() == 0) {
83 if (std::next(it) == std::end(message)) {
84 logger_->warn() <<
"Unexpected end of message from frontend. Skipped.";
89 size_t pos = it->find(
'=');
90 size_t value_size = it->size() - (pos + 1);
92 headers.emplace(it->substr(0, pos), it->substr(pos + 1, value_size));
98 if (worker !=
nullptr) {
99 logger_->debug(
" - incomming job {}", job_id);
102 std::vector<std::string> additional_data;
103 for (; it != std::end(message); ++it) {
104 additional_data.push_back(*it);
108 auto eval_request = std::make_shared<request>(headers, request_data);
109 worker->enqueue_request(eval_request);
111 if (!assign_queued_request(worker, respond)) {
112 logger_->debug() <<
" - saved to queue for worker '" << worker->get_description() <<
"'";
116 workers_->deprioritize_worker(worker);
119 logger_->error() <<
"Request '" << job_id <<
"' rejected. No worker available for headers:";
120 for (
auto &header : headers) {
121 logger_->error() <<
" - " << header.first <<
": " << header.second;
126 void broker_handler::process_worker_init(
127 const std::string &identity,
const std::vector<std::string> &message,
response_cb respond)
132 logger_->debug() <<
"Received message 'init' from workers";
135 if (message.size() < 2) {
136 logger_->warn() <<
"Init command without argument. Nothing to do.";
140 std::string hwgroup = message.at(1);
143 auto message_it = std::begin(message) + 2;
144 for (; message_it != std::end(message); ++message_it) {
145 auto &header = *message_it;
151 size_t pos = header.find(
'=');
152 size_t value_size = header.size() - (pos + 1);
154 headers.emplace(header.substr(0, pos), header.substr(pos + 1, value_size));
160 if (current_worker !=
nullptr) {
161 if (current_worker->headers_equal(headers)) {
166 status_notifier.
error(
167 "Received two different INIT messages from the same worker (" + current_worker->get_description() +
")");
175 for (; message_it != std::end(message); ++message_it) {
176 auto &header = *message_it;
178 size_t pos = header.find(
'=');
179 size_t value_size = header.size() - (pos + 1);
180 auto key = header.substr(0, pos);
181 auto value = header.substr(pos + 1, value_size);
183 if (key ==
"description") {
184 new_worker->description = value;
185 }
else if (key ==
"current_job") {
187 new_worker->enqueue_request(current_request);
188 new_worker->next_request();
193 workers_->add_worker(new_worker);
196 worker_timers_.emplace(new_worker, std::chrono::milliseconds(0));
198 if (logger_->should_log(spdlog::level::debug)) {
199 std::stringstream ss;
200 std::copy(message.begin() + 1, message.end(), std::ostream_iterator<std::string>(ss,
" "));
201 logger_->debug(
" - added new worker {} with headers: {}", new_worker->get_description(), ss.str());
205 void broker_handler::process_worker_done(
206 const std::string &identity,
const std::vector<std::string> &message,
response_cb respond)
211 logger_->debug() <<
"Received message 'done' from workers";
215 if (worker ==
nullptr) {
216 logger_->warn() <<
"Got 'done' message from an unknown worker";
220 if (message.size() < 3) {
221 logger_->error(
"Got 'done' message with not enough arguments from worker {}", worker->get_description());
225 std::shared_ptr<const request> current = worker->get_current_request();
227 if (message.at(1) != current->data.get_job_id()) {
228 logger_->error() <<
"Got 'done' message from worker {} with mismatched job id - {} (message) vs. {} (worker)",
229 worker->get_description(), message.at(1), current->data.get_job_id();
233 auto status = message.at(2);
235 if (status ==
"OK") {
237 status_notifier.
job_done(message.at(1));
238 worker->complete_request();
240 if (!assign_queued_request(worker, respond)) {
241 logger_->debug(
" - worker {} is now free", worker->get_description());
243 }
else if (status ==
"INTERNAL_ERROR") {
244 if (message.size() != 4) {
246 "Invalid number of arguments in a 'done' message with status 'INTERNAL_FAILURE' from worker {}",
247 worker->get_description());
251 auto failed_request = worker->cancel_request();
253 if (!failed_request->data.is_complete()) {
255 failed_request->data.get_job_id(),
"Job failed with '" + message.at(3) +
"' and cannot be reassigned");
256 }
else if (check_failure_count(failed_request, status_notifier)) {
257 reassign_request(failed_request, respond);
259 assign_queued_request(worker, respond);
261 }
else if (status ==
"FAILED") {
262 if (message.size() != 4) {
263 logger_->warn(
"Invalid number of arguments in a 'done' message with status 'FAILED' from worker {}",
264 worker->get_description());
268 status_notifier.
job_failed(message.at(1), message.at(3));
270 worker->cancel_request();
271 assign_queued_request(worker, respond);
273 logger_->warn() <<
"Received unexpected status code {} from worker {}", status, worker->get_description();
277 void broker_handler::process_worker_ping(
285 if (worker ==
nullptr) {
293 void broker_handler::process_worker_progress(
299 std::vector<std::string> monitor_message;
300 monitor_message.resize(message.size() - 1);
301 std::copy(message.begin() + 1, message.end(), monitor_message.begin());
308 std::chrono::milliseconds time(std::stoll(message.
data.front()));
309 std::list<worker_registry::worker_ptr> to_remove;
311 for (
auto worker : workers_->get_workers()) {
312 if (worker_timers_.find(
worker) == std::end(worker_timers_)) {
313 worker_timers_[
worker] = std::chrono::milliseconds(0);
316 worker_timers_[
worker] += time;
318 if (worker_timers_[
worker] > config_->get_worker_ping_interval()) {
320 worker_timers_[
worker] = std::chrono::milliseconds(0);
323 to_remove.push_back(
worker);
330 for (
auto worker : to_remove) {
333 workers_->remove_worker(
worker);
335 std::vector<worker::request_ptr> unassigned_requests;
337 for (
auto request : *requests) {
344 if (!check_failure_count(
request, status_notifier)) {
348 if (!reassign_request(
request, respond)) {
349 unassigned_requests.push_back(
request);
354 if (!unassigned_requests.empty()) {
357 for (
auto request : unassigned_requests) {
370 if (substitute_worker ==
nullptr) {
374 substitute_worker->enqueue_request(
request);
375 if (!assign_queued_request(substitute_worker, respond)) {
std::shared_ptr< request > request_ptr
broker_handler(std::shared_ptr< const broker_config > config, std::shared_ptr< worker_registry > workers, std::shared_ptr< spdlog::logger > logger)
static const std::string KEY_STATUS_NOTIFIER
virtual void job_failed(const std::string &job_id, const std::string &desc="")
std::function< void(const message_container &)> response_cb
virtual void job_done(const std::string &job_id)
const std::string & get_job_id() const
static const std::string MONITOR_IDENTITY
void call_function(const std::string &command, const std::string &identity, const std::vector< std::string > &message, handler_interface::response_cb respond)
virtual void rejected_job(const std::string &job_id, const std::string &desc="")
virtual bool next_request()
virtual void error(const std::string &desc)
virtual std::shared_ptr< std::vector< request_ptr > > terminate()
std::shared_ptr< worker > worker_ptr
void on_request(const message_container &message, response_cb respond)
std::multimap< std::string, std::string > headers_t
virtual std::shared_ptr< const request > get_current_request() const
static const std::string KEY_WORKERS
virtual void job_failed(const std::string &job_id, const std::string &desc="")=0
static const std::string KEY_CLIENTS
bool register_command(const std::string &command, callback_fn callback)
static const std::string KEY_MONITOR
static const std::string KEY_TIMER
const std::string identity
std::vector< std::string > data
const job_request_data data
std::string get_description() const