udphole

Basic UDP wormhole proxy
git clone git://git.finwo.net/app/udphole
Log | Files | Refs | README | LICENSE

cluster.c (14056B)


      1 #include <stdio.h>
      2 #include <stdlib.h>
      3 #include <string.h>
      4 #include <unistd.h>
      5 #include <fcntl.h>
      6 #include <sys/stat.h>
      7 #include <time.h>
      8 
      9 #include "cofyc/argparse.h"
     10 #include "rxi/log.h"
     11 
     12 #include "infrastructure/config.h"
     13 #include "common/resp.h"
     14 #include "../common.h"
     15 #include "domain/scheduler.h"
     16 #include "domain/config.h"
     17 #include "cluster.h"
     18 #include "interface/api/server.h"
     19 #include "domain/cluster/node.h"
     20 
     21 #define HEALTH_CHECK_INTERVAL 5
     22 #define MAX_FAILURES 3
     23 
     24 static cluster_node_t *cluster_nodes = NULL;
     25 static const char *cluster_listen = NULL;
     26 
     27 typedef struct {
     28   int64_t last_health_check;
     29 } cluster_udata_t;
     30 
     31 static void cluster_config_free(void) {
     32   cluster_node_t *node = cluster_nodes;
     33   while (node) {
     34     cluster_node_t *next = node->next;
     35     cluster_node_free(node);
     36     node = next;
     37   }
     38   cluster_nodes = NULL;
     39 }
     40 
     41 static int cluster_config_parse(void) {
     42   cluster_config_free();
     43 
     44   if (!global_cfg) return -1;
     45 
     46   resp_object *cluster_sec = resp_map_get(global_cfg, "cluster");
     47   if (!cluster_sec) {
     48     log_error("cluster: no [cluster] section in config");
     49     return -1;
     50   }
     51 
     52   const char *nodes_str = resp_map_get_string(cluster_sec, "nodes");
     53   if (!nodes_str) {
     54     log_error("cluster: no 'nodes' defined in [cluster] section");
     55     return -1;
     56   }
     57 
     58   cluster_listen = resp_map_get_string(cluster_sec, "listen");
     59 
     60   char *nodes_copy = strdup(nodes_str);
     61   char *saveptr;
     62   char *token = strtok_r(nodes_copy, ",", &saveptr);
     63 
     64   while (token) {
     65     while (*token == ' ') token++;
     66     char *end = token + strlen(token) - 1;
     67     while (end > token && *end == ' ') *end-- = '\0';
     68 
     69     char section_name[64];
     70     snprintf(section_name, sizeof(section_name), "cluster:node:%s", token);
     71 
     72     resp_object *node_sec = resp_map_get(global_cfg, section_name);
     73     if (!node_sec) {
     74       log_error("cluster: no [%s] section found", section_name);
     75       token = strtok_r(NULL, ",", &saveptr);
     76       continue;
     77     }
     78 
     79     const char *url = resp_map_get_string(node_sec, "url");
     80     const char *weight_str = resp_map_get_string(node_sec, "weight");
     81     const char *user = resp_map_get_string(node_sec, "user");
     82     const char *secret = resp_map_get_string(node_sec, "secret");
     83 
     84     if (!url) {
     85       log_error("cluster: no 'url' for node %s", token);
     86       token = strtok_r(NULL, ",", &saveptr);
     87       continue;
     88     }
     89 
     90     int weight = weight_str ? atoi(weight_str) : 1;
     91     if (weight <= 0) weight = 1;
     92 
     93     cluster_node_t *node = cluster_node_create(token, url, weight, user, secret);
     94     if (!node) {
     95       log_error("cluster: failed to create node %s", token);
     96       token = strtok_r(NULL, ",", &saveptr);
     97       continue;
     98     }
     99 
    100     node->next = cluster_nodes;
    101     cluster_nodes = node;
    102 
    103     log_info("cluster: added node %s (url=%s, weight=%d)", token, url, weight);
    104 
    105     token = strtok_r(NULL, ",", &saveptr);
    106   }
    107 
    108   free(nodes_copy);
    109   return 0;
    110 }
    111 
    112 static void cluster_health_check(void) {
    113   cluster_node_t *node = cluster_nodes;
    114   while (node) {
    115     int result = cluster_node_ping(node);
    116     if (result == 0) {
    117       if (!node->available) {
    118         log_info("cluster: node %s is now available", node->name);
    119         cluster_node_set_available(node, true);
    120       }
    121     } else {
    122       if (node->available || node->consecutive_failures >= MAX_FAILURES) {
    123         log_warn("cluster: node %s is unavailable (failures: %d)", node->name, node->consecutive_failures);
    124         cluster_node_set_available(node, false);
    125       }
    126     }
    127     node = node->next;
    128   }
    129 }
    130 
    131 static void cluster_refresh_session_counts(void) {
    132   cluster_node_t *node = cluster_nodes;
    133   while (node) {
    134     if (node->available) {
    135       int count = cluster_node_get_session_count(node);
    136       if (count >= 0) {
    137         node->session_count = count;
    138       }
    139     }
    140     node = node->next;
    141   }
    142 }
    143 
    144 static cluster_node_t *cluster_select_node(void) {
    145   cluster_node_t *best = NULL;
    146   cluster_node_t *node = cluster_nodes;
    147   while (node) {
    148     if (node->available) {
    149       if (!best || cluster_node_select_weighted_lowest(node, best)) {
    150         best = node;
    151       }
    152     }
    153     node = node->next;
    154   }
    155   return best;
    156 }
    157 
    158 static cluster_node_t *cluster_find_session_node(const char *session_id) {
    159   cluster_node_t *node = cluster_nodes;
    160   while (node) {
    161     if (node->available) {
    162       resp_object *resp = cluster_node_send_command(node, "session.info", session_id, NULL);
    163       if (resp && resp->type != RESPT_ERROR) {
    164         resp_free(resp);
    165         return node;
    166       }
    167       if (resp) resp_free(resp);
    168     }
    169     node = node->next;
    170   }
    171   return NULL;
    172 }
    173 
    174 static resp_object *cluster_forward_to_node(cluster_node_t *node, const char *cmd, resp_object *args) {
    175   if (!node || node->fd < 0) return NULL;
    176 
    177   resp_object *resp = cluster_node_send_command(node, cmd, NULL);
    178   if (resp) return resp;
    179 
    180   return NULL;
    181 }
    182 
    183 static resp_object *cluster_handle_command(const char *cmd, resp_object *args) {
    184   if (!cmd || !args) return NULL;
    185 
    186   if (strcmp(cmd, "session.create") == 0) {
    187     cluster_node_t *node = cluster_select_node();
    188     if (!node) {
    189       resp_object *err = resp_array_init();
    190       resp_array_append_simple(err, "ERR no available nodes");
    191       return err;
    192     }
    193 
    194     const char *session_id = NULL;
    195     const char *idle_expiry = NULL;
    196 
    197     if (args->u.arr.n > 1 && args->u.arr.elem[1].type == RESPT_BULK) {
    198       session_id = args->u.arr.elem[1].u.s;
    199     }
    200     if (args->u.arr.n > 2 && args->u.arr.elem[2].type == RESPT_BULK) {
    201       idle_expiry = args->u.arr.elem[2].u.s;
    202     }
    203 
    204     resp_object *resp;
    205     if (idle_expiry) {
    206       resp = cluster_node_send_command(node, "session.create", session_id, idle_expiry, NULL);
    207     } else {
    208       resp = cluster_node_send_command(node, "session.create", session_id, NULL);
    209     }
    210 
    211     if (resp && resp->type != RESPT_ERROR) {
    212       node->session_count++;
    213     }
    214 
    215     return resp;
    216   }
    217 
    218   if (strcmp(cmd, "session.list") == 0) {
    219     resp_object *result = resp_array_init();
    220     cluster_node_t *node = cluster_nodes;
    221     while (node) {
    222       if (node->available) {
    223         resp_object *resp = cluster_node_send_command(node, "session.list", NULL);
    224         if (resp && resp->type == RESPT_ARRAY) {
    225           for (size_t i = 0; i < resp->u.arr.n; i++) {
    226             if (resp->u.arr.elem[i].type == RESPT_BULK) {
    227               resp_array_append_bulk(result, resp->u.arr.elem[i].u.s);
    228             }
    229           }
    230         }
    231         if (resp) resp_free(resp);
    232       }
    233       node = node->next;
    234     }
    235     return result;
    236   }
    237 
    238   if (strcmp(cmd, "session.count") == 0) {
    239     int total = 0;
    240     cluster_node_t *node = cluster_nodes;
    241     while (node) {
    242       if (node->available) {
    243         int count = cluster_node_get_session_count(node);
    244         if (count >= 0) {
    245           total += count;
    246         }
    247       }
    248       node = node->next;
    249     }
    250     resp_object *result = malloc(sizeof(resp_object));
    251     result->type = RESPT_INT;
    252     result->u.i = total;
    253     return result;
    254   }
    255 
    256   if (strcmp(cmd, "session.info") == 0) {
    257     if (args->u.arr.n < 2 || args->u.arr.elem[1].type != RESPT_BULK) {
    258       resp_object *err = resp_array_init();
    259       resp_array_append_simple(err, "ERR wrong number of arguments");
    260       return err;
    261     }
    262 
    263     const char *session_id = args->u.arr.elem[1].u.s;
    264     cluster_node_t *node = cluster_find_session_node(session_id);
    265     if (!node) {
    266       resp_object *err = resp_array_init();
    267       resp_array_append_simple(err, "ERR session not found");
    268       return err;
    269     }
    270 
    271     return cluster_node_send_command(node, "session.info", session_id, NULL);
    272   }
    273 
    274   if (strcmp(cmd, "session.destroy") == 0) {
    275     if (args->u.arr.n < 2 || args->u.arr.elem[1].type != RESPT_BULK) {
    276       resp_object *err = resp_array_init();
    277       resp_array_append_simple(err, "ERR wrong number of arguments");
    278       return err;
    279     }
    280 
    281     const char *session_id = args->u.arr.elem[1].u.s;
    282     cluster_node_t *node = cluster_find_session_node(session_id);
    283     if (!node) {
    284       resp_object *err = resp_array_init();
    285       resp_array_append_simple(err, "ERR session not found");
    286       return err;
    287     }
    288 
    289     resp_object *resp = cluster_node_send_command(node, "session.destroy", session_id, NULL);
    290     if (resp && resp->type != RESPT_ERROR) {
    291       node->session_count--;
    292     }
    293     return resp;
    294   }
    295 
    296   if (strcmp(cmd, "session.socket.create.listen") == 0 ||
    297       strcmp(cmd, "session.socket.create.connect") == 0 ||
    298       strcmp(cmd, "session.socket.destroy") == 0 ||
    299       strcmp(cmd, "session.forward.create") == 0 ||
    300       strcmp(cmd, "session.forward.destroy") == 0) {
    301 
    302     if (args->u.arr.n < 2 || args->u.arr.elem[1].type != RESPT_BULK) {
    303       resp_object *err = resp_array_init();
    304       resp_array_append_simple(err, "ERR wrong number of arguments");
    305       return err;
    306     }
    307 
    308     const char *session_id = args->u.arr.elem[1].u.s;
    309     cluster_node_t *node = cluster_find_session_node(session_id);
    310     if (!node) {
    311       resp_object *err = resp_array_init();
    312       resp_array_append_simple(err, "ERR session not found");
    313       return err;
    314     }
    315 
    316     char **argv = malloc(sizeof(char *) * args->u.arr.n);
    317     for (size_t i = 0; i < args->u.arr.n; i++) {
    318       if (args->u.arr.elem[i].type == RESPT_BULK) {
    319         argv[i] = args->u.arr.elem[i].u.s ? strdup(args->u.arr.elem[i].u.s) : "";
    320       } else if (args->u.arr.elem[i].type == RESPT_INT) {
    321         char buf[32];
    322         snprintf(buf, sizeof(buf), "%lld", args->u.arr.elem[i].u.i);
    323         argv[i] = strdup(buf);
    324       } else {
    325         argv[i] = strdup("");
    326       }
    327     }
    328 
    329     resp_object *resp = cluster_node_send_command(node, cmd, NULL);
    330 
    331     for (size_t i = 0; i < args->u.arr.n; i++) {
    332       free(argv[i]);
    333     }
    334     free(argv);
    335 
    336     return resp;
    337   }
    338 
    339   if (strcmp(cmd, "session.forward.list") == 0) {
    340     if (args->u.arr.n < 2 || args->u.arr.elem[1].type != RESPT_BULK) {
    341       resp_object *err = resp_array_init();
    342       resp_array_append_simple(err, "ERR wrong number of arguments");
    343       return err;
    344     }
    345 
    346     const char *session_id = args->u.arr.elem[1].u.s;
    347     cluster_node_t *node = cluster_find_session_node(session_id);
    348     if (!node) {
    349       resp_object *err = resp_array_init();
    350       resp_array_append_simple(err, "ERR session not found");
    351       return err;
    352     }
    353 
    354     return cluster_node_send_command(node, "session.forward.list", session_id, NULL);
    355   }
    356 
    357   if (strcmp(cmd, "system.load") == 0) {
    358     resp_object *result = resp_array_init();
    359     char buf[64];
    360 
    361     resp_array_append_bulk(result, "1min");
    362     resp_array_append_bulk(result, "0.00");
    363     resp_array_append_bulk(result, "5min");
    364     resp_array_append_bulk(result, "0.00");
    365     resp_array_append_bulk(result, "15min");
    366     resp_array_append_bulk(result, "0.00");
    367 
    368     return result;
    369   }
    370 
    371   if (strcmp(cmd, "ping") == 0) {
    372     resp_object *result = resp_array_init();
    373     resp_array_append_simple(result, "PONG");
    374     return result;
    375   }
    376 
    377   resp_object *err = resp_array_init();
    378   resp_array_append_simple(err, "ERR unknown command");
    379   return err;
    380 }
    381 
    382 static int do_daemonize(void) {
    383   pid_t pid = fork();
    384   if (pid < 0) {
    385     log_fatal("fork: %m");
    386     return -1;
    387   }
    388   if (pid > 0)
    389     _exit(0);
    390   if (setsid() < 0) {
    391     log_fatal("setsid: %m");
    392     _exit(1);
    393   }
    394   pid = fork();
    395   if (pid < 0) {
    396     log_fatal("fork: %m");
    397     _exit(1);
    398   }
    399   if (pid > 0)
    400     _exit(0);
    401   if (chdir("/") != 0) {}
    402   int fd;
    403   for (fd = 0; fd < 3; fd++)
    404     (void)close(fd);
    405   fd = open("/dev/null", O_RDWR);
    406   if (fd >= 0) {
    407     dup2(fd, STDIN_FILENO);
    408     dup2(fd, STDOUT_FILENO);
    409     dup2(fd, STDERR_FILENO);
    410     if (fd > 2)
    411       close(fd);
    412   }
    413   return 0;
    414 }
    415 
    416 static void register_domain_commands(void) {
    417   api_register_domain_cmd("session.create", cluster_handle_command);
    418   api_register_domain_cmd("session.list", cluster_handle_command);
    419   api_register_domain_cmd("session.info", cluster_handle_command);
    420   api_register_domain_cmd("session.destroy", cluster_handle_command);
    421   api_register_domain_cmd("session.socket.create.listen", cluster_handle_command);
    422   api_register_domain_cmd("session.socket.create.connect", cluster_handle_command);
    423   api_register_domain_cmd("session.socket.destroy", cluster_handle_command);
    424   api_register_domain_cmd("session.forward.list", cluster_handle_command);
    425   api_register_domain_cmd("session.forward.create", cluster_handle_command);
    426   api_register_domain_cmd("session.forward.destroy", cluster_handle_command);
    427   api_register_domain_cmd("session.count", cluster_handle_command);
    428   api_register_domain_cmd("system.load", cluster_handle_command);
    429   log_info("cluster: registered session.* commands");
    430 }
    431 
    432 int cli_cmd_cluster(int argc, const char **argv) {
    433   int daemonize_flag = 0;
    434   int no_daemonize_flag = 0;
    435 
    436   struct argparse argparse;
    437   struct argparse_option options[] = {
    438     OPT_HELP(),
    439     OPT_BOOLEAN('d', "daemonize", &daemonize_flag, "run in background", NULL, 0, 0),
    440     OPT_BOOLEAN('D', "no-daemonize", &no_daemonize_flag, "force foreground", NULL, 0, 0),
    441     OPT_END(),
    442   };
    443   argparse_init(&argparse, options, (const char *const[]){"udphole cluster", NULL}, ARGPARSE_STOP_AT_NON_OPTION);
    444   argparse_parse(&argparse, argc, argv);
    445 
    446   if (!no_daemonize_flag && daemonize_flag) {
    447     do_daemonize();
    448   }
    449 
    450   domain_config_init();
    451 
    452   if (global_cfg) {
    453     resp_object *cfg_sec = resp_map_get(global_cfg, "udphole");
    454     if (cfg_sec) {
    455       const char *ports_str = resp_map_get_string(cfg_sec, "ports");
    456       if (ports_str) {
    457         int port_low = 7000, port_high = 7999;
    458         sscanf(ports_str, "%d-%d", &port_low, &port_high);
    459         domain_config_set_ports(port_low, port_high);
    460       }
    461       const char *advertise = resp_map_get_string(cfg_sec, "advertise");
    462       if (advertise) {
    463         domain_config_set_advertise(advertise);
    464       }
    465     }
    466   }
    467 
    468   if (cluster_config_parse() != 0) {
    469     log_error("cluster: failed to parse cluster config");
    470     return 1;
    471   }
    472 
    473   register_domain_commands();
    474 
    475   log_info("udphole: starting cluster daemon");
    476 
    477   cluster_udata_t *udata = calloc(1, sizeof(cluster_udata_t));
    478   udata->last_health_check = 0;
    479 
    480   domain_schedmod_pt_create(api_server_pt, NULL);
    481 
    482   log_info("udphole: cluster daemon started");
    483 
    484   return domain_schedmod_main();
    485 }