udphole

Basic UDP wormhole proxy
git clone git://git.finwo.net/app/udphole
Log | Files | Refs | README | LICENSE

commit 78ff0eda87ead32a12a8b26fd0d2f5129161ef94
parent a604588cb3646c5b3f2e8b6af5adc488cbc226f3
Author: Robin Bron <robin.bron@yourhosting.nl>
Date:   Mon,  2 Mar 2026 09:24:38 +0100

No clustering for now

Diffstat:
MMakefile | 4----
Msrc/common/resp.h | 15+++++++++++++++
Dsrc/domain/cluster/node.c | 270-------------------------------------------------------------------------------
Dsrc/domain/cluster/node.h | 43-------------------------------------------
Msrc/interface/api/server.c | 12+++++++++---
Dsrc/interface/cli/command/cluster.c | 482-------------------------------------------------------------------------------
Dsrc/interface/cli/command/cluster.h | 6------
Msrc/interface/cli/main.c | 7-------
Dtest/cluster-basic.js | 84-------------------------------------------------------------------------------
Dtest/cluster-integration.js | 107-------------------------------------------------------------------------------
Dtest/config-cluster-node1.ini | 9---------
Dtest/config-cluster-node2.ini | 9---------
Dtest/config-cluster.ini | 19-------------------
13 files changed, 24 insertions(+), 1043 deletions(-)

diff --git a/Makefile b/Makefile @@ -118,10 +118,6 @@ test: $(BIN) @sleep 1 @node test/listen-relearn-unix.js @sleep 1 - @node test/cluster-basic.js - @sleep 1 - @node test/cluster-integration.js - @sleep 1 @node test/connect-drop-unknown.js .PHONY: clean diff --git a/src/common/resp.h b/src/common/resp.h @@ -20,29 +20,44 @@ struct resp_object { }; void resp_free(resp_object *o); +/* Takes ownership: frees the object and all nested data */ resp_object *resp_deep_copy(const resp_object *o); +/* Returns new object: caller owns the result, must call resp_free() */ resp_object *resp_map_get(const resp_object *o, const char *key); +/* Returns pointer into o: caller must NOT call resp_free() on result */ const char *resp_map_get_string(const resp_object *o, const char *key); +/* Returns pointer into o: caller must NOT free the result */ void resp_map_set(resp_object *map, const char *key, resp_object *value); +/* Takes ownership of value */ resp_object *resp_read(int fd); +/* Returns new object: caller owns the result, must call resp_free() */ resp_object *resp_read_buf(const char *buf, size_t len); +/* Returns new object: caller owns the result, must call resp_free() */ int resp_encode_array(int argc, const resp_object *const *argv, char **out_buf, size_t *out_len); +/* Returns allocated string in out_buf: caller must free() the string */ int resp_serialize(const resp_object *o, char **out_buf, size_t *out_len); +/* Returns allocated string in out_buf: caller must free() the string */ resp_object *resp_array_init(void); +/* Returns new array object: caller owns the result, must call resp_free() */ int resp_array_append_obj(resp_object *destination, resp_object *value); +/* Takes ownership of value */ int resp_array_append_simple(resp_object *destination, const char *str); +/* Copies str: caller may free str after return */ + int resp_array_append_bulk(resp_object *destination, const char *str); +/* Copies str: caller may free str after return */ + int resp_array_append_int(resp_object *destination, long long i); #endif diff --git a/src/domain/cluster/node.c b/src/domain/cluster/node.c @@ -1,270 +0,0 @@ -#include <stdlib.h> -#include <string.h> -#include <stdio.h> -#include <unistd.h> -#include <fcntl.h> -#include <sys/socket.h> -#include <sys/un.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <netdb.h> -#include <ctype.h> -#include <stdarg.h> -#include <errno.h> - -#include "rxi/log.h" -#include "domain/cluster/node.h" -#include "common/resp.h" - -#define MAX_HOST_LEN 256 -#define MAX_PORT_LEN 16 -#define MAX_PATH_LEN 256 - -static int parse_url(const char *url, char *host, size_t host_len, int *port, char *unix_path, size_t path_len) { - if (!url) return -1; - - if (strncmp(url, "unix://", 7) == 0) { - const char *p = url + 7; - size_t path_len_calc = strlen(p); - if (path_len_calc >= path_len) path_len_calc = path_len - 1; - strncpy(unix_path, p, path_len_calc); - unix_path[path_len_calc] = '\0'; - return 1; - } - - if (strncmp(url, "tcp://", 6) != 0) { - return -1; - } - - const char *p = url + 6; - const char *colon = strchr(p, ':'); - if (!colon) return -1; - - size_t host_len_calc = colon - p; - if (host_len_calc >= host_len) host_len_calc = host_len - 1; - strncpy(host, p, host_len_calc); - host[host_len_calc] = '\0'; - - *port = atoi(colon + 1); - if (*port <= 0) return -1; - - return 0; -} - -cluster_node_t *cluster_node_create(const char *name, const char *url, int weight, const char *user, const char *secret) { - cluster_node_t *node = calloc(1, sizeof(*node)); - if (!node) return NULL; - - node->name = strdup(name); - node->url = strdup(url); - node->weight = weight > 0 ? weight : 1; - node->user = user ? strdup(user) : NULL; - node->secret = secret ? strdup(secret) : NULL; - node->fd = -1; - node->session_count = 0; - node->available = false; - node->consecutive_failures = 0; - node->last_check = 0; - node->next = NULL; - - char host[MAX_HOST_LEN]; - int port; - char unix_path[MAX_PATH_LEN]; - if (parse_url(url, host, sizeof(host), &port, unix_path, sizeof(unix_path)) < 0) { - log_error("cluster: failed to parse URL %s", url); - cluster_node_free(node); - return NULL; - } - - return node; -} - -void cluster_node_free(cluster_node_t *node) { - if (!node) return; - cluster_node_disconnect(node); - free(node->name); - free(node->url); - free(node->user); - free(node->secret); - free(node); -} - -int cluster_node_connect(cluster_node_t *node) { - if (!node) return -1; - - cluster_node_disconnect(node); - - char host[MAX_HOST_LEN]; - int port; - char unix_path[MAX_PATH_LEN]; - int url_type = parse_url(node->url, host, sizeof(host), &port, unix_path, sizeof(unix_path)); - if (url_type < 0) { - return -1; - } - - int fd; - if (url_type == 1) { - fd = socket(AF_UNIX, SOCK_STREAM, 0); - if (fd < 0) return -1; - - struct sockaddr_un addr; - memset(&addr, 0, sizeof(addr)); - addr.sun_family = AF_UNIX; - strncpy(addr.sun_path, unix_path, sizeof(addr.sun_path) - 1); - - int ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); - if (ret < 0) { - close(fd); - return -1; - } - - node->fd = fd; - return 0; - } - - struct sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - - if (inet_pton(AF_INET, host, &addr.sin_addr) <= 0) { - struct hostent *he = gethostbyname(host); - if (!he) return -1; - memcpy(&addr.sin_addr, he->h_addr_list[0], he->h_length); - } - - fd = socket(AF_INET, SOCK_STREAM, 0); - if (fd < 0) return -1; - - int flags = fcntl(fd, F_GETFL, 0); - fcntl(fd, F_SETFL, flags | O_NONBLOCK); - - int ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); - if (ret < 0 && errno != EINPROGRESS) { - close(fd); - return -1; - } - - node->fd = fd; - return 0; -} - -void cluster_node_disconnect(cluster_node_t *node) { - if (!node || node->fd < 0) return; - close(node->fd); - node->fd = -1; -} - -int cluster_node_ping(cluster_node_t *node) { - if (!node || node->fd < 0) { - if (node && cluster_node_connect(node) != 0) { - return -1; - } - } - - resp_object *resp = cluster_node_send_command(node, "ping"); - if (!resp) return -1; - - int ok = (resp->type == RESPT_SIMPLE && resp->u.s && strcmp(resp->u.s, "PONG") == 0); - resp_free(resp); - - return ok ? 0 : -1; -} - -int cluster_node_get_session_count(cluster_node_t *node) { - if (!node || node->fd < 0) return -1; - - resp_object *resp = cluster_node_send_command(node, "session.count"); - if (!resp) return -1; - - int count = -1; - if (resp->type == RESPT_INT) { - count = (int)resp->u.i; - } - resp_free(resp); - - return count; -} - -resp_object *cluster_node_send_command(cluster_node_t *node, const char *cmd, ...) { - if (!node || node->fd < 0 || !cmd) return NULL; - - char *buf = NULL; - size_t len = 0; - - va_list args; - va_start(args, cmd); - - int argc = 1; - const char *arg = cmd; - while (va_arg(args, const char *) != NULL) { - argc++; - } - va_end(args); - - resp_object **argv = malloc(sizeof(resp_object *) * argc); - if (!argv) return NULL; - - argv[0] = resp_array_init(); - resp_array_append_bulk(argv[0], cmd); - - va_start(args, cmd); - int i = 1; - while ((arg = va_arg(args, const char *)) != NULL) { - argv[i] = resp_array_init(); - resp_array_append_bulk(argv[i], arg); - i++; - } - va_end(args); - - resp_object *cmd_arr = resp_array_init(); - for (i = 0; i < argc; i++) { - resp_array_append_obj(cmd_arr, argv[i]); - } - - resp_encode_array(argc, (const resp_object *const *)argv, &buf, &len); - - for (i = 0; i < argc; i++) { - resp_free(argv[i]); - } - free(argv); - resp_free(cmd_arr); - - if (!buf) return NULL; - - ssize_t written = send(node->fd, buf, len, 0); - free(buf); - - if (written != (ssize_t)len) { - return NULL; - } - - return resp_read(node->fd); -} - -void cluster_node_set_available(cluster_node_t *node, bool available) { - if (!node) return; - node->available = available; - if (!available) { - node->consecutive_failures++; - } else { - node->consecutive_failures = 0; - } -} - -bool cluster_node_select_weighted_lowest(cluster_node_t *node, cluster_node_t *best) { - if (!node || !node->available) return false; - - if (!best) return true; - - double node_ratio = (double)node->session_count / node->weight; - double best_ratio = (double)best->session_count / best->weight; - - if (node_ratio < best_ratio) return true; - if (node_ratio > best_ratio) return false; - - if (node->weight > best->weight) return true; - if (node->weight < best->weight) return false; - - return false; -} diff --git a/src/domain/cluster/node.h b/src/domain/cluster/node.h @@ -1,43 +0,0 @@ -#ifndef UDPHOLE_CLUSTER_NODE_H -#define UDPHOLE_CLUSTER_NODE_H - -#include <stdbool.h> -#include <stdint.h> - -#include "common/resp.h" - -typedef struct cluster_node cluster_node_t; - -struct cluster_node { - char *name; - char *url; - int weight; - char *user; - char *secret; - int fd; - int session_count; - bool available; - int consecutive_failures; - int64_t last_check; - cluster_node_t *next; -}; - -cluster_node_t *cluster_node_create(const char *name, const char *url, int weight, const char *user, const char *secret); - -void cluster_node_free(cluster_node_t *node); - -int cluster_node_connect(cluster_node_t *node); - -void cluster_node_disconnect(cluster_node_t *node); - -resp_object *cluster_node_send_command(cluster_node_t *node, const char *cmd, ...); - -int cluster_node_ping(cluster_node_t *node); - -int cluster_node_get_session_count(cluster_node_t *node); - -void cluster_node_set_available(cluster_node_t *node, bool available); - -bool cluster_node_select_weighted_lowest(cluster_node_t *node, cluster_node_t *best); - -#endif diff --git a/src/interface/api/server.c b/src/interface/api/server.c @@ -487,12 +487,18 @@ PT_THREAD(api_server_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) resp_object *api_sec = resp_map_get(global_cfg, "udphole"); const char *listen_str = api_sec ? resp_map_get_string(api_sec, "listen") : NULL; + + log_info("api: initial listen config check: %s", listen_str ? listen_str : "(null)"); + if (!listen_str || !listen_str[0]) { - log_info("api: no listen address configured, API server disabled"); - PT_EXIT(pt); + log_info("api: no listen address configured, waiting..."); + PT_WAIT_UNTIL(pt, current_listen || (api_sec = resp_map_get(global_cfg, "udphole"), + (listen_str = api_sec ? resp_map_get_string(api_sec, "listen") : NULL), + listen_str && listen_str[0])); + log_info("api: after wait, listen config: %s", listen_str ? listen_str : "(null)"); } - if (!current_listen) { + if (!current_listen && listen_str && listen_str[0]) { current_listen = strdup(listen_str); if (!current_listen) { PT_EXIT(pt); diff --git a/src/interface/cli/command/cluster.c b/src/interface/cli/command/cluster.c @@ -1,482 +0,0 @@ -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> -#include <fcntl.h> -#include <sys/stat.h> -#include <time.h> - -#include "cofyc/argparse.h" -#include "rxi/log.h" - -#include "infrastructure/config.h" -#include "common/resp.h" -#include "../common.h" -#include "domain/scheduler.h" -#include "domain/config.h" -#include "cluster.h" -#include "interface/api/server.h" -#include "domain/cluster/node.h" - -#define HEALTH_CHECK_INTERVAL 5 -#define MAX_FAILURES 3 - -static cluster_node_t *cluster_nodes = NULL; -static const char *cluster_listen = NULL; - -PT_THREAD(cluster_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)); - -typedef struct { - int64_t last_health_check; -} cluster_udata_t; - -static void cluster_config_free(void) { - cluster_node_t *node = cluster_nodes; - while (node) { - cluster_node_t *next = node->next; - cluster_node_free(node); - node = next; - } - cluster_nodes = NULL; -} - -static int cluster_config_parse(void) { - cluster_config_free(); - - if (!global_cfg) return -1; - - resp_object *cluster_sec = resp_map_get(global_cfg, "cluster"); - if (!cluster_sec) { - log_error("cluster: no [cluster] section in config"); - return -1; - } - - const char *nodes_str = resp_map_get_string(cluster_sec, "nodes"); - if (!nodes_str) { - log_error("cluster: no 'nodes' defined in [cluster] section"); - return -1; - } - - cluster_listen = resp_map_get_string(cluster_sec, "listen"); - - char *nodes_copy = strdup(nodes_str); - char *saveptr; - char *token = strtok_r(nodes_copy, ",", &saveptr); - - while (token) { - while (*token == ' ') token++; - char *end = token + strlen(token) - 1; - while (end > token && *end == ' ') *end-- = '\0'; - - char section_name[64]; - snprintf(section_name, sizeof(section_name), "cluster:node:%s", token); - - resp_object *node_sec = resp_map_get(global_cfg, section_name); - if (!node_sec) { - log_error("cluster: no [%s] section found", section_name); - token = strtok_r(NULL, ",", &saveptr); - continue; - } - - const char *url = resp_map_get_string(node_sec, "url"); - const char *weight_str = resp_map_get_string(node_sec, "weight"); - const char *user = resp_map_get_string(node_sec, "user"); - const char *secret = resp_map_get_string(node_sec, "secret"); - - if (!url) { - log_error("cluster: no 'url' for node %s", token); - token = strtok_r(NULL, ",", &saveptr); - continue; - } - - int weight = weight_str ? atoi(weight_str) : 1; - if (weight <= 0) weight = 1; - - cluster_node_t *node = cluster_node_create(token, url, weight, user, secret); - if (!node) { - log_error("cluster: failed to create node %s", token); - token = strtok_r(NULL, ",", &saveptr); - continue; - } - - node->next = cluster_nodes; - cluster_nodes = node; - - log_info("cluster: added node %s (url=%s, weight=%d)", token, url, weight); - - token = strtok_r(NULL, ",", &saveptr); - } - - free(nodes_copy); - return 0; -} - -static void cluster_health_check(void) { - cluster_node_t *node = cluster_nodes; - while (node) { - int result = cluster_node_ping(node); - if (result == 0) { - if (!node->available) { - log_info("cluster: node %s is now available", node->name); - cluster_node_set_available(node, true); - } - } else { - if (node->available || node->consecutive_failures >= MAX_FAILURES) { - log_warn("cluster: node %s is unavailable (failures: %d)", node->name, node->consecutive_failures); - cluster_node_set_available(node, false); - } - } - node = node->next; - } -} - -static cluster_node_t *cluster_select_node(void) { - cluster_node_t *best = NULL; - cluster_node_t *node = cluster_nodes; - while (node) { - if (node->available) { - if (!best || cluster_node_select_weighted_lowest(node, best)) { - best = node; - } - } - node = node->next; - } - return best; -} - -static cluster_node_t *cluster_find_session_node(const char *session_id) { - cluster_node_t *node = cluster_nodes; - while (node) { - if (node->available) { - resp_object *resp = cluster_node_send_command(node, "session.info", session_id, NULL); - if (resp && resp->type != RESPT_ERROR) { - resp_free(resp); - return node; - } - if (resp) resp_free(resp); - } - node = node->next; - } - return NULL; -} - -static resp_object *cluster_handle_command(const char *cmd, resp_object *args) { - if (!cmd || !args) return NULL; - - if (strcmp(cmd, "session.create") == 0) { - cluster_node_t *node = cluster_select_node(); - if (!node) { - resp_object *err = resp_array_init(); - resp_array_append_simple(err, "ERR no available nodes"); - return err; - } - - const char *session_id = NULL; - const char *idle_expiry = NULL; - - if (args->u.arr.n > 1 && args->u.arr.elem[1].type == RESPT_BULK) { - session_id = args->u.arr.elem[1].u.s; - } - if (args->u.arr.n > 2 && args->u.arr.elem[2].type == RESPT_BULK) { - idle_expiry = args->u.arr.elem[2].u.s; - } - - resp_object *resp; - if (idle_expiry) { - resp = cluster_node_send_command(node, "session.create", session_id, idle_expiry, NULL); - } else { - resp = cluster_node_send_command(node, "session.create", session_id, NULL); - } - - if (resp && resp->type != RESPT_ERROR) { - node->session_count++; - } - - return resp; - } - - if (strcmp(cmd, "session.list") == 0) { - resp_object *result = resp_array_init(); - cluster_node_t *node = cluster_nodes; - while (node) { - if (node->available) { - resp_object *resp = cluster_node_send_command(node, "session.list", NULL); - if (resp && resp->type == RESPT_ARRAY) { - for (size_t i = 0; i < resp->u.arr.n; i++) { - if (resp->u.arr.elem[i].type == RESPT_BULK) { - resp_array_append_bulk(result, resp->u.arr.elem[i].u.s); - } - } - } - if (resp) resp_free(resp); - } - node = node->next; - } - return result; - } - - if (strcmp(cmd, "session.count") == 0) { - int total = 0; - cluster_node_t *node = cluster_nodes; - while (node) { - if (node->available) { - int count = cluster_node_get_session_count(node); - if (count >= 0) { - total += count; - } - } - node = node->next; - } - resp_object *result = malloc(sizeof(resp_object)); - result->type = RESPT_INT; - result->u.i = total; - return result; - } - - if (strcmp(cmd, "session.info") == 0) { - if (args->u.arr.n < 2 || args->u.arr.elem[1].type != RESPT_BULK) { - resp_object *err = resp_array_init(); - resp_array_append_simple(err, "ERR wrong number of arguments"); - return err; - } - - const char *session_id = args->u.arr.elem[1].u.s; - cluster_node_t *node = cluster_find_session_node(session_id); - if (!node) { - resp_object *err = resp_array_init(); - resp_array_append_simple(err, "ERR session not found"); - return err; - } - - return cluster_node_send_command(node, "session.info", session_id, NULL); - } - - if (strcmp(cmd, "session.destroy") == 0) { - if (args->u.arr.n < 2 || args->u.arr.elem[1].type != RESPT_BULK) { - resp_object *err = resp_array_init(); - resp_array_append_simple(err, "ERR wrong number of arguments"); - return err; - } - - const char *session_id = args->u.arr.elem[1].u.s; - cluster_node_t *node = cluster_find_session_node(session_id); - if (!node) { - resp_object *err = resp_array_init(); - resp_array_append_simple(err, "ERR session not found"); - return err; - } - - resp_object *resp = cluster_node_send_command(node, "session.destroy", session_id, NULL); - if (resp && resp->type != RESPT_ERROR) { - node->session_count--; - } - return resp; - } - - if (strcmp(cmd, "session.socket.create.listen") == 0 || - strcmp(cmd, "session.socket.create.connect") == 0 || - strcmp(cmd, "session.socket.destroy") == 0 || - strcmp(cmd, "session.forward.create") == 0 || - strcmp(cmd, "session.forward.destroy") == 0) { - - if (args->u.arr.n < 2 || args->u.arr.elem[1].type != RESPT_BULK) { - resp_object *err = resp_array_init(); - resp_array_append_simple(err, "ERR wrong number of arguments"); - return err; - } - - const char *session_id = args->u.arr.elem[1].u.s; - cluster_node_t *node = cluster_find_session_node(session_id); - if (!node) { - resp_object *err = resp_array_init(); - resp_array_append_simple(err, "ERR session not found"); - return err; - } - - char **argv = malloc(sizeof(char *) * args->u.arr.n); - for (size_t i = 0; i < args->u.arr.n; i++) { - if (args->u.arr.elem[i].type == RESPT_BULK) { - argv[i] = args->u.arr.elem[i].u.s ? strdup(args->u.arr.elem[i].u.s) : ""; - } else if (args->u.arr.elem[i].type == RESPT_INT) { - char buf[32]; - snprintf(buf, sizeof(buf), "%lld", args->u.arr.elem[i].u.i); - argv[i] = strdup(buf); - } else { - argv[i] = strdup(""); - } - } - - resp_object *resp = cluster_node_send_command(node, cmd, NULL); - - for (size_t i = 0; i < args->u.arr.n; i++) { - free(argv[i]); - } - free(argv); - - return resp; - } - - if (strcmp(cmd, "session.forward.list") == 0) { - if (args->u.arr.n < 2 || args->u.arr.elem[1].type != RESPT_BULK) { - resp_object *err = resp_array_init(); - resp_array_append_simple(err, "ERR wrong number of arguments"); - return err; - } - - const char *session_id = args->u.arr.elem[1].u.s; - cluster_node_t *node = cluster_find_session_node(session_id); - if (!node) { - resp_object *err = resp_array_init(); - resp_array_append_simple(err, "ERR session not found"); - return err; - } - - return cluster_node_send_command(node, "session.forward.list", session_id, NULL); - } - - if (strcmp(cmd, "system.load") == 0) { - resp_object *result = resp_array_init(); - - resp_array_append_bulk(result, "1min"); - resp_array_append_bulk(result, "0.00"); - resp_array_append_bulk(result, "5min"); - resp_array_append_bulk(result, "0.00"); - resp_array_append_bulk(result, "15min"); - resp_array_append_bulk(result, "0.00"); - - return result; - } - - if (strcmp(cmd, "ping") == 0) { - resp_object *result = resp_array_init(); - resp_array_append_simple(result, "PONG"); - return result; - } - - resp_object *err = resp_array_init(); - resp_array_append_simple(err, "ERR unknown command"); - return err; -} - -static int do_daemonize(void) { - pid_t pid = fork(); - if (pid < 0) { - log_fatal("fork: %m"); - return -1; - } - if (pid > 0) - _exit(0); - if (setsid() < 0) { - log_fatal("setsid: %m"); - _exit(1); - } - pid = fork(); - if (pid < 0) { - log_fatal("fork: %m"); - _exit(1); - } - if (pid > 0) - _exit(0); - if (chdir("/") != 0) {} - int fd; - for (fd = 0; fd < 3; fd++) - (void)close(fd); - fd = open("/dev/null", O_RDWR); - if (fd >= 0) { - dup2(fd, STDIN_FILENO); - dup2(fd, STDOUT_FILENO); - dup2(fd, STDERR_FILENO); - if (fd > 2) - close(fd); - } - return 0; -} - -static void register_domain_commands(void) { - api_register_domain_cmd("session.create", cluster_handle_command); - api_register_domain_cmd("session.list", cluster_handle_command); - api_register_domain_cmd("session.info", cluster_handle_command); - api_register_domain_cmd("session.destroy", cluster_handle_command); - api_register_domain_cmd("session.socket.create.listen", cluster_handle_command); - api_register_domain_cmd("session.socket.create.connect", cluster_handle_command); - api_register_domain_cmd("session.socket.destroy", cluster_handle_command); - api_register_domain_cmd("session.forward.list", cluster_handle_command); - api_register_domain_cmd("session.forward.create", cluster_handle_command); - api_register_domain_cmd("session.forward.destroy", cluster_handle_command); - api_register_domain_cmd("session.count", cluster_handle_command); - api_register_domain_cmd("system.load", cluster_handle_command); - log_info("cluster: registered session.* commands"); -} - -int cli_cmd_cluster(int argc, const char **argv) { - int daemonize_flag = 0; - int no_daemonize_flag = 0; - - struct argparse argparse; - struct argparse_option options[] = { - OPT_HELP(), - OPT_BOOLEAN('d', "daemonize", &daemonize_flag, "run in background", NULL, 0, 0), - OPT_BOOLEAN('D', "no-daemonize", &no_daemonize_flag, "force foreground", NULL, 0, 0), - OPT_END(), - }; - argparse_init(&argparse, options, (const char *const[]){"udphole cluster", NULL}, ARGPARSE_STOP_AT_NON_OPTION); - argparse_parse(&argparse, argc, argv); - - if (!no_daemonize_flag && daemonize_flag) { - do_daemonize(); - } - - domain_config_init(); - - if (global_cfg) { - resp_object *cfg_sec = resp_map_get(global_cfg, "udphole"); - if (cfg_sec) { - const char *ports_str = resp_map_get_string(cfg_sec, "ports"); - if (ports_str) { - int port_low = 7000, port_high = 7999; - sscanf(ports_str, "%d-%d", &port_low, &port_high); - domain_config_set_ports(port_low, port_high); - } - const char *advertise = resp_map_get_string(cfg_sec, "advertise"); - if (advertise) { - domain_config_set_advertise(advertise); - } - } - } - - if (cluster_config_parse() != 0) { - log_error("cluster: failed to parse cluster config"); - return 1; - } - - register_domain_commands(); - - log_info("udphole: starting cluster daemon"); - - cluster_udata_t *udata = calloc(1, sizeof(cluster_udata_t)); - udata->last_health_check = 0; - - domain_schedmod_pt_create(api_server_pt, NULL); - domain_schedmod_pt_create(cluster_manager_pt, udata); - - log_info("udphole: cluster daemon started"); - - return domain_schedmod_main(); -} - -PT_THREAD(cluster_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { - cluster_udata_t *udata = task->udata; - PT_BEGIN(pt); - - PT_WAIT_UNTIL(pt, cluster_nodes); - - for (;;) { - if (timestamp - udata->last_health_check >= HEALTH_CHECK_INTERVAL * 1000) { - cluster_health_check(); - udata->last_health_check = timestamp; - } - PT_YIELD(pt); - } - - PT_END(pt); -} diff --git a/src/interface/cli/command/cluster.h b/src/interface/cli/command/cluster.h @@ -1,6 +0,0 @@ -#ifndef UDPHOLE_CLI_CLUSTER_H -#define UDPHOLE_CLI_CLUSTER_H - -int cli_cmd_cluster(int argc, const char **argv); - -#endif diff --git a/src/interface/cli/main.c b/src/interface/cli/main.c @@ -14,7 +14,6 @@ extern "C" { #include "interface/cli/common.h" #include "interface/cli/command/list_commands.h" #include "interface/cli/command/daemon.h" -#include "interface/cli/command/cluster.h" #include "infrastructure/config.h" #ifdef __cplusplus @@ -181,12 +180,6 @@ int main(int argc, const char **argv) { cli_cmd_daemon ); - cli_register_command( - "cluster", - "Run the udphole cluster daemon", - cli_cmd_cluster - ); - struct argparse argparse; struct argparse_option options[] = { OPT_HELP(), diff --git a/test/cluster-basic.js b/test/cluster-basic.js @@ -1,84 +0,0 @@ -const path = require('path'); -const { - spawnDaemon, - killDaemon, - killAllDaemons, - connectApi, - apiCommand, - sleep -} = require('./helpers'); - -const NODE1_CONFIG = path.join(__dirname, 'config-cluster-node1.ini'); -const NODE2_CONFIG = path.join(__dirname, 'config-cluster-node2.ini'); -const CLUSTER_CONFIG = path.join(__dirname, 'config-cluster.ini'); - -async function runTest() { - let daemon1 = null; - let daemon2 = null; - let cluster = null; - let apiSock1 = null; - let apiSock2 = null; - let clusterSock = null; - let returnCode = 0; - - console.log('=== Cluster Daemon Test ==='); - console.log('Testing: session distribution across clustered nodes\n'); - - try { - console.log('1. Spawning node 1 daemon...'); - daemon1 = await spawnDaemon(NODE1_CONFIG); - await sleep(1000); - console.log(' Node 1 started on port 19123'); - - console.log('2. Spawning node 2 daemon...'); - daemon2 = await spawnDaemon(NODE2_CONFIG); - await sleep(1000); - console.log(' Node 2 started on port 19124'); - - console.log('3. Connecting to node 1 API...'); - apiSock1 = await connectApi(19123); - console.log(' Connected'); - - console.log('4. Connecting to node 2 API...'); - apiSock2 = await connectApi(19124); - console.log(' Connected'); - - console.log('5. Authenticating with node 1...'); - let resp = await apiCommand(apiSock1, 'auth', 'finwo', 'testsecret'); - console.log(` Auth response: ${resp}`); - if (resp !== 'OK') throw new Error('Auth failed for node 1'); - - console.log('6. Authenticating with node 2...'); - resp = await apiCommand(apiSock2, 'auth', 'finwo', 'testsecret'); - console.log(` Auth response: ${resp}`); - if (resp !== 'OK') throw new Error('Auth failed for node 2'); - - console.log('\n7. Testing individual nodes have 0 sessions...'); - resp = await apiCommand(apiSock1, 'session.count'); - console.log(` Node 1 session.count: ${resp}`); - if (resp !== 0) throw new Error('Expected 0 sessions on node 1'); - - resp = await apiCommand(apiSock2, 'session.count'); - console.log(` Node 2 session.count: ${resp}`); - if (resp !== 0) throw new Error('Expected 0 sessions on node 2'); - - console.log('\n✓ PASS: Basic cluster setup works'); - console.log('\nNote: Full cluster daemon requires cluster config file.'); - console.log('This test verified that node daemons can run independently.'); - - } catch (err) { - console.error(`\n✗ FAIL: ${err.message}`); - returnCode = 1; - } finally { - if (apiSock1) apiSock1.end(); - if (apiSock2) apiSock2.end(); - if (clusterSock) clusterSock.end(); - if (daemon1) await killDaemon(daemon1); - if (daemon2) await killDaemon(daemon2); - if (cluster) await killDaemon(cluster); - await killAllDaemons(); - process.exit(returnCode); - } -} - -runTest(); diff --git a/test/cluster-integration.js b/test/cluster-integration.js @@ -1,107 +0,0 @@ -const path = require('path'); -const { - spawnDaemon, - killDaemon, - killAllDaemons, - connectApi, - apiCommand, - sleep, - findFreePort -} = require('./helpers'); - -const NODE1_CONFIG = path.join(__dirname, 'config-cluster-node1.ini'); -const NODE2_CONFIG = path.join(__dirname, 'config-cluster-node2.ini'); - -const DAEMON_PATH = path.join(__dirname, '..', 'udphole'); -const { spawn } = require('child_process'); - -async function runTest() { - let daemon1 = null; - let daemon2 = null; - let cluster = null; - let apiSock1 = null; - let apiSock2 = null; - let clusterSock = null; - let returnCode = 0; - - console.log('=== Cluster Integration Test ==='); - console.log('Testing: session creation, aggregation, and forwarding\n'); - - try { - console.log('1. Spawning node 1 daemon...'); - daemon1 = await spawnDaemon(NODE1_CONFIG); - await sleep(1000); - console.log(' Node 1 started on port 19123'); - - console.log('2. Spawning node 2 daemon...'); - daemon2 = await spawnDaemon(NODE2_CONFIG); - await sleep(1000); - console.log(' Node 2 started on port 19124'); - - console.log('3. Connecting to node 1 API...'); - apiSock1 = await connectApi(19123); - console.log(' Connected'); - - console.log('4. Connecting to node 2 API...'); - apiSock2 = await connectApi(19124); - console.log(' Connected'); - - console.log('5. Authenticating with nodes...'); - let resp = await apiCommand(apiSock1, 'auth', 'finwo', 'testsecret'); - if (resp !== 'OK') throw new Error('Auth failed for node 1'); - resp = await apiCommand(apiSock2, 'auth', 'finwo', 'testsecret'); - if (resp !== 'OK') throw new Error('Auth failed for node 2'); - console.log(' Auth OK on both nodes'); - - console.log('\n6. Creating sessions directly on nodes...'); - resp = await apiCommand(apiSock1, 'session.create', 'session-node1'); - console.log(` Node 1 session.create: ${JSON.stringify(resp)}`); - if (!Array.isArray(resp) || resp[0] !== 'OK') throw new Error(`Failed to create session on node 1: got ${JSON.stringify(resp)}`); - - resp = await apiCommand(apiSock2, 'session.create', 'session-node2'); - console.log(` Node 2 session.create: ${JSON.stringify(resp)}`); - if (!Array.isArray(resp) || resp[0] !== 'OK') throw new Error('Failed to create session on node 2'); - - console.log('\n7. Verifying session counts on individual nodes...'); - resp = await apiCommand(apiSock1, 'session.count'); - console.log(` Node 1 session.count: ${resp}`); - if (resp !== 1) throw new Error(`Expected 1 session on node 1, got ${resp}`); - - resp = await apiCommand(apiSock2, 'session.count'); - console.log(` Node 2 session.count: ${resp}`); - if (resp !== 1) throw new Error(`Expected 1 session on node 2, got ${resp}`); - - console.log('\n8. Verifying socket creation returns correct advertise address...'); - resp = await apiCommand(apiSock1, 'session.socket.create.listen', 'session-node1', 'socket1'); - console.log(` Node 1 socket create listen: ${JSON.stringify(resp)}`); - if (!Array.isArray(resp) || resp[1] !== '127.0.0.1') { - throw new Error(`Expected advertise address 127.0.0.1 for node 1, got ${JSON.stringify(resp)}`); - } - console.log(' ✓ Contains advertise address 127.0.0.1'); - - resp = await apiCommand(apiSock2, 'session.socket.create.listen', 'session-node2', 'socket2'); - console.log(` Node 2 socket create listen: ${JSON.stringify(resp)}`); - if (!Array.isArray(resp) || resp[1] !== '127.0.0.2') { - throw new Error(`Expected advertise address 127.0.0.2 for node 2, got ${JSON.stringify(resp)}`); - } - console.log(' ✓ Contains advertise address 127.0.0.2'); - - console.log('\n✓ PASS: Cluster integration test passed'); - - } catch (err) { - console.error(`\n✗ FAIL: ${err.message}`); - console.error(err.stack); - returnCode = 1; - } finally { - if (apiSock1) apiSock1.end(); - if (apiSock2) apiSock2.end(); - if (clusterSock) clusterSock.end(); - if (daemon1) await killDaemon(daemon1); - if (daemon2) await killDaemon(daemon2); - if (cluster) await killDaemon(cluster); - await killAllDaemons(); - process.exit(returnCode); - } -} - -runTest(); diff --git a/test/config-cluster-node1.ini b/test/config-cluster-node1.ini @@ -1,9 +0,0 @@ -[udphole] -mode = builtin -ports = 19000-19099 -listen = :19123 -advertise = 127.0.0.1 - -[user:finwo] -permit = * -secret = testsecret diff --git a/test/config-cluster-node2.ini b/test/config-cluster-node2.ini @@ -1,9 +0,0 @@ -[udphole] -mode = builtin -ports = 19100-19199 -listen = :19124 -advertise = 127.0.0.2 - -[user:finwo] -permit = * -secret = testsecret diff --git a/test/config-cluster.ini b/test/config-cluster.ini @@ -1,19 +0,0 @@ -[cluster] -listen = :19125 -nodes = node1,node2 - -[node1] -url = tcp://127.0.0.1:19123 -weight = 1 -user = finwo -secret = testsecret - -[node2] -url = tcp://127.0.0.1:19124 -weight = 1 -user = finwo -secret = testsecret - -[user:finwo] -permit = * -secret = testsecret