udphole

Basic UDP wormhole proxy
git clone git://git.finwo.net/app/udphole
Log | Files | Refs | README | LICENSE

commit f599de3088433d4ce5a8837f782cafc80753941b
parent 21f64febb8db055bbf3da4e2c0019510d6e9dfe8
Author: Robin Bron <robin.bron@yourhosting.nl>
Date:   Sun,  1 Mar 2026 05:14:01 +0100

Working & tested

Diffstat:
M.gitignore | 1+
MMakefile | 9+++++----
AREADME.md | 191+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mconfig.ini.example | 9++++++++-
Dsrc/ApiModule/server.c | 644-------------------------------------------------------------------------------
Asrc/AppModule/api/server.c | 646+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Rsrc/ApiModule/server.h -> src/AppModule/api/server.h | 0
Msrc/AppModule/command/daemon.c | 24+++++++++++-------------
Asrc/AppModule/rtp/server.c | 831+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/AppModule/rtp/server.h | 11+++++++++++
Dsrc/AppModule/udphole.c | 876-------------------------------------------------------------------------------
Dsrc/AppModule/udphole.h | 15---------------
Msrc/RespModule/resp.c | 22+++++++++++++++++++++-
Msrc/RespModule/resp.h | 3+++
Msrc/SchedulerModule/scheduler.c | 28++++++++++++++++++----------
Atest/basic-forwarding.js | 99+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Atest/config.ini | 8++++++++
Atest/helpers.js | 226+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Atest/listen-relearn.js | 148+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
19 files changed, 2227 insertions(+), 1564 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -1,4 +1,5 @@ /lib/ *.o +*.log /udphole /config.ini diff --git a/Makefile b/Makefile @@ -78,7 +78,7 @@ override CPPFLAGS+=$(INCLUDES) override CPPFLAGS+=$(CFLAGS) .PHONY: default -default: $(BIN) $(BIN).1 +default: $(BIN) # Stddoc: extract /// documentation from source to markdown for manpage STDDOC ?= stddoc @@ -128,9 +128,10 @@ test_digest_auth: src/test/test_digest_auth.o src/common/digest_auth.o src/commo $(CC) $^ $(CFLAGS) -o $@ .PHONY: test -test: $(TEST_BINS) - @for t in $(TEST_BINS); do echo "--- $$t ---"; ./$$t || exit 1; done - @echo "All tests passed." +test: + @node test/basic-forwarding.js + @sleep 2 + @node test/listen-relearn.js .PHONY: clean clean: diff --git a/README.md b/README.md @@ -0,0 +1,191 @@ +# UDPHOLE + +A standalone UDP wormhole proxy: forward UDP packets between sockets using a simple API + +--- + +## Building + +Requirements: [dep](https://github.com/finwo/dep), C compiler. + +```bash +make +``` + +The binary is `udphole`. + +--- + +## Global options + +These options apply to all commands and must appear **before** the command name. + +| Option | Short | Description | +|--------|-------|-------------| +| `--config <path>` | `-f` | Config file path. If omitted, the following are tried in order: `$HOME/.config/udphole.conf`, `$HOME/.udphole.conf`, `/etc/udphole/udphole.conf`, `/etc/udphole.conf`. | +| `--verbosity <level>` | `-v` | Log verbosity: fatal, error, warn, info, debug, trace (default: info). | +| `--log <path>` | | Also write log to file (SIGHUP reopens for logrotate). | + +--- + +## Running the daemon + +The main entry point is the **daemon** command. + +```bash +# Foreground (default, config auto-detected) +./udphole daemon + +# Explicit config file +./udphole -f /etc/udphole.conf daemon + +# Background (daemonize) +./udphole -f /etc/udphole.conf daemon -d + +# Force foreground even if config has daemonize=1 +./udphole -f /etc/udphole.conf daemon -D +``` + +| Option | Short | Description | +|--------|--------|--------------| +| `--daemonize` | `-d` | Run in background (double fork, detach from terminal). | +| `--no-daemonize` | `-D` | Force foreground; overrides `daemonize=1` in config. | + +Daemonize behaviour: + +- By default the daemon runs in the **foreground**. +- It goes to the **background** only if `daemonize=1` is set in `[udphole]` **or** you pass `-d`/`--daemonize`. +- `-D`/`--no-daemonize` always forces foreground. + +After starting, the daemon loads config, starts the UDP proxy manager, binds the API socket, and handles session/socket/forward management via RESP2 protocol. Logging goes to stderr (and optionally to a file if you use global `--log`). + +--- + +## How it works + +UDP hole is a simple UDP packet forwarder. It creates "sessions" that contain "sockets" and "forwards": + +1. **Session**: A container for sockets and forwards. Has an idle timeout. +2. **Socket**: A UDP socket that can either: + - **Listen**: Binds to a port and learns the remote address from the first packet received (NAT traversal) + - **Connect**: Binds to a port and connects to a fixed remote address +3. **Forward**: Routes packets from a source socket to a destination socket + +### Use cases + +- **NAT traversal**: A socket in listen mode learns the remote address from the first incoming packet, enabling symmetric NAT traversal +- **Fixed forwarding**: Connect sockets to fixed IP:port for simple relay +- **Session management**: Idle sessions expire automatically, useful for temporary forwards + +--- + +## Example: simple UDP echo + +```bash +# Create a session with 60 second idle timeout +session.create mysession 60 + +# Create a listen socket (will learn remote from first packet) +session.socket.create.listen mysession socket1 + +# Get the port assigned to socket1 (response: [port, advertise_addr]) +# The port will be in range 7000-7999 + +# From a remote client, send UDP packet to that port +# The socket now "knows" the remote address + +# Create another socket to forward to +session.socket.create.connect mysession socket2 8.8.8.8 53 + +# Create forward: packets from socket1 -> socket2 +session.forward.create mysession socket1 socket2 + +# Now packets received on socket1 are forwarded to 8.8.8.8:53 +# And responses are sent back to the original sender +``` + +--- + +## Example: symmetric NAT traversal + +```bash +# Create session +session.create nat-traversal 300 + +# Client A: create listen socket, gets port 7012 +session.socket.create.listen nat-traversal client-a + +# Client B: connect to client A's port +session.socket.create.connect nat-traversal client-b <client-a-ip> 7012 + +# Client B now receives packets from client A +session.forward.create nat-traversal client-a client-b + +# Bidirectional +session.forward.create nat-traversal client-b client-a +``` + +--- + +## Configuration + +```ini +[udphole] +mode = builtin +ports = 7000-7999 +listen = :12345 + +[api] +listen = :12345 +``` + +### `[udphole]` + +| Option | Description | +|--------|-------------| +| `mode` | Currently only `builtin` is supported. | +| `ports` | Port range for UDP sockets, as `low-high` (e.g. `7000-7999`). Default 7000–7999. | +| `listen` | API server listen address. If not set, API server is disabled. | +| `advertise` | Optional. IP address to advertise in API responses instead of the port number. Useful when behind NAT. | + +--- + +## API commands + +The API uses the RESP2 (Redis) protocol. Connect with `redis-cli` or any Redis client library. + +| Command | Response | +|---------|----------| +| `auth username password` | `+OK` or `-ERR invalid credentials` | +| `ping` | `+PONG` | +| `quit` | `+OK`, closes connection | +| `command` | List of commands accessible to the current user | + +### Session commands + +| Command | Response | +|---------|----------| +| `session.create <id> [idle_expiry]` | `+OK` - creates session, optional idle expiry in seconds (default 60) | +| `session.list` | Array of session IDs | +| `session.info <id>` | Map with: session_id, created, last_activity, idle_expiry, sockets, forwards, fd_count, marked_for_deletion | +| `session.destroy <id>` | `+OK` - destroys session and all its sockets/forwards | + +### Socket commands + +| Command | Response | +|---------|----------| +| `session.socket.create.listen <session_id> <socket_id>` | Array: `[port, advertise_addr]` | +| `session.socket.create.connect <session_id> <socket_id> <ip> <port>` | Array: `[port, advertise_addr]` | +| `session.socket.destroy <session_id> <socket_id>` | `+OK` | + +### Forward commands + +| Command | Response | +|---------|----------| +| `session.forward.list <session_id>` | Array of `[src_socket_id, dst_socket_id]` pairs | +| `session.forward.create <session_id> <src_socket_id> <dst_socket_id>` | `+OK` | +| `session.forward.destroy <session_id> <src_socket_id> <dst_socket_id>` | `+OK` | + +--- + +*UDP hole is extracted from the UPBX project as a standalone UDP proxy daemon.* diff --git a/config.ini.example b/config.ini.example @@ -1,4 +1,11 @@ [udphole] mode = builtin ports = 7000-7999 -listen = :12344 +listen = :12345 + +[user:admin] +secret = adminpass +permit = * + +[user:*] +permit = ping diff --git a/src/ApiModule/server.c b/src/ApiModule/server.c @@ -1,644 +0,0 @@ -/* - * Generic RESP2 API server: TCP listener, connection management, RESP2 - * parsing/writing, command hashmap, authentication with per-user permit - * checking, and built-in commands (auth, ping, quit, command). - * - * Runs as a protothread in the main select() loop. - */ - -#include <stdlib.h> -#include <string.h> -#include <stdio.h> -#include <ctype.h> -#include <unistd.h> -#include <errno.h> -#include <fcntl.h> -#include <sys/socket.h> -#include <sys/types.h> -#include <netinet/in.h> -#include <arpa/inet.h> -#include <netdb.h> - -#include "rxi/log.h" -#include "tidwall/hashmap.h" -#include "SchedulerModule/protothreads.h" -#include "SchedulerModule/scheduler.h" -#include "common/socket_util.h" -#include "config.h" -#include "server.h" - -struct pt_task; -PT_THREAD(api_client_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)); - -/* Constants */ - -#define API_MAX_CLIENTS 8 -#define READ_BUF_SIZE 4096 -#define WRITE_BUF_INIT 4096 -#define MAX_ARGS 32 - -/* Per-client connection state (stored in pt udata) */ - -struct api_client_state { - int fd; - int *fds; - char *username; - char rbuf[READ_BUF_SIZE]; - size_t rlen; - char *wbuf; - size_t wlen; - size_t wcap; -}; - -typedef struct api_client_state api_client_t; - -/* Server state (stored in pt udata) */ - -struct api_server_state { - int *fds; - char *current_listen; - time_t next_listen_check; -}; - -typedef struct api_server_state api_server_t; - -/* Command entry */ - -typedef struct { - const char *name; - char (*func)(api_client_t *c, char **args, int nargs); -} api_cmd_entry; - -/* Module state */ - -static struct hashmap *cmd_map = NULL; - -/* Write helpers */ - -bool api_write_raw(api_client_t *c, const void *data, size_t len) { - if (c->fd < 0) return false; - if (c->wlen + len > c->wcap) { - size_t need = c->wlen + len; - size_t ncap = c->wcap ? c->wcap : WRITE_BUF_INIT; - while (ncap < need) ncap *= 2; - char *nb = realloc(c->wbuf, ncap); - if (!nb) return false; - c->wbuf = nb; - c->wcap = ncap; - } - memcpy(c->wbuf + c->wlen, data, len); - c->wlen += len; - return true; -} - -bool api_write_cstr(api_client_t *c, const char *s) { - return api_write_raw(c, s, strlen(s)); -} - -bool api_write_ok(api_client_t *c) { - return api_write_cstr(c, "+OK\r\n"); -} - -bool api_write_err(api_client_t *c, const char *msg) { - if (!api_write_cstr(c, "-ERR ")) return false; - if (!api_write_cstr(c, msg)) return false; - return api_write_cstr(c, "\r\n"); -} - -bool api_write_nil(api_client_t *c) { - return api_write_cstr(c, "$-1\r\n"); -} - -bool api_write_int(api_client_t *c, int value) { - char buf[32]; - snprintf(buf, sizeof(buf), ":%d\r\n", value); - return api_write_cstr(c, buf); -} - -bool api_write_array(api_client_t *c, size_t nitems) { - char buf[32]; - snprintf(buf, sizeof(buf), "*%zu\r\n", nitems); - return api_write_cstr(c, buf); -} - -bool api_write_bulk_cstr(api_client_t *c, const char *s) { - if (!s) return api_write_nil(c); - size_t len = strlen(s); - char prefix[32]; - snprintf(prefix, sizeof(prefix), "$%zu\r\n", len); - if (!api_write_cstr(c, prefix)) return false; - if (!api_write_raw(c, s, len)) return false; - return api_write_cstr(c, "\r\n"); -} - -bool api_write_bulk_int(api_client_t *c, int val) { - char buf[32]; - snprintf(buf, sizeof(buf), "%d", val); - return api_write_bulk_cstr(c, buf); -} - -/* Client lifecycle */ - -static void client_close(api_client_t *c) { - if (c->fd >= 0) { - close(c->fd); - c->fd = -1; - } - free(c->wbuf); - c->wbuf = NULL; - c->wlen = c->wcap = 0; - c->rlen = 0; - free(c->username); - c->username = NULL; -} - -static void client_flush(api_client_t *c) { - if (c->fd < 0 || c->wlen == 0) return; - ssize_t n = send(c->fd, c->wbuf, c->wlen, 0); - if (n > 0) { - if ((size_t)n < c->wlen) - memmove(c->wbuf, c->wbuf + n, c->wlen - (size_t)n); - c->wlen -= (size_t)n; - } else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { - client_close(c); - } -} - -/* RESP2 inline (telnet) parser */ - -static int parse_inline(const char *line, size_t len, char **args, int max_args) { - int nargs = 0; - const char *p = line; - const char *end = line + len; - while (p < end && nargs < max_args) { - while (p < end && (*p == ' ' || *p == '\t')) p++; - if (p >= end) break; - const char *start; - const char *tok_end; - if (*p == '"' || *p == '\'') { - char quote = *p++; - start = p; - while (p < end && *p != quote) p++; - tok_end = p; - if (p < end) p++; - } else { - start = p; - while (p < end && *p != ' ' && *p != '\t') p++; - tok_end = p; - } - size_t tlen = (size_t)(tok_end - start); - char *arg = malloc(tlen + 1); - if (!arg) return -1; - memcpy(arg, start, tlen); - arg[tlen] = '\0'; - args[nargs++] = arg; - } - return nargs; -} - -/* RESP2 multibulk parser */ - -static int parse_resp_command(api_client_t *c, char **args, int max_args, int *nargs) { - *nargs = 0; - if (c->rlen == 0) return 0; - - if (c->rbuf[0] != '*') { - char *nl = memchr(c->rbuf, '\n', c->rlen); - if (!nl) return 0; - size_t line_len = (size_t)(nl - c->rbuf); - size_t trim = line_len; - if (trim > 0 && c->rbuf[trim - 1] == '\r') trim--; - int n = parse_inline(c->rbuf, trim, args, max_args); - if (n < 0) return -1; - *nargs = n; - size_t consumed = line_len + 1; - c->rlen -= consumed; - if (c->rlen > 0) memmove(c->rbuf, c->rbuf + consumed, c->rlen); - return n > 0 ? 1 : 0; - } - - size_t pos = 0; - char *nl = memchr(c->rbuf + pos, '\n', c->rlen - pos); - if (!nl) return 0; - int count = atoi(c->rbuf + 1); - if (count <= 0 || count > max_args) return -1; - pos = (size_t)(nl - c->rbuf) + 1; - - for (int i = 0; i < count; i++) { - if (pos >= c->rlen) return 0; - if (c->rbuf[pos] != '$') return -1; - nl = memchr(c->rbuf + pos, '\n', c->rlen - pos); - if (!nl) return 0; - int blen = atoi(c->rbuf + pos + 1); - if (blen < 0) return -1; - size_t hdr_end = (size_t)(nl - c->rbuf) + 1; - if (hdr_end + (size_t)blen + 2 > c->rlen) return 0; - char *arg = malloc((size_t)blen + 1); - if (!arg) return -1; - memcpy(arg, c->rbuf + hdr_end, (size_t)blen); - arg[blen] = '\0'; - args[i] = arg; - pos = hdr_end + (size_t)blen + 2; - } - - *nargs = count; - c->rlen -= pos; - if (c->rlen > 0) memmove(c->rbuf, c->rbuf + pos, c->rlen); - return 1; -} - -/* Permission checking */ - -/* Check if a command name matches a single permit pattern. - * Supports: "*" matches everything, "foo.*" matches "foo.anything", - * exact match otherwise. */ -static bool permit_matches(const char *pattern, const char *cmd) { - size_t plen = strlen(pattern); - if (plen == 1 && pattern[0] == '*') - return true; - if (plen >= 2 && pattern[plen - 1] == '*') { - return strncasecmp(pattern, cmd, plen - 1) == 0; - } - return strcasecmp(pattern, cmd) == 0; -} - -/* Check if client has permission for a command. Uses live config: api:<username> and api:* permit keys. */ -static bool user_has_permit(api_client_t *c, const char *cmd) { - char section[128]; - const char *uname = (c->username && c->username[0]) ? c->username : "*"; - snprintf(section, sizeof(section), "api:%s", uname); - resp_object *sec = resp_map_get(global_cfg, section); - if (sec && sec->type == RESPT_ARRAY) { - for (size_t i = 0; i < sec->u.arr.n; i += 2) { - if (i + 1 < sec->u.arr.n) { - resp_object *key = &sec->u.arr.elem[i]; - resp_object *val = &sec->u.arr.elem[i + 1]; - if (key->type == RESPT_BULK && key->u.s && strcmp(key->u.s, "permit") == 0) { - if (val->type == RESPT_ARRAY) { - for (size_t j = 0; j < val->u.arr.n; j++) { - resp_object *p = &val->u.arr.elem[j]; - if (p->type == RESPT_BULK && p->u.s && permit_matches(p->u.s, cmd)) - return true; - } - } - } - } - } - } - if (strcmp(uname, "*") != 0) { - resp_object *anon = resp_map_get(global_cfg, "api:*"); - if (anon && anon->type == RESPT_ARRAY) { - for (size_t i = 0; i < anon->u.arr.n; i += 2) { - resp_object *key = &anon->u.arr.elem[i]; - resp_object *val = &anon->u.arr.elem[i + 1]; - if (key->type == RESPT_BULK && key->u.s && strcmp(key->u.s, "permit") == 0) { - if (val->type == RESPT_ARRAY) { - for (size_t j = 0; j < val->u.arr.n; j++) { - resp_object *p = &val->u.arr.elem[j]; - if (p->type == RESPT_BULK && p->u.s && permit_matches(p->u.s, cmd)) - return true; - } - } - } - } - } - } - return false; -} - -/* Hashmap callbacks */ - -static uint64_t cmd_hash(const void *item, uint64_t seed0, uint64_t seed1) { - const api_cmd_entry *cmd = item; - return hashmap_sip(cmd->name, strlen(cmd->name), seed0, seed1); -} - -static int cmd_compare(const void *a, const void *b, void *udata) { - (void)udata; - const api_cmd_entry *ca = a; - const api_cmd_entry *cb = b; - return strcasecmp(ca->name, cb->name); -} - -/* Public: register a command */ - -void api_register_cmd(const char *name, char (*func)(api_client_t *, char **, int)) { - if (!cmd_map) - cmd_map = hashmap_new(sizeof(api_cmd_entry), 0, 0, 0, cmd_hash, cmd_compare, NULL, NULL); - hashmap_set(cmd_map, &(api_cmd_entry){ .name = name, .func = func }); - log_trace("api: registered command '%s'", name); -} - -static char cmdAUTH(api_client_t *c, char **args, int nargs) { - if (nargs != 3) { - api_write_err(c, "wrong number of arguments for 'auth' command (AUTH username password)"); - return 1; - } - const char *uname = args[1]; - const char *pass = args[2]; - char section[128]; - snprintf(section, sizeof(section), "api:%s", uname); - resp_object *sec = resp_map_get(global_cfg, section); - const char *secret = sec ? resp_map_get_string(sec, "secret") : NULL; - if (secret && pass && strcmp(secret, pass) == 0) { - free(c->username); - c->username = strdup(uname); - if (c->username) { - log_debug("api: client authenticated as '%s'", uname); - return api_write_ok(c) ? 1 : 0; - } - } - return api_write_err(c, "invalid credentials") ? 1 : 0; -} - -static char cmdPING(api_client_t *c, char **args, int nargs) { - (void)args; - if (nargs == 1) - return api_write_cstr(c, "+PONG\r\n") ? 1 : 0; - if (nargs == 2) - return api_write_bulk_cstr(c, args[1]) ? 1 : 0; - return api_write_err(c, "wrong number of arguments for 'ping' command") ? 1 : 0; -} - -static char cmdQUIT(api_client_t *c, char **args, int nargs) { - (void)args; (void)nargs; - api_write_ok(c); - return 0; -} - -static char cmdCOMMAND(api_client_t *c, char **args, int nargs) { - (void)args; (void)nargs; - if (!cmd_map) - return api_write_array(c, 0) ? 1 : 0; - - size_t count = 0; - size_t iter = 0; - void *item; - while (hashmap_iter(cmd_map, &iter, &item)) { - const api_cmd_entry *e = item; - if (user_has_permit(c, e->name)) - count++; - } - - if (!api_write_array(c, count)) return 0; - - iter = 0; - while (hashmap_iter(cmd_map, &iter, &item)) { - const api_cmd_entry *e = item; - if (user_has_permit(c, e->name)) { - if (!api_write_bulk_cstr(c, e->name)) return 0; - } - } - return 1; -} - -/* Command dispatch */ - -static void init_builtins(void) { - api_register_cmd("auth", cmdAUTH); - api_register_cmd("ping", cmdPING); - api_register_cmd("quit", cmdQUIT); - api_register_cmd("command", cmdCOMMAND); -} - -/* Check if a command is a built-in that bypasses auth/permit checks */ -static bool is_builtin(const char *name) { - return (strcasecmp(name, "auth") == 0 || - strcasecmp(name, "ping") == 0 || - strcasecmp(name, "quit") == 0 || - strcasecmp(name, "command") == 0); -} - -static void dispatch_command(api_client_t *c, char **args, int nargs) { - if (nargs <= 0) return; - - for (char *p = args[0]; *p; p++) *p = (char)tolower((unsigned char)*p); - - const api_cmd_entry *cmd = hashmap_get(cmd_map, &(api_cmd_entry){ .name = args[0] }); - if (!cmd) { - api_write_err(c, "unknown command"); - return; - } - - if (!is_builtin(args[0])) { - if (!user_has_permit(c, args[0])) { - api_write_err(c, "no permission"); - return; - } - } - - char result = cmd->func(c, args, nargs); - if (!result) { - client_flush(c); - client_close(c); - } -} - -/* TCP listener */ - -static int *create_listen_socket(const char *listen_addr) { - const char *default_port = "6379"; - resp_object *api_sec = resp_map_get(global_cfg, "udphole"); - if (api_sec) { - const char *cfg_port = resp_map_get_string(api_sec, "port"); - if (cfg_port && cfg_port[0]) default_port = cfg_port; - } - int *fds = tcp_listen(listen_addr, NULL, default_port); - if (!fds) { - return NULL; - } - log_info("api: listening on %s", listen_addr); - return fds; -} - -static void handle_accept(int ready_fd) { - struct sockaddr_storage addr; - socklen_t addrlen = sizeof(addr); - int fd = accept(ready_fd, (struct sockaddr *)&addr, &addrlen); - if (fd < 0) return; - set_socket_nonblocking(fd, 1); - - api_client_t *state = calloc(1, sizeof(*state)); - if (!state) { - const char *msg = "-ERR out of memory\r\n"; - send(fd, msg, strlen(msg), 0); - close(fd); - return; - } - state->fd = fd; - - schedmod_pt_create(api_client_pt, state); - log_trace("api: accepted connection, spawned client pt"); -} - -PT_THREAD(api_server_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { - api_server_t *s = task->udata; - time_t loop_timestamp = 0; - log_trace("api_server: protothread entry"); - PT_BEGIN(pt); - - if (!s) { - s = calloc(1, sizeof(*s)); - if (!s) { - PT_EXIT(pt); - } - task->udata = s; - } - - if (!s->current_listen) { - resp_object *api_sec = resp_map_get(global_cfg, "udphole"); - const char *listen_str = api_sec ? resp_map_get_string(api_sec, "listen") : NULL; - if (!listen_str || !listen_str[0]) { - log_info("api: no listen address configured, API server disabled"); - free(s); - PT_EXIT(pt); - } - - s->current_listen = strdup(listen_str); - if (!s->current_listen) { - free(s); - PT_EXIT(pt); - } - - init_builtins(); - s->fds = create_listen_socket(s->current_listen); - if (!s->fds) { - free(s->current_listen); - s->current_listen = NULL; - free(s); - PT_EXIT(pt); - } - } - - for (;;) { - loop_timestamp = (time_t)(timestamp / 1000); - - if (loop_timestamp >= s->next_listen_check) { - s->next_listen_check = loop_timestamp + 60; - resp_object *api_sec = resp_map_get(global_cfg, "udphole"); - const char *new_str = api_sec ? resp_map_get_string(api_sec, "listen") : ""; - int rebind = (s->current_listen && (!new_str[0] || strcmp(s->current_listen, new_str) != 0)) || - (!s->current_listen && new_str[0]); - if (rebind) { - if (s->fds) { - for (int i = 1; i <= s->fds[0]; i++) { - close(s->fds[i]); - } - } - free(s->current_listen); - s->current_listen = (new_str[0]) ? strdup(new_str) : NULL; - if (s->current_listen) { - int *new_fds = create_listen_socket(s->current_listen); - if (new_fds) { - s->fds = realloc(s->fds, sizeof(int) * (new_fds[0] + 1)); - s->fds[0] = new_fds[0]; - for (int i = 1; i <= new_fds[0]; i++) { - s->fds[i] = new_fds[i]; - } - free(new_fds); - } else { - free(s->current_listen); - s->current_listen = NULL; - } - } - } - } - - if (s->fds && s->fds[0] > 0) { - int *ready_fds = NULL; - PT_WAIT_UNTIL(pt, schedmod_has_data(s->fds, &ready_fds) > 0); - if (ready_fds && ready_fds[0] > 0) { - for (int i = 1; i <= ready_fds[0]; i++) { - handle_accept(ready_fds[i]); - } - } - free(ready_fds); - } else { - PT_YIELD(pt); - } - } - if (s->fds) { - for (int i = 1; i <= s->fds[0]; i++) { - close(s->fds[i]); - } - free(s->fds); - } - free(s->current_listen); - free(s); - if (cmd_map) { - hashmap_free(cmd_map); - cmd_map = NULL; - } - - PT_END(pt); -} - -PT_THREAD(api_client_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { - (void)timestamp; - api_client_t *state = task->udata; - - log_trace("api_client: protothread entry fd=%d", state->fd); - PT_BEGIN(pt); - - state->fds = malloc(sizeof(int) * 2); - if (!state->fds) { - free(state); - PT_EXIT(pt); - } - state->fds[0] = 1; - state->fds[1] = state->fd; - - for (;;) { - int *ready_fds = NULL; - PT_WAIT_UNTIL(pt, schedmod_has_data(state->fds, &ready_fds) > 0); - - int ready_fd = -1; - if (ready_fds && ready_fds[0] > 0) { - for (int i = 1; i <= ready_fds[0]; i++) { - if (ready_fds[i] == state->fd) { - ready_fd = state->fd; - break; - } - } - } - free(ready_fds); - - if (ready_fd != state->fd) continue; - - size_t space = sizeof(state->rbuf) - state->rlen; - if (space == 0) { - break; - } - ssize_t n = recv(state->fd, state->rbuf + state->rlen, space, 0); - if (n <= 0) { - if (n == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) - break; - } - state->rlen += (size_t)n; - - char *args[MAX_ARGS]; - int nargs; - int rc = 0; - while (state->fd >= 0 && (rc = parse_resp_command(state, args, MAX_ARGS, &nargs)) > 0) { - dispatch_command(state, args, nargs); - for (int j = 0; j < nargs; j++) free(args[j]); - } - if (rc < 0) { - api_write_err(state, "Protocol error"); - } - - client_flush(state); - - if (state->fd < 0) break; - } - - if (state->fd >= 0) { - close(state->fd); - } - free(state->fds); - free(state->wbuf); - free(state->username); - free(state); - - PT_END(pt); -} diff --git a/src/AppModule/api/server.c b/src/AppModule/api/server.c @@ -0,0 +1,646 @@ +/* + * Generic RESP2 API server: TCP listener, connection management, RESP2 + * parsing/writing, command hashmap, authentication with per-user permit + * checking, and built-in commands (auth, ping, quit, command). + * + * Runs as a protothread in the main select() loop. + */ + +#include <stdlib.h> +#include <string.h> +#include <stdio.h> +#include <ctype.h> +#include <unistd.h> +#include <errno.h> +#include <fcntl.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> + +#include "rxi/log.h" +#include "tidwall/hashmap.h" +#include "SchedulerModule/protothreads.h" +#include "SchedulerModule/scheduler.h" +#include "common/socket_util.h" +#include "config.h" +#include "server.h" +#include "RespModule/resp.h" + +struct pt_task; +PT_THREAD(api_client_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)); + +/* Constants */ + +#define API_MAX_CLIENTS 8 +#define READ_BUF_SIZE 4096 +#define WRITE_BUF_INIT 4096 +#define MAX_ARGS 32 + +/* Per-client connection state (stored in pt udata) */ + +struct api_client_state { + int fd; + int *fds; + int *ready_fds; + int ready_fd; + char *username; + char rbuf[READ_BUF_SIZE]; + size_t rlen; + char *wbuf; + size_t wlen; + size_t wcap; +}; + +typedef struct api_client_state api_client_t; + +/* Command entry */ + +typedef struct { + const char *name; + char (*func)(api_client_t *c, char **args, int nargs); +} api_cmd_entry; + +/* Module state */ + +static char *current_listen = NULL; +static struct hashmap *cmd_map = NULL; + +typedef struct { + int *server_fds; + int *ready_fds; +} api_server_udata_t; + +/* Write helpers */ + +bool api_write_raw(api_client_t *c, const void *data, size_t len) { + if (c->fd < 0) return false; + if (c->wlen + len > c->wcap) { + size_t need = c->wlen + len; + size_t ncap = c->wcap ? c->wcap : WRITE_BUF_INIT; + while (ncap < need) ncap *= 2; + char *nb = realloc(c->wbuf, ncap); + if (!nb) return false; + c->wbuf = nb; + c->wcap = ncap; + } + memcpy(c->wbuf + c->wlen, data, len); + c->wlen += len; + return true; +} + +bool api_write_cstr(api_client_t *c, const char *s) { + return api_write_raw(c, s, strlen(s)); +} + +bool api_write_ok(api_client_t *c) { + return api_write_cstr(c, "+OK\r\n"); +} + +bool api_write_err(api_client_t *c, const char *msg) { + if (!api_write_cstr(c, "-ERR ")) return false; + if (!api_write_cstr(c, msg)) return false; + return api_write_cstr(c, "\r\n"); +} + +bool api_write_nil(api_client_t *c) { + return api_write_cstr(c, "$-1\r\n"); +} + +bool api_write_int(api_client_t *c, int value) { + char buf[32]; + snprintf(buf, sizeof(buf), ":%d\r\n", value); + return api_write_cstr(c, buf); +} + +bool api_write_array(api_client_t *c, size_t nitems) { + char buf[32]; + snprintf(buf, sizeof(buf), "*%zu\r\n", nitems); + return api_write_cstr(c, buf); +} + +bool api_write_bulk_cstr(api_client_t *c, const char *s) { + if (!s) return api_write_nil(c); + size_t len = strlen(s); + char prefix[32]; + snprintf(prefix, sizeof(prefix), "$%zu\r\n", len); + if (!api_write_cstr(c, prefix)) return false; + if (!api_write_raw(c, s, len)) return false; + return api_write_cstr(c, "\r\n"); +} + +bool api_write_bulk_int(api_client_t *c, int val) { + char buf[32]; + snprintf(buf, sizeof(buf), "%d", val); + return api_write_bulk_cstr(c, buf); +} + +/* Client lifecycle */ + +static void client_close(api_client_t *c) { + if (c->fd >= 0) { + close(c->fd); + c->fd = -1; + } + free(c->wbuf); + c->wbuf = NULL; + c->wlen = c->wcap = 0; + c->rlen = 0; + free(c->username); + c->username = NULL; +} + +static void client_flush(api_client_t *c) { + if (c->fd < 0 || c->wlen == 0) return; + ssize_t n = send(c->fd, c->wbuf, c->wlen, 0); + if (n > 0) { + if ((size_t)n < c->wlen) + memmove(c->wbuf, c->wbuf + n, c->wlen - (size_t)n); + c->wlen -= (size_t)n; + } else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + client_close(c); + } +} + +/* RESP2 inline (telnet) parser */ + +static int parse_inline(const char *line, size_t len, char **args, int max_args) { + int nargs = 0; + const char *p = line; + const char *end = line + len; + while (p < end && nargs < max_args) { + while (p < end && (*p == ' ' || *p == '\t')) p++; + if (p >= end) break; + const char *start; + const char *tok_end; + if (*p == '"' || *p == '\'') { + char quote = *p++; + start = p; + while (p < end && *p != quote) p++; + tok_end = p; + if (p < end) p++; + } else { + start = p; + while (p < end && *p != ' ' && *p != '\t') p++; + tok_end = p; + } + size_t tlen = (size_t)(tok_end - start); + char *arg = malloc(tlen + 1); + if (!arg) return -1; + memcpy(arg, start, tlen); + arg[tlen] = '\0'; + args[nargs++] = arg; + } + return nargs; +} + +/* RESP2 multibulk parser */ + +static int parse_resp_command(api_client_t *c, char **args, int max_args, int *nargs) { + *nargs = 0; + if (c->rlen == 0) return 0; + + if (c->rbuf[0] != '*') { + char *nl = memchr(c->rbuf, '\n', c->rlen); + if (!nl) return 0; + size_t line_len = (size_t)(nl - c->rbuf); + size_t trim = line_len; + if (trim > 0 && c->rbuf[trim - 1] == '\r') trim--; + int n = parse_inline(c->rbuf, trim, args, max_args); + if (n < 0) return -1; + *nargs = n; + size_t consumed = line_len + 1; + c->rlen -= consumed; + if (c->rlen > 0) memmove(c->rbuf, c->rbuf + consumed, c->rlen); + return n > 0 ? 1 : 0; + } + + size_t pos = 0; + char *nl = memchr(c->rbuf + pos, '\n', c->rlen - pos); + if (!nl) return 0; + int count = atoi(c->rbuf + 1); + if (count <= 0 || count > max_args) return -1; + pos = (size_t)(nl - c->rbuf) + 1; + + for (int i = 0; i < count; i++) { + if (pos >= c->rlen) return 0; + if (c->rbuf[pos] != '$') return -1; + nl = memchr(c->rbuf + pos, '\n', c->rlen - pos); + if (!nl) return 0; + int blen = atoi(c->rbuf + pos + 1); + if (blen < 0) return -1; + size_t hdr_end = (size_t)(nl - c->rbuf) + 1; + if (hdr_end + (size_t)blen + 2 > c->rlen) return 0; + char *arg = malloc((size_t)blen + 1); + if (!arg) return -1; + memcpy(arg, c->rbuf + hdr_end, (size_t)blen); + arg[blen] = '\0'; + args[i] = arg; + pos = hdr_end + (size_t)blen + 2; + } + + *nargs = count; + c->rlen -= pos; + if (c->rlen > 0) memmove(c->rbuf, c->rbuf + pos, c->rlen); + return 1; +} + +/* Permission checking */ + +/* Check if a command name matches a single permit pattern. + * Supports: "*" matches everything, "foo.*" matches "foo.anything", + * exact match otherwise. */ +static bool permit_matches(const char *pattern, const char *cmd) { + size_t plen = strlen(pattern); + if (plen == 1 && pattern[0] == '*') + return true; + if (plen >= 2 && pattern[plen - 1] == '*') { + return strncasecmp(pattern, cmd, plen - 1) == 0; + } + return strcasecmp(pattern, cmd) == 0; +} + +/* Check if client has permission for a command. Uses live config: api:<username> and api:* permit keys. */ +static bool user_has_permit(api_client_t *c, const char *cmd) { + char section[128]; + const char *uname = (c->username && c->username[0]) ? c->username : "*"; + snprintf(section, sizeof(section), "user:%s", uname); + resp_object *sec = resp_map_get(global_cfg, section); + if (sec && sec->type == RESPT_ARRAY) { + for (size_t i = 0; i < sec->u.arr.n; i += 2) { + if (i + 1 < sec->u.arr.n) { + resp_object *key = &sec->u.arr.elem[i]; + resp_object *val = &sec->u.arr.elem[i + 1]; + if (key->type == RESPT_BULK && key->u.s && strcmp(key->u.s, "permit") == 0) { + if (val->type == RESPT_ARRAY) { + for (size_t j = 0; j < val->u.arr.n; j++) { + resp_object *p = &val->u.arr.elem[j]; + if (p->type == RESPT_BULK && p->u.s && permit_matches(p->u.s, cmd)) + return true; + } + } else if (val->type == RESPT_BULK && val->u.s && permit_matches(val->u.s, cmd)) { + return true; + } + } + } + } + } + if (strcmp(uname, "*") != 0) { + resp_object *anon = resp_map_get(global_cfg, "user:*"); + if (anon && anon->type == RESPT_ARRAY) { + for (size_t i = 0; i < anon->u.arr.n; i += 2) { + if (i + 1 < anon->u.arr.n) { + resp_object *key = &anon->u.arr.elem[i]; + resp_object *val = &anon->u.arr.elem[i + 1]; + if (key->type == RESPT_BULK && key->u.s && strcmp(key->u.s, "permit") == 0) { + if (val->type == RESPT_ARRAY) { + for (size_t j = 0; j < val->u.arr.n; j++) { + resp_object *p = &val->u.arr.elem[j]; + if (p->type == RESPT_BULK && p->u.s && permit_matches(p->u.s, cmd)) + return true; + } + } else if (val->type == RESPT_BULK && val->u.s && permit_matches(val->u.s, cmd)) { + return true; + } + } + } + } + } + } + return false; +} + +/* Hashmap callbacks */ + +static uint64_t cmd_hash(const void *item, uint64_t seed0, uint64_t seed1) { + const api_cmd_entry *cmd = item; + return hashmap_sip(cmd->name, strlen(cmd->name), seed0, seed1); +} + +static int cmd_compare(const void *a, const void *b, void *udata) { + (void)udata; + const api_cmd_entry *ca = a; + const api_cmd_entry *cb = b; + return strcasecmp(ca->name, cb->name); +} + +/* Public: register a command */ + +void api_register_cmd(const char *name, char (*func)(api_client_t *, char **, int)) { + if (!cmd_map) + cmd_map = hashmap_new(sizeof(api_cmd_entry), 0, 0, 0, cmd_hash, cmd_compare, NULL, NULL); + hashmap_set(cmd_map, &(api_cmd_entry){ .name = name, .func = func }); + log_trace("api: registered command '%s'", name); +} + +static char cmdAUTH(api_client_t *c, char **args, int nargs) { + if (nargs != 3) { + api_write_err(c, "wrong number of arguments for 'auth' command (AUTH username password)"); + return 1; + } + const char *uname = args[1]; + const char *pass = args[2]; + char section[128]; + snprintf(section, sizeof(section), "user:%s", uname); + resp_object *sec = resp_map_get(global_cfg, section); + const char *secret = sec ? resp_map_get_string(sec, "secret") : NULL; + if (secret && pass && strcmp(secret, pass) == 0) { + free(c->username); + c->username = strdup(uname); + if (c->username) { + log_debug("api: client authenticated as '%s'", uname); + return api_write_ok(c) ? 1 : 0; + } + } + return api_write_err(c, "invalid credentials") ? 1 : 0; +} + +static char cmdPING(api_client_t *c, char **args, int nargs) { + (void)args; + if (nargs == 1) + return api_write_cstr(c, "+PONG\r\n") ? 1 : 0; + if (nargs == 2) + return api_write_bulk_cstr(c, args[1]) ? 1 : 0; + return api_write_err(c, "wrong number of arguments for 'ping' command") ? 1 : 0; +} + +static char cmdQUIT(api_client_t *c, char **args, int nargs) { + (void)args; (void)nargs; + api_write_ok(c); + return 0; +} + +static bool is_builtin(const char *name); + +static char cmdCOMMAND(api_client_t *c, char **args, int nargs) { + (void)args; + if (!cmd_map) + return api_write_array(c, 0) ? 1 : 0; + + resp_object *result = resp_array_init(); + if (!result) return 0; + + size_t iter = 0; + void *item; + while (hashmap_iter(cmd_map, &iter, &item)) { + const api_cmd_entry *e = item; + if (!is_builtin(e->name) && !user_has_permit(c, e->name)) + continue; + + resp_array_append_bulk(result, e->name); + resp_object *meta = resp_array_init(); + if (!meta) { resp_free(result); return 0; } + resp_array_append_bulk(meta, "summary"); + resp_array_append_bulk(meta, "UDP hole proxy command"); + resp_array_append_obj(result, meta); + } + + char *out_buf = NULL; + size_t out_len = 0; + if (resp_serialize(result, &out_buf, &out_len) != 0 || !out_buf) { + resp_free(result); + return 0; + } + resp_free(result); + + api_write_raw(c, out_buf, out_len); + free(out_buf); + return 1; +} + +/* Command dispatch */ + +static void init_builtins(void) { + api_register_cmd("auth", cmdAUTH); + api_register_cmd("ping", cmdPING); + api_register_cmd("quit", cmdQUIT); + api_register_cmd("command", cmdCOMMAND); +} + +/* Check if a command is a built-in that bypasses auth/permit checks */ +static bool is_builtin(const char *name) { + return (strcasecmp(name, "auth") == 0 || + strcasecmp(name, "ping") == 0 || + strcasecmp(name, "quit") == 0 || + strcasecmp(name, "command") == 0); +} + +static void dispatch_command(api_client_t *c, char **args, int nargs) { + if (nargs <= 0) return; + + for (char *p = args[0]; *p; p++) *p = (char)tolower((unsigned char)*p); + + const api_cmd_entry *cmd = hashmap_get(cmd_map, &(api_cmd_entry){ .name = args[0] }); + if (!cmd) { + api_write_err(c, "unknown command"); + return; + } + + if (!is_builtin(args[0])) { + if (!user_has_permit(c, args[0])) { + api_write_err(c, "no permission"); + return; + } + } + + char result = cmd->func(c, args, nargs); + if (!result) { + client_flush(c); + client_close(c); + } +} + +/* TCP listener */ + +static int *create_listen_socket(const char *listen_addr) { + const char *default_port = "6379"; + resp_object *api_sec = resp_map_get(global_cfg, "udphole"); + if (api_sec) { + const char *cfg_port = resp_map_get_string(api_sec, "port"); + if (cfg_port && cfg_port[0]) default_port = cfg_port; + } + int *fds = tcp_listen(listen_addr, NULL, default_port); + if (!fds) { + return NULL; + } + log_info("api: listening on %s", listen_addr); + return fds; +} + +static void handle_accept(int ready_fd) { + struct sockaddr_storage addr; + socklen_t addrlen = sizeof(addr); + int fd = accept(ready_fd, (struct sockaddr *)&addr, &addrlen); + if (fd < 0) return; + set_socket_nonblocking(fd, 1); + + api_client_t *state = calloc(1, sizeof(*state)); + if (!state) { + const char *msg = "-ERR out of memory\r\n"; + send(fd, msg, strlen(msg), 0); + close(fd); + return; + } + state->fd = fd; + + schedmod_pt_create(api_client_pt, state); + log_trace("api: accepted connection, spawned client pt"); +} + +PT_THREAD(api_server_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { + api_server_udata_t *udata = task->udata; + log_trace("api_server: protothread entry"); + PT_BEGIN(pt); + + if (!udata) { + udata = calloc(1, sizeof(api_server_udata_t)); + if (!udata) { + PT_EXIT(pt); + } + task->udata = udata; + } + + resp_object *api_sec = resp_map_get(global_cfg, "udphole"); + const char *listen_str = api_sec ? resp_map_get_string(api_sec, "listen") : NULL; + if (!listen_str || !listen_str[0]) { + log_info("api: no listen address configured, API server disabled"); + PT_EXIT(pt); + } + + if (!current_listen) { + current_listen = strdup(listen_str); + if (!current_listen) { + PT_EXIT(pt); + } + init_builtins(); + udata->server_fds = create_listen_socket(current_listen); + if (!udata->server_fds) { + free(current_listen); + current_listen = NULL; + PT_EXIT(pt); + } + } + + for (;;) { + (void)timestamp; + if (udata->server_fds && udata->server_fds[0] > 0) { + PT_WAIT_UNTIL(pt, schedmod_has_data(udata->server_fds, &udata->ready_fds) > 0); + if (udata->ready_fds && udata->ready_fds[0] > 0) { + for (int i = 1; i <= udata->ready_fds[0]; i++) { + handle_accept(udata->ready_fds[i]); + } + } + if (udata->ready_fds) free(udata->ready_fds); + udata->ready_fds = NULL; + } else { + PT_YIELD(pt); + } + } + if (udata->server_fds) { + for (int i = 1; i <= udata->server_fds[0]; i++) { + close(udata->server_fds[i]); + } + free(udata->server_fds); + } + free(udata->ready_fds); + free(udata); + free(current_listen); + current_listen = NULL; + if (cmd_map) { + hashmap_free(cmd_map); + cmd_map = NULL; + } + + PT_END(pt); +} + +PT_THREAD(api_client_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { + (void)timestamp; + api_client_t *state = task->udata; + + log_trace("api_client: protothread entry fd=%d", state->fd); + PT_BEGIN(pt); + + state->fds = malloc(sizeof(int) * 2); + if (!state->fds) { + free(state); + PT_EXIT(pt); + } + state->fds[0] = 1; + state->fds[1] = state->fd; + + for (;;) { + state->ready_fds = NULL; + PT_WAIT_UNTIL(pt, schedmod_has_data(state->fds, &state->ready_fds) > 0); + + state->ready_fd = -1; + if (state->ready_fds && state->ready_fds[0] > 0) { + for (int i = 1; i <= state->ready_fds[0]; i++) { + if (state->ready_fds[i] == state->fd) { + state->ready_fd = state->fd; + break; + } + } + } + free(state->ready_fds); + state->ready_fds = NULL; + + char buf[1]; + ssize_t n = recv(state->fd, buf, 1, MSG_PEEK); + if (n <= 0) { + break; + } + + resp_object *cmd = resp_read(state->fd); + if (!cmd) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + PT_YIELD(pt); + continue; + } + break; + } + + if (cmd->type != RESPT_ARRAY || cmd->u.arr.n == 0) { + resp_free(cmd); + api_write_err(state, "Protocol error"); + client_flush(state); + continue; + } + + char *args[MAX_ARGS]; + int nargs = 0; + for (size_t i = 0; i < cmd->u.arr.n && nargs < MAX_ARGS; i++) { + resp_object *elem = &cmd->u.arr.elem[i]; + if (elem->type == RESPT_BULK && elem->u.s) { + args[nargs++] = elem->u.s; + elem->u.s = NULL; + } else if (elem->type == RESPT_SIMPLE) { + args[nargs++] = elem->u.s ? elem->u.s : ""; + } + } + + if (nargs > 0) { + dispatch_command(state, args, nargs); + } + + for (int j = 0; j < nargs; j++) { + free(args[j]); + } + resp_free(cmd); + + client_flush(state); + + if (state->fd < 0) break; + } + + if (state->fd >= 0) { + close(state->fd); + } + free(state->fds); + free(state->wbuf); + free(state->username); + free(state); + + PT_END(pt); +} diff --git a/src/ApiModule/server.h b/src/AppModule/api/server.h diff --git a/src/AppModule/command/daemon.c b/src/AppModule/command/daemon.c @@ -12,13 +12,8 @@ #include "CliModule/common.h" #include "SchedulerModule/scheduler.h" #include "AppModule/command/daemon.h" -#include "AppModule/udphole.h" -#include "ApiModule/server.h" - -static const char *const daemon_usages[] = { - "udphole daemon [options]", - NULL, -}; +#include "AppModule/api/server.h" +#include "AppModule/rtp/server.h" static int do_daemonize(void) { pid_t pid = fork(); @@ -39,8 +34,7 @@ static int do_daemonize(void) { } if (pid > 0) _exit(0); - if (chdir("/") != 0) { - } + if (chdir("/") != 0) {} int fd; for (fd = 0; fd < 3; fd++) (void)close(fd); @@ -63,18 +57,22 @@ int appmodule_cmd_daemon(int argc, const char **argv) { struct argparse_option options[] = { OPT_HELP(), OPT_BOOLEAN('d', "daemonize", &daemonize_flag, "run in background", NULL, 0, 0), - OPT_BOOLEAN('D', "no-daemonize", &no_daemonize_flag, "force foreground (overrides config daemonize=1)", NULL, 0, 0), + OPT_BOOLEAN('D', "no-daemonize", &no_daemonize_flag, "force foreground", NULL, 0, 0), OPT_END(), }; - argparse_init(&argparse, options, daemon_usages, ARGPARSE_STOP_AT_NON_OPTION); - argc = argparse_parse(&argparse, argc, argv); + argparse_init(&argparse, options, (const char *const[]){"udphole daemon", NULL}, ARGPARSE_STOP_AT_NON_OPTION); + argparse_parse(&argparse, argc, argv); - if ((!no_daemonize_flag) && (daemonize_flag || 0)) { + if (!no_daemonize_flag && (daemonize_flag || 0)) { do_daemonize(); } + log_info("udphole: starting daemon"); + schedmod_pt_create(api_server_pt, NULL); schedmod_pt_create(udphole_manager_pt, NULL); + log_info("udphole: daemon started"); + return schedmod_main(); } diff --git a/src/AppModule/rtp/server.c b/src/AppModule/rtp/server.c @@ -0,0 +1,831 @@ +#include <stdlib.h> +#include <string.h> +#include <stdio.h> +#include <stdarg.h> +#include <unistd.h> +#include <errno.h> +#include <fcntl.h> +#include <time.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <sys/stat.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +#include "rxi/log.h" +#include "SchedulerModule/protothreads.h" +#include "SchedulerModule/scheduler.h" +#include "common/socket_util.h" +#include "config.h" +#include "tidwall/hashmap.h" +#include "server.h" +#include "AppModule/api/server.h" + +#define RTP_SESSION_HASH_SIZE 256 +#define RTP_BUFFER_SIZE 4096 +#define DEFAULT_IDLE_EXPIRY 60 + +typedef struct rtp_socket { + char *socket_id; + int *fds; + int local_port; + int mode; + struct sockaddr_storage remote_addr; + socklen_t remote_addrlen; + int learned_valid; + struct sockaddr_storage learned_addr; + socklen_t learned_addrlen; +} rtp_socket_t; + +typedef struct rtp_forward { + char *src_socket_id; + char *dst_socket_id; +} rtp_forward_t; + +typedef struct rtp_session { + char *session_id; + time_t idle_expiry; + time_t created; + time_t last_activity; + rtp_socket_t **sockets; + size_t sockets_count; + rtp_forward_t *forwards; + size_t forwards_count; + int marked_for_deletion; + int *ready_fds; + int *all_fds; + struct pt pt; + struct pt_task *task; +} rtp_session_t; + +static rtp_session_t **sessions = NULL; +static size_t sessions_count = 0; +static char *advertise_addr = NULL; +static int port_low = 7000; +static int port_high = 7999; +static int port_cur = 7000; +static int running = 0; + +static rtp_session_t *find_session(const char *session_id) { + for (size_t i = 0; i < sessions_count; i++) { + if (strcmp(sessions[i]->session_id, session_id) == 0) { + return sessions[i]; + } + } + return NULL; +} + +static uint64_t socket_hash(const void *item, uint64_t seed0, uint64_t seed1) { + const rtp_socket_t *s = item; + return hashmap_sip(s->socket_id, strlen(s->socket_id), seed0, seed1); +} + +static int socket_compare(const void *a, const void *b, void *udata) { + (void)udata; + const rtp_socket_t *sa = a; + const rtp_socket_t *sb = b; + return strcmp(sa->socket_id, sb->socket_id); +} + +static rtp_socket_t *find_socket(rtp_session_t *s, const char *socket_id) { + if (!s || !s->sockets || !socket_id) return NULL; + for (size_t i = 0; i < s->sockets_count; i++) { + if (s->sockets[i] && strcmp(s->sockets[i]->socket_id, socket_id) == 0) { + return s->sockets[i]; + } + } + return NULL; +} + +static int alloc_port(void) { + for (int i = 0; i < port_high - port_low; i++) { + int port = port_cur + i; + if (port > port_high) port = port_low; + port_cur = port + 1; + if (port_cur > port_high) port_cur = port_low; + + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + + int udp_fd = socket(AF_INET, SOCK_DGRAM, 0); + if (udp_fd < 0) continue; + int ok = (bind(udp_fd, (struct sockaddr *)&addr, sizeof(addr)) == 0); + close(udp_fd); + if (!ok) continue; + + return port; + } + return 0; +} + +static int parse_ip_addr(const char *ip_str, int port, struct sockaddr_storage *addr, socklen_t *addrlen) { + memset(addr, 0, sizeof(*addr)); + + struct sockaddr_in *addr4 = (struct sockaddr_in *)addr; + if (inet_pton(AF_INET, ip_str, &addr4->sin_addr) == 1) { + addr4->sin_family = AF_INET; + addr4->sin_port = htons(port); + *addrlen = sizeof(*addr4); + return 0; + } + + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)addr; + if (inet_pton(AF_INET6, ip_str, &addr6->sin6_addr) == 1) { + addr6->sin6_family = AF_INET6; + addr6->sin6_port = htons(port); + *addrlen = sizeof(*addr6); + return 0; + } + + return -1; +} + +static void close_socket(rtp_socket_t *sock) { + if (!sock || !sock->fds) return; + for (int i = 1; i <= sock->fds[0]; i++) { + if (sock->fds[i] >= 0) { + close(sock->fds[i]); + } + } + free(sock->fds); + sock->fds = NULL; +} + +static void free_socket(rtp_socket_t *sock) { + if (!sock) return; + close_socket(sock); + free(sock->socket_id); + free(sock); +} + +static void destroy_session(rtp_session_t *s) { + if (!s) return; + s->marked_for_deletion = 1; + for (size_t i = 0; i < sessions_count; i++) { + if (sessions[i] == s) { + sessions[i] = NULL; + break; + } + } +} + +static rtp_session_t *create_session(const char *session_id, int idle_expiry) { + const rtp_session_t *cs = find_session(session_id); + if (cs) return (rtp_session_t *)cs; + + rtp_session_t *s = calloc(1, sizeof(*s)); + if (!s) return NULL; + + s->session_id = strdup(session_id); + s->created = time(NULL); + s->last_activity = s->created; + s->idle_expiry = idle_expiry > 0 ? idle_expiry : DEFAULT_IDLE_EXPIRY; + + sessions = realloc(sessions, sizeof(rtp_session_t *) * (sessions_count + 1)); + sessions[sessions_count++] = s; + + return s; +} + +static void cleanup_expired_sessions(void) { + if (!sessions) return; + time_t now = time(NULL); + + for (size_t i = 0; i < sessions_count; i++) { + rtp_session_t *s = sessions[i]; + if (!s) continue; + if (now - s->last_activity > s->idle_expiry) { + log_debug("udphole: session %s expired (idle %ld > expiry %ld)", + s->session_id, (long)(now - s->last_activity), (long)s->idle_expiry); + destroy_session(s); + } + } +} + +static int add_forward(rtp_session_t *s, const char *src_id, const char *dst_id) { + for (size_t i = 0; i < s->forwards_count; i++) { + if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && + strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) { + return 0; + } + } + + rtp_forward_t *new_forwards = realloc(s->forwards, sizeof(rtp_forward_t) * (s->forwards_count + 1)); + if (!new_forwards) return -1; + s->forwards = new_forwards; + + s->forwards[s->forwards_count].src_socket_id = strdup(src_id); + s->forwards[s->forwards_count].dst_socket_id = strdup(dst_id); + s->forwards_count++; + + return 0; +} + +static int remove_forward(rtp_session_t *s, const char *src_id, const char *dst_id) { + for (size_t i = 0; i < s->forwards_count; i++) { + if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && + strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) { + free(s->forwards[i].src_socket_id); + free(s->forwards[i].dst_socket_id); + for (size_t j = i; j < s->forwards_count - 1; j++) { + s->forwards[j] = s->forwards[j + 1]; + } + s->forwards_count--; + return 0; + } + } + return -1; +} + +static rtp_socket_t *create_listen_socket(rtp_session_t *sess, const char *socket_id) { + rtp_socket_t *existing = find_socket(sess, socket_id); + if (existing) return existing; + + int port = alloc_port(); + if (!port) { + log_error("udphole: no ports available"); + return NULL; + } + + char port_str[16]; + snprintf(port_str, sizeof(port_str), "%d", port); + int *fds = udp_recv(port_str, NULL, NULL); + if (!fds || fds[0] == 0) { + log_error("udphole: failed to create UDP socket on port %d", port); + free(fds); + return NULL; + } + + rtp_socket_t *sock = calloc(1, sizeof(*sock)); + if (!sock) { + free(fds); + return NULL; + } + + sock->socket_id = strdup(socket_id); + sock->fds = fds; + sock->local_port = port; + sock->mode = 0; + sock->learned_valid = 0; + + sess->sockets = realloc(sess->sockets, sizeof(rtp_socket_t *) * (sess->sockets_count + 1)); + sess->sockets[sess->sockets_count++] = sock; + + log_debug("udphole: created listen socket %s in session %s on port %d", + socket_id, sess->session_id, port); + return sock; +} + +static rtp_socket_t *create_connect_socket(rtp_session_t *sess, const char *socket_id, + const char *ip, int port) { + rtp_socket_t *existing = find_socket(sess, socket_id); + if (existing) return existing; + + int local_port = alloc_port(); + if (!local_port) { + log_error("udphole: no ports available"); + return NULL; + } + + char port_str[16]; + snprintf(port_str, sizeof(port_str), "%d", local_port); + int *fds = udp_recv(port_str, NULL, NULL); + if (!fds || fds[0] == 0) { + log_error("udphole: failed to create UDP socket on port %d", local_port); + free(fds); + return NULL; + } + + struct sockaddr_storage remote_addr; + socklen_t remote_addrlen; + if (parse_ip_addr(ip, port, &remote_addr, &remote_addrlen) != 0) { + log_error("udphole: invalid remote address %s:%d", ip, port); + free(fds); + return NULL; + } + + rtp_socket_t *sock = calloc(1, sizeof(*sock)); + if (!sock) { + free(fds); + return NULL; + } + + sock->socket_id = strdup(socket_id); + sock->fds = fds; + sock->local_port = local_port; + sock->mode = 1; + sock->remote_addr = remote_addr; + sock->remote_addrlen = remote_addrlen; + sock->learned_valid = 0; + + sess->sockets = realloc(sess->sockets, sizeof(rtp_socket_t *) * (sess->sockets_count + 1)); + sess->sockets[sess->sockets_count++] = sock; + + log_debug("udphole: created connect socket %s in session %s on port %d -> %s:%d", + socket_id, sess->session_id, local_port, ip, port); + return sock; +} + +static int destroy_socket(rtp_session_t *sess, const char *socket_id) { + rtp_socket_t *sock = find_socket(sess, socket_id); + if (!sock) return -1; + + for (size_t i = 0; i < sess->sockets_count; i++) { + if (sess->sockets[i] == sock) { + sess->sockets[i] = NULL; + break; + } + } + free_socket(sock); + + for (size_t i = 0; i < sess->forwards_count; ) { + if (strcmp(sess->forwards[i].src_socket_id, socket_id) == 0 || + strcmp(sess->forwards[i].dst_socket_id, socket_id) == 0) { + free(sess->forwards[i].src_socket_id); + free(sess->forwards[i].dst_socket_id); + for (size_t j = i; j < sess->forwards_count - 1; j++) { + sess->forwards[j] = sess->forwards[j + 1]; + } + sess->forwards_count--; + } else { + i++; + } + } + + return 0; +} + +static rtp_socket_t *find_socket_by_fd(rtp_session_t *s, int fd) { + if (!s || !s->sockets) return NULL; + for (size_t j = 0; j < s->sockets_count; j++) { + rtp_socket_t *sock = s->sockets[j]; + if (!sock || !sock->fds) continue; + for (int i = 1; i <= sock->fds[0]; i++) { + if (sock->fds[i] == fd) { + return sock; + } + } + } + return NULL; +} + +PT_THREAD(rtp_session_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { + rtp_session_t *s = task->udata; + + (void)timestamp; + PT_BEGIN(pt); + + char buffer[RTP_BUFFER_SIZE]; + + for (;;) { + if (s->marked_for_deletion) { + break; + } + + if (!s->sockets || s->sockets_count == 0) { + PT_YIELD(pt); + continue; + } + + s->all_fds = realloc(s->all_fds, sizeof(int) * (s->sockets_count * 2 + 1)); + if (!s->all_fds) { + PT_YIELD(pt); + continue; + } + s->all_fds[0] = 0; + + for (size_t j = 0; j < s->sockets_count; j++) { + rtp_socket_t *sock = s->sockets[j]; + if (!sock || !sock->fds) continue; + for (int i = 1; i <= sock->fds[0]; i++) { + s->all_fds[++s->all_fds[0]] = sock->fds[i]; + } + } + + if (s->all_fds[0] == 0) { + PT_YIELD(pt); + continue; + } + + PT_WAIT_UNTIL(pt, schedmod_has_data(s->all_fds, &s->ready_fds) > 0); + + if (!s->ready_fds || s->ready_fds[0] == 0) { + PT_YIELD(pt); + continue; + } + + for (int r = 1; r <= s->ready_fds[0]; r++) { + int ready_fd = s->ready_fds[r]; + + rtp_socket_t *src_sock = find_socket_by_fd(s, ready_fd); + if (!src_sock) continue; + + struct sockaddr_storage from_addr; + socklen_t from_len = sizeof(from_addr); + ssize_t n = recvfrom(ready_fd, buffer, sizeof(buffer) - 1, 0, + (struct sockaddr *)&from_addr, &from_len); + + if (n <= 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + log_warn("udphole: recvfrom error on socket %s: %s", + src_sock->socket_id, strerror(errno)); + } + continue; + } + + s->last_activity = time(NULL); + + if (src_sock->mode == 0 && !src_sock->learned_valid) { + src_sock->learned_addr = from_addr; + src_sock->learned_addrlen = from_len; + src_sock->learned_valid = 1; + log_debug("udphole: socket %s learned remote address", src_sock->socket_id); + } + + for (size_t i = 0; i < s->forwards_count; i++) { + if (strcmp(s->forwards[i].src_socket_id, src_sock->socket_id) != 0) { + continue; + } + + rtp_socket_t *dst_sock = find_socket(s, s->forwards[i].dst_socket_id); + if (!dst_sock || !dst_sock->fds || dst_sock->fds[0] == 0) continue; + + struct sockaddr *dest_addr = NULL; + socklen_t dest_addrlen = 0; + + if (dst_sock->mode == 1) { + dest_addr = (struct sockaddr *)&dst_sock->remote_addr; + dest_addrlen = dst_sock->remote_addrlen; + } else if (dst_sock->learned_valid) { + dest_addr = (struct sockaddr *)&dst_sock->learned_addr; + dest_addrlen = dst_sock->learned_addrlen; + } + + if (dest_addr && dest_addrlen > 0) { + int dst_fd = dst_sock->fds[1]; + ssize_t sent = sendto(dst_fd, buffer, n, 0, dest_addr, dest_addrlen); + if (sent < 0) { + log_warn("udphole: forward failed %s -> %s: %s", + src_sock->socket_id, dst_sock->socket_id, strerror(errno)); + } + } + } + } + + } + + log_debug("udphole: session %s protothread exiting", s->session_id); + + if (s->all_fds) { + free(s->all_fds); + s->all_fds = NULL; + } + if (s->ready_fds) { + free(s->ready_fds); + s->ready_fds = NULL; + } + + PT_END(pt); +} + +static void spawn_session_pt(rtp_session_t *s) { + s->task = (struct pt_task *)(intptr_t)schedmod_pt_create(rtp_session_pt, s); +} + +static char cmd_session_create(api_client_t *c, char **args, int nargs) { + if (nargs < 2) { + return api_write_err(c, "wrong number of arguments for 'session.create'") ? 1 : 0; + } + + const char *session_id = args[1]; + int idle_expiry = 0; + if (nargs >= 3 && args[2] && args[2][0] != '\0') { + idle_expiry = atoi(args[2]); + } + + rtp_session_t *s = create_session(session_id, idle_expiry); + if (!s) { + return api_write_err(c, "failed to create session") ? 1 : 0; + } + + spawn_session_pt(s); + return api_write_ok(c) ? 1 : 0; +} + +static char cmd_session_list(api_client_t *c, char **args, int nargs) { + (void)args; + if (nargs != 1) { + return api_write_err(c, "wrong number of arguments for 'session.list'") ? 1 : 0; + } + + if (!sessions) { + return api_write_array(c, 0) ? 1 : 0; + } + + if (!api_write_array(c, sessions_count)) return 0; + + for (size_t i = 0; i < sessions_count; i++) { + rtp_session_t *s = sessions[i]; + if (!s) continue; + if (!api_write_bulk_cstr(c, s->session_id)) return 0; + } + + return 1; +} + +static char cmd_session_info(api_client_t *c, char **args, int nargs) { + if (nargs != 2) { + return api_write_err(c, "wrong number of arguments for 'session.info'") ? 1 : 0; + } + + const char *session_id = args[1]; + rtp_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + size_t num_items = 8; + if (!api_write_array(c, num_items)) return 0; + + if (!api_write_bulk_cstr(c, "session_id")) return 0; + if (!api_write_bulk_cstr(c, s->session_id)) return 0; + + if (!api_write_bulk_cstr(c, "created")) return 0; + if (!api_write_bulk_int(c, (int)s->created)) return 0; + + if (!api_write_bulk_cstr(c, "last_activity")) return 0; + if (!api_write_bulk_int(c, (int)s->last_activity)) return 0; + + if (!api_write_bulk_cstr(c, "idle_expiry")) return 0; + if (!api_write_bulk_int(c, (int)s->idle_expiry)) return 0; + + if (!api_write_bulk_cstr(c, "sockets")) return 0; + if (!api_write_array(c, s->sockets_count)) return 0; + for (size_t i = 0; i < s->sockets_count; i++) { + rtp_socket_t *sock = s->sockets[i]; + if (!sock) continue; + if (!api_write_bulk_cstr(c, sock->socket_id)) return 0; + } + + if (!api_write_bulk_cstr(c, "forwards")) return 0; + if (!api_write_array(c, s->forwards_count * 2)) return 0; + for (size_t i = 0; i < s->forwards_count; i++) { + if (!api_write_bulk_cstr(c, s->forwards[i].src_socket_id)) return 0; + if (!api_write_bulk_cstr(c, s->forwards[i].dst_socket_id)) return 0; + } + + if (!api_write_bulk_cstr(c, "marked_for_deletion")) return 0; + if (!api_write_int(c, s->marked_for_deletion)) return 0; + + return 1; +} + +static char cmd_session_destroy(api_client_t *c, char **args, int nargs) { + if (nargs != 2) { + return api_write_err(c, "wrong number of arguments for 'session.destroy'") ? 1 : 0; + } + + const char *session_id = args[1]; + rtp_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + destroy_session(s); + return api_write_ok(c) ? 1 : 0; +} + +static char cmd_socket_create_listen(api_client_t *c, char **args, int nargs) { + if (nargs != 3) { + return api_write_err(c, "wrong number of arguments for 'session.socket.create.listen'") ? 1 : 0; + } + + const char *session_id = args[1]; + const char *socket_id = args[2]; + + rtp_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + rtp_socket_t *sock = create_listen_socket(s, socket_id); + if (!sock) { + return api_write_err(c, "failed to create socket") ? 1 : 0; + } + + if (!api_write_array(c, 2)) return 0; + if (!api_write_bulk_int(c, sock->local_port)) return 0; + if (!api_write_bulk_cstr(c, advertise_addr)) return 0; + + return 1; +} + +static char cmd_socket_create_connect(api_client_t *c, char **args, int nargs) { + if (nargs != 5) { + return api_write_err(c, "wrong number of arguments for 'session.socket.create.connect'") ? 1 : 0; + } + + const char *session_id = args[1]; + const char *socket_id = args[2]; + const char *ip = args[3]; + int port = atoi(args[4]); + + rtp_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + rtp_socket_t *sock = create_connect_socket(s, socket_id, ip, port); + if (!sock) { + return api_write_err(c, "failed to create socket") ? 1 : 0; + } + + if (!api_write_array(c, 2)) return 0; + if (!api_write_bulk_int(c, sock->local_port)) return 0; + if (!api_write_bulk_cstr(c, advertise_addr)) return 0; + + return 1; +} + +static char cmd_socket_destroy(api_client_t *c, char **args, int nargs) { + if (nargs != 3) { + return api_write_err(c, "wrong number of arguments for 'session.socket.destroy'") ? 1 : 0; + } + + const char *session_id = args[1]; + const char *socket_id = args[2]; + + rtp_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + if (destroy_socket(s, socket_id) != 0) { + return api_write_err(c, "socket not found") ? 1 : 0; + } + + return api_write_ok(c) ? 1 : 0; +} + +static char cmd_forward_list(api_client_t *c, char **args, int nargs) { + if (nargs != 2) { + return api_write_err(c, "wrong number of arguments for 'session.forward.list'") ? 1 : 0; + } + + const char *session_id = args[1]; + rtp_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + if (!api_write_array(c, s->forwards_count * 2)) return 0; + for (size_t i = 0; i < s->forwards_count; i++) { + if (!api_write_bulk_cstr(c, s->forwards[i].src_socket_id)) return 0; + if (!api_write_bulk_cstr(c, s->forwards[i].dst_socket_id)) return 0; + } + + return 1; +} + +static char cmd_forward_create(api_client_t *c, char **args, int nargs) { + if (nargs != 4) { + return api_write_err(c, "wrong number of arguments for 'session.forward.create'") ? 1 : 0; + } + + const char *session_id = args[1]; + const char *src_socket_id = args[2]; + const char *dst_socket_id = args[3]; + + rtp_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + rtp_socket_t *src = find_socket(s, src_socket_id); + if (!src) { + return api_write_err(c, "source socket not found") ? 1 : 0; + } + + rtp_socket_t *dst = find_socket(s, dst_socket_id); + if (!dst) { + return api_write_err(c, "destination socket not found") ? 1 : 0; + } + + if (add_forward(s, src_socket_id, dst_socket_id) != 0) { + return api_write_err(c, "failed to add forward") ? 1 : 0; + } + + log_debug("udphole: created forward %s -> %s in session %s", src_socket_id, dst_socket_id, session_id); + return api_write_ok(c) ? 1 : 0; +} + +static char cmd_forward_destroy(api_client_t *c, char **args, int nargs) { + if (nargs != 4) { + return api_write_err(c, "wrong number of arguments for 'session.forward.destroy'") ? 1 : 0; + } + + const char *session_id = args[1]; + const char *src_socket_id = args[2]; + const char *dst_socket_id = args[3]; + + rtp_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + if (remove_forward(s, src_socket_id, dst_socket_id) != 0) { + return api_write_err(c, "forward not found") ? 1 : 0; + } + + return api_write_ok(c) ? 1 : 0; +} + +static void register_rtp_commands(void) { + api_register_cmd("session.create", cmd_session_create); + api_register_cmd("session.list", cmd_session_list); + api_register_cmd("session.info", cmd_session_info); + api_register_cmd("session.destroy", cmd_session_destroy); + api_register_cmd("session.socket.create.listen", cmd_socket_create_listen); + api_register_cmd("session.socket.create.connect", cmd_socket_create_connect); + api_register_cmd("session.socket.destroy", cmd_socket_destroy); + api_register_cmd("session.forward.list", cmd_forward_list); + api_register_cmd("session.forward.create", cmd_forward_create); + api_register_cmd("session.forward.destroy", cmd_forward_destroy); + log_info("udphole: registered session.* commands"); +} + +PT_THREAD(udphole_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { + (void)timestamp; + log_trace("udphole_manager: protothread entry"); + PT_BEGIN(pt); + + PT_WAIT_UNTIL(pt, global_cfg); + + resp_object *rtp_sec = resp_map_get(global_cfg, "udphole"); + if (!rtp_sec) { + log_info("udphole: no [udphole] section in config, not starting"); + PT_EXIT(pt); + } + + const char *mode = resp_map_get_string(rtp_sec, "mode"); + if (!mode || strcmp(mode, "builtin") != 0) { + log_info("udphole: mode is '%s', not starting builtin server", mode ? mode : "(null)"); + PT_EXIT(pt); + } + + const char *ports_str = resp_map_get_string(rtp_sec, "ports"); + if (ports_str) { + sscanf(ports_str, "%d-%d", &port_low, &port_high); + if (port_low <= 0) port_low = 7000; + if (port_high <= port_low) port_high = port_low + 999; + } + port_cur = port_low; + + const char *advertise_cfg = resp_map_get_string(rtp_sec, "advertise"); + if (advertise_cfg) { + advertise_addr = strdup(advertise_cfg); + } + + register_rtp_commands(); + running = 1; + log_info("udphole: manager started with port range %d-%d", port_low, port_high); + + int64_t last_cleanup = 0; + + for (;;) { + if (global_cfg) { + resp_object *new_rtp_sec = resp_map_get(global_cfg, "udphole"); + if (new_rtp_sec) { + const char *new_mode = resp_map_get_string(new_rtp_sec, "mode"); + if (new_mode && strcmp(new_mode, "builtin") != 0) { + log_info("udphole: mode changed to '%s', shutting down", new_mode); + break; + } + } + } + + int64_t now = (int64_t)(time(NULL)); + if (now - last_cleanup >= 1) { + cleanup_expired_sessions(); + last_cleanup = now; + } + + PT_YIELD(pt); + } + + running = 0; + + for (size_t i = 0; i < sessions_count; i++) { + if (sessions[i]) { + sessions[i]->marked_for_deletion = 1; + } + } + + free(advertise_addr); + advertise_addr = NULL; + + PT_END(pt); +} diff --git a/src/AppModule/rtp/server.h b/src/AppModule/rtp/server.h @@ -0,0 +1,10 @@ +#ifndef __APPMODULE_RTP_SERVER_H__ +#define __APPMODULE_RTP_SERVER_H__ + +#include <stdint.h> + +#include "SchedulerModule/scheduler.h" + +PT_THREAD(udphole_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)); + +#endif // __APPMODULE_RTP_SERVER_H__ +\ No newline at end of file diff --git a/src/AppModule/udphole.c b/src/AppModule/udphole.c @@ -1,876 +0,0 @@ -#include <stdlib.h> -#include <string.h> -#include <stdio.h> -#include <stdarg.h> -#include <unistd.h> -#include <errno.h> -#include <fcntl.h> -#include <time.h> -#include <sys/socket.h> -#include <sys/un.h> -#include <sys/stat.h> -#include <netinet/in.h> -#include <arpa/inet.h> - -#include "rxi/log.h" -#include "SchedulerModule/protothreads.h" -#include "SchedulerModule/scheduler.h" -#include "common/socket_util.h" -#include "config.h" -#include "tidwall/hashmap.h" -#include "AppModule/udphole.h" -#include "ApiModule/server.h" - -#define UDPHOLE_SESSION_HASH_SIZE 256 -#define UDPHOLE_BUFFER_SIZE 4096 -#define DEFAULT_IDLE_EXPIRY 60 - -typedef struct udphole_socket { - char *socket_id; - int fd; - int local_port; - int mode; - struct sockaddr_storage remote_addr; - socklen_t remote_addrlen; - int learned_valid; - struct sockaddr_storage learned_addr; - socklen_t learned_addrlen; -} udphole_socket_t; - -typedef struct udphole_forward { - char *src_socket_id; - char *dst_socket_id; -} udphole_forward_t; - -typedef struct udphole_session { - char *session_id; - time_t idle_expiry; - time_t created; - time_t last_activity; - struct hashmap *sockets; - udphole_forward_t *forwards; - size_t forwards_count; - int *fds; - int fd_count; - int marked_for_deletion; - int *ready_fds; - struct pt pt; - struct pt_task *task; -} udphole_session_t; - -static struct hashmap *sessions = NULL; -static char *advertise_addr = NULL; -static int port_low = 7000; -static int port_high = 7999; -static int port_cur = 7000; -static int running = 0; - -static uint64_t session_hash(const void *item, uint64_t seed0, uint64_t seed1) { - const udphole_session_t *s = item; - return hashmap_sip(s->session_id, strlen(s->session_id), seed0, seed1); -} - -static int session_compare(const void *a, const void *b, void *udata) { - (void)udata; - const udphole_session_t *sa = a; - const udphole_session_t *sb = b; - return strcmp(sa->session_id, sb->session_id); -} - -static uint64_t socket_hash(const void *item, uint64_t seed0, uint64_t seed1) { - const udphole_socket_t *s = item; - return hashmap_sip(s->socket_id, strlen(s->socket_id), seed0, seed1); -} - -static int socket_compare(const void *a, const void *b, void *udata) { - (void)udata; - const udphole_socket_t *sa = a; - const udphole_socket_t *sb = b; - return strcmp(sa->socket_id, sb->socket_id); -} - -static udphole_session_t *find_session(const char *session_id) { - if (!sessions || !session_id) return NULL; - udphole_session_t key = { .session_id = (char *)session_id }; - return (udphole_session_t *)hashmap_get(sessions, &key); -} - -static udphole_socket_t *find_socket(udphole_session_t *s, const char *socket_id) { - if (!s || !s->sockets || !socket_id) return NULL; - udphole_socket_t key = { .socket_id = (char *)socket_id }; - return (udphole_socket_t *)hashmap_get(s->sockets, &key); -} - -static int alloc_port(void) { - for (int i = 0; i < port_high - port_low; i++) { - int port = port_cur + i; - if (port > port_high) port = port_low; - port_cur = port + 1; - if (port_cur > port_high) port_cur = port_low; - - struct sockaddr_in addr; - memset(&addr, 0, sizeof(addr)); - addr.sin_family = AF_INET; - addr.sin_addr.s_addr = INADDR_ANY; - addr.sin_port = htons(port); - - int udp_fd = socket(AF_INET, SOCK_DGRAM, 0); - if (udp_fd < 0) continue; - int ok = (bind(udp_fd, (struct sockaddr *)&addr, sizeof(addr)) == 0); - close(udp_fd); - if (!ok) continue; - - return port; - } - return 0; -} - -static int parse_ip_addr(const char *ip_str, int port, struct sockaddr_storage *addr, socklen_t *addrlen) { - memset(addr, 0, sizeof(*addr)); - - struct sockaddr_in *addr4 = (struct sockaddr_in *)addr; - if (inet_pton(AF_INET, ip_str, &addr4->sin_addr) == 1) { - addr4->sin_family = AF_INET; - addr4->sin_port = htons(port); - *addrlen = sizeof(*addr4); - return 0; - } - - struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)addr; - if (inet_pton(AF_INET6, ip_str, &addr6->sin6_addr) == 1) { - addr6->sin6_family = AF_INET6; - addr6->sin6_port = htons(port); - *addrlen = sizeof(*addr6); - return 0; - } - - return -1; -} - -static void close_socket(udphole_socket_t *sock) { - if (!sock) return; - if (sock->fd >= 0) { - close(sock->fd); - sock->fd = -1; - } -} - -static void free_socket(udphole_socket_t *sock) { - if (!sock) return; - close_socket(sock); - free(sock->socket_id); - free(sock); -} - -static void destroy_session(udphole_session_t *s); - -static void session_remove_fds(udphole_session_t *s) { - if (!s) return; - if (s->fds) { - free(s->fds); - s->fds = NULL; - } - s->fd_count = 0; -} - -static void session_update_fds(udphole_session_t *s) { - session_remove_fds(s); - if (!s || !s->sockets) return; - - int count = (int)hashmap_count(s->sockets); - if (count == 0) return; - - s->fds = malloc(sizeof(int) * (count + 1)); - if (!s->fds) return; - s->fds[0] = 0; - - size_t iter = 0; - void *item; - while (hashmap_iter(s->sockets, &iter, &item)) { - udphole_socket_t *sock = item; - if (sock->fd >= 0) { - s->fds[++s->fds[0]] = sock->fd; - } - } -} - -static void destroy_session(udphole_session_t *s) { - if (!s) return; - s->marked_for_deletion = 1; - if (sessions) { - hashmap_delete(sessions, s); - } -} - -static udphole_session_t *create_session(const char *session_id, int idle_expiry) { - const udphole_session_t *cs = find_session(session_id); - if (cs) return (udphole_session_t *)cs; - - udphole_session_t *s = calloc(1, sizeof(*s)); - if (!s) return NULL; - - s->session_id = strdup(session_id); - s->created = time(NULL); - s->last_activity = s->created; - s->idle_expiry = idle_expiry > 0 ? idle_expiry : DEFAULT_IDLE_EXPIRY; - - s->sockets = hashmap_new(sizeof(udphole_socket_t), 0, 0, 0, socket_hash, socket_compare, NULL, NULL); - - if (!sessions) { - sessions = hashmap_new(sizeof(udphole_session_t), 0, 0, 0, session_hash, session_compare, NULL, NULL); - } - hashmap_set(sessions, s); - - log_debug("udphole: created session %s with idle_expiry %ld", session_id, (long)s->idle_expiry); - return s; -} - -static void cleanup_expired_sessions(void) { - if (!sessions) return; - time_t now = time(NULL); - - size_t iter = 0; - void *item; - while (hashmap_iter(sessions, &iter, &item)) { - udphole_session_t *s = item; - if (now - s->last_activity > s->idle_expiry) { - log_debug("udphole: session %s expired (idle %ld > expiry %ld)", - s->session_id, (long)(now - s->last_activity), (long)s->idle_expiry); - destroy_session(s); - } - } -} - -static int add_forward(udphole_session_t *s, const char *src_id, const char *dst_id) { - for (size_t i = 0; i < s->forwards_count; i++) { - if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && - strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) { - return 0; - } - } - - udphole_forward_t *new_forwards = realloc(s->forwards, sizeof(udphole_forward_t) * (s->forwards_count + 1)); - if (!new_forwards) return -1; - s->forwards = new_forwards; - - s->forwards[s->forwards_count].src_socket_id = strdup(src_id); - s->forwards[s->forwards_count].dst_socket_id = strdup(dst_id); - s->forwards_count++; - - return 0; -} - -static int remove_forward(udphole_session_t *s, const char *src_id, const char *dst_id) { - for (size_t i = 0; i < s->forwards_count; i++) { - if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && - strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) { - free(s->forwards[i].src_socket_id); - free(s->forwards[i].dst_socket_id); - for (size_t j = i; j < s->forwards_count - 1; j++) { - s->forwards[j] = s->forwards[j + 1]; - } - s->forwards_count--; - return 0; - } - } - return -1; -} - -static udphole_socket_t *create_listen_socket(udphole_session_t *sess, const char *socket_id) { - udphole_socket_t *existing = find_socket(sess, socket_id); - if (existing) return existing; - - int port = alloc_port(); - if (!port) { - log_error("udphole: no ports available"); - return NULL; - } - - char port_str[16]; - snprintf(port_str, sizeof(port_str), "%d", port); - int *fds = udp_recv(port_str, NULL, NULL); - if (!fds || fds[0] == 0) { - log_error("udphole: failed to create UDP socket on port %d", port); - free(fds); - return NULL; - } - - udphole_socket_t *sock = calloc(1, sizeof(*sock)); - if (!sock) { - free(fds); - return NULL; - } - - sock->socket_id = strdup(socket_id); - sock->fd = fds[1]; - sock->local_port = port; - sock->mode = 0; - sock->learned_valid = 0; - free(fds); - - hashmap_set(sess->sockets, sock); - session_update_fds(sess); - - log_debug("udphole: created listen socket %s in session %s on port %d", - socket_id, sess->session_id, port); - return sock; -} - -static udphole_socket_t *create_connect_socket(udphole_session_t *sess, const char *socket_id, - const char *ip, int port) { - udphole_socket_t *existing = find_socket(sess, socket_id); - if (existing) return existing; - - int local_port = alloc_port(); - if (!local_port) { - log_error("udphole: no ports available"); - return NULL; - } - - char port_str[16]; - snprintf(port_str, sizeof(port_str), "%d", local_port); - int *fds = udp_recv(port_str, NULL, NULL); - if (!fds || fds[0] == 0) { - log_error("udphole: failed to create UDP socket on port %d", local_port); - free(fds); - return NULL; - } - - struct sockaddr_storage remote_addr; - socklen_t remote_addrlen; - if (parse_ip_addr(ip, port, &remote_addr, &remote_addrlen) != 0) { - log_error("udphole: invalid remote address %s:%d", ip, port); - free(fds); - return NULL; - } - - udphole_socket_t *sock = calloc(1, sizeof(*sock)); - if (!sock) { - free(fds); - return NULL; - } - - sock->socket_id = strdup(socket_id); - sock->fd = fds[1]; - sock->local_port = local_port; - sock->mode = 1; - sock->remote_addr = remote_addr; - sock->remote_addrlen = remote_addrlen; - sock->learned_valid = 0; - free(fds); - - hashmap_set(sess->sockets, sock); - session_update_fds(sess); - - log_debug("udphole: created connect socket %s in session %s on port %d -> %s:%d", - socket_id, sess->session_id, local_port, ip, port); - return sock; -} - -static int destroy_socket(udphole_session_t *sess, const char *socket_id) { - udphole_socket_t *sock = find_socket(sess, socket_id); - if (!sock) return -1; - - hashmap_delete(sess->sockets, sock); - free_socket(sock); - session_update_fds(sess); - - for (size_t i = 0; i < sess->forwards_count; ) { - if (strcmp(sess->forwards[i].src_socket_id, socket_id) == 0 || - strcmp(sess->forwards[i].dst_socket_id, socket_id) == 0) { - free(sess->forwards[i].src_socket_id); - free(sess->forwards[i].dst_socket_id); - for (size_t j = i; j < sess->forwards_count - 1; j++) { - sess->forwards[j] = sess->forwards[j + 1]; - } - sess->forwards_count--; - } else { - i++; - } - } - - return 0; -} - -PT_THREAD(udphole_session_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { - udphole_session_t *s = task->udata; - - (void)timestamp; - log_trace("udphole_session: protothread entry session=%s", s->session_id); - PT_BEGIN(pt); - - char buffer[UDPHOLE_BUFFER_SIZE]; - - for (;;) { - if (s->marked_for_deletion) { - break; - } - - if (!s->fds || s->fd_count == 0) { - PT_YIELD(pt); - continue; - } - - PT_WAIT_UNTIL(pt, schedmod_has_data(s->fds, &s->ready_fds) > 0); - - if (!s->ready_fds || s->ready_fds[0] == 0) { - PT_YIELD(pt); - continue; - } - - for (int r = 1; r <= s->ready_fds[0]; r++) { - int ready_fd = s->ready_fds[r]; - - udphole_socket_t *src_sock = NULL; - size_t iter = 0; - void *item; - while (hashmap_iter(s->sockets, &iter, &item)) { - udphole_socket_t *sock = item; - if (sock->fd == ready_fd) { - src_sock = sock; - break; - } - } - - if (!src_sock) continue; - - struct sockaddr_storage from_addr; - socklen_t from_len = sizeof(from_addr); - ssize_t n = recvfrom(ready_fd, buffer, sizeof(buffer) - 1, 0, - (struct sockaddr *)&from_addr, &from_len); - - if (n <= 0) { - if (errno != EAGAIN && errno != EWOULDBLOCK) { - log_warn("udphole: recvfrom error on socket %s: %s", - src_sock->socket_id, strerror(errno)); - } - continue; - } - - s->last_activity = time(NULL); - - if (src_sock->mode == 0 && !src_sock->learned_valid) { - src_sock->learned_addr = from_addr; - src_sock->learned_addrlen = from_len; - src_sock->learned_valid = 1; - log_debug("udphole: socket %s learned remote address", src_sock->socket_id); - } - - for (size_t i = 0; i < s->forwards_count; i++) { - if (strcmp(s->forwards[i].src_socket_id, src_sock->socket_id) != 0) { - continue; - } - - udphole_socket_t *dst_sock = find_socket(s, s->forwards[i].dst_socket_id); - if (!dst_sock || dst_sock->fd < 0) continue; - - struct sockaddr *dest_addr = NULL; - socklen_t dest_addrlen = 0; - - if (dst_sock->mode == 1) { - dest_addr = (struct sockaddr *)&dst_sock->remote_addr; - dest_addrlen = dst_sock->remote_addrlen; - } else if (dst_sock->learned_valid) { - dest_addr = (struct sockaddr *)&dst_sock->learned_addr; - dest_addrlen = dst_sock->learned_addrlen; - } - - if (dest_addr && dest_addrlen > 0) { - ssize_t sent = sendto(dst_sock->fd, buffer, n, 0, dest_addr, dest_addrlen); - if (sent < 0) { - log_warn("udphole: forward failed %s -> %s: %s", - src_sock->socket_id, dst_sock->socket_id, strerror(errno)); - } - } - } - } - } - - log_debug("udphole: session %s protothread exiting", s->session_id); - - if (s->ready_fds) { - free(s->ready_fds); - s->ready_fds = NULL; - } - - if (s->fds) { - free(s->fds); - s->fds = NULL; - } - - if (s->sockets) { - size_t iter = 0; - void *item; - while (hashmap_iter(s->sockets, &iter, &item)) { - udphole_socket_t *sock = item; - free_socket(sock); - } - hashmap_free(s->sockets); - s->sockets = NULL; - } - - if (s->forwards) { - for (size_t i = 0; i < s->forwards_count; i++) { - free(s->forwards[i].src_socket_id); - free(s->forwards[i].dst_socket_id); - } - free(s->forwards); - s->forwards = NULL; - s->forwards_count = 0; - } - - free(s->session_id); - free(s); - - PT_END(pt); -} - -static void spawn_session_pt(udphole_session_t *s) { - s->task = (struct pt_task *)(intptr_t)schedmod_pt_create(udphole_session_pt, s); -} - -static char cmd_session_create(api_client_t *c, char **args, int nargs) { - if (nargs != 3) { - return api_write_err(c, "wrong number of arguments for 'session.create'") ? 1 : 0; - } - - const char *session_id = args[1]; - int idle_expiry = 0; - if (nargs >= 4 && args[2] && args[2][0] != '\0') { - idle_expiry = atoi(args[2]); - } - - udphole_session_t *s = create_session(session_id, idle_expiry); - if (!s) { - return api_write_err(c, "failed to create session") ? 1 : 0; - } - - spawn_session_pt(s); - return api_write_ok(c) ? 1 : 0; -} - -static char cmd_session_list(api_client_t *c, char **args, int nargs) { - (void)args; - if (nargs != 1) { - return api_write_err(c, "wrong number of arguments for 'session.list'") ? 1 : 0; - } - - if (!sessions) { - return api_write_array(c, 0) ? 1 : 0; - } - - size_t count = hashmap_count(sessions); - if (!api_write_array(c, count)) return 0; - - size_t iter = 0; - void *item; - while (hashmap_iter(sessions, &iter, &item)) { - udphole_session_t *s = item; - if (!api_write_bulk_cstr(c, s->session_id)) return 0; - } - - return 1; -} - -static char cmd_session_info(api_client_t *c, char **args, int nargs) { - if (nargs != 2) { - return api_write_err(c, "wrong number of arguments for 'session.info'") ? 1 : 0; - } - - const char *session_id = args[1]; - udphole_session_t *s = find_session(session_id); - if (!s) { - return api_write_err(c, "session not found") ? 1 : 0; - } - - size_t num_items = 8; - if (!api_write_array(c, num_items)) return 0; - - if (!api_write_bulk_cstr(c, "session_id")) return 0; - if (!api_write_bulk_cstr(c, s->session_id)) return 0; - - if (!api_write_bulk_cstr(c, "created")) return 0; - if (!api_write_bulk_int(c, (int)s->created)) return 0; - - if (!api_write_bulk_cstr(c, "last_activity")) return 0; - if (!api_write_bulk_int(c, (int)s->last_activity)) return 0; - - if (!api_write_bulk_cstr(c, "idle_expiry")) return 0; - if (!api_write_bulk_int(c, (int)s->idle_expiry)) return 0; - - if (!api_write_bulk_cstr(c, "sockets")) return 0; - size_t socket_count = s->sockets ? hashmap_count(s->sockets) : 0; - if (!api_write_array(c, socket_count)) return 0; - if (s->sockets) { - size_t iter = 0; - void *item; - while (hashmap_iter(s->sockets, &iter, &item)) { - udphole_socket_t *sock = item; - if (!api_write_bulk_cstr(c, sock->socket_id)) return 0; - } - } - - if (!api_write_bulk_cstr(c, "forwards")) return 0; - if (!api_write_array(c, s->forwards_count * 2)) return 0; - for (size_t i = 0; i < s->forwards_count; i++) { - if (!api_write_bulk_cstr(c, s->forwards[i].src_socket_id)) return 0; - if (!api_write_bulk_cstr(c, s->forwards[i].dst_socket_id)) return 0; - } - - if (!api_write_bulk_cstr(c, "fd_count")) return 0; - if (!api_write_int(c, s->fd_count)) return 0; - - if (!api_write_bulk_cstr(c, "marked_for_deletion")) return 0; - if (!api_write_int(c, s->marked_for_deletion)) return 0; - - return 1; -} - -static char cmd_session_destroy(api_client_t *c, char **args, int nargs) { - if (nargs != 2) { - return api_write_err(c, "wrong number of arguments for 'session.destroy'") ? 1 : 0; - } - - const char *session_id = args[1]; - udphole_session_t *s = find_session(session_id); - if (!s) { - return api_write_err(c, "session not found") ? 1 : 0; - } - - destroy_session(s); - return api_write_ok(c) ? 1 : 0; -} - -static char cmd_socket_create_listen(api_client_t *c, char **args, int nargs) { - if (nargs != 3) { - return api_write_err(c, "wrong number of arguments for 'session.socket.create.listen'") ? 1 : 0; - } - - const char *session_id = args[1]; - const char *socket_id = args[2]; - - udphole_session_t *s = find_session(session_id); - if (!s) { - return api_write_err(c, "session not found") ? 1 : 0; - } - - udphole_socket_t *sock = create_listen_socket(s, socket_id); - if (!sock) { - return api_write_err(c, "failed to create socket") ? 1 : 0; - } - - if (!api_write_array(c, 2)) return 0; - if (!api_write_bulk_int(c, sock->local_port)) return 0; - if (!api_write_bulk_cstr(c, advertise_addr)) return 0; - - return 1; -} - -static char cmd_socket_create_connect(api_client_t *c, char **args, int nargs) { - if (nargs != 5) { - return api_write_err(c, "wrong number of arguments for 'session.socket.create.connect'") ? 1 : 0; - } - - const char *session_id = args[1]; - const char *socket_id = args[2]; - const char *ip = args[3]; - int port = atoi(args[4]); - - udphole_session_t *s = find_session(session_id); - if (!s) { - return api_write_err(c, "session not found") ? 1 : 0; - } - - udphole_socket_t *sock = create_connect_socket(s, socket_id, ip, port); - if (!sock) { - return api_write_err(c, "failed to create socket") ? 1 : 0; - } - - if (!api_write_array(c, 2)) return 0; - if (!api_write_bulk_int(c, sock->local_port)) return 0; - if (!api_write_bulk_cstr(c, advertise_addr)) return 0; - - return 1; -} - -static char cmd_socket_destroy(api_client_t *c, char **args, int nargs) { - if (nargs != 3) { - return api_write_err(c, "wrong number of arguments for 'session.socket.destroy'") ? 1 : 0; - } - - const char *session_id = args[1]; - const char *socket_id = args[2]; - - udphole_session_t *s = find_session(session_id); - if (!s) { - return api_write_err(c, "session not found") ? 1 : 0; - } - - if (destroy_socket(s, socket_id) != 0) { - return api_write_err(c, "socket not found") ? 1 : 0; - } - - return api_write_ok(c) ? 1 : 0; -} - -static char cmd_forward_list(api_client_t *c, char **args, int nargs) { - if (nargs != 2) { - return api_write_err(c, "wrong number of arguments for 'session.forward.list'") ? 1 : 0; - } - - const char *session_id = args[1]; - udphole_session_t *s = find_session(session_id); - if (!s) { - return api_write_err(c, "session not found") ? 1 : 0; - } - - if (!api_write_array(c, s->forwards_count * 2)) return 0; - for (size_t i = 0; i < s->forwards_count; i++) { - if (!api_write_bulk_cstr(c, s->forwards[i].src_socket_id)) return 0; - if (!api_write_bulk_cstr(c, s->forwards[i].dst_socket_id)) return 0; - } - - return 1; -} - -static char cmd_forward_create(api_client_t *c, char **args, int nargs) { - if (nargs != 4) { - return api_write_err(c, "wrong number of arguments for 'session.forward.create'") ? 1 : 0; - } - - const char *session_id = args[1]; - const char *src_socket_id = args[2]; - const char *dst_socket_id = args[3]; - - udphole_session_t *s = find_session(session_id); - if (!s) { - return api_write_err(c, "session not found") ? 1 : 0; - } - - udphole_socket_t *src = find_socket(s, src_socket_id); - if (!src) { - return api_write_err(c, "source socket not found") ? 1 : 0; - } - - udphole_socket_t *dst = find_socket(s, dst_socket_id); - if (!dst) { - return api_write_err(c, "destination socket not found") ? 1 : 0; - } - - if (add_forward(s, src_socket_id, dst_socket_id) != 0) { - return api_write_err(c, "failed to add forward") ? 1 : 0; - } - - return api_write_ok(c) ? 1 : 0; -} - -static char cmd_forward_destroy(api_client_t *c, char **args, int nargs) { - if (nargs != 4) { - return api_write_err(c, "wrong number of arguments for 'session.forward.destroy'") ? 1 : 0; - } - - const char *session_id = args[1]; - const char *src_socket_id = args[2]; - const char *dst_socket_id = args[3]; - - udphole_session_t *s = find_session(session_id); - if (!s) { - return api_write_err(c, "session not found") ? 1 : 0; - } - - if (remove_forward(s, src_socket_id, dst_socket_id) != 0) { - return api_write_err(c, "forward not found") ? 1 : 0; - } - - return api_write_ok(c) ? 1 : 0; -} - -static void register_udphole_commands(void) { - api_register_cmd("session.create", cmd_session_create); - api_register_cmd("session.list", cmd_session_list); - api_register_cmd("session.info", cmd_session_info); - api_register_cmd("session.destroy", cmd_session_destroy); - api_register_cmd("session.socket.create.listen", cmd_socket_create_listen); - api_register_cmd("session.socket.create.connect", cmd_socket_create_connect); - api_register_cmd("session.socket.destroy", cmd_socket_destroy); - api_register_cmd("session.forward.list", cmd_forward_list); - api_register_cmd("session.forward.create", cmd_forward_create); - api_register_cmd("session.forward.destroy", cmd_forward_destroy); - log_info("udphole: registered session.* commands"); -} - -PT_THREAD(udphole_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { - (void)timestamp; - log_trace("udphole_manager: protothread entry"); - PT_BEGIN(pt); - - PT_WAIT_UNTIL(pt, global_cfg); - - resp_object *udphole_sec = resp_map_get(global_cfg, "udphole"); - if (!udphole_sec) { - log_info("udphole: no [udphole] section in config, not starting"); - PT_EXIT(pt); - } - - const char *mode = resp_map_get_string(udphole_sec, "mode"); - if (!mode || strcmp(mode, "builtin") != 0) { - log_info("udphole: mode is '%s', not starting builtin server", mode ? mode : "(null)"); - PT_EXIT(pt); - } - - const char *ports_str = resp_map_get_string(udphole_sec, "ports"); - if (ports_str) { - sscanf(ports_str, "%d-%d", &port_low, &port_high); - if (port_low <= 0) port_low = 7000; - if (port_high <= port_low) port_high = port_low + 999; - } - port_cur = port_low; - - const char *advertise_cfg = resp_map_get_string(udphole_sec, "advertise"); - if (advertise_cfg) { - advertise_addr = strdup(advertise_cfg); - } - - register_udphole_commands(); - running = 1; - log_info("udphole: manager started with port range %d-%d", port_low, port_high); - - int64_t last_cleanup = 0; - - for (;;) { - if (global_cfg) { - resp_object *new_udphole_sec = resp_map_get(global_cfg, "udphole"); - if (new_udphole_sec) { - const char *new_mode = resp_map_get_string(new_udphole_sec, "mode"); - if (new_mode && strcmp(new_mode, "builtin") != 0) { - log_info("udphole: mode changed to '%s', shutting down", new_mode); - break; - } - } - } - - int64_t now = (int64_t)(time(NULL)); - if (now - last_cleanup >= 1) { - cleanup_expired_sessions(); - last_cleanup = now; - } - - PT_YIELD(pt); - } - - running = 0; - - if (sessions) { - size_t iter = 0; - void *item; - while (hashmap_iter(sessions, &iter, &item)) { - udphole_session_t *s = item; - s->marked_for_deletion = 1; - } - } - - free(advertise_addr); - advertise_addr = NULL; - - PT_END(pt); -} diff --git a/src/AppModule/udphole.h b/src/AppModule/udphole.h @@ -1,15 +0,0 @@ -#ifndef __APPMODULE_UDPHOLE_H__ -#define __APPMODULE_UDPHOLE_H__ - -#include <stddef.h> -#include <stdint.h> -#include <sys/time.h> - -#include "SchedulerModule/protothreads.h" - -struct pt; -struct pt_task; - -PT_THREAD(udphole_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)); - -#endif diff --git a/src/RespModule/resp.c b/src/RespModule/resp.c @@ -5,6 +5,7 @@ #include <string.h> #include <stdio.h> #include <unistd.h> +#include <errno.h> #include "rxi/log.h" #include "RespModule/resp.h" @@ -16,8 +17,12 @@ static void resp_free_internal(resp_object *o); static int resp_read_byte(int fd) { unsigned char c; - if (read(fd, &c, 1) != 1) + ssize_t n = read(fd, &c, 1); + if (n != 1) { + if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) + return -2; return -1; + } return (int)c; } @@ -40,6 +45,7 @@ static int resp_read_line(int fd, char *buf, size_t buf_size) { resp_object *resp_read(int fd) { int type_c = resp_read_byte(fd); if (type_c < 0) return NULL; + if (type_c == -2) return NULL; // EAGAIN - no data (non-blocking) resp_object *o = calloc(1, sizeof(resp_object)); if (!o) return NULL; char line[LINE_BUF]; @@ -265,6 +271,20 @@ int resp_encode_array(int argc, const resp_object *const *argv, char **out_buf, return 0; } +int resp_serialize(const resp_object *o, char **out_buf, size_t *out_len) { + size_t cap = 64; + size_t len = 0; + char *buf = malloc(cap); + if (!buf) return -1; + if (resp_append_object(&buf, &cap, &len, o) != 0) { + free(buf); + return -1; + } + *out_buf = buf; + *out_len = len; + return 0; +} + resp_object *resp_array_init(void) { resp_object *o = calloc(1, sizeof(resp_object)); if (!o) return NULL; diff --git a/src/RespModule/resp.h b/src/RespModule/resp.h @@ -44,6 +44,9 @@ resp_object *resp_read(int fd); /* Encode array of argc objects. Caller must free(*out_buf). Returns 0 on success. */ int resp_encode_array(int argc, const resp_object *const *argv, char **out_buf, size_t *out_len); +/* Serialize single object. Caller must free(*out_buf). Returns 0 on success. */ +int resp_serialize(const resp_object *o, char **out_buf, size_t *out_len); + /* Allocate an empty RESPT_ARRAY. Caller must resp_free. Returns NULL on alloc failure. */ resp_object *resp_array_init(void); diff --git a/src/SchedulerModule/scheduler.c b/src/SchedulerModule/scheduler.c @@ -23,6 +23,8 @@ int schedmod_pt_create(pt_task_fn fn, void *udata) { PT_INIT(&node->pt); node->is_active = 1; pt_first = node; + + log_trace("scheduler: created task %p (func=%p, udata=%p), pt_first=%p", (void*)node, (void*)fn, udata, (void*)pt_first); return 0; } @@ -52,6 +54,7 @@ int schedmod_pt_remove(pt_task_t *task) { int schedmod_has_data(int *in_fds, int **out_fds) { if (!in_fds || in_fds[0] == 0) return 0; + log_trace("schedmod_has_data: in_fds[0]=%d", in_fds[0]); for (int i = 1; i <= in_fds[0]; i++) { int fd = in_fds[i]; @@ -80,6 +83,7 @@ int schedmod_has_data(int *in_fds, int **out_fds) { for (int i = 1; i <= in_fds[0]; i++) { if (in_fds[i] >= 0 && FD_ISSET(in_fds[i], &g_select_result)) { (*out_fds)[idx++] = in_fds[i]; + FD_CLR(in_fds[i], &g_select_result); } } @@ -100,9 +104,11 @@ int schedmod_main() { } } + tv.tv_sec = 0; tv.tv_usec = 100000; select(maxfd + 1, &g_want_fds, NULL, NULL, &tv); + log_trace("scheduler: select returned"); struct timeval now; gettimeofday(&now, NULL); @@ -111,17 +117,19 @@ int schedmod_main() { FD_ZERO(&g_want_fds); - pt_task_t *task = pt_first; - while (task) { - pt_task_t *next = task->next; - log_trace("scheduler: running task %p", (void*)task->func); - task->is_active = PT_SCHEDULE(task->func(&task->pt, timestamp, task)); - log_trace("scheduler: task %p returned, is_active=%d", (void*)task->func, task->is_active); - if (!task->is_active) { - schedmod_pt_remove(task); - } - task = next; + pt_task_t *task = pt_first; + while (task) { + pt_task_t *next = task->next; + log_trace("scheduler: about to run task %p (func=%p, is_active=%d)", (void*)task, (void*)task->func, task->is_active); + task->is_active = PT_SCHEDULE(task->func(&task->pt, timestamp, task)); + log_trace("scheduler: task %p (func=%p) returned, is_active=%d, next=%p", (void*)task, (void*)task->func, task->is_active, (void*)next); + if (!task->is_active) { + log_trace("scheduler: removing inactive task %p", (void*)task); + schedmod_pt_remove(task); } + task = next; + } + log_trace("scheduler: loop done, pt_first=%p", (void*)pt_first); if (!pt_first) break; } diff --git a/test/basic-forwarding.js b/test/basic-forwarding.js @@ -0,0 +1,98 @@ +const path = require('path'); +const { + spawnDaemon, + killDaemon, + connectApi, + apiCommand, + createUdpEchoServer, + sendUdp, + TIMEOUT +} = require('./helpers'); + +const CONFIG_PATH = path.join(__dirname, 'config.ini'); +const API_PORT = 9123; + +async function runTest() { + let daemon = null; + let apiSock = null; + let echoServer = null; + + console.log('=== Basic Forwarding Test ==='); + console.log('Testing: UDP packets are forwarded from listen socket to connect socket\n'); + + try { + console.log('1. Spawning daemon...'); + daemon = await spawnDaemon(CONFIG_PATH); + console.log(` Daemon started (PID: ${daemon.pid})`); + + console.log('2. Connecting to API...'); + apiSock = await connectApi(API_PORT); + console.log(' Connected to API'); + + console.log('3. Authenticating...'); + let resp = await apiCommand(apiSock, 'auth', 'finwo', 'testsecret'); + console.log(` Auth response: ${resp}`); + if (resp !== 'OK') throw new Error('Authentication failed'); + + console.log('4. Creating session...'); + resp = await apiCommand(apiSock, 'session.create', 'test-session', '60'); + console.log(` Session create: ${resp}`); + + console.log('5. Creating listen socket...'); + resp = await apiCommand(apiSock, 'session.socket.create.listen', 'test-session', 'client-a'); + const listenPort = resp[0]; + console.log(` Listen socket port: ${listenPort}`); + + console.log('6. Starting echo server...'); + echoServer = await createUdpEchoServer(); + console.log(` Echo server on port: ${echoServer.port}`); + + console.log('7. Creating connect socket to echo server...'); + resp = await apiCommand(apiSock, 'session.socket.create.connect', 'test-session', 'relay', '127.0.0.1', echoServer.port); + console.log(` Connect socket: ${resp}`); + + console.log('8. Creating forward: client-a -> relay...'); + resp = await apiCommand(apiSock, 'session.forward.create', 'test-session', 'client-a', 'relay'); + console.log(` Forward create: ${resp}`); + + console.log(' Waiting for session to initialize...'); + await new Promise(r => setTimeout(r, 100)); + + console.log('9. Sending UDP packet to listen socket...'); + await sendUdp(listenPort, '127.0.0.1', 'hello'); + console.log(' Sent "hello"'); + + console.log('10. Waiting for echo response...'); + const messages = echoServer.getMessages(); + const start = Date.now(); + while (messages.length === 0 && Date.now() - start < TIMEOUT) { + await new Promise(r => setTimeout(r, 50)); + } + + if (messages.length === 0) { + throw new Error('Timeout: no message received by echo server'); + } + + const msg = messages[0]; + console.log(` Received: "${msg.data}" from ${msg.rinfo.address}:${msg.rinfo.port}`); + + if (msg.data === 'hello') { + console.log('\n✓ PASS: UDP forwarding works correctly'); + console.log(' Packet was forwarded from listen socket to connect socket'); + console.log(' and echoed back successfully.'); + process.exit(0); + } else { + throw new Error(`Expected "hello", got "${msg.data}"`); + } + + } catch (err) { + console.error(`\n✗ FAIL: ${err.message}`); + process.exit(1); + } finally { + if (echoServer) echoServer.socket.close(); + if (apiSock) apiSock.end(); + if (daemon) await killDaemon(daemon); + } +} + +runTest(); +\ No newline at end of file diff --git a/test/config.ini b/test/config.ini @@ -0,0 +1,8 @@ +[udphole] +mode = builtin +ports = 9000-9099 +listen = :9123 + +[user:finwo] +permit = * +secret = testsecret diff --git a/test/helpers.js b/test/helpers.js @@ -0,0 +1,225 @@ +const { spawn } = require('child_process'); +const net = require('net'); +const dgram = require('dgram'); +const path = require('path'); + +const DAEMON_PATH = path.join(__dirname, '..', 'udphole'); +const TIMEOUT = 2000; + +function sleep(ms) { + return new Promise(resolve => setTimeout(resolve, ms)); +} + +function findFreePort() { + return new Promise((resolve, reject) => { + const server = net.createServer(); + server.unref(); + server.on('error', reject); + server.listen(0, () => { + const addr = server.address(); + server.close(() => resolve(addr.port)); + }); + }); +} + +function spawnDaemon(configPath) { + return new Promise((resolve, reject) => { + const { execSync } = require('child_process'); + try { execSync('pkill -9 udphole 2>/dev/null', { stdio: 'ignore' }); } catch(e) {} + + const daemon = spawn(DAEMON_PATH, ['-f', configPath, 'daemon', '-D'], { + stdio: ['ignore', 'pipe', 'pipe', 'pipe'] + }); + + + + let output = ''; + const startTimeout = setTimeout(() => { + reject(new Error(`Daemon start timeout. Output: ${output}`)); + }, TIMEOUT); + + daemon.stderr.on('data', (data) => { + process.stderr.write(data.toString()); + output += data.toString(); + if (output.includes('daemon started')) { + clearTimeout(startTimeout); + sleep(200).then(() => resolve(daemon)); + } + }); + + daemon.on('error', (err) => { + clearTimeout(startTimeout); + reject(err); + }); + }); +} + +function killDaemon(daemon) { + return new Promise((resolve) => { + if (!daemon || daemon.killed) { + resolve(); + return; + } + daemon.once('exit', resolve); + daemon.kill('SIGTERM'); + setTimeout(() => { + if (!daemon.killed) daemon.kill('SIGKILL'); + resolve(); + }, 1000); + }); +} + +function connectApi(port) { + return new Promise((resolve, reject) => { + const sock = net.createConnection({ port, host: '127.0.0.1', noDelay: true }); + sock.setEncoding('utf8'); + + const timeout = setTimeout(() => { + sock.destroy(); + reject(new Error('Connection timeout')); + }, TIMEOUT); + + sock.on('connect', () => { + clearTimeout(timeout); + resolve(sock); + }); + + sock.on('error', reject); + }); +} + +function encodeResp(...args) { + const n = args.length; + let cmd = `*${n}\r\n`; + for (const arg of args) { + const s = String(arg); + cmd += `$${s.length}\r\n${s}\r\n`; + } + return cmd; +} + +function apiCommand(sock, ...args) { + return new Promise((resolve, reject) => { + const cmd = encodeResp(...args); + + let response = ''; + const timeout = setTimeout(() => { + sock.destroy(); + reject(new Error('API command timeout')); + }, TIMEOUT); + + sock.once('data', (data) => { + response += data; + clearTimeout(timeout); + resolve(parseResp(response)); + }); + + sock.write(cmd); + }); +} + +function parseResp(data) { + data = data.trim(); + if (data.startsWith('+')) return data.substring(1).trim(); + if (data.startsWith('-')) throw new Error(data.substring(1).trim()); + if (data.startsWith(':')) return parseInt(data.substring(1), 10); + if (data.startsWith('*')) { + const count = parseInt(data.substring(1), 10); + if (count === 0) return []; + const lines = data.split('\r\n'); + const result = []; + let i = 1; + for (let j = 0; j < count && i < lines.length; j++) { + if (lines[i].startsWith('$')) { + i++; + if (i < lines.length) result.push(lines[i]); + } else if (lines[i].startsWith(':')) { + result.push(parseInt(lines[i].substring(1), 10)); + } else if (lines[i].startsWith('+')) { + result.push(lines[i].substring(1)); + } + i++; + } + return result; + } + if (data.startsWith('$')) { + const len = parseInt(data.substring(1), 10); + if (len === -1) return null; + const idx = data.indexOf('\r\n'); + if (idx >= 0) return data.substring(idx + 2); + return ''; + } + return data; +} + +function createUdpEchoServer() { + return new Promise(async (resolve, reject) => { + const server = dgram.createSocket('udp4'); + const messages = []; + + server.on('message', (msg, rinfo) => { + messages.push({ data: msg.toString(), rinfo }); + server.send(msg, rinfo.port, rinfo.address); + }); + + server.on('error', reject); + + const port = await findFreePort(); + server.bind(port, '127.0.0.1', () => { + resolve({ + port, + socket: server, + getMessages: () => messages, + clearMessages: () => { messages.length = 0; } + }); + }); + }); +} + +function sendUdp(port, host, message) { + return new Promise((resolve, reject) => { + const sock = dgram.createSocket('udp4'); + const buf = Buffer.from(message); + sock.send(buf, 0, buf.length, port, host, (err) => { + sock.close(); + if (err) reject(err); + else resolve(); + }); + }); +} + +function recvUdp(port, timeout) { + return new Promise((resolve, reject) => { + const sock = dgram.createSocket('udp4'); + sock.bind(port, '127.0.0.1'); + + const timer = setTimeout(() => { + sock.close(); + reject(new Error('Receive timeout')); + }, timeout || TIMEOUT); + + sock.on('message', (msg, rinfo) => { + clearTimeout(timer); + sock.close(); + resolve({ data: msg.toString(), rinfo }); + }); + + sock.on('error', (err) => { + clearTimeout(timer); + reject(err); + }); + }); +} + +module.exports = { + sleep, + findFreePort, + spawnDaemon, + killDaemon, + connectApi, + apiCommand, + createUdpEchoServer, + sendUdp, + recvUdp, + TIMEOUT +}; +\ No newline at end of file diff --git a/test/listen-relearn.js b/test/listen-relearn.js @@ -0,0 +1,147 @@ +const path = require('path'); +const dgram = require('dgram'); +const { + spawnDaemon, + killDaemon, + connectApi, + apiCommand, + createUdpEchoServer, + TIMEOUT +} = require('./helpers'); + +const CONFIG_PATH = path.join(__dirname, 'config.ini'); +const API_PORT = 9123; + +function sendUdpFromPort(srcPort, dstPort, message) { + return new Promise((resolve, reject) => { + const sock = dgram.createSocket('udp4'); + sock.bind(srcPort, '127.0.0.1', () => { + const buf = Buffer.from(message); + sock.send(buf, 0, buf.length, dstPort, '127.0.0.1', (err) => { + if (err) { + sock.close(); + reject(err); + } else { + setTimeout(() => { + sock.close(); + resolve(); + }, 50); + } + }); + }); + sock.on('error', reject); + }); +} + +async function runTest() { + let daemon = null; + let apiSock = null; + let echoServer = null; + + console.log('=== Listen Socket Re-learn Test ==='); + console.log('Testing: listen socket re-learns remote address when different client sends\n'); + + try { + console.log('1. Spawning daemon...'); + daemon = await spawnDaemon(CONFIG_PATH); + console.log(` Daemon started (PID: ${daemon.pid})`); + + console.log('2. Connecting to API...'); + apiSock = await connectApi(API_PORT); + console.log(' Connected to API'); + + console.log('3. Authenticating...'); + let resp = await apiCommand(apiSock, 'auth', 'finwo', 'testsecret'); + console.log(` Auth response: ${resp}`); + if (resp !== 'OK') throw new Error('Authentication failed'); + + console.log('4. Creating session...'); + resp = await apiCommand(apiSock, 'session.create', 'test-relearn', '60'); + console.log(` Session create: ${resp}`); + + console.log('5. Creating listen socket...'); + resp = await apiCommand(apiSock, 'session.socket.create.listen', 'test-relearn', 'listener'); + const listenPort = resp[0]; + console.log(` Listen socket port: ${listenPort}`); + + console.log('6. Starting echo server...'); + echoServer = await createUdpEchoServer(); + console.log(` Echo server on port: ${echoServer.port}`); + + console.log('7. Creating connect socket to echo server...'); + resp = await apiCommand(apiSock, 'session.socket.create.connect', 'test-relearn', 'relay', '127.0.0.1', echoServer.port); + console.log(` Connect socket: ${resp}`); + + console.log('8. Creating forward: listener -> relay...'); + resp = await apiCommand(apiSock, 'session.forward.create', 'test-relearn', 'listener', 'relay'); + console.log(` Forward create: ${resp}`); + + console.log('9. First client sending "from-A" from port 50001...'); + await sendUdpFromPort(50001, listenPort, 'from-A'); + console.log(' Sent "from-A"'); + + let messages = echoServer.getMessages(); + let start = Date.now(); + while (messages.length === 0 && Date.now() - start < TIMEOUT) { + await new Promise(r => setTimeout(r, 50)); + messages = echoServer.getMessages(); + } + + if (messages.length === 0) { + throw new Error('Timeout: no message received from first client'); + } + + const msgA = messages[0]; + console.log(` Received: "${msgA.data}" from ${msgA.rinfo.address}:${msgA.rinfo.port}`); + + if (msgA.data !== 'from-A') { + throw new Error(`Expected "from-A", got "${msgA.data}"`); + } + console.log(' ✓ First client connection established, listen socket learned address'); + + echoServer.clearMessages(); + + console.log('10. Second client sending "from-B" from port 50002...'); + await sendUdpFromPort(50002, listenPort, 'from-B'); + console.log(' Sent "from-B"'); + + messages = echoServer.getMessages(); + start = Date.now(); + while (messages.length === 0 && Date.now() - start < TIMEOUT) { + await new Promise(r => setTimeout(r, 50)); + messages = echoServer.getMessages(); + } + + if (messages.length === 0) { + throw new Error('Timeout: no message received from second client'); + } + + const msgB = messages[0]; + console.log(` Received: "${msgB.data}" from ${msgB.rinfo.address}:${msgB.rinfo.port}`); + + if (msgB.data === 'from-B') { + console.log('\n✓ PASS: Listen socket correctly re-learned new remote address'); + console.log(' Second client (from port 50002) was able to communicate'); + console.log(' through the same listen socket after the first client.'); + process.exit(0); + } else if (msgB.data === 'from-A') { + console.log('\n✗ FAIL: Listen socket did NOT re-learn new remote address'); + console.log(' The second client\'s packet was sent back to the first client'); + console.log(' instead of the second client. This is the bug to fix.'); + console.log(` Expected to receive from port 50002, but responses went to port ${msgA.rinfo.port}`); + process.exit(1); + } else { + throw new Error(`Unexpected message: "${msgB.data}"`); + } + + } catch (err) { + console.error(`\n✗ FAIL: ${err.message}`); + process.exit(1); + } finally { + if (echoServer) echoServer.socket.close(); + if (apiSock) apiSock.end(); + if (daemon) await killDaemon(daemon); + } +} + +runTest(); +\ No newline at end of file