cluster.c (14056B)
1 #include <stdio.h> 2 #include <stdlib.h> 3 #include <string.h> 4 #include <unistd.h> 5 #include <fcntl.h> 6 #include <sys/stat.h> 7 #include <time.h> 8 9 #include "cofyc/argparse.h" 10 #include "rxi/log.h" 11 12 #include "infrastructure/config.h" 13 #include "common/resp.h" 14 #include "../common.h" 15 #include "domain/scheduler.h" 16 #include "domain/config.h" 17 #include "cluster.h" 18 #include "interface/api/server.h" 19 #include "domain/cluster/node.h" 20 21 #define HEALTH_CHECK_INTERVAL 5 22 #define MAX_FAILURES 3 23 24 static cluster_node_t *cluster_nodes = NULL; 25 static const char *cluster_listen = NULL; 26 27 typedef struct { 28 int64_t last_health_check; 29 } cluster_udata_t; 30 31 static void cluster_config_free(void) { 32 cluster_node_t *node = cluster_nodes; 33 while (node) { 34 cluster_node_t *next = node->next; 35 cluster_node_free(node); 36 node = next; 37 } 38 cluster_nodes = NULL; 39 } 40 41 static int cluster_config_parse(void) { 42 cluster_config_free(); 43 44 if (!global_cfg) return -1; 45 46 resp_object *cluster_sec = resp_map_get(global_cfg, "cluster"); 47 if (!cluster_sec) { 48 log_error("cluster: no [cluster] section in config"); 49 return -1; 50 } 51 52 const char *nodes_str = resp_map_get_string(cluster_sec, "nodes"); 53 if (!nodes_str) { 54 log_error("cluster: no 'nodes' defined in [cluster] section"); 55 return -1; 56 } 57 58 cluster_listen = resp_map_get_string(cluster_sec, "listen"); 59 60 char *nodes_copy = strdup(nodes_str); 61 char *saveptr; 62 char *token = strtok_r(nodes_copy, ",", &saveptr); 63 64 while (token) { 65 while (*token == ' ') token++; 66 char *end = token + strlen(token) - 1; 67 while (end > token && *end == ' ') *end-- = '\0'; 68 69 char section_name[64]; 70 snprintf(section_name, sizeof(section_name), "cluster:node:%s", token); 71 72 resp_object *node_sec = resp_map_get(global_cfg, section_name); 73 if (!node_sec) { 74 log_error("cluster: no [%s] section found", section_name); 75 token = strtok_r(NULL, ",", &saveptr); 76 continue; 77 } 78 79 const char *url = resp_map_get_string(node_sec, "url"); 80 const char *weight_str = resp_map_get_string(node_sec, "weight"); 81 const char *user = resp_map_get_string(node_sec, "user"); 82 const char *secret = resp_map_get_string(node_sec, "secret"); 83 84 if (!url) { 85 log_error("cluster: no 'url' for node %s", token); 86 token = strtok_r(NULL, ",", &saveptr); 87 continue; 88 } 89 90 int weight = weight_str ? atoi(weight_str) : 1; 91 if (weight <= 0) weight = 1; 92 93 cluster_node_t *node = cluster_node_create(token, url, weight, user, secret); 94 if (!node) { 95 log_error("cluster: failed to create node %s", token); 96 token = strtok_r(NULL, ",", &saveptr); 97 continue; 98 } 99 100 node->next = cluster_nodes; 101 cluster_nodes = node; 102 103 log_info("cluster: added node %s (url=%s, weight=%d)", token, url, weight); 104 105 token = strtok_r(NULL, ",", &saveptr); 106 } 107 108 free(nodes_copy); 109 return 0; 110 } 111 112 static void cluster_health_check(void) { 113 cluster_node_t *node = cluster_nodes; 114 while (node) { 115 int result = cluster_node_ping(node); 116 if (result == 0) { 117 if (!node->available) { 118 log_info("cluster: node %s is now available", node->name); 119 cluster_node_set_available(node, true); 120 } 121 } else { 122 if (node->available || node->consecutive_failures >= MAX_FAILURES) { 123 log_warn("cluster: node %s is unavailable (failures: %d)", node->name, node->consecutive_failures); 124 cluster_node_set_available(node, false); 125 } 126 } 127 node = node->next; 128 } 129 } 130 131 static void cluster_refresh_session_counts(void) { 132 cluster_node_t *node = cluster_nodes; 133 while (node) { 134 if (node->available) { 135 int count = cluster_node_get_session_count(node); 136 if (count >= 0) { 137 node->session_count = count; 138 } 139 } 140 node = node->next; 141 } 142 } 143 144 static cluster_node_t *cluster_select_node(void) { 145 cluster_node_t *best = NULL; 146 cluster_node_t *node = cluster_nodes; 147 while (node) { 148 if (node->available) { 149 if (!best || cluster_node_select_weighted_lowest(node, best)) { 150 best = node; 151 } 152 } 153 node = node->next; 154 } 155 return best; 156 } 157 158 static cluster_node_t *cluster_find_session_node(const char *session_id) { 159 cluster_node_t *node = cluster_nodes; 160 while (node) { 161 if (node->available) { 162 resp_object *resp = cluster_node_send_command(node, "session.info", session_id, NULL); 163 if (resp && resp->type != RESPT_ERROR) { 164 resp_free(resp); 165 return node; 166 } 167 if (resp) resp_free(resp); 168 } 169 node = node->next; 170 } 171 return NULL; 172 } 173 174 static resp_object *cluster_forward_to_node(cluster_node_t *node, const char *cmd, resp_object *args) { 175 if (!node || node->fd < 0) return NULL; 176 177 resp_object *resp = cluster_node_send_command(node, cmd, NULL); 178 if (resp) return resp; 179 180 return NULL; 181 } 182 183 static resp_object *cluster_handle_command(const char *cmd, resp_object *args) { 184 if (!cmd || !args) return NULL; 185 186 if (strcmp(cmd, "session.create") == 0) { 187 cluster_node_t *node = cluster_select_node(); 188 if (!node) { 189 resp_object *err = resp_array_init(); 190 resp_array_append_simple(err, "ERR no available nodes"); 191 return err; 192 } 193 194 const char *session_id = NULL; 195 const char *idle_expiry = NULL; 196 197 if (args->u.arr.n > 1 && args->u.arr.elem[1].type == RESPT_BULK) { 198 session_id = args->u.arr.elem[1].u.s; 199 } 200 if (args->u.arr.n > 2 && args->u.arr.elem[2].type == RESPT_BULK) { 201 idle_expiry = args->u.arr.elem[2].u.s; 202 } 203 204 resp_object *resp; 205 if (idle_expiry) { 206 resp = cluster_node_send_command(node, "session.create", session_id, idle_expiry, NULL); 207 } else { 208 resp = cluster_node_send_command(node, "session.create", session_id, NULL); 209 } 210 211 if (resp && resp->type != RESPT_ERROR) { 212 node->session_count++; 213 } 214 215 return resp; 216 } 217 218 if (strcmp(cmd, "session.list") == 0) { 219 resp_object *result = resp_array_init(); 220 cluster_node_t *node = cluster_nodes; 221 while (node) { 222 if (node->available) { 223 resp_object *resp = cluster_node_send_command(node, "session.list", NULL); 224 if (resp && resp->type == RESPT_ARRAY) { 225 for (size_t i = 0; i < resp->u.arr.n; i++) { 226 if (resp->u.arr.elem[i].type == RESPT_BULK) { 227 resp_array_append_bulk(result, resp->u.arr.elem[i].u.s); 228 } 229 } 230 } 231 if (resp) resp_free(resp); 232 } 233 node = node->next; 234 } 235 return result; 236 } 237 238 if (strcmp(cmd, "session.count") == 0) { 239 int total = 0; 240 cluster_node_t *node = cluster_nodes; 241 while (node) { 242 if (node->available) { 243 int count = cluster_node_get_session_count(node); 244 if (count >= 0) { 245 total += count; 246 } 247 } 248 node = node->next; 249 } 250 resp_object *result = malloc(sizeof(resp_object)); 251 result->type = RESPT_INT; 252 result->u.i = total; 253 return result; 254 } 255 256 if (strcmp(cmd, "session.info") == 0) { 257 if (args->u.arr.n < 2 || args->u.arr.elem[1].type != RESPT_BULK) { 258 resp_object *err = resp_array_init(); 259 resp_array_append_simple(err, "ERR wrong number of arguments"); 260 return err; 261 } 262 263 const char *session_id = args->u.arr.elem[1].u.s; 264 cluster_node_t *node = cluster_find_session_node(session_id); 265 if (!node) { 266 resp_object *err = resp_array_init(); 267 resp_array_append_simple(err, "ERR session not found"); 268 return err; 269 } 270 271 return cluster_node_send_command(node, "session.info", session_id, NULL); 272 } 273 274 if (strcmp(cmd, "session.destroy") == 0) { 275 if (args->u.arr.n < 2 || args->u.arr.elem[1].type != RESPT_BULK) { 276 resp_object *err = resp_array_init(); 277 resp_array_append_simple(err, "ERR wrong number of arguments"); 278 return err; 279 } 280 281 const char *session_id = args->u.arr.elem[1].u.s; 282 cluster_node_t *node = cluster_find_session_node(session_id); 283 if (!node) { 284 resp_object *err = resp_array_init(); 285 resp_array_append_simple(err, "ERR session not found"); 286 return err; 287 } 288 289 resp_object *resp = cluster_node_send_command(node, "session.destroy", session_id, NULL); 290 if (resp && resp->type != RESPT_ERROR) { 291 node->session_count--; 292 } 293 return resp; 294 } 295 296 if (strcmp(cmd, "session.socket.create.listen") == 0 || 297 strcmp(cmd, "session.socket.create.connect") == 0 || 298 strcmp(cmd, "session.socket.destroy") == 0 || 299 strcmp(cmd, "session.forward.create") == 0 || 300 strcmp(cmd, "session.forward.destroy") == 0) { 301 302 if (args->u.arr.n < 2 || args->u.arr.elem[1].type != RESPT_BULK) { 303 resp_object *err = resp_array_init(); 304 resp_array_append_simple(err, "ERR wrong number of arguments"); 305 return err; 306 } 307 308 const char *session_id = args->u.arr.elem[1].u.s; 309 cluster_node_t *node = cluster_find_session_node(session_id); 310 if (!node) { 311 resp_object *err = resp_array_init(); 312 resp_array_append_simple(err, "ERR session not found"); 313 return err; 314 } 315 316 char **argv = malloc(sizeof(char *) * args->u.arr.n); 317 for (size_t i = 0; i < args->u.arr.n; i++) { 318 if (args->u.arr.elem[i].type == RESPT_BULK) { 319 argv[i] = args->u.arr.elem[i].u.s ? strdup(args->u.arr.elem[i].u.s) : ""; 320 } else if (args->u.arr.elem[i].type == RESPT_INT) { 321 char buf[32]; 322 snprintf(buf, sizeof(buf), "%lld", args->u.arr.elem[i].u.i); 323 argv[i] = strdup(buf); 324 } else { 325 argv[i] = strdup(""); 326 } 327 } 328 329 resp_object *resp = cluster_node_send_command(node, cmd, NULL); 330 331 for (size_t i = 0; i < args->u.arr.n; i++) { 332 free(argv[i]); 333 } 334 free(argv); 335 336 return resp; 337 } 338 339 if (strcmp(cmd, "session.forward.list") == 0) { 340 if (args->u.arr.n < 2 || args->u.arr.elem[1].type != RESPT_BULK) { 341 resp_object *err = resp_array_init(); 342 resp_array_append_simple(err, "ERR wrong number of arguments"); 343 return err; 344 } 345 346 const char *session_id = args->u.arr.elem[1].u.s; 347 cluster_node_t *node = cluster_find_session_node(session_id); 348 if (!node) { 349 resp_object *err = resp_array_init(); 350 resp_array_append_simple(err, "ERR session not found"); 351 return err; 352 } 353 354 return cluster_node_send_command(node, "session.forward.list", session_id, NULL); 355 } 356 357 if (strcmp(cmd, "system.load") == 0) { 358 resp_object *result = resp_array_init(); 359 char buf[64]; 360 361 resp_array_append_bulk(result, "1min"); 362 resp_array_append_bulk(result, "0.00"); 363 resp_array_append_bulk(result, "5min"); 364 resp_array_append_bulk(result, "0.00"); 365 resp_array_append_bulk(result, "15min"); 366 resp_array_append_bulk(result, "0.00"); 367 368 return result; 369 } 370 371 if (strcmp(cmd, "ping") == 0) { 372 resp_object *result = resp_array_init(); 373 resp_array_append_simple(result, "PONG"); 374 return result; 375 } 376 377 resp_object *err = resp_array_init(); 378 resp_array_append_simple(err, "ERR unknown command"); 379 return err; 380 } 381 382 static int do_daemonize(void) { 383 pid_t pid = fork(); 384 if (pid < 0) { 385 log_fatal("fork: %m"); 386 return -1; 387 } 388 if (pid > 0) 389 _exit(0); 390 if (setsid() < 0) { 391 log_fatal("setsid: %m"); 392 _exit(1); 393 } 394 pid = fork(); 395 if (pid < 0) { 396 log_fatal("fork: %m"); 397 _exit(1); 398 } 399 if (pid > 0) 400 _exit(0); 401 if (chdir("/") != 0) {} 402 int fd; 403 for (fd = 0; fd < 3; fd++) 404 (void)close(fd); 405 fd = open("/dev/null", O_RDWR); 406 if (fd >= 0) { 407 dup2(fd, STDIN_FILENO); 408 dup2(fd, STDOUT_FILENO); 409 dup2(fd, STDERR_FILENO); 410 if (fd > 2) 411 close(fd); 412 } 413 return 0; 414 } 415 416 static void register_domain_commands(void) { 417 api_register_domain_cmd("session.create", cluster_handle_command); 418 api_register_domain_cmd("session.list", cluster_handle_command); 419 api_register_domain_cmd("session.info", cluster_handle_command); 420 api_register_domain_cmd("session.destroy", cluster_handle_command); 421 api_register_domain_cmd("session.socket.create.listen", cluster_handle_command); 422 api_register_domain_cmd("session.socket.create.connect", cluster_handle_command); 423 api_register_domain_cmd("session.socket.destroy", cluster_handle_command); 424 api_register_domain_cmd("session.forward.list", cluster_handle_command); 425 api_register_domain_cmd("session.forward.create", cluster_handle_command); 426 api_register_domain_cmd("session.forward.destroy", cluster_handle_command); 427 api_register_domain_cmd("session.count", cluster_handle_command); 428 api_register_domain_cmd("system.load", cluster_handle_command); 429 log_info("cluster: registered session.* commands"); 430 } 431 432 int cli_cmd_cluster(int argc, const char **argv) { 433 int daemonize_flag = 0; 434 int no_daemonize_flag = 0; 435 436 struct argparse argparse; 437 struct argparse_option options[] = { 438 OPT_HELP(), 439 OPT_BOOLEAN('d', "daemonize", &daemonize_flag, "run in background", NULL, 0, 0), 440 OPT_BOOLEAN('D', "no-daemonize", &no_daemonize_flag, "force foreground", NULL, 0, 0), 441 OPT_END(), 442 }; 443 argparse_init(&argparse, options, (const char *const[]){"udphole cluster", NULL}, ARGPARSE_STOP_AT_NON_OPTION); 444 argparse_parse(&argparse, argc, argv); 445 446 if (!no_daemonize_flag && daemonize_flag) { 447 do_daemonize(); 448 } 449 450 domain_config_init(); 451 452 if (global_cfg) { 453 resp_object *cfg_sec = resp_map_get(global_cfg, "udphole"); 454 if (cfg_sec) { 455 const char *ports_str = resp_map_get_string(cfg_sec, "ports"); 456 if (ports_str) { 457 int port_low = 7000, port_high = 7999; 458 sscanf(ports_str, "%d-%d", &port_low, &port_high); 459 domain_config_set_ports(port_low, port_high); 460 } 461 const char *advertise = resp_map_get_string(cfg_sec, "advertise"); 462 if (advertise) { 463 domain_config_set_advertise(advertise); 464 } 465 } 466 } 467 468 if (cluster_config_parse() != 0) { 469 log_error("cluster: failed to parse cluster config"); 470 return 1; 471 } 472 473 register_domain_commands(); 474 475 log_info("udphole: starting cluster daemon"); 476 477 cluster_udata_t *udata = calloc(1, sizeof(cluster_udata_t)); 478 udata->last_health_check = 0; 479 480 domain_schedmod_pt_create(api_server_pt, NULL); 481 482 log_info("udphole: cluster daemon started"); 483 484 return domain_schedmod_main(); 485 }