commit 9621d56ec9bd1dac721633dc8ee5c8151f4f39bd
parent d9cbcfcf98fb9de5ddcb6ed1aa5560d760bae242
Author: Robin Bron <robin.bron@yourhosting.nl>
Date: Mon, 2 Mar 2026 15:02:55 +0100
Re-implement clustering
Diffstat:
24 files changed, 1561 insertions(+), 130 deletions(-)
diff --git a/Makefile b/Makefile
@@ -123,6 +123,8 @@ test: $(BIN)
@node test/listen-relearn-unix.js
@sleep 1
@node test/connect-drop-unknown.js
+ @sleep 1
+ @node test/cluster.js
.PHONY: clean
clean:
diff --git a/README.md b/README.md
@@ -127,6 +127,36 @@ session.forward.create nat-traversal client-b client-a
---
+## Running the cluster
+
+The cluster command acts as a proxy to multiple backing udphole daemon nodes.
+
+```bash
+# Foreground
+./udphole cluster
+
+# Background
+./udphole -f /etc/udphole.conf cluster -d
+
+# Force foreground
+./udphole -f /etc/udphole.conf cluster -D
+```
+
+| Option | Short | Description |
+|--------|--------|--------------|
+| `--daemonize` | `-d` | Run in background (double fork, detach from terminal). |
+| `--no-daemonize` | `-D` | Force foreground; overrides `daemonize=1` in config. |
+
+The cluster provides the **exact same API** as the regular daemon but forwards commands to backing nodes:
+
+- `session.create` - Routes to the node with the lowest weight/sessioncount ratio
+- `session.list` / `session.count` - Aggregates results from all nodes
+- `session.info` / `session.destroy` / `socket.*` / `forward.*` - Routes to the node that has the session
+
+The cluster holds no local state - it queries backing nodes on demand and performs healthchecks (every 5 seconds) to track node availability.
+
+---
+
## Configuration
```ini
@@ -147,11 +177,20 @@ permit = ping
| Option | Description |
|--------|-------------|
-| `ports` | Port range for UDP sockets, as `low-high` (e.g. `7000-7999`). Default 7000ā7999. |
+| `ports` | Port range for UDP sockets, as `low-high` (e.g. `7000-7999`). Default 7000ā7999. (daemon only) |
| `listen` | API server listen address. If not set, API server is disabled. |
-| `advertise` | Optional. IP address to advertise in API responses instead of the port number. Useful when behind NAT. |
+| `advertise` | Optional. IP address to advertise in API responses instead of the port number. Useful when behind NAT. (daemon only) |
+| `cluster` | Repeat for each backing node name. Enables cluster mode and specifies node names. |
+
+### `[cluster:<name>]`
+
+| Option | Description |
+|--------|-------------|
+| `address` | Connection string for the backing node (e.g., `tcp://127.0.0.1:19122` or `unix:///path/to/socket`). |
+| `username` | Username for authentication to the backing node. |
+| `password` | Password for authentication to the backing node. |
-### `[user:<name>]`
+### `[api:<name>]`
| Option | Description |
|--------|-------------|
diff --git a/src/common/resp.c b/src/common/resp.c
@@ -396,6 +396,14 @@ resp_object *resp_array_init(void) {
return o;
}
+resp_object *resp_simple_init(const char *value) {
+ resp_object *o = calloc(1, sizeof(resp_object));
+ if (!o) return NULL;
+ o->type = RESPT_SIMPLE;
+ o->u.s = value ? strdup(value) : NULL;
+ return o;
+}
+
int resp_array_append_obj(resp_object *destination, resp_object *value) {
if (!destination || destination->type != RESPT_ARRAY || !value) return -1;
size_t n = destination->u.arr.n;
@@ -408,6 +416,15 @@ int resp_array_append_obj(resp_object *destination, resp_object *value) {
return 0;
}
+resp_object *resp_error_init(const char *value) {
+ resp_object *o = calloc(1, sizeof(resp_object));
+ if (!o) return NULL;
+ o->type = RESPT_ERROR;
+ o->u.s = strdup(value ? value : "");
+ if (!o->u.s) { free(o); return NULL; }
+ return o;
+}
+
int resp_array_append_simple(resp_object *destination, const char *str) {
resp_object *o = calloc(1, sizeof(resp_object));
if (!o) return -1;
@@ -418,6 +435,16 @@ int resp_array_append_simple(resp_object *destination, const char *str) {
return 0;
}
+int resp_array_append_error(resp_object *destination, const char *str) {
+ resp_object *o = calloc(1, sizeof(resp_object));
+ if (!o) return -1;
+ o->type = RESPT_ERROR;
+ o->u.s = strdup(str ? str : "");
+ if (!o->u.s) { free(o); return -1; }
+ if (resp_array_append_obj(destination, o) != 0) { free(o->u.s); free(o); return -1; }
+ return 0;
+}
+
int resp_array_append_bulk(resp_object *destination, const char *str) {
resp_object *o = calloc(1, sizeof(resp_object));
if (!o) return -1;
diff --git a/src/common/resp.h b/src/common/resp.h
@@ -49,6 +49,12 @@ int resp_serialize(const resp_object *o, char **out_buf, size_t *out_len);
resp_object *resp_array_init(void);
/* Returns new array object: caller owns the result, must call resp_free() */
+resp_object *resp_simple_init(const char *value);
+/* Returns new simple string object: caller owns the result, must call resp_free() */
+
+resp_object *resp_error_init(const char *value);
+/* Returns new error object: caller owns the result, must call resp_free() */
+
int resp_array_append_obj(resp_object *destination, resp_object *value);
/* Takes ownership of value */
diff --git a/src/domain/cluster/cluster.c b/src/domain/cluster/cluster.c
@@ -0,0 +1,516 @@
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <math.h>
+
+#include "rxi/log.h"
+#include "common/resp.h"
+#include "common/scheduler.h"
+#include "domain/config.h"
+#include "cluster.h"
+
+cluster_state_t *cluster_state = NULL;
+
+static int is_session_not_found(resp_object *resp) {
+ if (!resp) return 0;
+ if (resp->type == RESPT_ERROR && resp->u.s) {
+ return (strstr(resp->u.s, "session") != NULL && strstr(resp->u.s, "not found") != NULL) ||
+ (strstr(resp->u.s, "not found") != NULL);
+ }
+ return 0;
+}
+
+static resp_object *cluster_forward_to_nodes(const char *cmd_str) {
+ size_t available_count = 0;
+ cluster_node_t **available = cluster_nodes_get_available(cluster_state->nodes, &available_count);
+
+ if (!available || available_count == 0) {
+ free(available);
+ return resp_error_init("ERR no available nodes");
+ }
+
+ for (size_t i = 0; i < available_count; i++) {
+ cluster_node_t *node = available[i];
+
+ resp_object *resp = NULL;
+ cluster_node_send_command(node, cmd_str, &resp);
+
+ if (!resp) {
+ node->available = 0;
+ continue;
+ }
+
+ if (is_session_not_found(resp)) {
+ resp_free(resp);
+ continue;
+ }
+
+ if (resp->type == RESPT_ERROR) {
+ node->available = 0;
+ resp_free(resp);
+ continue;
+ }
+
+ free(available);
+ return resp;
+ }
+
+ free(available);
+ return resp_error_init("ERR session not found");
+}
+
+void cluster_init(void) {
+ if (cluster_state) {
+ cluster_shutdown();
+ }
+
+ cluster_state = calloc(1, sizeof(cluster_state_t));
+ cluster_state->nodes = cluster_nodes_create();
+
+ if (!domain_cfg) return;
+
+ resp_object *cluster_nodes = domain_config_get_cluster_nodes();
+ if (!cluster_nodes || cluster_nodes->type != RESPT_ARRAY) {
+ log_error("cluster: no cluster nodes configured");
+ return;
+ }
+
+ log_info("cluster: found %zu cluster nodes", cluster_nodes->u.arr.n);
+ for (size_t i = 0; i < cluster_nodes->u.arr.n; i++) {
+ resp_object *elem = &cluster_nodes->u.arr.elem[i];
+ if (elem->type != RESPT_BULK || !elem->u.s) continue;
+
+ const char *node_name = elem->u.s;
+
+ char node_section[256];
+ snprintf(node_section, sizeof(node_section), "cluster:%s", node_name);
+ resp_object *node_sec = resp_map_get(domain_cfg, node_section);
+
+ const char *address = node_sec ? resp_map_get_string(node_sec, "address") : NULL;
+ const char *username = node_sec ? resp_map_get_string(node_sec, "username") : NULL;
+ const char *password = node_sec ? resp_map_get_string(node_sec, "password") : NULL;
+
+ if (!address) {
+ log_error("cluster: node '%s' has no address configured", node_name);
+ continue;
+ }
+
+ cluster_node_t *node = calloc(1, sizeof(cluster_node_t));
+ if (cluster_node_init(node, node_name, address, username, password) == 0) {
+ cluster_nodes_add(cluster_state->nodes, node);
+ sched_create(cluster_node_healthcheck_pt, node);
+ log_info("cluster: added node '%s' at %s", node->name, node->address);
+ } else {
+ cluster_node_free(node);
+ free(node);
+ }
+ }
+
+ cluster_state->initialized = 1;
+}
+
+void cluster_reload(void) {
+ cluster_shutdown();
+ cluster_init();
+}
+
+void cluster_shutdown(void) {
+ if (!cluster_state) return;
+ if (cluster_state->nodes) {
+ cluster_nodes_free(cluster_state->nodes);
+ }
+ free(cluster_state);
+ cluster_state = NULL;
+}
+
+static char *serialize_args(resp_object *args) {
+ char *cmd_str = NULL;
+ size_t cmd_len = 0;
+ resp_serialize(args, &cmd_str, &cmd_len);
+ return cmd_str;
+}
+
+resp_object *cluster_session_create(const char *cmd, resp_object *args) {
+ (void)cmd;
+ if (!cluster_state || !cluster_state->nodes) {
+ return resp_error_init("ERR cluster not initialized");
+ }
+
+ char *cmd_str = serialize_args(args);
+ if (!cmd_str) {
+ return resp_error_init("ERR failed to serialize command");
+ }
+
+ cluster_node_t *selected_node = NULL;
+ double selected_ratio = -1.0;
+ resp_object *resp = NULL;
+
+ for (int attempt = 0; attempt < 10; attempt++) {
+ size_t available_count = 0;
+ cluster_node_t **available = cluster_nodes_get_available(cluster_state->nodes, &available_count);
+
+ if (!available || available_count == 0) {
+ free(available);
+ break;
+ }
+
+ selected_node = NULL;
+ selected_ratio = -1.0;
+
+ for (size_t i = 0; i < available_count; i++) {
+ cluster_node_t *node = available[i];
+
+ resp_object *count_args = resp_array_init();
+ resp_array_append_bulk(count_args, "session.count");
+ char *count_str = serialize_args(count_args);
+ resp_free(count_args);
+
+ size_t node_session_count = 0;
+ if (count_str) {
+ resp_object *count_resp = NULL;
+ if (cluster_node_send_command(node, count_str, &count_resp) == 0 && count_resp && count_resp->type == RESPT_INT) {
+ node_session_count = (size_t)count_resp->u.i;
+ }
+ if (count_resp) resp_free(count_resp);
+ free(count_str);
+ }
+
+ double ratio = (double)node_session_count / (double)node->weight;
+ if (selected_node == NULL || ratio < selected_ratio) {
+ selected_node = node;
+ selected_ratio = ratio;
+ }
+ }
+
+ free(available);
+
+ if (!selected_node) {
+ break;
+ }
+
+ if (cluster_node_send_command(selected_node, cmd_str, &resp) == 0) {
+ free(cmd_str);
+ return resp;
+ }
+
+ log_debug("cluster: session.create failed on node '%s', marking unavailable", selected_node->name);
+ selected_node->available = 0;
+ if (resp) {
+ resp_free(resp);
+ resp = NULL;
+ }
+ }
+
+ free(cmd_str);
+ if (resp) resp_free(resp);
+ return resp_error_init("ERR all nodes failed");
+}
+
+resp_object *cluster_session_list(const char *cmd, resp_object *args) {
+ (void)cmd;
+ (void)args;
+ if (!cluster_state || !cluster_state->nodes) {
+ return resp_error_init("ERR cluster not initialized");
+ }
+
+ size_t available_count = 0;
+ cluster_node_t **available = cluster_nodes_get_available(cluster_state->nodes, &available_count);
+
+ if (!available || available_count == 0) {
+ free(available);
+ resp_object *res = resp_array_init();
+ return res;
+ }
+
+ resp_object *result = resp_array_init();
+
+ for (size_t i = 0; i < available_count; i++) {
+ cluster_node_t *node = available[i];
+
+ resp_object *cmd_args = resp_array_init();
+ resp_array_append_bulk(cmd_args, "session.list");
+ char *cmd_str = serialize_args(cmd_args);
+ resp_free(cmd_args);
+
+ if (!cmd_str) continue;
+
+ resp_object *resp = NULL;
+ if (cluster_node_send_command(node, cmd_str, &resp) == 0 && resp && resp->type == RESPT_ARRAY) {
+ for (size_t j = 0; j < resp->u.arr.n; j++) {
+ resp_object *elem = &resp->u.arr.elem[j];
+ resp_object *copy = resp_deep_copy(elem);
+ if (copy) resp_array_append_obj(result, copy);
+ }
+ }
+ if (resp) resp_free(resp);
+ free(cmd_str);
+ }
+
+ free(available);
+ return result;
+}
+
+resp_object *cluster_session_info(const char *cmd, resp_object *args) {
+ (void)cmd;
+ if (!cluster_state || !cluster_state->nodes) {
+ return resp_error_init("ERR cluster not initialized");
+ }
+
+ size_t available_count = 0;
+ cluster_node_t **available = cluster_nodes_get_available(cluster_state->nodes, &available_count);
+
+ if (!available || available_count == 0) {
+ free(available);
+ return resp_error_init("ERR no available nodes");
+ }
+
+ resp_object *cmd_args = resp_array_init();
+ for (size_t i = 0; i < args->u.arr.n && i < 4; i++) {
+ resp_array_append_obj(cmd_args, resp_deep_copy(&args->u.arr.elem[i]));
+ }
+ char *cmd_str = serialize_args(cmd_args);
+ resp_free(cmd_args);
+
+ if (!cmd_str) {
+ free(available);
+ return resp_error_init("ERR failed to serialize command");
+ }
+
+ for (size_t i = 0; i < available_count; i++) {
+ cluster_node_t *node = available[i];
+
+ resp_object *resp = NULL;
+ if (cluster_node_send_command(node, cmd_str, &resp) == 0) {
+ if (resp && resp->type != RESPT_ERROR) {
+ free(cmd_str);
+ free(available);
+ return resp;
+ }
+ if (resp && !is_session_not_found(resp)) {
+ free(cmd_str);
+ free(available);
+ return resp;
+ }
+ if (resp) resp_free(resp);
+ }
+ }
+
+ free(cmd_str);
+ free(available);
+ return resp_error_init("ERR session not found");
+}
+
+resp_object *cluster_session_destroy(const char *cmd, resp_object *args) {
+ (void)cmd;
+ if (!cluster_state || !cluster_state->nodes) {
+ return resp_error_init("ERR cluster not initialized");
+ }
+
+ resp_object *cmd_args = resp_array_init();
+ for (size_t i = 0; i < args->u.arr.n && i < 4; i++) {
+ resp_array_append_obj(cmd_args, resp_deep_copy(&args->u.arr.elem[i]));
+ }
+ char *cmd_str = serialize_args(cmd_args);
+ resp_free(cmd_args);
+
+ if (!cmd_str) {
+ return resp_error_init("ERR failed to serialize command");
+ }
+
+ return cluster_forward_to_nodes(cmd_str);
+}
+
+resp_object *cluster_socket_create_listen(const char *cmd, resp_object *args) {
+ (void)cmd;
+ if (!cluster_state || !cluster_state->nodes) {
+ return resp_error_init("ERR cluster not initialized");
+ }
+
+ resp_object *cmd_args = resp_array_init();
+ for (size_t i = 0; i < args->u.arr.n && i < 8; i++) {
+ resp_array_append_obj(cmd_args, resp_deep_copy(&args->u.arr.elem[i]));
+ }
+ char *cmd_str = serialize_args(cmd_args);
+ resp_free(cmd_args);
+
+ if (!cmd_str) {
+ return resp_error_init("ERR failed to serialize command");
+ }
+
+ return cluster_forward_to_nodes(cmd_str);
+}
+
+resp_object *cluster_socket_create_connect(const char *cmd, resp_object *args) {
+ (void)cmd;
+ if (!cluster_state || !cluster_state->nodes) {
+ return resp_error_init("ERR cluster not initialized");
+ }
+
+ resp_object *cmd_args = resp_array_init();
+ for (size_t i = 0; i < args->u.arr.n && i < 8; i++) {
+ resp_array_append_obj(cmd_args, resp_deep_copy(&args->u.arr.elem[i]));
+ }
+ char *cmd_str = serialize_args(cmd_args);
+ resp_free(cmd_args);
+
+ if (!cmd_str) {
+ return resp_error_init("ERR failed to serialize command");
+ }
+
+ return cluster_forward_to_nodes(cmd_str);
+}
+
+resp_object *cluster_socket_destroy(const char *cmd, resp_object *args) {
+ (void)cmd;
+ if (!cluster_state || !cluster_state->nodes) {
+ return resp_error_init("ERR cluster not initialized");
+ }
+
+ resp_object *cmd_args = resp_array_init();
+ for (size_t i = 0; i < args->u.arr.n && i < 8; i++) {
+ resp_array_append_obj(cmd_args, resp_deep_copy(&args->u.arr.elem[i]));
+ }
+ char *cmd_str = serialize_args(cmd_args);
+ resp_free(cmd_args);
+
+ if (!cmd_str) {
+ return resp_error_init("ERR failed to serialize command");
+ }
+
+ return cluster_forward_to_nodes(cmd_str);
+}
+
+resp_object *cluster_forward_list(const char *cmd, resp_object *args) {
+ (void)cmd;
+ if (!cluster_state || !cluster_state->nodes) {
+ return resp_error_init("ERR cluster not initialized");
+ }
+
+ resp_object *cmd_args = resp_array_init();
+ for (size_t i = 0; i < args->u.arr.n && i < 4; i++) {
+ resp_array_append_obj(cmd_args, resp_deep_copy(&args->u.arr.elem[i]));
+ }
+ char *cmd_str = serialize_args(cmd_args);
+ resp_free(cmd_args);
+
+ if (!cmd_str) {
+ return resp_error_init("ERR failed to serialize command");
+ }
+
+ return cluster_forward_to_nodes(cmd_str);
+}
+
+resp_object *cluster_forward_create(const char *cmd, resp_object *args) {
+ (void)cmd;
+ if (!cluster_state || !cluster_state->nodes) {
+ return resp_error_init("ERR cluster not initialized");
+ }
+
+ resp_object *cmd_args = resp_array_init();
+ for (size_t i = 0; i < args->u.arr.n && i < 8; i++) {
+ resp_array_append_obj(cmd_args, resp_deep_copy(&args->u.arr.elem[i]));
+ }
+ char *cmd_str = serialize_args(cmd_args);
+ resp_free(cmd_args);
+
+ if (!cmd_str) {
+ return resp_error_init("ERR failed to serialize command");
+ }
+
+ return cluster_forward_to_nodes(cmd_str);
+}
+
+resp_object *cluster_forward_destroy(const char *cmd, resp_object *args) {
+ (void)cmd;
+ if (!cluster_state || !cluster_state->nodes) {
+ return resp_error_init("ERR cluster not initialized");
+ }
+
+ resp_object *cmd_args = resp_array_init();
+ for (size_t i = 0; i < args->u.arr.n && i < 8; i++) {
+ resp_array_append_obj(cmd_args, resp_deep_copy(&args->u.arr.elem[i]));
+ }
+ char *cmd_str = serialize_args(cmd_args);
+ resp_free(cmd_args);
+
+ if (!cmd_str) {
+ return resp_error_init("ERR failed to serialize command");
+ }
+
+ return cluster_forward_to_nodes(cmd_str);
+}
+
+resp_object *cluster_session_count(const char *cmd, resp_object *args) {
+ (void)cmd;
+ (void)args;
+ if (!cluster_state || !cluster_state->nodes) {
+ return resp_error_init("ERR cluster not initialized");
+ }
+
+ size_t available_count = 0;
+ cluster_node_t **available = cluster_nodes_get_available(cluster_state->nodes, &available_count);
+
+ if (!available || available_count == 0) {
+ free(available);
+ resp_object *res = malloc(sizeof(resp_object));
+ if (!res) return NULL;
+ res->type = RESPT_INT;
+ res->u.i = 0;
+ return res;
+ }
+
+ size_t total_count = 0;
+
+ for (size_t i = 0; i < available_count; i++) {
+ cluster_node_t *node = available[i];
+
+ resp_object *cmd_args = resp_array_init();
+ resp_array_append_bulk(cmd_args, "session.count");
+ char *cmd_str = serialize_args(cmd_args);
+ resp_free(cmd_args);
+
+ if (!cmd_str) continue;
+
+ resp_object *resp = NULL;
+ if (cluster_node_send_command(node, cmd_str, &resp) == 0 && resp && resp->type == RESPT_INT) {
+ total_count += (size_t)resp->u.i;
+ }
+ if (resp) resp_free(resp);
+ free(cmd_str);
+ }
+
+ free(available);
+
+ resp_object *res = malloc(sizeof(resp_object));
+ if (!res) return NULL;
+ res->type = RESPT_INT;
+ res->u.i = (long long)total_count;
+ return res;
+}
+
+resp_object *cluster_system_load(const char *cmd, resp_object *args) {
+ (void)cmd;
+ (void)args;
+
+ double loadavg[3];
+ if (getloadavg(loadavg, 3) != 3) {
+ return resp_error_init("ERR failed to get load average");
+ }
+
+ resp_object *res = resp_array_init();
+ char buf[64];
+
+ resp_array_append_bulk(res, "1min");
+ snprintf(buf, sizeof(buf), "%.2f", loadavg[0]);
+ resp_array_append_bulk(res, buf);
+
+ resp_array_append_bulk(res, "5min");
+ snprintf(buf, sizeof(buf), "%.2f", loadavg[1]);
+ resp_array_append_bulk(res, buf);
+
+ resp_array_append_bulk(res, "15min");
+ snprintf(buf, sizeof(buf), "%.2f", loadavg[2]);
+ resp_array_append_bulk(res, buf);
+
+ return res;
+}
diff --git a/src/domain/cluster/cluster.h b/src/domain/cluster/cluster.h
@@ -0,0 +1,33 @@
+#ifndef UDPHOLE_CLUSTER_H
+#define UDPHOLE_CLUSTER_H
+
+#include "common/resp.h"
+#include "node.h"
+
+typedef struct {
+ cluster_nodes_t *nodes;
+ int initialized;
+} cluster_state_t;
+
+extern cluster_state_t *cluster_state;
+
+void cluster_init(void);
+
+void cluster_reload(void);
+
+void cluster_shutdown(void);
+
+resp_object *cluster_session_create(const char *cmd, resp_object *args);
+resp_object *cluster_session_list(const char *cmd, resp_object *args);
+resp_object *cluster_session_info(const char *cmd, resp_object *args);
+resp_object *cluster_session_destroy(const char *cmd, resp_object *args);
+resp_object *cluster_socket_create_listen(const char *cmd, resp_object *args);
+resp_object *cluster_socket_create_connect(const char *cmd, resp_object *args);
+resp_object *cluster_socket_destroy(const char *cmd, resp_object *args);
+resp_object *cluster_forward_list(const char *cmd, resp_object *args);
+resp_object *cluster_forward_create(const char *cmd, resp_object *args);
+resp_object *cluster_forward_destroy(const char *cmd, resp_object *args);
+resp_object *cluster_session_count(const char *cmd, resp_object *args);
+resp_object *cluster_system_load(const char *cmd, resp_object *args);
+
+#endif
diff --git a/src/domain/cluster/node.c b/src/domain/cluster/node.c
@@ -0,0 +1,367 @@
+#include <stdlib.h>
+#include <string.h>
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <sys/socket.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+
+#include "rxi/log.h"
+#include "common/resp.h"
+#include "common/socket_util.h"
+#include "node.h"
+
+#define QUERY_TIMEOUT_MS 500
+#define HEALTHCHECK_INTERVAL_MS 5000
+
+static int parse_address(const char *address, char **host, int *port, char **unix_path) {
+ if (!address) return -1;
+
+ if (strncmp(address, "tcp://", 6) == 0) {
+ const char *hp = address + 6;
+ const char *colon = strchr(hp, ':');
+ if (!colon) {
+ log_error("cluster: invalid tcp address '%s' (missing port)", address);
+ return -1;
+ }
+
+ size_t host_len = colon - hp;
+ *host = malloc(host_len + 1);
+ if (!*host) return -1;
+ memcpy(*host, hp, host_len);
+ (*host)[host_len] = '\0';
+
+ *port = atoi(colon + 1);
+ if (*port <= 0) {
+ log_error("cluster: invalid port in tcp address '%s'", address);
+ free(*host);
+ *host = NULL;
+ return -1;
+ }
+ *unix_path = NULL;
+ return 0;
+
+ } else if (strncmp(address, "unix://", 7) == 0) {
+ *unix_path = strdup(address + 7);
+ *host = NULL;
+ *port = 0;
+ return 0;
+ }
+
+ log_error("cluster: unknown address scheme in '%s' (expected tcp:// or unix://)", address);
+ return -1;
+}
+
+int cluster_node_init(cluster_node_t *node, const char *name, const char *address, const char *username, const char *password) {
+ memset(node, 0, sizeof(*node));
+
+ node->name = strdup(name);
+ node->address = address ? strdup(address) : NULL;
+ node->username = username ? strdup(username) : NULL;
+ node->password = password ? strdup(password) : NULL;
+
+ if (!node->address) {
+ log_error("cluster: node '%s' has no address configured", name);
+ return -1;
+ }
+
+ if (parse_address(node->address, &node->host, &node->port, &node->unix_path) != 0) {
+ return -1;
+ }
+
+ node->fd = -1;
+ node->available = 0;
+ node->last_ping = 0;
+ node->last_check = 0;
+ node->weight = 1;
+
+ return 0;
+}
+
+void cluster_node_free(cluster_node_t *node) {
+ if (!node) return;
+ cluster_node_disconnect(node);
+ free(node->name);
+ free(node->address);
+ free(node->host);
+ free(node->unix_path);
+ free(node->username);
+ free(node->password);
+}
+
+static int connect_tcp(const char *host, int port) {
+ struct addrinfo hints, *res, *res0;
+ int sockfd, error;
+
+ memset(&hints, 0, sizeof(hints));
+ hints.ai_family = AF_UNSPEC;
+ hints.ai_socktype = SOCK_STREAM;
+ hints.ai_protocol = 0;
+
+ char port_str[16];
+ snprintf(port_str, sizeof(port_str), "%d", port);
+
+ error = getaddrinfo(host, port_str, &hints, &res0);
+ if (error) {
+ log_error("cluster: getaddrinfo(%s:%s): %s", host, port_str, gai_strerror(error));
+ return -1;
+ }
+
+ sockfd = -1;
+ for (res = res0; res; res = res->ai_next) {
+ sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
+ if (sockfd < 0) continue;
+
+ if (connect(sockfd, res->ai_addr, res->ai_addrlen) == 0) break;
+
+ close(sockfd);
+ sockfd = -1;
+ }
+
+ freeaddrinfo(res0);
+
+ if (sockfd >= 0) {
+ int flag = 1;
+ setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
+ }
+
+ return sockfd;
+}
+
+static int connect_unix(const char *path) {
+ struct sockaddr_un addr;
+ int sockfd;
+
+ if (strlen(path) >= sizeof(addr.sun_path)) {
+ log_error("cluster: unix socket path too long: %s", path);
+ return -1;
+ }
+
+ sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
+ if (sockfd < 0) {
+ return -1;
+ }
+
+ memset(&addr, 0, sizeof(addr));
+ addr.sun_family = AF_UNIX;
+ strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
+
+ if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
+ close(sockfd);
+ return -1;
+ }
+
+ return sockfd;
+}
+
+int cluster_node_connect(cluster_node_t *node) {
+ if (node->fd >= 0) {
+ return 0;
+ }
+
+ if (node->unix_path) {
+ node->fd = connect_unix(node->unix_path);
+ } else {
+ node->fd = connect_tcp(node->host, node->port);
+ }
+
+ if (node->fd < 0) {
+ log_debug("cluster: failed to connect to node '%s' at %s", node->name, node->address);
+ return -1;
+ }
+
+ log_info("cluster: connected to node '%s' at %s", node->name, node->address);
+
+ if (node->username && node->password) {
+ char *cmd_str = NULL;
+ size_t cmd_len = 0;
+ resp_object *cmd = resp_array_init();
+ resp_array_append_bulk(cmd, "auth");
+ resp_array_append_bulk(cmd, node->username);
+ resp_array_append_bulk(cmd, node->password);
+ resp_serialize(cmd, &cmd_str, &cmd_len);
+ resp_free(cmd);
+
+ if (cmd_str) {
+ ssize_t n = send(node->fd, cmd_str, cmd_len, 0);
+ if (n > 0) {
+ resp_object *resp = resp_read(node->fd);
+ if (!resp || resp->type == RESPT_ERROR) {
+ log_warn("cluster: authentication failed for node '%s'", node->name);
+ close(node->fd);
+ node->fd = -1;
+ free(cmd_str);
+ return -1;
+ }
+ resp_free(resp);
+ }
+ free(cmd_str);
+ }
+ }
+
+ return 0;
+}
+
+void cluster_node_disconnect(cluster_node_t *node) {
+ if (node->fd >= 0) {
+ close(node->fd);
+ node->fd = -1;
+ }
+}
+
+static int send_resp_command(int fd, const char *cmd) {
+ size_t cmd_len = strlen(cmd);
+ ssize_t n = send(fd, cmd, cmd_len, 0);
+ return (n == (ssize_t)cmd_len) ? 0 : -1;
+}
+
+int cluster_node_send_command(cluster_node_t *node, const char *cmd, resp_object **out_response) {
+ if (node->fd < 0) {
+ if (cluster_node_connect(node) != 0) {
+ return -1;
+ }
+ }
+
+ if (send_resp_command(node->fd, cmd) != 0) {
+ log_debug("cluster: failed to send command to node '%s'", node->name);
+ cluster_node_disconnect(node);
+ return -1;
+ }
+
+ resp_object *resp = resp_read(node->fd);
+ if (!resp) {
+ log_debug("cluster: no response from node '%s'", node->name);
+ cluster_node_disconnect(node);
+ return -1;
+ }
+
+ *out_response = resp;
+ return 0;
+}
+
+int cluster_node_healthcheck_pt(int64_t timestamp, struct pt_task *task) {
+ cluster_node_t *node = task->udata;
+
+ if (!node) {
+ return SCHED_DONE;
+ }
+
+ if (node->last_check > 0 && timestamp - node->last_check < HEALTHCHECK_INTERVAL_MS) {
+ return SCHED_RUNNING;
+ }
+ node->last_check = timestamp;
+
+ if (node->fd < 0) {
+ if (cluster_node_connect(node) != 0) {
+ node->available = 0;
+ return SCHED_RUNNING;
+ }
+ }
+
+ char *cmd_str = NULL;
+ size_t cmd_len = 0;
+ resp_object *cmd = resp_array_init();
+ resp_array_append_bulk(cmd, "ping");
+ resp_serialize(cmd, &cmd_str, &cmd_len);
+ resp_free(cmd);
+
+ if (!cmd_str) {
+ node->available = 0;
+ cluster_node_disconnect(node);
+ return SCHED_RUNNING;
+ }
+
+ ssize_t n = send(node->fd, cmd_str, cmd_len, 0);
+ free(cmd_str);
+
+ if (n <= 0) {
+ node->available = 0;
+ cluster_node_disconnect(node);
+ return SCHED_RUNNING;
+ }
+
+ resp_object *resp = resp_read(node->fd);
+ if (!resp || (resp->type != RESPT_SIMPLE && resp->type != RESPT_BULK)) {
+ node->available = 0;
+ if (resp) resp_free(resp);
+ cluster_node_disconnect(node);
+ return SCHED_RUNNING;
+ }
+
+ node->available = 1;
+ node->last_ping = timestamp;
+ log_trace("cluster: node '%s' healthcheck OK", node->name);
+ resp_free(resp);
+
+ return SCHED_RUNNING;
+}
+
+cluster_nodes_t *cluster_nodes_create(void) {
+ cluster_nodes_t *cnodes = calloc(1, sizeof(cluster_nodes_t));
+ return cnodes;
+}
+
+void cluster_nodes_free(cluster_nodes_t *cnodes) {
+ if (!cnodes) return;
+ for (size_t i = 0; i < cnodes->nodes_count; i++) {
+ if (cnodes->nodes[i]) {
+ cluster_node_free(cnodes->nodes[i]);
+ free(cnodes->nodes[i]);
+ }
+ }
+ free(cnodes->nodes);
+ free(cnodes);
+}
+
+int cluster_nodes_add(cluster_nodes_t *cnodes, cluster_node_t *node) {
+ cluster_node_t **new_nodes = realloc(cnodes->nodes,
+ sizeof(cluster_node_t *) * (cnodes->nodes_count + 1));
+ if (!new_nodes) return -1;
+ cnodes->nodes = new_nodes;
+ cnodes->nodes[cnodes->nodes_count++] = node;
+ return 0;
+}
+
+cluster_node_t *cluster_nodes_get(cluster_nodes_t *cnodes, const char *name) {
+ for (size_t i = 0; i < cnodes->nodes_count; i++) {
+ if (cnodes->nodes[i] && strcmp(cnodes->nodes[i]->name, name) == 0) {
+ return cnodes->nodes[i];
+ }
+ }
+ return NULL;
+}
+
+cluster_node_t **cluster_nodes_get_available(cluster_nodes_t *cnodes, size_t *out_count) {
+ size_t count = 0;
+ for (size_t i = 0; i < cnodes->nodes_count; i++) {
+ if (cnodes->nodes[i] && cnodes->nodes[i]->available) {
+ count++;
+ }
+ }
+
+ if (count == 0) {
+ *out_count = 0;
+ return NULL;
+ }
+
+ cluster_node_t **available = malloc(sizeof(cluster_node_t *) * count);
+ if (!available) {
+ *out_count = 0;
+ return NULL;
+ }
+
+ count = 0;
+ for (size_t i = 0; i < cnodes->nodes_count; i++) {
+ if (cnodes->nodes[i] && cnodes->nodes[i]->available) {
+ available[count++] = cnodes->nodes[i];
+ }
+ }
+
+ *out_count = count;
+ return available;
+}
diff --git a/src/domain/cluster/node.h b/src/domain/cluster/node.h
@@ -0,0 +1,51 @@
+#ifndef UDPHOLE_CLUSTER_NODE_H
+#define UDPHOLE_CLUSTER_NODE_H
+
+#include <stdint.h>
+#include "common/resp.h"
+#include "common/scheduler.h"
+
+typedef struct cluster_node {
+ char *name;
+ char *address;
+ char *host;
+ int port;
+ char *unix_path;
+ char *username;
+ char *password;
+ int weight;
+
+ int fd;
+ int available;
+ int64_t last_ping;
+ int64_t last_check;
+} cluster_node_t;
+
+typedef struct {
+ cluster_node_t **nodes;
+ size_t nodes_count;
+} cluster_nodes_t;
+
+int cluster_node_init(cluster_node_t *node, const char *name, const char *address, const char *username, const char *password);
+
+void cluster_node_free(cluster_node_t *node);
+
+int cluster_node_connect(cluster_node_t *node);
+
+void cluster_node_disconnect(cluster_node_t *node);
+
+int cluster_node_send_command(cluster_node_t *node, const char *cmd, resp_object **out_response);
+
+int cluster_node_healthcheck_pt(int64_t timestamp, struct pt_task *task);
+
+cluster_nodes_t *cluster_nodes_create(void);
+
+void cluster_nodes_free(cluster_nodes_t *cnodes);
+
+int cluster_nodes_add(cluster_nodes_t *cnodes, cluster_node_t *node);
+
+cluster_node_t *cluster_nodes_get(cluster_nodes_t *cnodes, const char *name);
+
+cluster_node_t **cluster_nodes_get_available(cluster_nodes_t *cnodes, size_t *out_count);
+
+#endif
diff --git a/src/domain/config.c b/src/domain/config.c
@@ -1,40 +1,26 @@
#include <stdlib.h>
#include <string.h>
+#include <stdio.h>
#include "domain/config.h"
-udphole_config_t *domain_cfg = NULL;
+resp_object *domain_cfg = NULL;
void domain_config_init(void) {
- if (domain_cfg) {
- domain_config_free();
- }
- domain_cfg = calloc(1, sizeof(udphole_config_t));
- if (domain_cfg) {
- domain_cfg->port_low = 7000;
- domain_cfg->port_high = 7999;
- domain_cfg->port_cur = 7000;
- }
-}
-
-void domain_config_set_ports(int low, int high) {
- if (!domain_cfg) domain_config_init();
- if (!domain_cfg) return;
- domain_cfg->port_low = low > 0 ? low : 7000;
- domain_cfg->port_high = high > domain_cfg->port_low ? high : domain_cfg->port_low + 999;
- domain_cfg->port_cur = domain_cfg->port_low;
+ if (domain_cfg) return;
+ domain_cfg = resp_array_init();
}
-void domain_config_set_advertise(const char *addr) {
- if (!domain_cfg) domain_config_init();
- if (!domain_cfg) return;
- free(domain_cfg->advertise_addr);
- domain_cfg->advertise_addr = addr ? strdup(addr) : NULL;
+void domain_config_free(void) {
+ if (domain_cfg) {
+ resp_free(domain_cfg);
+ domain_cfg = NULL;
+ }
}
-void domain_config_free(void) {
- if (!domain_cfg) return;
- free(domain_cfg->advertise_addr);
- free(domain_cfg);
- domain_cfg = NULL;
+resp_object *domain_config_get_cluster_nodes(void) {
+ if (!domain_cfg) return NULL;
+ resp_object *udphole_sec = resp_map_get(domain_cfg, "udphole");
+ if (!udphole_sec) return NULL;
+ return resp_map_get(udphole_sec, "cluster");
}
diff --git a/src/domain/config.h b/src/domain/config.h
@@ -2,22 +2,14 @@
#define UDPHOLE_DOMAIN_CONFIG_H
#include <stdint.h>
+#include "common/resp.h"
-typedef struct {
- int port_low;
- int port_high;
- int port_cur;
- char *advertise_addr;
-} udphole_config_t;
-
-extern udphole_config_t *domain_cfg;
+extern resp_object *domain_cfg;
void domain_config_init(void);
-void domain_config_set_ports(int low, int high);
-
-void domain_config_set_advertise(const char *addr);
-
void domain_config_free(void);
+resp_object *domain_config_get_cluster_nodes(void);
+
#endif
diff --git a/src/domain/daemon/session.c b/src/domain/daemon/session.c
@@ -20,6 +20,10 @@
#include "tidwall/hashmap.h"
#include "session.h"
+static resp_object *get_udphole_cfg(void) {
+ return domain_cfg ? resp_map_get(domain_cfg, "udphole") : NULL;
+}
+
#define SESSION_HASH_SIZE 256
#define BUFFER_SIZE 4096
#define DEFAULT_IDLE_EXPIRY 60
@@ -79,12 +83,19 @@ static socket_t *find_socket(session_t *s, const char *socket_id) {
}
static int alloc_port(void) {
- if (!domain_cfg) return 0;
- for (int i = 0; i < domain_cfg->port_high - domain_cfg->port_low; i++) {
- int port = domain_cfg->port_cur + i;
- if (port > domain_cfg->port_high) port = domain_cfg->port_low;
- domain_cfg->port_cur = port + 1;
- if (domain_cfg->port_cur > domain_cfg->port_high) domain_cfg->port_cur = domain_cfg->port_low;
+ resp_object *udphole = get_udphole_cfg();
+ if (!udphole) return 0;
+ const char *ports_str = resp_map_get_string(udphole, "ports");
+ int port_low = 7000, port_high = 7999, port_cur = 7000;
+ if (ports_str) sscanf(ports_str, "%d-%d", &port_low, &port_high);
+ const char *port_cur_str = resp_map_get_string(udphole, "_port_cur");
+ if (port_cur_str) port_cur = atoi(port_cur_str);
+
+ for (int i = 0; i < port_high - port_low; i++) {
+ int port = port_cur + i;
+ if (port > port_high) port = port_low;
+ port_cur = port + 1;
+ if (port_cur > port_high) port_cur = port_low;
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
@@ -520,8 +531,7 @@ resp_object *domain_session_create(const char *cmd, resp_object *args) {
spawn_session_pt(s);
- resp_object *res = resp_array_init();
- resp_array_append_simple(res, "OK");
+ resp_object *res = resp_simple_init("OK");
return res;
}
@@ -562,8 +572,7 @@ resp_object *domain_session_info(const char *cmd, resp_object *args) {
session_t *s = find_session(session_id);
if (!s) {
- resp_object *err = resp_array_init();
- resp_array_append_simple(err, "ERR session not found");
+ resp_object *err = resp_error_init("ERR session not found");
return err;
}
@@ -626,15 +635,13 @@ resp_object *domain_session_destroy(const char *cmd, resp_object *args) {
session_t *s = find_session(session_id);
if (!s) {
- resp_object *err = resp_array_init();
- resp_array_append_simple(err, "ERR session not found");
+ resp_object *err = resp_error_init("ERR session not found");
return err;
}
destroy_session(s);
- resp_object *res = resp_array_init();
- resp_array_append_simple(res, "OK");
+ resp_object *res = resp_simple_init("OK");
return res;
}
@@ -664,8 +671,7 @@ resp_object *domain_socket_create_listen(const char *cmd, resp_object *args) {
session_t *s = find_session(session_id);
if (!s) {
- resp_object *err = resp_array_init();
- resp_array_append_simple(err, "ERR session not found");
+ resp_object *err = resp_error_init("ERR session not found");
return err;
}
@@ -678,7 +684,9 @@ resp_object *domain_socket_create_listen(const char *cmd, resp_object *args) {
resp_object *res = resp_array_init();
resp_array_append_int(res, sock->local_port);
- resp_array_append_bulk(res, domain_cfg && domain_cfg->advertise_addr ? domain_cfg->advertise_addr : "");
+ resp_object *udphole = get_udphole_cfg();
+ const char *advertise = udphole ? resp_map_get_string(udphole, "advertise") : NULL;
+ resp_array_append_bulk(res, advertise ? advertise : "");
return res;
}
@@ -718,8 +726,7 @@ resp_object *domain_socket_create_connect(const char *cmd, resp_object *args) {
session_t *s = find_session(session_id);
if (!s) {
- resp_object *err = resp_array_init();
- resp_array_append_simple(err, "ERR session not found");
+ resp_object *err = resp_error_init("ERR session not found");
return err;
}
@@ -732,7 +739,9 @@ resp_object *domain_socket_create_connect(const char *cmd, resp_object *args) {
resp_object *res = resp_array_init();
resp_array_append_int(res, sock->local_port);
- resp_array_append_bulk(res, domain_cfg && domain_cfg->advertise_addr ? domain_cfg->advertise_addr : "");
+ resp_object *udphole = get_udphole_cfg();
+ const char *advertise = udphole ? resp_map_get_string(udphole, "advertise") : NULL;
+ resp_array_append_bulk(res, advertise ? advertise : "");
return res;
}
@@ -762,8 +771,7 @@ resp_object *domain_socket_destroy(const char *cmd, resp_object *args) {
session_t *s = find_session(session_id);
if (!s) {
- resp_object *err = resp_array_init();
- resp_array_append_simple(err, "ERR session not found");
+ resp_object *err = resp_error_init("ERR session not found");
return err;
}
@@ -773,8 +781,7 @@ resp_object *domain_socket_destroy(const char *cmd, resp_object *args) {
return err;
}
- resp_object *res = resp_array_init();
- resp_array_append_simple(res, "OK");
+ resp_object *res = resp_simple_init("OK");
return res;
}
@@ -799,8 +806,7 @@ resp_object *domain_forward_list(const char *cmd, resp_object *args) {
session_t *s = find_session(session_id);
if (!s) {
- resp_object *err = resp_array_init();
- resp_array_append_simple(err, "ERR session not found");
+ resp_object *err = resp_error_init("ERR session not found");
return err;
}
@@ -842,8 +848,7 @@ resp_object *domain_forward_create(const char *cmd, resp_object *args) {
session_t *s = find_session(session_id);
if (!s) {
- resp_object *err = resp_array_init();
- resp_array_append_simple(err, "ERR session not found");
+ resp_object *err = resp_error_init("ERR session not found");
return err;
}
@@ -869,8 +874,7 @@ resp_object *domain_forward_create(const char *cmd, resp_object *args) {
log_debug("udphole: created forward %s -> %s in session %s", src_socket_id, dst_socket_id, session_id);
- resp_object *res = resp_array_init();
- resp_array_append_simple(res, "OK");
+ resp_object *res = resp_simple_init("OK");
return res;
}
@@ -904,8 +908,7 @@ resp_object *domain_forward_destroy(const char *cmd, resp_object *args) {
session_t *s = find_session(session_id);
if (!s) {
- resp_object *err = resp_array_init();
- resp_array_append_simple(err, "ERR session not found");
+ resp_object *err = resp_error_init("ERR session not found");
return err;
}
@@ -915,8 +918,7 @@ resp_object *domain_forward_destroy(const char *cmd, resp_object *args) {
return err;
}
- resp_object *res = resp_array_init();
- resp_array_append_simple(res, "OK");
+ resp_object *res = resp_simple_init("OK");
return res;
}
@@ -970,12 +972,16 @@ resp_object *domain_session_count(const char *cmd, resp_object *args) {
int session_manager_pt(int64_t timestamp, struct pt_task *task) {
session_manager_udata_t *udata = task->udata;
- if (!domain_cfg) {
+ resp_object *udphole = get_udphole_cfg();
+ if (!udphole) {
return SCHED_RUNNING;
}
if (!udata->initialized) {
- log_info("udphole: manager started with port range %d-%d", domain_cfg->port_low, domain_cfg->port_high);
+ const char *ports_str = resp_map_get_string(udphole, "ports");
+ int port_low = 7000, port_high = 7999;
+ if (ports_str) sscanf(ports_str, "%d-%d", &port_low, &port_high);
+ log_info("udphole: manager started with port range %d-%d", port_low, port_high);
udata->initialized = 1;
}
diff --git a/src/infrastructure/config.c b/src/infrastructure/config.c
@@ -4,8 +4,8 @@
#include "rxi/log.h"
#include "infrastructure/config.h"
#include "common/resp.h"
+#include "domain/config.h"
-resp_object *global_cfg = NULL;
resp_object *pending_cfg = NULL;
static const char *stored_config_path = NULL;
@@ -19,36 +19,57 @@ static int config_handler(void *user, const char *section, const char *name, con
resp_map_set(cfg, section, sec);
sec = resp_map_get(cfg, section);
}
- resp_array_append_bulk(sec, name);
- resp_array_append_bulk(sec, value);
+
+ if (strcmp(name, "cluster") == 0) {
+ resp_object *arr = resp_map_get(sec, "cluster");
+ if (!arr) {
+ arr = resp_array_init();
+ resp_map_set(sec, "cluster", arr);
+ arr = resp_map_get(sec, "cluster");
+ }
+ if (!arr || arr->type != RESPT_ARRAY) {
+ log_error("config: 'cluster' key already exists as non-array");
+ return 0;
+ }
+ resp_array_append_bulk(arr, value);
+ } else {
+ resp_array_append_bulk(sec, name);
+ resp_array_append_bulk(sec, value);
+ }
return 1;
}
void config_init(void) {
- global_cfg = resp_array_init();
- config_load(global_cfg, config_get_path());
+ if (pending_cfg) resp_free(pending_cfg);
+ pending_cfg = resp_array_init();
+ config_load(NULL, config_get_path());
+ resp_object *old = domain_cfg;
+ domain_cfg = pending_cfg;
+ pending_cfg = NULL;
+ if (old) resp_free(old);
}
int config_load(resp_object *cfg, const char *path) {
- return ini_parse(path, config_handler, cfg);
+ resp_object *load_cfg = cfg;
+ if (!load_cfg) {
+ load_cfg = pending_cfg;
+ }
+ return ini_parse(path, config_handler, load_cfg);
}
void config_pending_init(void) {
+ if (pending_cfg) resp_free(pending_cfg);
pending_cfg = resp_array_init();
}
-void config_swap(void) {
- resp_object *old = global_cfg;
- global_cfg = pending_cfg;
- pending_cfg = old;
- if (old) resp_free(old);
-}
-
int config_reload(void) {
config_pending_init();
- int r = config_load(pending_cfg, config_get_path());
+ int r = config_load(NULL, config_get_path());
if (r < 0) return -1;
- config_swap();
+ resp_object *old = domain_cfg;
+ domain_cfg = pending_cfg;
+ pending_cfg = NULL;
+ if (old) resp_free(old);
return 0;
}
@@ -59,4 +80,4 @@ void config_set_path(const char *path) {
const char *config_get_path(void) {
return stored_config_path;
-}
-\ No newline at end of file
+}
diff --git a/src/infrastructure/config.h b/src/infrastructure/config.h
@@ -3,7 +3,6 @@
#include "common/resp.h"
-extern resp_object *global_cfg;
extern resp_object *pending_cfg;
void config_init(void);
diff --git a/src/interface/api/server.c b/src/interface/api/server.c
@@ -24,6 +24,7 @@
#include "common/scheduler.h"
#include "common/socket_util.h"
#include "infrastructure/config.h"
+#include "domain/config.h"
#include "interface/api/server.h"
#include "common/resp.h"
@@ -169,7 +170,7 @@ static bool user_has_permit(api_client_t *c, const char *cmd) {
char section[128];
const char *uname = (c->username && c->username[0]) ? c->username : "*";
snprintf(section, sizeof(section), "user:%s", uname);
- resp_object *sec = resp_map_get(global_cfg, section);
+ resp_object *sec = resp_map_get(domain_cfg, section);
if (sec && sec->type == RESPT_ARRAY) {
for (size_t i = 0; i < sec->u.arr.n; i += 2) {
if (i + 1 < sec->u.arr.n) {
@@ -190,7 +191,7 @@ static bool user_has_permit(api_client_t *c, const char *cmd) {
}
}
if (strcmp(uname, "*") != 0) {
- resp_object *anon = resp_map_get(global_cfg, "user:*");
+ resp_object *anon = resp_map_get(domain_cfg, "user:*");
if (anon && anon->type == RESPT_ARRAY) {
for (size_t i = 0; i < anon->u.arr.n; i += 2) {
if (i + 1 < anon->u.arr.n) {
@@ -261,7 +262,7 @@ static char cmdAUTH(api_client_t *c, char **args, int nargs) {
const char *pass = args[2];
char section[128];
snprintf(section, sizeof(section), "user:%s", uname);
- resp_object *sec = resp_map_get(global_cfg, section);
+ resp_object *sec = resp_map_get(domain_cfg, section);
const char *secret = sec ? resp_map_get_string(sec, "secret") : NULL;
if (secret && pass && strcmp(secret, pass) == 0) {
free(c->username);
@@ -425,7 +426,7 @@ static void dispatch_command(api_client_t *c, char **args, int nargs) {
static int *create_listen_socket(const char *listen_addr) {
const char *default_port = "6379";
- resp_object *api_sec = resp_map_get(global_cfg, "udphole");
+ resp_object *api_sec = resp_map_get(domain_cfg, "udphole");
if (api_sec) {
const char *cfg_port = resp_map_get_string(api_sec, "port");
if (cfg_port && cfg_port[0]) default_port = cfg_port;
@@ -483,7 +484,7 @@ int api_server_pt(int64_t timestamp, struct pt_task *task) {
}
if (udata->server_fds == NULL) {
- resp_object *api_sec = resp_map_get(global_cfg, "udphole");
+ resp_object *api_sec = resp_map_get(domain_cfg, "udphole");
const char *listen_str = api_sec ? resp_map_get_string(api_sec, "listen") : NULL;
if (!listen_str || !listen_str[0]) {
diff --git a/src/interface/cli/command/cluster.c b/src/interface/cli/command/cluster.c
@@ -0,0 +1,98 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <fcntl.h>
+#include <sys/stat.h>
+
+#include "cofyc/argparse.h"
+#include "rxi/log.h"
+
+#include "infrastructure/config.h"
+#include "common/resp.h"
+#include "../common.h"
+#include "common/scheduler.h"
+#include "domain/config.h"
+#include "domain/cluster/cluster.h"
+#include "interface/api/server.h"
+
+static void register_cluster_commands(void) {
+ api_register_domain_cmd("session.create", cluster_session_create);
+ api_register_domain_cmd("session.list", cluster_session_list);
+ api_register_domain_cmd("session.info", cluster_session_info);
+ api_register_domain_cmd("session.destroy", cluster_session_destroy);
+ api_register_domain_cmd("session.socket.create.listen", cluster_socket_create_listen);
+ api_register_domain_cmd("session.socket.create.connect", cluster_socket_create_connect);
+ api_register_domain_cmd("session.socket.destroy", cluster_socket_destroy);
+ api_register_domain_cmd("session.forward.list", cluster_forward_list);
+ api_register_domain_cmd("session.forward.create", cluster_forward_create);
+ api_register_domain_cmd("session.forward.destroy", cluster_forward_destroy);
+ api_register_domain_cmd("session.count", cluster_session_count);
+ api_register_domain_cmd("system.load", cluster_system_load);
+ log_info("cluster: registered session.* commands");
+}
+
+static int do_daemonize(void) {
+ pid_t pid = fork();
+ if (pid < 0) {
+ log_fatal("fork: %m");
+ return -1;
+ }
+ if (pid > 0)
+ _exit(0);
+ if (setsid() < 0) {
+ log_fatal("setsid: %m");
+ _exit(1);
+ }
+ pid = fork();
+ if (pid < 0) {
+ log_fatal("fork: %m");
+ _exit(1);
+ }
+ if (pid > 0)
+ _exit(0);
+ if (chdir("/") != 0) {}
+ int fd;
+ for (fd = 0; fd < 3; fd++)
+ (void)close(fd);
+ fd = open("/dev/null", O_RDWR);
+ if (fd >= 0) {
+ dup2(fd, STDIN_FILENO);
+ dup2(fd, STDOUT_FILENO);
+ dup2(fd, STDERR_FILENO);
+ if (fd > 2)
+ close(fd);
+ }
+ return 0;
+}
+
+int cli_cmd_cluster(int argc, const char **argv) {
+ int daemonize_flag = 0;
+ int no_daemonize_flag = 0;
+
+ struct argparse argparse;
+ struct argparse_option options[] = {
+ OPT_HELP(),
+ OPT_BOOLEAN('d', "daemonize", &daemonize_flag, "run in background", NULL, 0, 0),
+ OPT_BOOLEAN('D', "no-daemonize", &no_daemonize_flag, "force foreground", NULL, 0, 0),
+ OPT_END(),
+ };
+ argparse_init(&argparse, options, (const char *const[]){"udphole cluster", NULL}, ARGPARSE_STOP_AT_NON_OPTION);
+ argparse_parse(&argparse, argc, argv);
+
+ if (!no_daemonize_flag && daemonize_flag) {
+ do_daemonize();
+ }
+
+ cluster_init();
+
+ register_cluster_commands();
+
+ log_info("udphole: starting cluster");
+
+ sched_create(api_server_pt, NULL);
+
+ log_info("udphole: cluster started");
+
+ return sched_main();
+}
diff --git a/src/interface/cli/command/cluster.h b/src/interface/cli/command/cluster.h
@@ -0,0 +1,6 @@
+#ifndef UDPHOLE_CLI_CLUSTER_H
+#define UDPHOLE_CLI_CLUSTER_H
+
+int cli_cmd_cluster(int argc, const char **argv);
+
+#endif
diff --git a/src/interface/cli/command/daemon.c b/src/interface/cli/command/daemon.c
@@ -87,22 +87,6 @@ int cli_cmd_daemon(int argc, const char **argv) {
domain_config_init();
- if (global_cfg) {
- resp_object *cfg_sec = resp_map_get(global_cfg, "udphole");
- if (cfg_sec) {
- const char *ports_str = resp_map_get_string(cfg_sec, "ports");
- if (ports_str) {
- int port_low = 7000, port_high = 7999;
- sscanf(ports_str, "%d-%d", &port_low, &port_high);
- domain_config_set_ports(port_low, port_high);
- }
- const char *advertise = resp_map_get_string(cfg_sec, "advertise");
- if (advertise) {
- domain_config_set_advertise(advertise);
- }
- }
- }
-
register_domain_commands();
log_info("udphole: starting daemon");
diff --git a/src/interface/cli/main.c b/src/interface/cli/main.c
@@ -14,7 +14,9 @@ extern "C" {
#include "interface/cli/common.h"
#include "interface/cli/command/list_commands.h"
#include "interface/cli/command/daemon.h"
+#include "interface/cli/command/cluster.h"
#include "infrastructure/config.h"
+#include "domain/cluster/cluster.h"
#ifdef __cplusplus
}
@@ -43,6 +45,7 @@ static void logfile_callback(log_Event *ev) {
log_file = fopen(log_path, "a");
}
config_reload();
+ cluster_reload();
}
if (log_file) {
char buf[64];
@@ -180,6 +183,12 @@ int main(int argc, const char **argv) {
cli_cmd_daemon
);
+ cli_register_command(
+ "cluster",
+ "Run the udphole cluster",
+ cli_cmd_cluster
+ );
+
struct argparse argparse;
struct argparse_option options[] = {
OPT_HELP(),
diff --git a/test/cluster.js b/test/cluster.js
@@ -0,0 +1,236 @@
+const path = require('path');
+const {
+ spawnDaemon,
+ killDaemon,
+ killAllDaemons,
+ connectApi,
+ apiCommand,
+ findFreePort,
+ sleep,
+ createUdpEchoServer,
+ sendUdp,
+ TIMEOUT
+} = require('./helpers');
+
+const CLUSTER_CONFIG_PATH = path.join(__dirname, 'config-cluster.ini');
+const NODE1_CONFIG_PATH = path.join(__dirname, 'config-node1.ini');
+const NODE2_CONFIG_PATH = path.join(__dirname, 'config-node2.ini');
+const CLUSTER_API_PORT = 19121;
+const NODE1_API_PORT = 19122;
+const NODE2_API_PORT = 19123;
+
+async function runTest() {
+ let cluster = null;
+ let node1 = null;
+ let node2 = null;
+ let clusterApi = null;
+ let node1Api = null;
+ let node2Api = null;
+ let returnCode = 0;
+ let resp;
+
+ console.log('=== Cluster Test ===\n');
+
+ await killAllDaemons();
+
+ try {
+ console.log('1. Starting backing daemon node1...');
+ node1 = await spawnDaemon(NODE1_CONFIG_PATH, 'daemon');
+ console.log(` Node1 started (PID: ${node1.pid})`);
+
+ console.log('2. Starting backing daemon node2...');
+ node2 = await spawnDaemon(NODE2_CONFIG_PATH, 'daemon');
+ console.log(` Node2 started (PID: ${node2.pid})`);
+
+ console.log('3. Starting cluster...');
+ cluster = await spawnDaemon(CLUSTER_CONFIG_PATH, 'cluster');
+ console.log(` Cluster started (PID: ${cluster.pid})`);
+
+ await sleep(6000); // Wait for healthcheck to run
+
+ console.log('4. Connecting to cluster API...');
+ clusterApi = await connectApi(CLUSTER_API_PORT);
+ console.log(' Connected to cluster');
+
+ console.log('5. Connecting to node1 API...');
+ node1Api = await connectApi(NODE1_API_PORT);
+ console.log(' Connected to node1');
+ resp = await apiCommand(node1Api, 'auth', 'finwo', 'testsecret');
+ console.log(` Auth response: ${resp}`);
+
+ console.log('6. Connecting to node2 API...');
+ node2Api = await connectApi(NODE2_API_PORT);
+ console.log(' Connected to node2');
+ resp = await apiCommand(node2Api, 'auth', 'finwo', 'testsecret');
+ console.log(` Auth response: ${resp}`);
+
+ console.log('7. Authenticating with cluster...');
+ resp = await apiCommand(clusterApi, 'auth', 'test', 'testsecret');
+ console.log(` Auth response: ${resp}`);
+ if (resp !== 'OK') throw new Error('Cluster authentication failed');
+
+ console.log('8. Testing session.count on cluster (no sessions)...');
+ resp = await apiCommand(clusterApi, 'session.count');
+ console.log(` cluster session.count: ${resp}`);
+ if (typeof resp !== 'number' || resp !== 0) {
+ throw new Error(`Expected 0 sessions, got ${resp}`);
+ }
+
+ console.log('9. Creating session on cluster...');
+ resp = await apiCommand(clusterApi, 'session.create', 'test-session-1', '60');
+ console.log(` session.create: ${resp}`);
+ if (resp !== 'OK') throw new Error('Failed to create session via cluster');
+
+ console.log('10. Verifying session created on one of the nodes...');
+ let node1Count = await apiCommand(node1Api, 'session.count');
+ let node2Count = await apiCommand(node2Api, 'session.count');
+ console.log(` Node1 session.count: ${node1Count}, Node2 session.count: ${node2Count}`);
+ let totalCount = node1Count + node2Count;
+ if (totalCount !== 1) {
+ throw new Error(`Expected 1 total session across nodes, got ${totalCount}`);
+ }
+
+ console.log('11. Testing session.list aggregation...');
+ resp = await apiCommand(clusterApi, 'session.list');
+ console.log(` cluster session.list: ${JSON.stringify(resp)}`);
+ if (!Array.isArray(resp) || resp.length !== 1) {
+ throw new Error(`Expected 1 session in list, got ${resp.length}`);
+ }
+
+ console.log('12. Testing session.count aggregation...');
+ resp = await apiCommand(clusterApi, 'session.count');
+ console.log(` cluster session.count: ${resp}`);
+ if (resp !== totalCount) {
+ throw new Error(`Expected ${totalCount} sessions, got ${resp}`);
+ }
+
+ console.log('13. Creating another session (should go to different node)...');
+ resp = await apiCommand(clusterApi, 'session.create', 'test-session-2', '60');
+ console.log(` session.create: ${resp}`);
+ if (resp !== 'OK') throw new Error('Failed to create second session');
+
+ console.log('14. Verifying sessions distributed...');
+ node1Count = await apiCommand(node1Api, 'session.count');
+ node2Count = await apiCommand(node2Api, 'session.count');
+ console.log(` Node1: ${node1Count}, Node2: ${node2Count}`);
+ if (node1Count + node2Count !== 2) {
+ throw new Error('Expected 2 total sessions');
+ }
+
+ console.log('15. Testing session.info routing...');
+ resp = await apiCommand(clusterApi, 'session.info', 'test-session-1');
+ console.log(` session.info: ${JSON.stringify(resp).substring(0, 80)}...`);
+ if (!Array.isArray(resp)) {
+ throw new Error('Expected array response for session.info');
+ }
+
+ console.log('16. Testing session.destroy...');
+ resp = await apiCommand(clusterApi, 'session.destroy', 'test-session-1');
+ console.log(` session.destroy: ${resp}`);
+ if (resp !== 'OK') throw new Error('Failed to destroy session');
+
+ console.log('17. Verifying session destroyed...');
+ resp = await apiCommand(clusterApi, 'session.count');
+ console.log(` cluster session.count after destroy: ${resp}`);
+ if (resp !== 1) {
+ throw new Error(`Expected 1 session after destroy, got ${resp}`);
+ }
+
+ console.log('18. Testing socket creation on existing session...');
+ resp = await apiCommand(clusterApi, 'session.socket.create.listen', 'test-session-2', 'socket1');
+ console.log(` socket.create.listen: ${JSON.stringify(resp)}`);
+ if (!Array.isArray(resp) || resp.length < 1) {
+ throw new Error('Expected port in response');
+ }
+
+ console.log('18b. Testing advertise address and UDP forwarding...');
+ const listenPort = resp[0];
+ const advertiseAddr = resp[1];
+ console.log(` Listen port: ${listenPort}, advertise addr: ${advertiseAddr}`);
+ if (advertiseAddr !== '127.0.0.2' && advertiseAddr !== '127.0.0.3') {
+ throw new Error(`Expected advertise address 127.0.0.2 or 127.0.0.3, got ${advertiseAddr}`);
+ }
+
+ console.log(' Starting echo server...');
+ const echoServer = await createUdpEchoServer();
+ console.log(` Echo server on port: ${echoServer.port}`);
+
+ console.log(' Creating connect socket to echo server...');
+ resp = await apiCommand(clusterApi, 'session.socket.create.connect', 'test-session-2', 'relay', '127.0.0.1', echoServer.port);
+ console.log(` socket.create.connect: ${JSON.stringify(resp)}`);
+
+ console.log(' Creating forward: socket1 -> relay...');
+ resp = await apiCommand(clusterApi, 'session.forward.create', 'test-session-2', 'socket1', 'relay');
+ console.log(` forward.create: ${resp}`);
+
+ console.log(' Sending UDP packet to listen socket...');
+ await sendUdp(listenPort, advertiseAddr, 'hello');
+ console.log(' Sent "hello"');
+
+ console.log(' Waiting for echo response...');
+ const messages = echoServer.getMessages();
+ const start = Date.now();
+ while (messages.length === 0 && Date.now() - start < TIMEOUT) {
+ await new Promise(r => setTimeout(r, 50));
+ }
+ if (messages.length === 0) {
+ throw new Error('Timeout: no message received by echo server');
+ }
+ const msg = messages[0];
+ console.log(` Received: "${msg.data}" from ${msg.rinfo.address}:${msg.rinfo.port}`);
+ if (msg.data !== 'hello') {
+ throw new Error(`Expected "hello", got "${msg.data}"`);
+ }
+ echoServer.socket.close();
+
+ console.log('19. Testing node failure handling...');
+ console.log(' Killing node1...');
+ await killDaemon(node1);
+ node1Api.end();
+ node1Api = null;
+ await sleep(6000); // Wait for healthcheck to detect failure
+
+ console.log(' Creating session while node1 is down...');
+ resp = await apiCommand(clusterApi, 'session.create', 'test-session-3', '60');
+ console.log(` session.create: ${resp}`);
+ if (resp !== 'OK') throw new Error('Failed to create session while node1 down');
+
+ console.log(' Verifying session via cluster session.list...');
+ resp = await apiCommand(clusterApi, 'session.list');
+ console.log(' cluster session.list:', resp);
+ if (!Array.isArray(resp) || resp.length < 1) {
+ throw new Error('Expected at least 1 session in cluster');
+ }
+
+ console.log(' Restarting node1...');
+ node1 = await spawnDaemon(NODE1_CONFIG_PATH, 'daemon');
+ console.log(` Node1 restarted (PID: ${node1.pid})`);
+ await sleep(1000);
+ node1Api = await connectApi(NODE1_API_PORT);
+ await apiCommand(node1Api, 'auth', 'finwo', 'testsecret');
+ await sleep(6000); // Wait for healthcheck to detect recovery
+
+ console.log(' Creating another session after node1 recovery...');
+ resp = await apiCommand(clusterApi, 'session.create', 'test-session-4', '60');
+ console.log(` session.create: ${resp}`);
+ if (resp !== 'OK') throw new Error('Failed to create session after node1 recovery');
+
+ console.log('\nā PASS: All cluster tests passed');
+
+ } catch (err) {
+ console.error(`\nā FAIL: ${err.message}`);
+ console.error(err.stack);
+ returnCode = 1;
+ } finally {
+ if (clusterApi) clusterApi.end();
+ if (node1Api) node1Api.end();
+ if (node2Api) node2Api.end();
+ if (cluster) await killDaemon(cluster);
+ if (node1) await killDaemon(node1);
+ if (node2) await killDaemon(node2);
+ await killAllDaemons();
+ process.exit(returnCode);
+ }
+}
+
+runTest();
diff --git a/test/config-cluster.ini b/test/config-cluster.ini
@@ -0,0 +1,18 @@
+[udphole]
+listen = :19121
+cluster = node1
+cluster = node2
+
+[cluster:node1]
+address = tcp://127.0.0.1:19122
+username = nodeuser
+password = nodepass
+
+[cluster:node2]
+address = tcp://127.0.0.1:19123
+username = nodeuser
+password = nodepass
+
+[user:test]
+permit = *
+secret = testsecret
diff --git a/test/config-node1.ini b/test/config-node1.ini
@@ -0,0 +1,13 @@
+[udphole]
+mode = builtin
+ports = 9000-9099
+listen = :19122
+advertise = 127.0.0.2
+
+[user:finwo]
+permit = *
+secret = testsecret
+
+[user:nodeuser]
+permit = *
+secret = nodepass
diff --git a/test/config-node2.ini b/test/config-node2.ini
@@ -0,0 +1,13 @@
+[udphole]
+mode = builtin
+ports = 9100-9199
+listen = :19123
+advertise = 127.0.0.3
+
+[user:finwo]
+permit = *
+secret = testsecret
+
+[user:nodeuser]
+permit = *
+secret = nodepass
diff --git a/test/config-tcp.ini b/test/config-tcp.ini
@@ -6,3 +6,7 @@ listen = :9123
[user:finwo]
permit = *
secret = testsecret
+
+[user:nodeuser]
+permit = *
+secret = nodepass
diff --git a/test/helpers.js b/test/helpers.js
@@ -3,7 +3,7 @@ const net = require('net');
const dgram = require('dgram');
const path = require('path');
-const DAEMON_PATH = path.join(__dirname, '..', 'udphole');
+const DAEMON_PATH = path.resolve(__dirname, '..', 'udphole');
const TIMEOUT = 2000;
function sleep(ms) {
@@ -22,10 +22,22 @@ function findFreePort() {
});
}
-function spawnDaemon(configPath) {
+function killAllDaemons() {
+ return new Promise((resolve) => {
+ const { execSync } = require('child_process');
+ try { execSync('pkill -9 udphole 2>/dev/null', { stdio: 'ignore' }); } catch(e) {}
+ sleep(1000).then(resolve);
+ });
+}
+
+function spawnDaemon(configPath, command) {
return new Promise(async (resolve, reject) => {
await sleep(500);
- const daemon = spawn(DAEMON_PATH, ['-f', configPath, 'daemon', '-D'], {
+ const args = ['-f', configPath];
+ args.push(command || 'daemon');
+ args.push('-D');
+
+ const daemon = spawn(DAEMON_PATH, args, {
stdio: ['ignore', 'pipe', 'pipe', 'pipe']
});
@@ -34,10 +46,11 @@ function spawnDaemon(configPath) {
reject(new Error(`Daemon start timeout. Output: ${output}`));
}, TIMEOUT);
+ const started = command === 'cluster' ? 'cluster' : 'daemon started';
daemon.stderr.on('data', (data) => {
process.stderr.write(data.toString());
output += data.toString();
- if (output.includes('daemon started')) {
+ if (output.includes(started)) {
clearTimeout(startTimeout);
sleep(200).then(() => resolve(daemon));
}
@@ -50,14 +63,6 @@ function spawnDaemon(configPath) {
});
}
-function killAllDaemons() {
- return new Promise((resolve) => {
- const { execSync } = require('child_process');
- try { execSync('pkill -9 udphole 2>/dev/null', { stdio: 'ignore' }); } catch(e) {}
- sleep(1000).then(resolve);
- });
-}
-
function killDaemon(daemon) {
return new Promise((resolve) => {
if (!daemon || daemon.killed) {