ReCodEx - Task Broker
ReCodEx is complex programmer testing solution, primary targeted to technical universities. It's highly customizable and based on modern technologies.
broker_connect.cpp
1 #include "broker_connect.h"
2 
3 const std::string broker_connect::KEY_WORKERS = "workers";
4 const std::string broker_connect::KEY_CLIENTS = "clients";
5 const std::string broker_connect::KEY_MONITOR = "monitor";
6 const std::string broker_connect::KEY_STATUS_NOTIFIER = "status_notifier";
7 
8 // FIXME This must be equal to reactor::KEY_TIMER, but we can't assign that directly
9 const std::string broker_connect::KEY_TIMER = "timer";
10 
11 const std::string broker_connect::MONITOR_IDENTITY = "recodex-monitor";
12 
13 broker_connect::broker_connect(std::shared_ptr<const broker_config> config,
14  std::shared_ptr<zmq::context_t> context,
15  std::shared_ptr<worker_registry> router,
16  std::shared_ptr<spdlog::logger> logger)
17  : config_(config), logger_(logger), workers_(router), reactor_(context)
18 {
19  if (logger_ == nullptr) {
20  logger_ = helpers::create_null_logger();
21  }
22 
23  auto clients_endpoint = "tcp://" + config_->get_client_address() + ":" + std::to_string(config_->get_client_port());
24  logger_->debug() << "Binding clients to " + clients_endpoint;
25 
26  auto workers_endpoint = "tcp://" + config_->get_worker_address() + ":" + std::to_string(config_->get_worker_port());
27  logger_->debug() << "Binding workers to " + workers_endpoint;
28 
29  auto monitor_endpoint =
30  "tcp://" + config_->get_monitor_address() + ":" + std::to_string(config_->get_monitor_port());
31  logger_->debug() << "Binding monitor to " + monitor_endpoint;
32 
33  reactor_.add_socket(KEY_WORKERS, std::make_shared<router_socket_wrapper>(context, workers_endpoint, true));
34  reactor_.add_socket(KEY_CLIENTS, std::make_shared<router_socket_wrapper>(context, clients_endpoint, true));
35  reactor_.add_socket(KEY_MONITOR, std::make_shared<router_socket_wrapper>(context, monitor_endpoint, false));
36 
37  reactor_.add_handler(
38  {KEY_CLIENTS, KEY_WORKERS, KEY_TIMER}, std::make_shared<broker_handler>(config_, workers_, logger_));
39  reactor_.add_async_handler(
40  {KEY_STATUS_NOTIFIER}, std::make_shared<status_notifier_handler>(config_->get_notifier_config(), logger_));
41 }
42 
44 {
45  reactor_.start_loop();
46  logger_->emerg() << "The main loop terminated";
47 }
static const std::string KEY_STATUS_NOTIFIER
static const std::string MONITOR_IDENTITY
void start_loop()
Definition: reactor.cpp:58
void add_handler(const std::vector< std::string > &origins, std::shared_ptr< handler_interface > handler)
Definition: reactor.cpp:19
static const std::string KEY_WORKERS
broker_connect(std::shared_ptr< const broker_config > config, std::shared_ptr< zmq::context_t > context, std::shared_ptr< worker_registry > router, std::shared_ptr< spdlog::logger > logger=nullptr)
static const std::string KEY_CLIENTS
void add_async_handler(const std::vector< std::string > &origins, std::shared_ptr< handler_interface > handler)
Definition: reactor.cpp:28
static const std::string KEY_MONITOR
static const std::string KEY_TIMER
void add_socket(const std::string &name, std::shared_ptr< socket_wrapper_base > socket)
Definition: reactor.cpp:14