udphole

Basic UDP wormhole proxy
git clone git://git.finwo.net/app/udphole
Log | Files | Refs | README | LICENSE

commit 21f64febb8db055bbf3da4e2c0019510d6e9dfe8
Author: Robin Bron <robin.bron@yourhosting.nl>
Date:   Sat, 28 Feb 2026 23:06:16 +0100

Extract from upbx

Diffstat:
A.gitignore | 4++++
ALICENSE.md | 39+++++++++++++++++++++++++++++++++++++++
AMakefile | 141+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aconfig.ini.example | 4++++
Apackage.ini | 10++++++++++
Asrc/ApiModule/server.c | 644+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/ApiModule/server.h | 23+++++++++++++++++++++++
Asrc/AppModule/command/daemon.c | 80+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/AppModule/command/daemon.h | 6++++++
Asrc/AppModule/setup.c | 11+++++++++++
Asrc/AppModule/setup.h | 6++++++
Asrc/AppModule/udphole.c | 876+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/AppModule/udphole.h | 15+++++++++++++++
Asrc/CliModule/command/list_commands.c | 43+++++++++++++++++++++++++++++++++++++++++++
Asrc/CliModule/command/list_commands.h | 6++++++
Asrc/CliModule/common.c | 79+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/CliModule/common.h | 41+++++++++++++++++++++++++++++++++++++++++
Asrc/CliModule/execute_command.c | 16++++++++++++++++
Asrc/CliModule/execute_command.h | 6++++++
Asrc/CliModule/register_command.c | 18++++++++++++++++++
Asrc/CliModule/register_command.h | 6++++++
Asrc/CliModule/setup.c | 11+++++++++++
Asrc/CliModule/setup.h | 6++++++
Asrc/RespModule/resp.c | 315+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/RespModule/resp.h | 58++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/SchedulerModule/protothreads.h | 409+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/SchedulerModule/scheduler.c | 130+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/SchedulerModule/scheduler.h | 30++++++++++++++++++++++++++++++
Asrc/common/digest_auth.c | 54++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/common/digest_auth.h | 27+++++++++++++++++++++++++++
Asrc/common/hexdump.c | 35+++++++++++++++++++++++++++++++++++
Asrc/common/hexdump.h | 12++++++++++++
Asrc/common/md5.c | 147+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/common/md5.h | 20++++++++++++++++++++
Asrc/common/socket_util.c | 358+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/common/socket_util.h | 22++++++++++++++++++++++
Asrc/common/util/hex.c | 31+++++++++++++++++++++++++++++++
Asrc/common/util/hex.h | 12++++++++++++
Asrc/config.c | 62++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Asrc/config.h | 15+++++++++++++++
Asrc/main.c | 278+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
41 files changed, 4106 insertions(+), 0 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -0,0 +1,4 @@ +/lib/ +*.o +/udphole +/config.ini diff --git a/LICENSE.md b/LICENSE.md @@ -0,0 +1,39 @@ +Copyright (c) 2026 finwo + +<!-- paragraph --> +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to use, copy, +modify, and distribute the Software, subject to the following conditions: + +<!-- list:start --> + 1. Redistributions of source code must retain the above copyright notice, this + list of conditions, and the following disclaimer. + + 2. Redistributions in binary form, or any public offering of the Software + (including hosted or managed services), must reproduce the above copyright + notice, this list of conditions, and the following disclaimer in the + documentation and/or other materials provided. + + 3. Any redistribution or public offering of the Software must clearly attribute + the Software to the original copyright holder, reference this License, and + include a link to the official project repository or website. + + 4. The Software may not be renamed, rebranded, or marketed in a manner that + implies it is an independent or proprietary product. Derivative works must + clearly state that they are based on the Software. + + 5. Modifications to copies of the Software must carry prominent notices stating + that changes were made, the nature of the modifications, and the date of the + modifications. +<!-- list:end --> + +<!-- paragraph --> +Any violation of these conditions terminates the permissions granted herein. + +<!-- paragraph --> +THIS SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE COPYRIGHT +HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/Makefile b/Makefile @@ -0,0 +1,141 @@ +lc = $(subst A,a,$(subst B,b,$(subst C,c,$(subst D,d,$(subst E,e,$(subst F,f,$(subst G,g,$(subst H,h,$(subst I,i,$(subst J,j,$(subst K,k,$(subst L,l,$(subst M,m,$(subst N,n,$(subst O,o,$(subst P,p,$(subst Q,q,$(subst R,r,$(subst S,s,$(subst T,t,$(subst U,u,$(subst V,v,$(subst W,w,$(subst X,x,$(subst Y,y,$(subst Z,z,$1)))))))))))))))))))))))))) + +LIBS:= +SRC:= + +# UNAME_MACHINE=$(call lc,$(shell uname -m)) +# UNAME_SYSTEM=$(call lc,$(shell uname -s)) + +BIN?=udphole +VERSION?=0.1.0 + +CC:=gcc +CPP:=g++ + +FIND=$(shell which gfind find | head -1) +SRC+=$(shell $(FIND) src/ -type f -name '*.c') +# Exclude standalone test programs from main binary +SRC:=$(filter-out $(wildcard src/test/test_*.c),$(SRC)) + +INCLUDES:= + +override CFLAGS?=-Wall -O2 +override CFLAGS+=-I src -D INI_HANDLER_LINENO=1 -D'UDPHOLE_VERSION_STR="$(VERSION)"' +override LDFLAGS?= + +override LDFLAGS+=-lresolv + +override CPPFLAGS?= + +ifeq ($(OS),Windows_NT) + # CFLAGS += -D WIN32 + override CPPFLAGS+=-lstdc++ + override CPPFLAGS+= + ifeq ($(PROCESSOR_ARCHITEW6432),AMD64) + # CFLAGS += -D AMD64 + else + ifeq ($(PROCESSOR_ARCHITECTURE),AMD64) + # CFLAGS += -D AMD64 + endif + ifeq ($(PROCESSOR_ARCHITECTURE),x86) + # CFLAGS += -D IA32 + endif + endif +else + UNAME_S := $(shell uname -s) + ifeq ($(UNAME_S),Linux) + # CFLAGS += -D LINUX + override CPPFLAGS+=-lstdc++ + # override CFLAGS+=$(shell pkg-config --cflags glib-2.0) + # override LDFLAGS+=$(shell pkg-config --libs glib-2.0) + override CFLAGS+=-D _GNU_SOURCE + endif + ifeq ($(UNAME_S),Darwin) + # CFLAGS += -D OSX + override CPPFLAGS+=-std=c++14 + override CFLAGS+=-D _BSD_SOURCE + endif + UNAME_P := $(shell uname -p) + ifeq ($(UNAME_P),x86_64) + # CFLAGS += -D AMD64 + endif + ifneq ($(filter %86,$(UNAME_P)),) + # CFLAGS += -D IA32 + endif + ifneq ($(filter arm%,$(UNAME_P)),) + # CFLAGS += -D ARM + endif + # TODO: flags for riscv +endif + +include lib/.dep/config.mk + +OBJ:=$(SRC:.c=.o) +OBJ:=$(OBJ:.cc=.o) + +override CFLAGS+=$(INCLUDES) +override CPPFLAGS+=$(INCLUDES) +override CPPFLAGS+=$(CFLAGS) + +.PHONY: default +default: $(BIN) $(BIN).1 + +# Stddoc: extract /// documentation from source to markdown for manpage +STDDOC ?= stddoc +doc/cli_doc.md: $(SRC) + @mkdir -p doc + cat $(SRC) | $(STDDOC) > doc/cli_doc.md + +# Manpage: template + stddoc fragment (markdown -> man) + envsubst for VERSION +doc/cli_doc.man: doc/cli_doc.md + pandoc doc/cli_doc.md -f markdown -t man -o doc/cli_doc.man + +doc/license.man: LICENSE.md + @mkdir -p doc + pandoc LICENSE.md -f markdown -t man --standalone=false -o doc/license.man + +$(BIN).1: doc/upbx.1.in doc/cli_doc.man doc/license.man + VERSION=$(VERSION) envsubst '$$VERSION' < doc/upbx.1.in | sed '/__COMMANDS_MAN__/r doc/cli_doc.man' | sed '/__COMMANDS_MAN__/d' | sed '/__LICENSE_MAN__/r doc/license.man' | sed '/__LICENSE_MAN__/d' > $(BIN).1 + +# .cc.o: +# $(CPP) $< $(CPPFLAGS) -c -o $@ + +.c.o: + ${CC} $< ${CFLAGS} -c -o $@ + +$(BIN): $(OBJ) + ${CC} ${OBJ} ${CFLAGS} ${LDFLAGS} -o $@ + +# ---- Test targets (finwo/assert) ---- +TEST_BINS:=test_md5 test_sdp_parse test_sip_parse test_config test_resp test_digest_auth + +test_md5: src/test/test_md5.o src/common/md5.o + $(CC) $^ $(CFLAGS) -o $@ + +test_sdp_parse: src/test/test_sdp_parse.o src/AppModule/util/sdp_parse.o + $(CC) $^ $(CFLAGS) -o $@ + +test_sip_parse: src/test/test_sip_parse.o src/AppModule/sip_parse.o src/AppModule/pbx/registration.o src/config.o src/RespModule/resp.o src/PluginModule/plugin.o lib/benhoyt/inih/ini.o lib/cofyc/argparse/argparse.o lib/rxi/log/src/log.o src/common/socket_util.o + $(CC) $^ $(CFLAGS) -o $@ + +test_config: src/test/test_config.o src/config.o src/RespModule/resp.o lib/benhoyt/inih/ini.o lib/rxi/log/src/log.o + $(CC) $^ $(CFLAGS) -o $@ + +test_resp: src/test/test_resp.o src/RespModule/resp.o + $(CC) $^ $(CFLAGS) -o $@ + +test_digest_auth: src/test/test_digest_auth.o src/common/digest_auth.o src/common/md5.o + $(CC) $^ $(CFLAGS) -o $@ + +.PHONY: test +test: $(TEST_BINS) + @for t in $(TEST_BINS); do echo "--- $$t ---"; ./$$t || exit 1; done + @echo "All tests passed." + +.PHONY: clean +clean: + rm -rf $(BIN) $(BIN).1 $(TEST_BINS) + rm -rf $(OBJ) + rm -rf $(patsubst %,src/test/%.o,$(TEST_BINS:test_%=test_%)) + rm -rf src/test/*.o + rm -rf doc/cli_doc.md doc/cli_doc.man doc/license.man diff --git a/config.ini.example b/config.ini.example @@ -0,0 +1,4 @@ +[udphole] +mode = builtin +ports = 7000-7999 +listen = :12344 diff --git a/package.ini b/package.ini @@ -0,0 +1,10 @@ +[dependencies] +benhoyt/inih=master +cofyc/argparse=master +finwo/assert=main +graphitemaster/incbin=main +rxi/log=master +tidwall/hashmap=master + +[package] +name=finwo/udphole diff --git a/src/ApiModule/server.c b/src/ApiModule/server.c @@ -0,0 +1,644 @@ +/* + * Generic RESP2 API server: TCP listener, connection management, RESP2 + * parsing/writing, command hashmap, authentication with per-user permit + * checking, and built-in commands (auth, ping, quit, command). + * + * Runs as a protothread in the main select() loop. + */ + +#include <stdlib.h> +#include <string.h> +#include <stdio.h> +#include <ctype.h> +#include <unistd.h> +#include <errno.h> +#include <fcntl.h> +#include <sys/socket.h> +#include <sys/types.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> + +#include "rxi/log.h" +#include "tidwall/hashmap.h" +#include "SchedulerModule/protothreads.h" +#include "SchedulerModule/scheduler.h" +#include "common/socket_util.h" +#include "config.h" +#include "server.h" + +struct pt_task; +PT_THREAD(api_client_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)); + +/* Constants */ + +#define API_MAX_CLIENTS 8 +#define READ_BUF_SIZE 4096 +#define WRITE_BUF_INIT 4096 +#define MAX_ARGS 32 + +/* Per-client connection state (stored in pt udata) */ + +struct api_client_state { + int fd; + int *fds; + char *username; + char rbuf[READ_BUF_SIZE]; + size_t rlen; + char *wbuf; + size_t wlen; + size_t wcap; +}; + +typedef struct api_client_state api_client_t; + +/* Server state (stored in pt udata) */ + +struct api_server_state { + int *fds; + char *current_listen; + time_t next_listen_check; +}; + +typedef struct api_server_state api_server_t; + +/* Command entry */ + +typedef struct { + const char *name; + char (*func)(api_client_t *c, char **args, int nargs); +} api_cmd_entry; + +/* Module state */ + +static struct hashmap *cmd_map = NULL; + +/* Write helpers */ + +bool api_write_raw(api_client_t *c, const void *data, size_t len) { + if (c->fd < 0) return false; + if (c->wlen + len > c->wcap) { + size_t need = c->wlen + len; + size_t ncap = c->wcap ? c->wcap : WRITE_BUF_INIT; + while (ncap < need) ncap *= 2; + char *nb = realloc(c->wbuf, ncap); + if (!nb) return false; + c->wbuf = nb; + c->wcap = ncap; + } + memcpy(c->wbuf + c->wlen, data, len); + c->wlen += len; + return true; +} + +bool api_write_cstr(api_client_t *c, const char *s) { + return api_write_raw(c, s, strlen(s)); +} + +bool api_write_ok(api_client_t *c) { + return api_write_cstr(c, "+OK\r\n"); +} + +bool api_write_err(api_client_t *c, const char *msg) { + if (!api_write_cstr(c, "-ERR ")) return false; + if (!api_write_cstr(c, msg)) return false; + return api_write_cstr(c, "\r\n"); +} + +bool api_write_nil(api_client_t *c) { + return api_write_cstr(c, "$-1\r\n"); +} + +bool api_write_int(api_client_t *c, int value) { + char buf[32]; + snprintf(buf, sizeof(buf), ":%d\r\n", value); + return api_write_cstr(c, buf); +} + +bool api_write_array(api_client_t *c, size_t nitems) { + char buf[32]; + snprintf(buf, sizeof(buf), "*%zu\r\n", nitems); + return api_write_cstr(c, buf); +} + +bool api_write_bulk_cstr(api_client_t *c, const char *s) { + if (!s) return api_write_nil(c); + size_t len = strlen(s); + char prefix[32]; + snprintf(prefix, sizeof(prefix), "$%zu\r\n", len); + if (!api_write_cstr(c, prefix)) return false; + if (!api_write_raw(c, s, len)) return false; + return api_write_cstr(c, "\r\n"); +} + +bool api_write_bulk_int(api_client_t *c, int val) { + char buf[32]; + snprintf(buf, sizeof(buf), "%d", val); + return api_write_bulk_cstr(c, buf); +} + +/* Client lifecycle */ + +static void client_close(api_client_t *c) { + if (c->fd >= 0) { + close(c->fd); + c->fd = -1; + } + free(c->wbuf); + c->wbuf = NULL; + c->wlen = c->wcap = 0; + c->rlen = 0; + free(c->username); + c->username = NULL; +} + +static void client_flush(api_client_t *c) { + if (c->fd < 0 || c->wlen == 0) return; + ssize_t n = send(c->fd, c->wbuf, c->wlen, 0); + if (n > 0) { + if ((size_t)n < c->wlen) + memmove(c->wbuf, c->wbuf + n, c->wlen - (size_t)n); + c->wlen -= (size_t)n; + } else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + client_close(c); + } +} + +/* RESP2 inline (telnet) parser */ + +static int parse_inline(const char *line, size_t len, char **args, int max_args) { + int nargs = 0; + const char *p = line; + const char *end = line + len; + while (p < end && nargs < max_args) { + while (p < end && (*p == ' ' || *p == '\t')) p++; + if (p >= end) break; + const char *start; + const char *tok_end; + if (*p == '"' || *p == '\'') { + char quote = *p++; + start = p; + while (p < end && *p != quote) p++; + tok_end = p; + if (p < end) p++; + } else { + start = p; + while (p < end && *p != ' ' && *p != '\t') p++; + tok_end = p; + } + size_t tlen = (size_t)(tok_end - start); + char *arg = malloc(tlen + 1); + if (!arg) return -1; + memcpy(arg, start, tlen); + arg[tlen] = '\0'; + args[nargs++] = arg; + } + return nargs; +} + +/* RESP2 multibulk parser */ + +static int parse_resp_command(api_client_t *c, char **args, int max_args, int *nargs) { + *nargs = 0; + if (c->rlen == 0) return 0; + + if (c->rbuf[0] != '*') { + char *nl = memchr(c->rbuf, '\n', c->rlen); + if (!nl) return 0; + size_t line_len = (size_t)(nl - c->rbuf); + size_t trim = line_len; + if (trim > 0 && c->rbuf[trim - 1] == '\r') trim--; + int n = parse_inline(c->rbuf, trim, args, max_args); + if (n < 0) return -1; + *nargs = n; + size_t consumed = line_len + 1; + c->rlen -= consumed; + if (c->rlen > 0) memmove(c->rbuf, c->rbuf + consumed, c->rlen); + return n > 0 ? 1 : 0; + } + + size_t pos = 0; + char *nl = memchr(c->rbuf + pos, '\n', c->rlen - pos); + if (!nl) return 0; + int count = atoi(c->rbuf + 1); + if (count <= 0 || count > max_args) return -1; + pos = (size_t)(nl - c->rbuf) + 1; + + for (int i = 0; i < count; i++) { + if (pos >= c->rlen) return 0; + if (c->rbuf[pos] != '$') return -1; + nl = memchr(c->rbuf + pos, '\n', c->rlen - pos); + if (!nl) return 0; + int blen = atoi(c->rbuf + pos + 1); + if (blen < 0) return -1; + size_t hdr_end = (size_t)(nl - c->rbuf) + 1; + if (hdr_end + (size_t)blen + 2 > c->rlen) return 0; + char *arg = malloc((size_t)blen + 1); + if (!arg) return -1; + memcpy(arg, c->rbuf + hdr_end, (size_t)blen); + arg[blen] = '\0'; + args[i] = arg; + pos = hdr_end + (size_t)blen + 2; + } + + *nargs = count; + c->rlen -= pos; + if (c->rlen > 0) memmove(c->rbuf, c->rbuf + pos, c->rlen); + return 1; +} + +/* Permission checking */ + +/* Check if a command name matches a single permit pattern. + * Supports: "*" matches everything, "foo.*" matches "foo.anything", + * exact match otherwise. */ +static bool permit_matches(const char *pattern, const char *cmd) { + size_t plen = strlen(pattern); + if (plen == 1 && pattern[0] == '*') + return true; + if (plen >= 2 && pattern[plen - 1] == '*') { + return strncasecmp(pattern, cmd, plen - 1) == 0; + } + return strcasecmp(pattern, cmd) == 0; +} + +/* Check if client has permission for a command. Uses live config: api:<username> and api:* permit keys. */ +static bool user_has_permit(api_client_t *c, const char *cmd) { + char section[128]; + const char *uname = (c->username && c->username[0]) ? c->username : "*"; + snprintf(section, sizeof(section), "api:%s", uname); + resp_object *sec = resp_map_get(global_cfg, section); + if (sec && sec->type == RESPT_ARRAY) { + for (size_t i = 0; i < sec->u.arr.n; i += 2) { + if (i + 1 < sec->u.arr.n) { + resp_object *key = &sec->u.arr.elem[i]; + resp_object *val = &sec->u.arr.elem[i + 1]; + if (key->type == RESPT_BULK && key->u.s && strcmp(key->u.s, "permit") == 0) { + if (val->type == RESPT_ARRAY) { + for (size_t j = 0; j < val->u.arr.n; j++) { + resp_object *p = &val->u.arr.elem[j]; + if (p->type == RESPT_BULK && p->u.s && permit_matches(p->u.s, cmd)) + return true; + } + } + } + } + } + } + if (strcmp(uname, "*") != 0) { + resp_object *anon = resp_map_get(global_cfg, "api:*"); + if (anon && anon->type == RESPT_ARRAY) { + for (size_t i = 0; i < anon->u.arr.n; i += 2) { + resp_object *key = &anon->u.arr.elem[i]; + resp_object *val = &anon->u.arr.elem[i + 1]; + if (key->type == RESPT_BULK && key->u.s && strcmp(key->u.s, "permit") == 0) { + if (val->type == RESPT_ARRAY) { + for (size_t j = 0; j < val->u.arr.n; j++) { + resp_object *p = &val->u.arr.elem[j]; + if (p->type == RESPT_BULK && p->u.s && permit_matches(p->u.s, cmd)) + return true; + } + } + } + } + } + } + return false; +} + +/* Hashmap callbacks */ + +static uint64_t cmd_hash(const void *item, uint64_t seed0, uint64_t seed1) { + const api_cmd_entry *cmd = item; + return hashmap_sip(cmd->name, strlen(cmd->name), seed0, seed1); +} + +static int cmd_compare(const void *a, const void *b, void *udata) { + (void)udata; + const api_cmd_entry *ca = a; + const api_cmd_entry *cb = b; + return strcasecmp(ca->name, cb->name); +} + +/* Public: register a command */ + +void api_register_cmd(const char *name, char (*func)(api_client_t *, char **, int)) { + if (!cmd_map) + cmd_map = hashmap_new(sizeof(api_cmd_entry), 0, 0, 0, cmd_hash, cmd_compare, NULL, NULL); + hashmap_set(cmd_map, &(api_cmd_entry){ .name = name, .func = func }); + log_trace("api: registered command '%s'", name); +} + +static char cmdAUTH(api_client_t *c, char **args, int nargs) { + if (nargs != 3) { + api_write_err(c, "wrong number of arguments for 'auth' command (AUTH username password)"); + return 1; + } + const char *uname = args[1]; + const char *pass = args[2]; + char section[128]; + snprintf(section, sizeof(section), "api:%s", uname); + resp_object *sec = resp_map_get(global_cfg, section); + const char *secret = sec ? resp_map_get_string(sec, "secret") : NULL; + if (secret && pass && strcmp(secret, pass) == 0) { + free(c->username); + c->username = strdup(uname); + if (c->username) { + log_debug("api: client authenticated as '%s'", uname); + return api_write_ok(c) ? 1 : 0; + } + } + return api_write_err(c, "invalid credentials") ? 1 : 0; +} + +static char cmdPING(api_client_t *c, char **args, int nargs) { + (void)args; + if (nargs == 1) + return api_write_cstr(c, "+PONG\r\n") ? 1 : 0; + if (nargs == 2) + return api_write_bulk_cstr(c, args[1]) ? 1 : 0; + return api_write_err(c, "wrong number of arguments for 'ping' command") ? 1 : 0; +} + +static char cmdQUIT(api_client_t *c, char **args, int nargs) { + (void)args; (void)nargs; + api_write_ok(c); + return 0; +} + +static char cmdCOMMAND(api_client_t *c, char **args, int nargs) { + (void)args; (void)nargs; + if (!cmd_map) + return api_write_array(c, 0) ? 1 : 0; + + size_t count = 0; + size_t iter = 0; + void *item; + while (hashmap_iter(cmd_map, &iter, &item)) { + const api_cmd_entry *e = item; + if (user_has_permit(c, e->name)) + count++; + } + + if (!api_write_array(c, count)) return 0; + + iter = 0; + while (hashmap_iter(cmd_map, &iter, &item)) { + const api_cmd_entry *e = item; + if (user_has_permit(c, e->name)) { + if (!api_write_bulk_cstr(c, e->name)) return 0; + } + } + return 1; +} + +/* Command dispatch */ + +static void init_builtins(void) { + api_register_cmd("auth", cmdAUTH); + api_register_cmd("ping", cmdPING); + api_register_cmd("quit", cmdQUIT); + api_register_cmd("command", cmdCOMMAND); +} + +/* Check if a command is a built-in that bypasses auth/permit checks */ +static bool is_builtin(const char *name) { + return (strcasecmp(name, "auth") == 0 || + strcasecmp(name, "ping") == 0 || + strcasecmp(name, "quit") == 0 || + strcasecmp(name, "command") == 0); +} + +static void dispatch_command(api_client_t *c, char **args, int nargs) { + if (nargs <= 0) return; + + for (char *p = args[0]; *p; p++) *p = (char)tolower((unsigned char)*p); + + const api_cmd_entry *cmd = hashmap_get(cmd_map, &(api_cmd_entry){ .name = args[0] }); + if (!cmd) { + api_write_err(c, "unknown command"); + return; + } + + if (!is_builtin(args[0])) { + if (!user_has_permit(c, args[0])) { + api_write_err(c, "no permission"); + return; + } + } + + char result = cmd->func(c, args, nargs); + if (!result) { + client_flush(c); + client_close(c); + } +} + +/* TCP listener */ + +static int *create_listen_socket(const char *listen_addr) { + const char *default_port = "6379"; + resp_object *api_sec = resp_map_get(global_cfg, "udphole"); + if (api_sec) { + const char *cfg_port = resp_map_get_string(api_sec, "port"); + if (cfg_port && cfg_port[0]) default_port = cfg_port; + } + int *fds = tcp_listen(listen_addr, NULL, default_port); + if (!fds) { + return NULL; + } + log_info("api: listening on %s", listen_addr); + return fds; +} + +static void handle_accept(int ready_fd) { + struct sockaddr_storage addr; + socklen_t addrlen = sizeof(addr); + int fd = accept(ready_fd, (struct sockaddr *)&addr, &addrlen); + if (fd < 0) return; + set_socket_nonblocking(fd, 1); + + api_client_t *state = calloc(1, sizeof(*state)); + if (!state) { + const char *msg = "-ERR out of memory\r\n"; + send(fd, msg, strlen(msg), 0); + close(fd); + return; + } + state->fd = fd; + + schedmod_pt_create(api_client_pt, state); + log_trace("api: accepted connection, spawned client pt"); +} + +PT_THREAD(api_server_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { + api_server_t *s = task->udata; + time_t loop_timestamp = 0; + log_trace("api_server: protothread entry"); + PT_BEGIN(pt); + + if (!s) { + s = calloc(1, sizeof(*s)); + if (!s) { + PT_EXIT(pt); + } + task->udata = s; + } + + if (!s->current_listen) { + resp_object *api_sec = resp_map_get(global_cfg, "udphole"); + const char *listen_str = api_sec ? resp_map_get_string(api_sec, "listen") : NULL; + if (!listen_str || !listen_str[0]) { + log_info("api: no listen address configured, API server disabled"); + free(s); + PT_EXIT(pt); + } + + s->current_listen = strdup(listen_str); + if (!s->current_listen) { + free(s); + PT_EXIT(pt); + } + + init_builtins(); + s->fds = create_listen_socket(s->current_listen); + if (!s->fds) { + free(s->current_listen); + s->current_listen = NULL; + free(s); + PT_EXIT(pt); + } + } + + for (;;) { + loop_timestamp = (time_t)(timestamp / 1000); + + if (loop_timestamp >= s->next_listen_check) { + s->next_listen_check = loop_timestamp + 60; + resp_object *api_sec = resp_map_get(global_cfg, "udphole"); + const char *new_str = api_sec ? resp_map_get_string(api_sec, "listen") : ""; + int rebind = (s->current_listen && (!new_str[0] || strcmp(s->current_listen, new_str) != 0)) || + (!s->current_listen && new_str[0]); + if (rebind) { + if (s->fds) { + for (int i = 1; i <= s->fds[0]; i++) { + close(s->fds[i]); + } + } + free(s->current_listen); + s->current_listen = (new_str[0]) ? strdup(new_str) : NULL; + if (s->current_listen) { + int *new_fds = create_listen_socket(s->current_listen); + if (new_fds) { + s->fds = realloc(s->fds, sizeof(int) * (new_fds[0] + 1)); + s->fds[0] = new_fds[0]; + for (int i = 1; i <= new_fds[0]; i++) { + s->fds[i] = new_fds[i]; + } + free(new_fds); + } else { + free(s->current_listen); + s->current_listen = NULL; + } + } + } + } + + if (s->fds && s->fds[0] > 0) { + int *ready_fds = NULL; + PT_WAIT_UNTIL(pt, schedmod_has_data(s->fds, &ready_fds) > 0); + if (ready_fds && ready_fds[0] > 0) { + for (int i = 1; i <= ready_fds[0]; i++) { + handle_accept(ready_fds[i]); + } + } + free(ready_fds); + } else { + PT_YIELD(pt); + } + } + if (s->fds) { + for (int i = 1; i <= s->fds[0]; i++) { + close(s->fds[i]); + } + free(s->fds); + } + free(s->current_listen); + free(s); + if (cmd_map) { + hashmap_free(cmd_map); + cmd_map = NULL; + } + + PT_END(pt); +} + +PT_THREAD(api_client_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { + (void)timestamp; + api_client_t *state = task->udata; + + log_trace("api_client: protothread entry fd=%d", state->fd); + PT_BEGIN(pt); + + state->fds = malloc(sizeof(int) * 2); + if (!state->fds) { + free(state); + PT_EXIT(pt); + } + state->fds[0] = 1; + state->fds[1] = state->fd; + + for (;;) { + int *ready_fds = NULL; + PT_WAIT_UNTIL(pt, schedmod_has_data(state->fds, &ready_fds) > 0); + + int ready_fd = -1; + if (ready_fds && ready_fds[0] > 0) { + for (int i = 1; i <= ready_fds[0]; i++) { + if (ready_fds[i] == state->fd) { + ready_fd = state->fd; + break; + } + } + } + free(ready_fds); + + if (ready_fd != state->fd) continue; + + size_t space = sizeof(state->rbuf) - state->rlen; + if (space == 0) { + break; + } + ssize_t n = recv(state->fd, state->rbuf + state->rlen, space, 0); + if (n <= 0) { + if (n == 0 || (errno != EAGAIN && errno != EWOULDBLOCK)) + break; + } + state->rlen += (size_t)n; + + char *args[MAX_ARGS]; + int nargs; + int rc = 0; + while (state->fd >= 0 && (rc = parse_resp_command(state, args, MAX_ARGS, &nargs)) > 0) { + dispatch_command(state, args, nargs); + for (int j = 0; j < nargs; j++) free(args[j]); + } + if (rc < 0) { + api_write_err(state, "Protocol error"); + } + + client_flush(state); + + if (state->fd < 0) break; + } + + if (state->fd >= 0) { + close(state->fd); + } + free(state->fds); + free(state->wbuf); + free(state->username); + free(state); + + PT_END(pt); +} diff --git a/src/ApiModule/server.h b/src/ApiModule/server.h @@ -0,0 +1,23 @@ +#ifndef __APPMODULE_API_SERVER_H__ +#define __APPMODULE_API_SERVER_H__ + +#include <stdint.h> +#include <stdbool.h> + +#include "SchedulerModule/scheduler.h" + +struct api_client_state; +typedef struct api_client_state api_client_t; + +PT_THREAD(api_server_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)); + +void api_register_cmd(const char *name, char (*func)(api_client_t *, char **, int)); + +bool api_write_ok(api_client_t *c); +bool api_write_err(api_client_t *c, const char *msg); +bool api_write_array(api_client_t *c, size_t nitems); +bool api_write_bulk_cstr(api_client_t *c, const char *s); +bool api_write_bulk_int(api_client_t *c, int val); +bool api_write_int(api_client_t *c, int val); + +#endif // __APPMODULE_API_SERVER_H__ diff --git a/src/AppModule/command/daemon.c b/src/AppModule/command/daemon.c @@ -0,0 +1,80 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <fcntl.h> +#include <sys/stat.h> + +#include "cofyc/argparse.h" +#include "rxi/log.h" + +#include "config.h" +#include "CliModule/common.h" +#include "SchedulerModule/scheduler.h" +#include "AppModule/command/daemon.h" +#include "AppModule/udphole.h" +#include "ApiModule/server.h" + +static const char *const daemon_usages[] = { + "udphole daemon [options]", + NULL, +}; + +static int do_daemonize(void) { + pid_t pid = fork(); + if (pid < 0) { + log_fatal("fork: %m"); + return -1; + } + if (pid > 0) + _exit(0); + if (setsid() < 0) { + log_fatal("setsid: %m"); + _exit(1); + } + pid = fork(); + if (pid < 0) { + log_fatal("fork: %m"); + _exit(1); + } + if (pid > 0) + _exit(0); + if (chdir("/") != 0) { + } + int fd; + for (fd = 0; fd < 3; fd++) + (void)close(fd); + fd = open("/dev/null", O_RDWR); + if (fd >= 0) { + dup2(fd, STDIN_FILENO); + dup2(fd, STDOUT_FILENO); + dup2(fd, STDERR_FILENO); + if (fd > 2) + close(fd); + } + return 0; +} + +int appmodule_cmd_daemon(int argc, const char **argv) { + int daemonize_flag = 0; + int no_daemonize_flag = 0; + + struct argparse argparse; + struct argparse_option options[] = { + OPT_HELP(), + OPT_BOOLEAN('d', "daemonize", &daemonize_flag, "run in background", NULL, 0, 0), + OPT_BOOLEAN('D', "no-daemonize", &no_daemonize_flag, "force foreground (overrides config daemonize=1)", NULL, 0, 0), + OPT_END(), + }; + argparse_init(&argparse, options, daemon_usages, ARGPARSE_STOP_AT_NON_OPTION); + argc = argparse_parse(&argparse, argc, argv); + + if ((!no_daemonize_flag) && (daemonize_flag || 0)) { + do_daemonize(); + } + + schedmod_pt_create(api_server_pt, NULL); + schedmod_pt_create(udphole_manager_pt, NULL); + + return schedmod_main(); +} diff --git a/src/AppModule/command/daemon.h b/src/AppModule/command/daemon.h @@ -0,0 +1,6 @@ +#ifndef __APPMODULE_COMMAND_DAEMON_H__ +#define __APPMODULE_COMMAND_DAEMON_H__ + +int appmodule_cmd_daemon(int argc, const char **argv); + +#endif diff --git a/src/AppModule/setup.c b/src/AppModule/setup.c @@ -0,0 +1,11 @@ +#include "AppModule/setup.h" +#include "AppModule/command/daemon.h" +#include "CliModule/register_command.h" + +void appmodule_setup(void) { + climodule_register_command( + "daemon", + "Run the udphole daemon", + appmodule_cmd_daemon + ); +} diff --git a/src/AppModule/setup.h b/src/AppModule/setup.h @@ -0,0 +1,6 @@ +#ifndef __APPMODULE_SETUP_H__ +#define __APPMODULE_SETUP_H__ + +void appmodule_setup(void); + +#endif diff --git a/src/AppModule/udphole.c b/src/AppModule/udphole.c @@ -0,0 +1,876 @@ +#include <stdlib.h> +#include <string.h> +#include <stdio.h> +#include <stdarg.h> +#include <unistd.h> +#include <errno.h> +#include <fcntl.h> +#include <time.h> +#include <sys/socket.h> +#include <sys/un.h> +#include <sys/stat.h> +#include <netinet/in.h> +#include <arpa/inet.h> + +#include "rxi/log.h" +#include "SchedulerModule/protothreads.h" +#include "SchedulerModule/scheduler.h" +#include "common/socket_util.h" +#include "config.h" +#include "tidwall/hashmap.h" +#include "AppModule/udphole.h" +#include "ApiModule/server.h" + +#define UDPHOLE_SESSION_HASH_SIZE 256 +#define UDPHOLE_BUFFER_SIZE 4096 +#define DEFAULT_IDLE_EXPIRY 60 + +typedef struct udphole_socket { + char *socket_id; + int fd; + int local_port; + int mode; + struct sockaddr_storage remote_addr; + socklen_t remote_addrlen; + int learned_valid; + struct sockaddr_storage learned_addr; + socklen_t learned_addrlen; +} udphole_socket_t; + +typedef struct udphole_forward { + char *src_socket_id; + char *dst_socket_id; +} udphole_forward_t; + +typedef struct udphole_session { + char *session_id; + time_t idle_expiry; + time_t created; + time_t last_activity; + struct hashmap *sockets; + udphole_forward_t *forwards; + size_t forwards_count; + int *fds; + int fd_count; + int marked_for_deletion; + int *ready_fds; + struct pt pt; + struct pt_task *task; +} udphole_session_t; + +static struct hashmap *sessions = NULL; +static char *advertise_addr = NULL; +static int port_low = 7000; +static int port_high = 7999; +static int port_cur = 7000; +static int running = 0; + +static uint64_t session_hash(const void *item, uint64_t seed0, uint64_t seed1) { + const udphole_session_t *s = item; + return hashmap_sip(s->session_id, strlen(s->session_id), seed0, seed1); +} + +static int session_compare(const void *a, const void *b, void *udata) { + (void)udata; + const udphole_session_t *sa = a; + const udphole_session_t *sb = b; + return strcmp(sa->session_id, sb->session_id); +} + +static uint64_t socket_hash(const void *item, uint64_t seed0, uint64_t seed1) { + const udphole_socket_t *s = item; + return hashmap_sip(s->socket_id, strlen(s->socket_id), seed0, seed1); +} + +static int socket_compare(const void *a, const void *b, void *udata) { + (void)udata; + const udphole_socket_t *sa = a; + const udphole_socket_t *sb = b; + return strcmp(sa->socket_id, sb->socket_id); +} + +static udphole_session_t *find_session(const char *session_id) { + if (!sessions || !session_id) return NULL; + udphole_session_t key = { .session_id = (char *)session_id }; + return (udphole_session_t *)hashmap_get(sessions, &key); +} + +static udphole_socket_t *find_socket(udphole_session_t *s, const char *socket_id) { + if (!s || !s->sockets || !socket_id) return NULL; + udphole_socket_t key = { .socket_id = (char *)socket_id }; + return (udphole_socket_t *)hashmap_get(s->sockets, &key); +} + +static int alloc_port(void) { + for (int i = 0; i < port_high - port_low; i++) { + int port = port_cur + i; + if (port > port_high) port = port_low; + port_cur = port + 1; + if (port_cur > port_high) port_cur = port_low; + + struct sockaddr_in addr; + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = INADDR_ANY; + addr.sin_port = htons(port); + + int udp_fd = socket(AF_INET, SOCK_DGRAM, 0); + if (udp_fd < 0) continue; + int ok = (bind(udp_fd, (struct sockaddr *)&addr, sizeof(addr)) == 0); + close(udp_fd); + if (!ok) continue; + + return port; + } + return 0; +} + +static int parse_ip_addr(const char *ip_str, int port, struct sockaddr_storage *addr, socklen_t *addrlen) { + memset(addr, 0, sizeof(*addr)); + + struct sockaddr_in *addr4 = (struct sockaddr_in *)addr; + if (inet_pton(AF_INET, ip_str, &addr4->sin_addr) == 1) { + addr4->sin_family = AF_INET; + addr4->sin_port = htons(port); + *addrlen = sizeof(*addr4); + return 0; + } + + struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)addr; + if (inet_pton(AF_INET6, ip_str, &addr6->sin6_addr) == 1) { + addr6->sin6_family = AF_INET6; + addr6->sin6_port = htons(port); + *addrlen = sizeof(*addr6); + return 0; + } + + return -1; +} + +static void close_socket(udphole_socket_t *sock) { + if (!sock) return; + if (sock->fd >= 0) { + close(sock->fd); + sock->fd = -1; + } +} + +static void free_socket(udphole_socket_t *sock) { + if (!sock) return; + close_socket(sock); + free(sock->socket_id); + free(sock); +} + +static void destroy_session(udphole_session_t *s); + +static void session_remove_fds(udphole_session_t *s) { + if (!s) return; + if (s->fds) { + free(s->fds); + s->fds = NULL; + } + s->fd_count = 0; +} + +static void session_update_fds(udphole_session_t *s) { + session_remove_fds(s); + if (!s || !s->sockets) return; + + int count = (int)hashmap_count(s->sockets); + if (count == 0) return; + + s->fds = malloc(sizeof(int) * (count + 1)); + if (!s->fds) return; + s->fds[0] = 0; + + size_t iter = 0; + void *item; + while (hashmap_iter(s->sockets, &iter, &item)) { + udphole_socket_t *sock = item; + if (sock->fd >= 0) { + s->fds[++s->fds[0]] = sock->fd; + } + } +} + +static void destroy_session(udphole_session_t *s) { + if (!s) return; + s->marked_for_deletion = 1; + if (sessions) { + hashmap_delete(sessions, s); + } +} + +static udphole_session_t *create_session(const char *session_id, int idle_expiry) { + const udphole_session_t *cs = find_session(session_id); + if (cs) return (udphole_session_t *)cs; + + udphole_session_t *s = calloc(1, sizeof(*s)); + if (!s) return NULL; + + s->session_id = strdup(session_id); + s->created = time(NULL); + s->last_activity = s->created; + s->idle_expiry = idle_expiry > 0 ? idle_expiry : DEFAULT_IDLE_EXPIRY; + + s->sockets = hashmap_new(sizeof(udphole_socket_t), 0, 0, 0, socket_hash, socket_compare, NULL, NULL); + + if (!sessions) { + sessions = hashmap_new(sizeof(udphole_session_t), 0, 0, 0, session_hash, session_compare, NULL, NULL); + } + hashmap_set(sessions, s); + + log_debug("udphole: created session %s with idle_expiry %ld", session_id, (long)s->idle_expiry); + return s; +} + +static void cleanup_expired_sessions(void) { + if (!sessions) return; + time_t now = time(NULL); + + size_t iter = 0; + void *item; + while (hashmap_iter(sessions, &iter, &item)) { + udphole_session_t *s = item; + if (now - s->last_activity > s->idle_expiry) { + log_debug("udphole: session %s expired (idle %ld > expiry %ld)", + s->session_id, (long)(now - s->last_activity), (long)s->idle_expiry); + destroy_session(s); + } + } +} + +static int add_forward(udphole_session_t *s, const char *src_id, const char *dst_id) { + for (size_t i = 0; i < s->forwards_count; i++) { + if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && + strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) { + return 0; + } + } + + udphole_forward_t *new_forwards = realloc(s->forwards, sizeof(udphole_forward_t) * (s->forwards_count + 1)); + if (!new_forwards) return -1; + s->forwards = new_forwards; + + s->forwards[s->forwards_count].src_socket_id = strdup(src_id); + s->forwards[s->forwards_count].dst_socket_id = strdup(dst_id); + s->forwards_count++; + + return 0; +} + +static int remove_forward(udphole_session_t *s, const char *src_id, const char *dst_id) { + for (size_t i = 0; i < s->forwards_count; i++) { + if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && + strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) { + free(s->forwards[i].src_socket_id); + free(s->forwards[i].dst_socket_id); + for (size_t j = i; j < s->forwards_count - 1; j++) { + s->forwards[j] = s->forwards[j + 1]; + } + s->forwards_count--; + return 0; + } + } + return -1; +} + +static udphole_socket_t *create_listen_socket(udphole_session_t *sess, const char *socket_id) { + udphole_socket_t *existing = find_socket(sess, socket_id); + if (existing) return existing; + + int port = alloc_port(); + if (!port) { + log_error("udphole: no ports available"); + return NULL; + } + + char port_str[16]; + snprintf(port_str, sizeof(port_str), "%d", port); + int *fds = udp_recv(port_str, NULL, NULL); + if (!fds || fds[0] == 0) { + log_error("udphole: failed to create UDP socket on port %d", port); + free(fds); + return NULL; + } + + udphole_socket_t *sock = calloc(1, sizeof(*sock)); + if (!sock) { + free(fds); + return NULL; + } + + sock->socket_id = strdup(socket_id); + sock->fd = fds[1]; + sock->local_port = port; + sock->mode = 0; + sock->learned_valid = 0; + free(fds); + + hashmap_set(sess->sockets, sock); + session_update_fds(sess); + + log_debug("udphole: created listen socket %s in session %s on port %d", + socket_id, sess->session_id, port); + return sock; +} + +static udphole_socket_t *create_connect_socket(udphole_session_t *sess, const char *socket_id, + const char *ip, int port) { + udphole_socket_t *existing = find_socket(sess, socket_id); + if (existing) return existing; + + int local_port = alloc_port(); + if (!local_port) { + log_error("udphole: no ports available"); + return NULL; + } + + char port_str[16]; + snprintf(port_str, sizeof(port_str), "%d", local_port); + int *fds = udp_recv(port_str, NULL, NULL); + if (!fds || fds[0] == 0) { + log_error("udphole: failed to create UDP socket on port %d", local_port); + free(fds); + return NULL; + } + + struct sockaddr_storage remote_addr; + socklen_t remote_addrlen; + if (parse_ip_addr(ip, port, &remote_addr, &remote_addrlen) != 0) { + log_error("udphole: invalid remote address %s:%d", ip, port); + free(fds); + return NULL; + } + + udphole_socket_t *sock = calloc(1, sizeof(*sock)); + if (!sock) { + free(fds); + return NULL; + } + + sock->socket_id = strdup(socket_id); + sock->fd = fds[1]; + sock->local_port = local_port; + sock->mode = 1; + sock->remote_addr = remote_addr; + sock->remote_addrlen = remote_addrlen; + sock->learned_valid = 0; + free(fds); + + hashmap_set(sess->sockets, sock); + session_update_fds(sess); + + log_debug("udphole: created connect socket %s in session %s on port %d -> %s:%d", + socket_id, sess->session_id, local_port, ip, port); + return sock; +} + +static int destroy_socket(udphole_session_t *sess, const char *socket_id) { + udphole_socket_t *sock = find_socket(sess, socket_id); + if (!sock) return -1; + + hashmap_delete(sess->sockets, sock); + free_socket(sock); + session_update_fds(sess); + + for (size_t i = 0; i < sess->forwards_count; ) { + if (strcmp(sess->forwards[i].src_socket_id, socket_id) == 0 || + strcmp(sess->forwards[i].dst_socket_id, socket_id) == 0) { + free(sess->forwards[i].src_socket_id); + free(sess->forwards[i].dst_socket_id); + for (size_t j = i; j < sess->forwards_count - 1; j++) { + sess->forwards[j] = sess->forwards[j + 1]; + } + sess->forwards_count--; + } else { + i++; + } + } + + return 0; +} + +PT_THREAD(udphole_session_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { + udphole_session_t *s = task->udata; + + (void)timestamp; + log_trace("udphole_session: protothread entry session=%s", s->session_id); + PT_BEGIN(pt); + + char buffer[UDPHOLE_BUFFER_SIZE]; + + for (;;) { + if (s->marked_for_deletion) { + break; + } + + if (!s->fds || s->fd_count == 0) { + PT_YIELD(pt); + continue; + } + + PT_WAIT_UNTIL(pt, schedmod_has_data(s->fds, &s->ready_fds) > 0); + + if (!s->ready_fds || s->ready_fds[0] == 0) { + PT_YIELD(pt); + continue; + } + + for (int r = 1; r <= s->ready_fds[0]; r++) { + int ready_fd = s->ready_fds[r]; + + udphole_socket_t *src_sock = NULL; + size_t iter = 0; + void *item; + while (hashmap_iter(s->sockets, &iter, &item)) { + udphole_socket_t *sock = item; + if (sock->fd == ready_fd) { + src_sock = sock; + break; + } + } + + if (!src_sock) continue; + + struct sockaddr_storage from_addr; + socklen_t from_len = sizeof(from_addr); + ssize_t n = recvfrom(ready_fd, buffer, sizeof(buffer) - 1, 0, + (struct sockaddr *)&from_addr, &from_len); + + if (n <= 0) { + if (errno != EAGAIN && errno != EWOULDBLOCK) { + log_warn("udphole: recvfrom error on socket %s: %s", + src_sock->socket_id, strerror(errno)); + } + continue; + } + + s->last_activity = time(NULL); + + if (src_sock->mode == 0 && !src_sock->learned_valid) { + src_sock->learned_addr = from_addr; + src_sock->learned_addrlen = from_len; + src_sock->learned_valid = 1; + log_debug("udphole: socket %s learned remote address", src_sock->socket_id); + } + + for (size_t i = 0; i < s->forwards_count; i++) { + if (strcmp(s->forwards[i].src_socket_id, src_sock->socket_id) != 0) { + continue; + } + + udphole_socket_t *dst_sock = find_socket(s, s->forwards[i].dst_socket_id); + if (!dst_sock || dst_sock->fd < 0) continue; + + struct sockaddr *dest_addr = NULL; + socklen_t dest_addrlen = 0; + + if (dst_sock->mode == 1) { + dest_addr = (struct sockaddr *)&dst_sock->remote_addr; + dest_addrlen = dst_sock->remote_addrlen; + } else if (dst_sock->learned_valid) { + dest_addr = (struct sockaddr *)&dst_sock->learned_addr; + dest_addrlen = dst_sock->learned_addrlen; + } + + if (dest_addr && dest_addrlen > 0) { + ssize_t sent = sendto(dst_sock->fd, buffer, n, 0, dest_addr, dest_addrlen); + if (sent < 0) { + log_warn("udphole: forward failed %s -> %s: %s", + src_sock->socket_id, dst_sock->socket_id, strerror(errno)); + } + } + } + } + } + + log_debug("udphole: session %s protothread exiting", s->session_id); + + if (s->ready_fds) { + free(s->ready_fds); + s->ready_fds = NULL; + } + + if (s->fds) { + free(s->fds); + s->fds = NULL; + } + + if (s->sockets) { + size_t iter = 0; + void *item; + while (hashmap_iter(s->sockets, &iter, &item)) { + udphole_socket_t *sock = item; + free_socket(sock); + } + hashmap_free(s->sockets); + s->sockets = NULL; + } + + if (s->forwards) { + for (size_t i = 0; i < s->forwards_count; i++) { + free(s->forwards[i].src_socket_id); + free(s->forwards[i].dst_socket_id); + } + free(s->forwards); + s->forwards = NULL; + s->forwards_count = 0; + } + + free(s->session_id); + free(s); + + PT_END(pt); +} + +static void spawn_session_pt(udphole_session_t *s) { + s->task = (struct pt_task *)(intptr_t)schedmod_pt_create(udphole_session_pt, s); +} + +static char cmd_session_create(api_client_t *c, char **args, int nargs) { + if (nargs != 3) { + return api_write_err(c, "wrong number of arguments for 'session.create'") ? 1 : 0; + } + + const char *session_id = args[1]; + int idle_expiry = 0; + if (nargs >= 4 && args[2] && args[2][0] != '\0') { + idle_expiry = atoi(args[2]); + } + + udphole_session_t *s = create_session(session_id, idle_expiry); + if (!s) { + return api_write_err(c, "failed to create session") ? 1 : 0; + } + + spawn_session_pt(s); + return api_write_ok(c) ? 1 : 0; +} + +static char cmd_session_list(api_client_t *c, char **args, int nargs) { + (void)args; + if (nargs != 1) { + return api_write_err(c, "wrong number of arguments for 'session.list'") ? 1 : 0; + } + + if (!sessions) { + return api_write_array(c, 0) ? 1 : 0; + } + + size_t count = hashmap_count(sessions); + if (!api_write_array(c, count)) return 0; + + size_t iter = 0; + void *item; + while (hashmap_iter(sessions, &iter, &item)) { + udphole_session_t *s = item; + if (!api_write_bulk_cstr(c, s->session_id)) return 0; + } + + return 1; +} + +static char cmd_session_info(api_client_t *c, char **args, int nargs) { + if (nargs != 2) { + return api_write_err(c, "wrong number of arguments for 'session.info'") ? 1 : 0; + } + + const char *session_id = args[1]; + udphole_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + size_t num_items = 8; + if (!api_write_array(c, num_items)) return 0; + + if (!api_write_bulk_cstr(c, "session_id")) return 0; + if (!api_write_bulk_cstr(c, s->session_id)) return 0; + + if (!api_write_bulk_cstr(c, "created")) return 0; + if (!api_write_bulk_int(c, (int)s->created)) return 0; + + if (!api_write_bulk_cstr(c, "last_activity")) return 0; + if (!api_write_bulk_int(c, (int)s->last_activity)) return 0; + + if (!api_write_bulk_cstr(c, "idle_expiry")) return 0; + if (!api_write_bulk_int(c, (int)s->idle_expiry)) return 0; + + if (!api_write_bulk_cstr(c, "sockets")) return 0; + size_t socket_count = s->sockets ? hashmap_count(s->sockets) : 0; + if (!api_write_array(c, socket_count)) return 0; + if (s->sockets) { + size_t iter = 0; + void *item; + while (hashmap_iter(s->sockets, &iter, &item)) { + udphole_socket_t *sock = item; + if (!api_write_bulk_cstr(c, sock->socket_id)) return 0; + } + } + + if (!api_write_bulk_cstr(c, "forwards")) return 0; + if (!api_write_array(c, s->forwards_count * 2)) return 0; + for (size_t i = 0; i < s->forwards_count; i++) { + if (!api_write_bulk_cstr(c, s->forwards[i].src_socket_id)) return 0; + if (!api_write_bulk_cstr(c, s->forwards[i].dst_socket_id)) return 0; + } + + if (!api_write_bulk_cstr(c, "fd_count")) return 0; + if (!api_write_int(c, s->fd_count)) return 0; + + if (!api_write_bulk_cstr(c, "marked_for_deletion")) return 0; + if (!api_write_int(c, s->marked_for_deletion)) return 0; + + return 1; +} + +static char cmd_session_destroy(api_client_t *c, char **args, int nargs) { + if (nargs != 2) { + return api_write_err(c, "wrong number of arguments for 'session.destroy'") ? 1 : 0; + } + + const char *session_id = args[1]; + udphole_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + destroy_session(s); + return api_write_ok(c) ? 1 : 0; +} + +static char cmd_socket_create_listen(api_client_t *c, char **args, int nargs) { + if (nargs != 3) { + return api_write_err(c, "wrong number of arguments for 'session.socket.create.listen'") ? 1 : 0; + } + + const char *session_id = args[1]; + const char *socket_id = args[2]; + + udphole_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + udphole_socket_t *sock = create_listen_socket(s, socket_id); + if (!sock) { + return api_write_err(c, "failed to create socket") ? 1 : 0; + } + + if (!api_write_array(c, 2)) return 0; + if (!api_write_bulk_int(c, sock->local_port)) return 0; + if (!api_write_bulk_cstr(c, advertise_addr)) return 0; + + return 1; +} + +static char cmd_socket_create_connect(api_client_t *c, char **args, int nargs) { + if (nargs != 5) { + return api_write_err(c, "wrong number of arguments for 'session.socket.create.connect'") ? 1 : 0; + } + + const char *session_id = args[1]; + const char *socket_id = args[2]; + const char *ip = args[3]; + int port = atoi(args[4]); + + udphole_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + udphole_socket_t *sock = create_connect_socket(s, socket_id, ip, port); + if (!sock) { + return api_write_err(c, "failed to create socket") ? 1 : 0; + } + + if (!api_write_array(c, 2)) return 0; + if (!api_write_bulk_int(c, sock->local_port)) return 0; + if (!api_write_bulk_cstr(c, advertise_addr)) return 0; + + return 1; +} + +static char cmd_socket_destroy(api_client_t *c, char **args, int nargs) { + if (nargs != 3) { + return api_write_err(c, "wrong number of arguments for 'session.socket.destroy'") ? 1 : 0; + } + + const char *session_id = args[1]; + const char *socket_id = args[2]; + + udphole_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + if (destroy_socket(s, socket_id) != 0) { + return api_write_err(c, "socket not found") ? 1 : 0; + } + + return api_write_ok(c) ? 1 : 0; +} + +static char cmd_forward_list(api_client_t *c, char **args, int nargs) { + if (nargs != 2) { + return api_write_err(c, "wrong number of arguments for 'session.forward.list'") ? 1 : 0; + } + + const char *session_id = args[1]; + udphole_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + if (!api_write_array(c, s->forwards_count * 2)) return 0; + for (size_t i = 0; i < s->forwards_count; i++) { + if (!api_write_bulk_cstr(c, s->forwards[i].src_socket_id)) return 0; + if (!api_write_bulk_cstr(c, s->forwards[i].dst_socket_id)) return 0; + } + + return 1; +} + +static char cmd_forward_create(api_client_t *c, char **args, int nargs) { + if (nargs != 4) { + return api_write_err(c, "wrong number of arguments for 'session.forward.create'") ? 1 : 0; + } + + const char *session_id = args[1]; + const char *src_socket_id = args[2]; + const char *dst_socket_id = args[3]; + + udphole_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + udphole_socket_t *src = find_socket(s, src_socket_id); + if (!src) { + return api_write_err(c, "source socket not found") ? 1 : 0; + } + + udphole_socket_t *dst = find_socket(s, dst_socket_id); + if (!dst) { + return api_write_err(c, "destination socket not found") ? 1 : 0; + } + + if (add_forward(s, src_socket_id, dst_socket_id) != 0) { + return api_write_err(c, "failed to add forward") ? 1 : 0; + } + + return api_write_ok(c) ? 1 : 0; +} + +static char cmd_forward_destroy(api_client_t *c, char **args, int nargs) { + if (nargs != 4) { + return api_write_err(c, "wrong number of arguments for 'session.forward.destroy'") ? 1 : 0; + } + + const char *session_id = args[1]; + const char *src_socket_id = args[2]; + const char *dst_socket_id = args[3]; + + udphole_session_t *s = find_session(session_id); + if (!s) { + return api_write_err(c, "session not found") ? 1 : 0; + } + + if (remove_forward(s, src_socket_id, dst_socket_id) != 0) { + return api_write_err(c, "forward not found") ? 1 : 0; + } + + return api_write_ok(c) ? 1 : 0; +} + +static void register_udphole_commands(void) { + api_register_cmd("session.create", cmd_session_create); + api_register_cmd("session.list", cmd_session_list); + api_register_cmd("session.info", cmd_session_info); + api_register_cmd("session.destroy", cmd_session_destroy); + api_register_cmd("session.socket.create.listen", cmd_socket_create_listen); + api_register_cmd("session.socket.create.connect", cmd_socket_create_connect); + api_register_cmd("session.socket.destroy", cmd_socket_destroy); + api_register_cmd("session.forward.list", cmd_forward_list); + api_register_cmd("session.forward.create", cmd_forward_create); + api_register_cmd("session.forward.destroy", cmd_forward_destroy); + log_info("udphole: registered session.* commands"); +} + +PT_THREAD(udphole_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { + (void)timestamp; + log_trace("udphole_manager: protothread entry"); + PT_BEGIN(pt); + + PT_WAIT_UNTIL(pt, global_cfg); + + resp_object *udphole_sec = resp_map_get(global_cfg, "udphole"); + if (!udphole_sec) { + log_info("udphole: no [udphole] section in config, not starting"); + PT_EXIT(pt); + } + + const char *mode = resp_map_get_string(udphole_sec, "mode"); + if (!mode || strcmp(mode, "builtin") != 0) { + log_info("udphole: mode is '%s', not starting builtin server", mode ? mode : "(null)"); + PT_EXIT(pt); + } + + const char *ports_str = resp_map_get_string(udphole_sec, "ports"); + if (ports_str) { + sscanf(ports_str, "%d-%d", &port_low, &port_high); + if (port_low <= 0) port_low = 7000; + if (port_high <= port_low) port_high = port_low + 999; + } + port_cur = port_low; + + const char *advertise_cfg = resp_map_get_string(udphole_sec, "advertise"); + if (advertise_cfg) { + advertise_addr = strdup(advertise_cfg); + } + + register_udphole_commands(); + running = 1; + log_info("udphole: manager started with port range %d-%d", port_low, port_high); + + int64_t last_cleanup = 0; + + for (;;) { + if (global_cfg) { + resp_object *new_udphole_sec = resp_map_get(global_cfg, "udphole"); + if (new_udphole_sec) { + const char *new_mode = resp_map_get_string(new_udphole_sec, "mode"); + if (new_mode && strcmp(new_mode, "builtin") != 0) { + log_info("udphole: mode changed to '%s', shutting down", new_mode); + break; + } + } + } + + int64_t now = (int64_t)(time(NULL)); + if (now - last_cleanup >= 1) { + cleanup_expired_sessions(); + last_cleanup = now; + } + + PT_YIELD(pt); + } + + running = 0; + + if (sessions) { + size_t iter = 0; + void *item; + while (hashmap_iter(sessions, &iter, &item)) { + udphole_session_t *s = item; + s->marked_for_deletion = 1; + } + } + + free(advertise_addr); + advertise_addr = NULL; + + PT_END(pt); +} diff --git a/src/AppModule/udphole.h b/src/AppModule/udphole.h @@ -0,0 +1,15 @@ +#ifndef __APPMODULE_UDPHOLE_H__ +#define __APPMODULE_UDPHOLE_H__ + +#include <stddef.h> +#include <stdint.h> +#include <sys/time.h> + +#include "SchedulerModule/protothreads.h" + +struct pt; +struct pt_task; + +PT_THREAD(udphole_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)); + +#endif diff --git a/src/CliModule/command/list_commands.c b/src/CliModule/command/list_commands.c @@ -0,0 +1,43 @@ +/// <!-- path: src/CliModule/command/list_commands.c --> +/// # LIST-COMMANDS +/// **list-commands** is the command that lists all available CLI commands and their short descriptions. It has no subcommands or options. +/// +/// **Synopsis** +/// +/// **upbx** [global options] **list-commands** +/// +/// **Description** +/// +/// Prints a two-column layout (command name, description). Use it to discover **daemon**, **extension**, **trunk**, **api-user**, **completion**, and any other registered commands. +/// +#include <stdio.h> +#include <string.h> + +#include "../common.h" +#include "list_commands.h" + +int climodule_cmd_list_commands(int argc, const char **argv) { + int len; + int name_longest = 0; + struct climodule_command *cmd = climodule_commands; + while(cmd) { + len = (int)strlen(cmd->cmd); + if (len > name_longest) { name_longest = len; } + cmd = cmd->next; + } + + int width = cli_get_output_width(80); + int left_col = name_longest + 3; /* " " + name + " " */ + + printf("\n"); + printf("Available commands:\n"); + cmd = climodule_commands; + while (cmd) { + printf("\n %*s ", name_longest, cmd->cmd); + cli_print_wrapped(stdout, cmd->desc, width, left_col); + cmd = cmd->next; + } + printf("\n\n"); + + return 0; +} diff --git a/src/CliModule/command/list_commands.h b/src/CliModule/command/list_commands.h @@ -0,0 +1,6 @@ +#ifndef __CLIMODULE_LIST_COMMANDS_H__ +#define __CLIMODULE_LIST_COMMANDS_H__ + +int climodule_cmd_list_commands(int, const char **); + +#endif // __CLIMODULE_LIST_COMMANDS_H__ diff --git a/src/CliModule/common.c b/src/CliModule/common.c @@ -0,0 +1,79 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <unistd.h> +#include <sys/ioctl.h> + +#include "CliModule/common.h" + +#ifndef NULL +#define NULL (void*)0 +#endif + +struct climodule_command *climodule_commands = NULL; + +const char *cli_find_arg(int argc, const char **argv, const char *name) { + for (int i = 0; i < argc - 1; i++) + if (strcmp(argv[i], name) == 0) + return argv[i + 1]; + return NULL; +} + +size_t cli_collect_positional(int argc, const char **argv, int start, + const char **out, size_t max_out) { + size_t n = 0; + for (int i = start; i < argc && n < max_out; i++) { + if (argv[i][0] == '-' && argv[i][1] == '-' && argv[i][2] != '\0') { + i++; /* skip value of --flag */ + continue; + } + out[n++] = argv[i]; + } + return n; +} + +const char *cli_resolve_default_config(void) { + static char buf[1024]; + const char *home = getenv("HOME"); + if (home) { + snprintf(buf, sizeof(buf), "%s/.config/udphole.conf", home); + if (access(buf, R_OK) == 0) return buf; + snprintf(buf, sizeof(buf), "%s/.udphole.conf", home); + if (access(buf, R_OK) == 0) return buf; + } + if (access("/etc/udphole/udphole.conf", R_OK) == 0) return "/etc/udphole/udphole.conf"; + if (access("/etc/udphole.conf", R_OK) == 0) return "/etc/udphole.conf"; + return NULL; +} + +int cli_get_output_width(int default_width) { + if (!isatty(STDOUT_FILENO)) + return default_width; + struct winsize w; + if (ioctl(STDOUT_FILENO, TIOCGWINSZ, &w) < 0 || w.ws_col <= 0) + return default_width; + return (int)w.ws_col; +} + +void cli_print_wrapped(FILE *out, const char *text, int width, int left_col_width) { + if (!text || width <= left_col_width) + return; + char *copy = strdup(text); + if (!copy) + return; + int len = left_col_width; + char *tok = strtok(copy, " "); + while (tok) { + int toklen = (int)strlen(tok); + if (len + 1 + toklen > width) { + fprintf(out, "\n%*s", left_col_width, ""); + len = left_col_width; + } + if (len > left_col_width) + fputc(' ', out); + fputs(tok, out); + len += (len > left_col_width ? 1 : 0) + toklen; + tok = strtok(NULL, " "); + } + free(copy); +} diff --git a/src/CliModule/common.h b/src/CliModule/common.h @@ -0,0 +1,41 @@ +#ifndef __CLIMODULE_COMMON_H__ +#define __CLIMODULE_COMMON_H__ + +#include <stddef.h> +#include <stdio.h> + +struct climodule_command { + void *next; + const char *cmd; + const char *desc; + int (*fn)(int, const char **); +}; + +extern struct climodule_command *climodule_commands; + +/* Find the value of a named argument (e.g. "--name" "value"). Returns NULL if not found. */ +const char *cli_find_arg(int argc, const char **argv, const char *name); + +/* Collect positional arguments (those not part of --flag value pairs). + * Scans argv[start..argc), skipping --flag value pairs. Returns count, fills out[]. */ +size_t cli_collect_positional(int argc, const char **argv, int start, + const char **out, size_t max_out); + +/* Set/get the global config file path (resolved from -f or default locations). */ +void cli_set_config_path(const char *path); +const char *cli_config_path(void); + +/* Resolve default config path by checking standard locations: + * $HOME/.config/upbx.conf, $HOME/.upbx.conf, /etc/upbx/upbx.conf, /etc/upbx.conf + * Returns pointer to a static buffer, or NULL if no config found. */ +const char *cli_resolve_default_config(void); + +/* Get terminal width for stdout (TIOCGWINSZ when stdout is a tty). Returns default_width if not a tty or ioctl fails. */ +int cli_get_output_width(int default_width); + +/* Word-wrap text to stream. Does not mutate text (works on a copy). + * width: total line width. + * left_col_width: on first line we assume this many columns are already used (e.g. command name); continuation lines are indented by this many spaces. Use 0 for full-width (e.g. paragraphs). */ +void cli_print_wrapped(FILE *out, const char *text, int width, int left_col_width); + +#endif // __CLIMODULE_COMMON_H__ diff --git a/src/CliModule/execute_command.c b/src/CliModule/execute_command.c @@ -0,0 +1,16 @@ +#include <stdio.h> +#include <string.h> + +#include "common.h" + +int climodule_execute_command(int argc, const char **argv) { + struct climodule_command *cmd = climodule_commands; + + while(cmd) { + if (!strcmp(cmd->cmd, argv[0])) return cmd->fn(argc, argv); + cmd = cmd->next; + } + + fprintf(stderr, "Unknown command: %s\n", argv[0]); + return 1; +} diff --git a/src/CliModule/execute_command.h b/src/CliModule/execute_command.h @@ -0,0 +1,6 @@ +#ifndef __CLIMODULE_EXECUTE_COMMAND_H__ +#define __CLIMODULE_EXECUTE_COMMAND_H__ + +int climodule_execute_command(int, const char **); + +#endif // __CLIMODULE_EXECUTE_COMMAND_H__ diff --git a/src/CliModule/register_command.c b/src/CliModule/register_command.c @@ -0,0 +1,18 @@ +#include <stdlib.h> +#include <string.h> + +#include "common.h" +#include "register_command.h" + +void climodule_register_command( + const char *name, + const char *description, + int (*fn)(int, const char **) +) { + struct climodule_command *cmd = malloc(sizeof(struct climodule_command)); + cmd->next = climodule_commands; + cmd->cmd = strdup(name); + cmd->desc = strdup(description); + cmd->fn = fn; + climodule_commands = cmd; +} diff --git a/src/CliModule/register_command.h b/src/CliModule/register_command.h @@ -0,0 +1,6 @@ +#ifndef __CLIMODULE_REGISTER_COMMAND_H__ +#define __CLIMODULE_REGISTER_COMMAND_H__ + +void climodule_register_command(const char *name, const char *description, int (*fn)(int, const char **)); + +#endif // __CLIMODULE_REGISTER_COMMAND_H__ diff --git a/src/CliModule/setup.c b/src/CliModule/setup.c @@ -0,0 +1,11 @@ +#include "setup.h" +#include "command/list_commands.h" +#include "register_command.h" + +void climodule_setup() { + climodule_register_command( + "list-commands", + "Displays known commands and their descriptions", + climodule_cmd_list_commands + ); +} diff --git a/src/CliModule/setup.h b/src/CliModule/setup.h @@ -0,0 +1,6 @@ +#ifndef __CLIMODULE_SETUP_H__ +#define __CLIMODULE_SETUP_H__ + +void climodule_setup(); + +#endif // __CLIMODULE_SETUP_H__ diff --git a/src/RespModule/resp.c b/src/RespModule/resp.c @@ -0,0 +1,315 @@ +/* + * RESP encoding and decoding implementation. + */ +#include <stdlib.h> +#include <string.h> +#include <stdio.h> +#include <unistd.h> + +#include "rxi/log.h" +#include "RespModule/resp.h" + +#define MAX_BULK_LEN (256 * 1024) +#define LINE_BUF 4096 + +static void resp_free_internal(resp_object *o); + +static int resp_read_byte(int fd) { + unsigned char c; + if (read(fd, &c, 1) != 1) + return -1; + return (int)c; +} + +static int resp_read_line(int fd, char *buf, size_t buf_size) { + size_t i = 0; + int prev = -1; + while (i + 1 < buf_size) { + int b = resp_read_byte(fd); + if (b < 0) return -1; + if (prev == '\r' && b == '\n') { + buf[i - 1] = '\0'; + return 0; + } + prev = b; + buf[i++] = (char)b; + } + return -1; +} + +resp_object *resp_read(int fd) { + int type_c = resp_read_byte(fd); + if (type_c < 0) return NULL; + resp_object *o = calloc(1, sizeof(resp_object)); + if (!o) return NULL; + char line[LINE_BUF]; + switch ((char)type_c) { + case '+': + o->type = RESPT_SIMPLE; + if (resp_read_line(fd, line, sizeof(line)) != 0) { free(o); return NULL; } + o->u.s = strdup(line); + break; + case '-': + o->type = RESPT_ERROR; + if (resp_read_line(fd, line, sizeof(line)) != 0) { free(o); return NULL; } + o->u.s = strdup(line); + break; + case ':': { + if (resp_read_line(fd, line, sizeof(line)) != 0) { free(o); return NULL; } + o->type = RESPT_INT; + o->u.i = (long long)strtoll(line, NULL, 10); + break; + } + case '$': { + if (resp_read_line(fd, line, sizeof(line)) != 0) { free(o); return NULL; } + long len = strtol(line, NULL, 10); + if (len < 0 || len > (long)MAX_BULK_LEN) { free(o); return NULL; } + o->type = RESPT_BULK; + if (len == 0) { + o->u.s = strdup(""); + if (resp_read_line(fd, line, sizeof(line)) != 0) { free(o->u.s); free(o); return NULL; } + } else { + o->u.s = malloc((size_t)len + 1); + if (!o->u.s) { free(o); return NULL; } + if (read(fd, o->u.s, (size_t)len) != (ssize_t)len) { free(o->u.s); free(o); return NULL; } + o->u.s[len] = '\0'; + if (resp_read_byte(fd) != '\r' || resp_read_byte(fd) != '\n') { free(o->u.s); free(o); return NULL; } + } + break; + } + case '*': { + if (resp_read_line(fd, line, sizeof(line)) != 0) { free(o); return NULL; } + long n = strtol(line, NULL, 10); + if (n < 0 || n > 65536) { free(o); return NULL; } + o->type = RESPT_ARRAY; + o->u.arr.n = (size_t)n; + o->u.arr.elem = n ? calloc((size_t)n, sizeof(resp_object)) : NULL; + if (n && !o->u.arr.elem) { free(o); return NULL; } + for (size_t i = 0; i < (size_t)n; i++) { + resp_object *sub = resp_read(fd); + if (!sub) { + for (size_t j = 0; j < i; j++) resp_free_internal(&o->u.arr.elem[j]); + free(o->u.arr.elem); + free(o); + return NULL; + } + o->u.arr.elem[i] = *sub; + free(sub); + } + break; + } + default: + free(o); + return NULL; + } + return o; +} + +static void resp_free_internal(resp_object *o) { + if (!o) return; + if (o->type == RESPT_SIMPLE || o->type == RESPT_ERROR || o->type == RESPT_BULK) { + free(o->u.s); + } else if (o->type == RESPT_ARRAY) { + for (size_t i = 0; i < o->u.arr.n; i++) + resp_free_internal(&o->u.arr.elem[i]); + free(o->u.arr.elem); + } +} + +void resp_free(resp_object *o) { + resp_free_internal(o); + free(o); +} + +resp_object *resp_deep_copy(const resp_object *o) { + if (!o) return NULL; + resp_object *c = (resp_object *)calloc(1, sizeof(resp_object)); + if (!c) return NULL; + c->type = o->type; + if (o->type == RESPT_SIMPLE || o->type == RESPT_ERROR || o->type == RESPT_BULK) { + c->u.s = o->u.s ? strdup(o->u.s) : NULL; + if (o->u.s && !c->u.s) { free(c); return NULL; } + return c; + } + if (o->type == RESPT_INT) { + c->u.i = o->u.i; + return c; + } + if (o->type == RESPT_ARRAY) { + c->u.arr.n = o->u.arr.n; + c->u.arr.elem = o->u.arr.n ? (resp_object *)calloc(o->u.arr.n, sizeof(resp_object)) : NULL; + if (o->u.arr.n && !c->u.arr.elem) { free(c); return NULL; } + for (size_t i = 0; i < o->u.arr.n; i++) { + resp_object *sub = resp_deep_copy(&o->u.arr.elem[i]); + if (!sub) { + for (size_t j = 0; j < i; j++) resp_free_internal(&c->u.arr.elem[j]); + free(c->u.arr.elem); + free(c); + return NULL; + } + c->u.arr.elem[i] = *sub; + free(sub); + } + return c; + } + free(c); + return NULL; +} + +resp_object *resp_map_get(const resp_object *o, const char *key) { + if (!o || !key || o->type != RESPT_ARRAY) return NULL; + size_t n = o->u.arr.n; + if (n & 1) return NULL; + for (size_t i = 0; i < n; i += 2) { + const resp_object *k = &o->u.arr.elem[i]; + const char *s = (k->type == RESPT_BULK || k->type == RESPT_SIMPLE) ? k->u.s : NULL; + if (s && strcmp(s, key) == 0 && i + 1 < n) + return (resp_object *)&o->u.arr.elem[i + 1]; + } + return NULL; +} + +const char *resp_map_get_string(const resp_object *o, const char *key) { + resp_object *val = resp_map_get(o, key); + if (!val) return NULL; + if (val->type == RESPT_BULK || val->type == RESPT_SIMPLE) + return val->u.s; + return NULL; +} + +void resp_map_set(resp_object *o, const char *key, resp_object *value) { + if (!o || !key || o->type != RESPT_ARRAY) return; + for (size_t i = 0; i + 1 < o->u.arr.n; i += 2) { + const resp_object *k = &o->u.arr.elem[i]; + const char *s = (k->type == RESPT_BULK || k->type == RESPT_SIMPLE) ? k->u.s : NULL; + if (s && strcmp(s, key) == 0 && i + 1 < o->u.arr.n) { + resp_free(&o->u.arr.elem[i + 1]); + o->u.arr.elem[i + 1] = *value; + free(value); + return; + } + } + resp_array_append_bulk(o, key); + resp_array_append_obj(o, value); +} + +/* Append one RESP-encoded object to buf; realloc as needed. Returns 0 on success. */ +static int resp_append_object(char **buf, size_t *cap, size_t *len, const resp_object *o) { + if (!o) return -1; + size_t need = *len + 256; + if (o->type == RESPT_BULK || o->type == RESPT_SIMPLE || o->type == RESPT_ERROR) { + size_t slen = o->u.s ? strlen(o->u.s) : 0; + need = *len + 32 + slen + 2; + } else if (o->type == RESPT_ARRAY) { + need = *len + 32; + for (size_t i = 0; i < o->u.arr.n; i++) + need += 64; + } + if (need > *cap) { + size_t newcap = need + 4096; + char *n = realloc(*buf, newcap); + if (!n) return -1; + *buf = n; + *cap = newcap; + } + switch (o->type) { + case RESPT_SIMPLE: { + const char *s = o->u.s ? o->u.s : ""; + *len += (size_t)snprintf(*buf + *len, *cap - *len, "+%s\r\n", s); + break; + } + case RESPT_ERROR: { + const char *s = o->u.s ? o->u.s : ""; + *len += (size_t)snprintf(*buf + *len, *cap - *len, "-%s\r\n", s); + break; + } + case RESPT_INT: + *len += (size_t)snprintf(*buf + *len, *cap - *len, ":%lld\r\n", (long long)o->u.i); + break; + case RESPT_BULK: { + const char *s = o->u.s ? o->u.s : ""; + size_t slen = strlen(s); + *len += (size_t)snprintf(*buf + *len, *cap - *len, "$%zu\r\n%s\r\n", slen, s); + break; + } + case RESPT_ARRAY: { + size_t n = o->u.arr.n; + *len += (size_t)snprintf(*buf + *len, *cap - *len, "*%zu\r\n", n); + for (size_t i = 0; i < n; i++) { + if (resp_append_object(buf, cap, len, &o->u.arr.elem[i]) != 0) + return -1; + } + break; + } + default: + return -1; + } + return 0; +} + +int resp_encode_array(int argc, const resp_object *const *argv, char **out_buf, size_t *out_len) { + size_t cap = 64; + size_t len = 0; + char *buf = malloc(cap); + if (!buf) return -1; + len += (size_t)snprintf(buf + len, cap - len, "*%d\r\n", argc); + if (len >= cap) { free(buf); return -1; } + for (int i = 0; i < argc; i++) { + if (resp_append_object(&buf, &cap, &len, argv[i]) != 0) { + free(buf); + return -1; + } + } + *out_buf = buf; + *out_len = len; + return 0; +} + +resp_object *resp_array_init(void) { + resp_object *o = calloc(1, sizeof(resp_object)); + if (!o) return NULL; + o->type = RESPT_ARRAY; + o->u.arr.n = 0; + o->u.arr.elem = NULL; + return o; +} + +int resp_array_append_obj(resp_object *destination, resp_object *value) { + if (!destination || destination->type != RESPT_ARRAY || !value) return -1; + size_t n = destination->u.arr.n; + resp_object *new_elem = realloc(destination->u.arr.elem, (n + 1) * sizeof(resp_object)); + if (!new_elem) return -1; + destination->u.arr.elem = new_elem; + destination->u.arr.elem[n] = *value; + destination->u.arr.n++; + free(value); + return 0; +} + +int resp_array_append_simple(resp_object *destination, const char *str) { + resp_object *o = calloc(1, sizeof(resp_object)); + if (!o) return -1; + o->type = RESPT_SIMPLE; + o->u.s = strdup(str ? str : ""); + if (!o->u.s) { free(o); return -1; } + if (resp_array_append_obj(destination, o) != 0) { free(o->u.s); free(o); return -1; } + return 0; +} + +int resp_array_append_bulk(resp_object *destination, const char *str) { + resp_object *o = calloc(1, sizeof(resp_object)); + if (!o) return -1; + o->type = RESPT_BULK; + o->u.s = strdup(str ? str : ""); + if (!o->u.s) { free(o); return -1; } + if (resp_array_append_obj(destination, o) != 0) { free(o->u.s); free(o); return -1; } + return 0; +} + +int resp_array_append_int(resp_object *destination, long long i) { + resp_object *o = malloc(sizeof(resp_object)); + if (!o) return -1; + o->type = RESPT_INT; + o->u.i = i; + return resp_array_append_obj(destination, o); +} diff --git a/src/RespModule/resp.h b/src/RespModule/resp.h @@ -0,0 +1,58 @@ +/* + * RESP (Redis Serialization Protocol) encoding and decoding. + * Types and helpers for building/parsing RESP values. + */ +#ifndef RESPMODULE_RESP_H +#define RESPMODULE_RESP_H + +#include <stddef.h> + +#define RESPT_SIMPLE 0 +#define RESPT_ERROR 1 +#define RESPT_BULK 2 +#define RESPT_INT 3 +#define RESPT_ARRAY 4 + +/* RESP value. Call resp_free when done. */ +typedef struct resp_object resp_object; +struct resp_object { + int type; /* RESPT_* */ + union { + char *s; + long long i; + struct { resp_object *elem; size_t n; } arr; + } u; +}; + +void resp_free(resp_object *o); + +/* Deep-copy resp_object. Caller must resp_free result. Returns NULL on alloc failure. */ +resp_object *resp_deep_copy(const resp_object *o); + +/* Map helper: array as map (even-length: key, value, ...). Returns value element or NULL. Valid until resp freed. */ +resp_object *resp_map_get(const resp_object *o, const char *key); + +/* Map + key → string value (BULK/SIMPLE), or NULL. Valid until resp freed. */ +const char *resp_map_get_string(const resp_object *o, const char *key); + +/* Map helper: set key to value in array-as-map. Takes ownership of value. */ +void resp_map_set(resp_object *map, const char *key, resp_object *value); + +/* Decode one RESP value from fd. Caller must resp_free result. Returns NULL on error. */ +resp_object *resp_read(int fd); + +/* Encode array of argc objects. Caller must free(*out_buf). Returns 0 on success. */ +int resp_encode_array(int argc, const resp_object *const *argv, char **out_buf, size_t *out_len); + +/* Allocate an empty RESPT_ARRAY. Caller must resp_free. Returns NULL on alloc failure. */ +resp_object *resp_array_init(void); + +/* Append value to array (destination must be RESPT_ARRAY). Takes ownership of value; no clone. Returns 0 on success. */ +int resp_array_append_obj(resp_object *destination, resp_object *value); + +/* Append string/integer to array (clone string for simple/bulk). Return 0 on success. */ +int resp_array_append_simple(resp_object *destination, const char *str); +int resp_array_append_bulk(resp_object *destination, const char *str); +int resp_array_append_int(resp_object *destination, long long i); + +#endif diff --git a/src/SchedulerModule/protothreads.h b/src/SchedulerModule/protothreads.h @@ -0,0 +1,409 @@ +/* + * Protothreads: cooperative multithreading for C (single header). + * + * Combined from: + * - pt.h (protothreads core) + * - pt-sem.h (counting semaphores) + * - lc-addrlabels.h (local continuations, GCC "labels as values") + * + * Original copyright (c) 2004-2006, Swedish Institute of Computer Science. + * All rights reserved. See Contiki and protothreads licensing terms. + * Author: Adam Dunkels <adam@sics.se> + * + * This implementation uses GCC's "labels as values" extension for + * local continuations. No separate lc.h / lc-addrlabels.h is required; + * continuation state is stored in struct pt and used only by PT_* macros. + */ + +#ifndef __PT_H__ +#define __PT_H__ + +#include <stddef.h> + +/* ========================================================================== + * Internal: unique labels for continuation points (GCC addrlabels). + * Not for direct use; merged into PT_BEGIN / PT_WAIT_UNTIL / PT_YIELD etc. + * ========================================================================== */ +#define PT_CONCAT2(s1, s2) s1##s2 +#define PT_CONCAT(s1, s2) PT_CONCAT2(s1, s2) + +/** Resume continuation for \a pt if set; otherwise fall through. */ + +#define PT_RESUME(pt) \ + do { \ + if ((pt)->lc != NULL) \ + goto *(pt)->lc; \ + } while (0) + +/** Set continuation for \a pt at current line. */ +#define PT_SET(pt) \ + do { \ + PT_CONCAT(PT_LABEL, __LINE__): \ + (pt)->lc = &&PT_CONCAT(PT_LABEL, __LINE__); \ + } while (0) + +/* ========================================================================== + * Protothread control structure + * ========================================================================== */ + +/** Protothread state. Only the PT_* macros should touch \a lc. */ +struct pt { + void *lc; /**< Continuation (GCC label pointer); NULL = start. */ +}; + +/** Return values from a protothread function. */ +#define PT_WAITING 0 /**< Blocked waiting. */ +#define PT_YIELDED 1 /**< Yielded voluntarily. */ +#define PT_EXITED 2 /**< Exited (PT_EXIT). */ +#define PT_ENDED 3 /**< Reached PT_END. */ + +/** + * \name Initialization + * @{ + */ + +/** + * Initialize a protothread. + * Must be called before the first PT_SCHEDULE of this protothread. + * \param pt Pointer to the protothread control structure. + */ +#define PT_INIT(pt) ((pt)->lc = NULL) + +/** @} */ + +/** + * \name Declaration and definition + * @{ + */ + +/** + * Declare a protothread function. + * Use as the function return type, e.g. PT_THREAD(worker(struct pt *pt)). + * \param name_args Function name and parameter list. + */ +#define PT_THREAD(name_args) char name_args + +/** + * Start of a protothread body. + * Place at the top of the function; statements above are run every schedule. + * Opens a block; PT_END closes it. PT_YIELD_FLAG is in scope between them. + * \param pt Pointer to the protothread control structure. + */ +#define PT_BEGIN(pt) { char PT_YIELD_FLAG = 1; (void)PT_YIELD_FLAG; PT_RESUME((pt)) + +/** + * End of a protothread. + * Closes the block opened by PT_BEGIN and returns PT_ENDED. + * \param pt Pointer to the protothread control structure. + */ +#define PT_END(pt) PT_INIT(pt); PT_YIELD_FLAG = 0; return PT_ENDED; } + +/** @} */ + +/** + * \name Blocked wait + * @{ + */ + +/** + * Block until \a condition is true. + * \param pt Pointer to the protothread control structure. + * \param condition Expression; protothread resumes when it is non-zero. + */ +#define PT_WAIT_UNTIL(pt, condition) \ + do { \ + PT_SET(pt); \ + if (!(condition)) \ + return PT_WAITING; \ + } while (0) + +/** + * Block while \a cond is true (i.e. until !(cond)). + * \param pt Pointer to the protothread control structure. + * \param cond Expression. + */ +#define PT_WAIT_WHILE(pt, cond) PT_WAIT_UNTIL((pt), !(cond)) + +/** @} */ + +/** + * \name Hierarchical protothreads + * @{ + */ + +/** + * Block until child protothread \a thread completes. + * The child must be initialized with PT_INIT before use. + * \param pt Pointer to the parent protothread control structure. + * \param thread Child protothread call, e.g. child_pt(&pt_child). + */ +#define PT_WAIT_THREAD(pt, thread) PT_WAIT_WHILE((pt), PT_SCHEDULE(thread)) + +/** + * Spawn a child protothread and wait until it exits. + * Initializes the child and waits; use within a protothread only. + * \param pt Pointer to the parent protothread control structure. + * \param child Pointer to the child's struct pt. + * \param thread Child protothread call, e.g. child_fn(child). + */ +#define PT_SPAWN(pt, child, thread) \ + do { \ + PT_INIT((child)); \ + PT_WAIT_THREAD((pt), (thread)); \ + } while (0) + +/** @} */ + +/** + * \name Exiting and restarting + * @{ + */ + +/** + * Restart the protothread from PT_BEGIN. + * \param pt Pointer to the protothread control structure. + */ +#define PT_RESTART(pt) \ + do { \ + PT_INIT(pt); \ + return PT_WAITING; \ + } while (0) + +/** + * Exit the protothread. + * If it was spawned, the parent becomes unblocked. + * \param pt Pointer to the protothread control structure. + */ +#define PT_EXIT(pt) \ + do { \ + PT_INIT(pt); \ + return PT_EXITED; \ + } while (0) + +/** @} */ + +/** + * \name Scheduling + * @{ + */ + +/** + * Schedule a protothread. + * \param f Call to the protothread function, e.g. my_pt(&pt_state). + * \return Non-zero if still running, 0 if exited or ended. + */ +#define PT_SCHEDULE(f) ((f) < PT_EXITED) + +/** @} */ + +/** + * \name Yielding + * @{ + */ + +/** + * Yield once, then continue. + * \param pt Pointer to the protothread control structure. + */ +#define PT_YIELD(pt) \ + do { \ + PT_YIELD_FLAG = 0; \ + PT_SET(pt); \ + if (PT_YIELD_FLAG == 0) \ + return PT_YIELDED; \ + } while (0) + +/** + * Yield until \a cond is true. + * \param pt Pointer to the protothread control structure. + * \param cond Expression; protothread continues when it is non-zero. + */ +#define PT_YIELD_UNTIL(pt, cond) \ + do { \ + PT_YIELD_FLAG = 0; \ + PT_SET(pt); \ + if ((PT_YIELD_FLAG == 0) || !(cond)) \ + return PT_YIELDED; \ + } while (0) + +/** @} */ + +/* =========================================================================== + * Counting semaphores (pt-sem) + * =========================================================================== */ + +/** + * Counting semaphore for use with protothreads. + * Wait blocks while count is 0; signal increments count. + */ +struct pt_sem { + unsigned int count; +}; + +/** + * Initialize a semaphore. + * \param s Pointer to the semaphore. + * \param c Initial count (unsigned). + */ +#define PT_SEM_INIT(s, c) ((s)->count = (c)) + +/** + * Wait for the semaphore (decrement when > 0). + * Blocks the protothread while count is 0. + * \param pt Pointer to the protothread control structure. + * \param s Pointer to the semaphore. + */ +#define PT_SEM_WAIT(pt, s) \ + do { \ + PT_WAIT_UNTIL(pt, (s)->count > 0); \ + (s)->count--; \ + } while (0) + +/** + * Signal the semaphore (increment count). + * \param pt Pointer to the protothread control structure (unused; for API consistency). + * \param s Pointer to the semaphore. + */ +#define PT_SEM_SIGNAL(pt, s) ((s)->count++) + +/* =========================================================================== + * EXAMPLES (for reference; block not compiled when header is included) + * =========================================================================== + * + * Example 1: Minimal protothread + * + * #include "pt.h" + * + * static struct pt pt_worker; + * + * static PT_THREAD(worker(struct pt *pt)) + * { + * PT_BEGIN(pt); + * while (1) { + * do_something(); + * PT_WAIT_UNTIL(pt, data_ready()); + * process(data); + * } + * PT_END(pt); + * } + * + * int main(void) + * { + * PT_INIT(&pt_worker); + * while (PT_SCHEDULE(worker(&pt_worker))) + * ; + * return 0; + * } + * + * Example 2: Producer-consumer with semaphores (bounded buffer) + * + * #include "pt.h" + * + * #define NUM_ITEMS 32 + * #define BUFSIZE 8 + * + * static struct pt_sem mutex, full, empty; + * static struct pt pt_producer, pt_consumer, pt_driver; + * + * static PT_THREAD(producer(struct pt *pt)) + * { + * static unsigned int produced; + * + * PT_BEGIN(pt); + * for (produced = 0; produced < NUM_ITEMS; produced++) { + * PT_SEM_WAIT(pt, &full); + * PT_SEM_WAIT(pt, &mutex); + * add_to_buffer(produce_item()); + * PT_SEM_SIGNAL(pt, &mutex); + * PT_SEM_SIGNAL(pt, &empty); + * } + * PT_END(pt); + * } + * + * static PT_THREAD(consumer(struct pt *pt)) + * { + * static unsigned int consumed; + * + * PT_BEGIN(pt); + * for (consumed = 0; consumed < NUM_ITEMS; consumed++) { + * PT_SEM_WAIT(pt, &empty); + * PT_SEM_WAIT(pt, &mutex); + * consume_item(get_from_buffer()); + * PT_SEM_SIGNAL(pt, &mutex); + * PT_SEM_SIGNAL(pt, &full); + * } + * PT_END(pt); + * } + * + * static PT_THREAD(driver_thread(struct pt *pt)) + * { + * PT_BEGIN(pt); + * + * PT_SEM_INIT(&empty, 0); + * PT_SEM_INIT(&full, BUFSIZE); + * PT_SEM_INIT(&mutex, 1); + * + * PT_INIT(&pt_producer); + * PT_INIT(&pt_consumer); + * + * PT_WAIT_THREAD(pt, producer(&pt_producer) & consumer(&pt_consumer)); + * + * PT_END(pt); + * } + * + * int main(void) + * { + * PT_INIT(&pt_driver); + * while (PT_SCHEDULE(driver_thread(&pt_driver))) + * ; + * return 0; + * } + * + * Note: producer(&pt_producer) & consumer(&pt_consumer) runs both threads + * each schedule round and blocks until both have exited (return value + * >= PT_EXITED). Implement add_to_buffer, get_from_buffer, produce_item, + * consume_item, and the buffer as needed. + * + * Example 3: Spawning a child protothread + * + * static struct pt pt_parent, pt_child; + * + * static PT_THREAD(child(struct pt *pt)) + * { + * PT_BEGIN(pt); + * do_work(); + * PT_END(pt); + * } + * + * static PT_THREAD(parent(struct pt *pt)) + * { + * PT_BEGIN(pt); + * PT_SPAWN(pt, &pt_child, child(&pt_child)); + * PT_END(pt); + * } + * + * int main(void) + * { + * PT_INIT(&pt_parent); + * while (PT_SCHEDULE(parent(&pt_parent))) + * ; + * return 0; + * } + * + * Example 4: Yielding + * + * static PT_THREAD(periodic(struct pt *pt)) + * { + * PT_BEGIN(pt); + * for (;;) { + * do_work(); + * PT_YIELD(pt); + * } + * PT_END(pt); + * } + * + * PT_YIELD(pt) runs one step then yields; the scheduler can run other + * threads. PT_YIELD_UNTIL(pt, cond) yields until cond is true. + */ + +#endif /* __PT_H__ */ diff --git a/src/SchedulerModule/scheduler.c b/src/SchedulerModule/scheduler.c @@ -0,0 +1,130 @@ +#include <stdlib.h> +#include <sys/select.h> +#include <sys/time.h> + +#include "rxi/log.h" +#include "scheduler.h" + +#ifndef NULL +#define NULL ((void*)0) +#endif + +pt_task_t *pt_first = NULL; +fd_set g_select_result; +static fd_set g_want_fds; + +int schedmod_pt_create(pt_task_fn fn, void *udata) { + if (!fn) return 1; + + pt_task_t *node = calloc(1, sizeof(pt_task_t)); + node->next = pt_first; + node->func = fn; + node->udata = udata; + PT_INIT(&node->pt); + node->is_active = 1; + pt_first = node; + + return 0; +} + +int schedmod_pt_remove(pt_task_t *task) { + if (!task) return 1; + + pt_task_t *curr = pt_first; + pt_task_t *prev = NULL; + + while (curr) { + if (curr == task) { + if (prev) { + prev->next = curr->next; + } else { + pt_first = curr->next; + } + free(curr); + return 0; + } + prev = curr; + curr = curr->next; + } + + return 1; +} + +int schedmod_has_data(int *in_fds, int **out_fds) { + if (!in_fds || in_fds[0] == 0) return 0; + + for (int i = 1; i <= in_fds[0]; i++) { + int fd = in_fds[i]; + if (fd >= 0) { + FD_SET(fd, &g_want_fds); + } + } + + if (*out_fds) free(*out_fds); + *out_fds = NULL; + + int count = 0; + for (int i = 1; i <= in_fds[0]; i++) { + if (in_fds[i] >= 0 && FD_ISSET(in_fds[i], &g_select_result)) { + count++; + } + } + + if (count == 0) return 0; + + *out_fds = malloc(sizeof(int) * (count + 1)); + if (!*out_fds) return 0; + + (*out_fds)[0] = count; + int idx = 1; + for (int i = 1; i <= in_fds[0]; i++) { + if (in_fds[i] >= 0 && FD_ISSET(in_fds[i], &g_select_result)) { + (*out_fds)[idx++] = in_fds[i]; + } + } + + return count; +} + +int schedmod_main() { + if (!pt_first) return 0; + + struct timeval tv; + int maxfd = -1; + + for(;;) { + maxfd = -1; + for (int fd = 0; fd < FD_SETSIZE; fd++) { + if (FD_ISSET(fd, &g_want_fds)) { + if (fd > maxfd) maxfd = fd; + } + } + + tv.tv_sec = 0; + tv.tv_usec = 100000; + select(maxfd + 1, &g_want_fds, NULL, NULL, &tv); + + struct timeval now; + gettimeofday(&now, NULL); + int64_t timestamp = (int64_t)now.tv_sec * 1000 + now.tv_usec / 1000; + g_select_result = g_want_fds; + + FD_ZERO(&g_want_fds); + + pt_task_t *task = pt_first; + while (task) { + pt_task_t *next = task->next; + log_trace("scheduler: running task %p", (void*)task->func); + task->is_active = PT_SCHEDULE(task->func(&task->pt, timestamp, task)); + log_trace("scheduler: task %p returned, is_active=%d", (void*)task->func, task->is_active); + if (!task->is_active) { + schedmod_pt_remove(task); + } + task = next; + } + + if (!pt_first) break; + } + + return 0; +} diff --git a/src/SchedulerModule/scheduler.h b/src/SchedulerModule/scheduler.h @@ -0,0 +1,30 @@ +#ifndef __SCHEDULERMODULE_SCHEDULER_H__ +#define __SCHEDULERMODULE_SCHEDULER_H__ + +#include "protothreads.h" + +#include <stdint.h> +#include <sys/select.h> + +struct pt_task; + +typedef char (*pt_task_fn)(struct pt *pt, int64_t timestamp, struct pt_task *task); + +typedef struct pt_task { + struct pt pt; + struct pt_task *next; + pt_task_fn func; + void *udata; + char is_active; + int maxfd; +} pt_task_t; + +int schedmod_pt_create(pt_task_fn fn, void *udata); +int schedmod_pt_remove(pt_task_t *task); +int schedmod_main(); + +extern fd_set g_select_result; + +int schedmod_has_data(int *in_fds, int **out_fds); + +#endif // __SCHEDULERMODULE_SCHEDULER_H__ diff --git a/src/common/digest_auth.c b/src/common/digest_auth.c @@ -0,0 +1,54 @@ +/* + * Digest authentication helpers (RFC 2069). + * Simplified digest auth: response = MD5(HA1:nonce:HA2) + */ +#include "common/digest_auth.h" +#include "common/md5.h" +#include <string.h> + +void cvt_hex(const unsigned char *bin, HASHHEX hex) { + for (int i = 0; i < DIGEST_HASHLEN; i++) { + unsigned char j = (bin[i] >> 4) & 0xf; + hex[i * 2] = (char)(j <= 9 ? j + '0' : j + 'a' - 10); + j = bin[i] & 0xf; + hex[i * 2 + 1] = (char)(j <= 9 ? j + '0' : j + 'a' - 10); + } + hex[DIGEST_HASHHEXLEN] = '\0'; +} + +void digest_calc_ha1(const char *user, const char *realm, const char *password, HASHHEX out) { + MD5_CTX ctx; + HASH ha1; + MD5_Init(&ctx); + if (user) MD5_Update(&ctx, (const unsigned char *)user, strlen(user)); + MD5_Update(&ctx, (const unsigned char *)":", 1); + if (realm) MD5_Update(&ctx, (const unsigned char *)realm, strlen(realm)); + MD5_Update(&ctx, (const unsigned char *)":", 1); + if (password) MD5_Update(&ctx, (const unsigned char *)password, strlen(password)); + MD5_Final(ha1, &ctx); + cvt_hex(ha1, out); +} + +void digest_calc_ha2(const char *method, const char *uri, HASHHEX out) { + MD5_CTX ctx; + HASH ha2; + MD5_Init(&ctx); + if (method) MD5_Update(&ctx, (const unsigned char *)method, strlen(method)); + MD5_Update(&ctx, (const unsigned char *)":", 1); + if (uri) MD5_Update(&ctx, (const unsigned char *)uri, strlen(uri)); + MD5_Final(ha2, &ctx); + cvt_hex(ha2, out); +} + +void digest_calc_response(HASHHEX ha1, const char *nonce, HASHHEX ha2, HASHHEX out) { + MD5_CTX ctx; + HASH resphash; + MD5_Init(&ctx); + MD5_Update(&ctx, (const unsigned char *)ha1, DIGEST_HASHHEXLEN); + MD5_Update(&ctx, (const unsigned char *)":", 1); + if (nonce) MD5_Update(&ctx, (const unsigned char *)nonce, strlen(nonce)); + MD5_Update(&ctx, (const unsigned char *)":", 1); + MD5_Update(&ctx, (const unsigned char *)ha2, DIGEST_HASHHEXLEN); + MD5_Final(resphash, &ctx); + cvt_hex(resphash, out); +} diff --git a/src/common/digest_auth.h b/src/common/digest_auth.h @@ -0,0 +1,27 @@ +/* + * Digest authentication helpers (RFC 2069). + * Simplified digest auth: response = MD5(HA1:nonce:HA2) + * Shared between SIP server (extension auth) and trunk registration (client auth). + */ +#ifndef UPBX_DIGEST_AUTH_H +#define UPBX_DIGEST_AUTH_H + +#define DIGEST_HASHLEN 16 +#define DIGEST_HASHHEXLEN 32 + +typedef unsigned char HASH[DIGEST_HASHLEN]; +typedef unsigned char HASHHEX[DIGEST_HASHHEXLEN + 1]; + +/* Convert raw 16-byte hash to 32-char lowercase hex string (NUL-terminated). */ +void cvt_hex(const unsigned char *bin, HASHHEX hex); + +/* HA1 = MD5(username:realm:password) */ +void digest_calc_ha1(const char *user, const char *realm, const char *password, HASHHEX out); + +/* HA2 = MD5(method:uri) */ +void digest_calc_ha2(const char *method, const char *uri, HASHHEX out); + +/* response = MD5(HA1:nonce:HA2) */ +void digest_calc_response(HASHHEX ha1, const char *nonce, HASHHEX ha2, HASHHEX out); + +#endif diff --git a/src/common/hexdump.c b/src/common/hexdump.c @@ -0,0 +1,35 @@ +/* + * Canonical hexdump at trace level; shared by sip_server and trunk_reg. + */ +#include <stddef.h> +#include <stdio.h> + +#include "rxi/log.h" + +void log_hexdump_trace(const char *buf, size_t len) { + log_trace("%s", __func__); + char line[96]; + const size_t row = 16; + for (size_t off = 0; off < len; off += row) { + size_t n = row; + if (off + n > len) + n = len - off; + size_t o = 0; + o += (size_t)snprintf(line + o, sizeof(line) - o, "%08zx ", off); + for (size_t i = 0; i < row; i++) { + if (i < n) + o += (size_t)snprintf(line + o, sizeof(line) - o, "%02x ", (unsigned char)buf[off + i]); + else + o += (size_t)snprintf(line + o, sizeof(line) - o, " "); + if (i == 7) + o += (size_t)snprintf(line + o, sizeof(line) - o, " "); + } + o += (size_t)snprintf(line + o, sizeof(line) - o, " |"); + for (size_t i = 0; i < n; i++) { + unsigned char c = (unsigned char)buf[off + i]; + line[o++] = (c >= 0x20 && c < 0x7f) ? (char)c : '.'; + } + o += (size_t)snprintf(line + o, sizeof(line) - o, "|"); + log_trace("%s", line); + } +} diff --git a/src/common/hexdump.h b/src/common/hexdump.h @@ -0,0 +1,12 @@ +/* + * Canonical hexdump utility for debugging network buffers. + */ +#ifndef UPBX_HEXDUMP_H +#define UPBX_HEXDUMP_H + +#include <stddef.h> + +/* Log buf[0..len) as canonical hexdump at trace level. */ +void log_hexdump_trace(const char *buf, size_t len); + +#endif diff --git a/src/common/md5.c b/src/common/md5.c @@ -0,0 +1,147 @@ +/* + * MD5 (RFC 1321) -- reference implementation in C. + * Public domain. No external dependencies. + */ +#include "common/md5.h" +#include <string.h> +#include <stdint.h> + +#define F(x, y, z) U32(((x) & (y)) | ((~(x)) & (z))) +#define G(x, y, z) U32(((x) & (z)) | ((y) & (~(z)))) +#define H(x, y, z) U32((x) ^ (y) ^ (z)) +#define I(x, y, z) U32((y) ^ ((x) | (~(z)))) + +#define U32(x) ((x) & 0xffffffffu) +#define ROTL32(v, n) (U32(((v) << (n)) | ((v) >> (32 - (n))))) + +#define STEP(f, a, b, c, d, x, t, s) do { \ + (a) = U32((a) + f((b), (c), (d)) + (x) + (t)); \ + (a) = ROTL32((a), (s)); \ + (a) = U32((a) + (b)); \ +} while (0) + +static const unsigned int T[64] = { + 0xd76aa478, 0xe8c7b756, 0x242070db, 0xc1bdceee, 0xf57c0faf, 0x4787c62a, 0xa8304613, 0xfd469501, + 0x698098d8, 0x8b44f7af, 0xffff5bb1, 0x895cd7be, 0x6b901122, 0xfd987193, 0xa679438e, 0x49b40821, + 0xf61e2562, 0xc040b340, 0x265e5a51, 0xe9b6c7aa, 0xd62f105d, 0x02441453, 0xd8a1e681, 0xe7d3fbc8, + 0x21e1cde6, 0xc33707d6, 0xf4d50d87, 0x455a14ed, 0xa9e3e905, 0xfcefa3f8, 0x676f02d9, 0x8d2a4c8a, + 0xfffa3942, 0x8771f681, 0x6d9d6122, 0xfde5380c, 0xa4beea44, 0x4bdecfa9, 0xf6bb4b60, 0xbebfbc70, + 0x289b7ec6, 0xeaa127fa, 0xd4ef3085, 0x04881d05, 0xd9d4d039, 0xe6db99e5, 0x1fa27cf8, 0xc4ac5665, + 0xf4292244, 0x432aff97, 0xab9423a7, 0xfc93a039, 0x655b59c3, 0x8f0ccc92, 0xffeff47d, 0x85845dd1, + 0x6fa87e4f, 0xfe2ce6e0, 0xa3014314, 0x4e0811a1, 0xf7537e82, 0xbd3af235, 0x2ad7d2bb, 0xeb86d391, +}; + +static void md5_transform(unsigned int state[4], const unsigned char block[64]) { + uint32_t a = (uint32_t)state[0], b = (uint32_t)state[1], c = (uint32_t)state[2], d = (uint32_t)state[3]; + uint32_t X[16]; + int i; + + for (i = 0; i < 16; i++) + X[i] = (uint32_t)block[i*4] | ((uint32_t)block[i*4+1] << 8) | + ((uint32_t)block[i*4+2] << 16) | ((uint32_t)block[i*4+3] << 24); + + /* Round 1 */ + STEP(F, a, b, c, d, X[ 0], T[ 0], 7); STEP(F, d, a, b, c, X[ 1], T[ 1], 12); + STEP(F, c, d, a, b, X[ 2], T[ 2], 17); STEP(F, b, c, d, a, X[ 3], T[ 3], 22); + STEP(F, a, b, c, d, X[ 4], T[ 4], 7); STEP(F, d, a, b, c, X[ 5], T[ 5], 12); + STEP(F, c, d, a, b, X[ 6], T[ 6], 17); STEP(F, b, c, d, a, X[ 7], T[ 7], 22); + STEP(F, a, b, c, d, X[ 8], T[ 8], 7); STEP(F, d, a, b, c, X[ 9], T[ 9], 12); + STEP(F, c, d, a, b, X[10], T[10], 17); STEP(F, b, c, d, a, X[11], T[11], 22); + STEP(F, a, b, c, d, X[12], T[12], 7); STEP(F, d, a, b, c, X[13], T[13], 12); + STEP(F, c, d, a, b, X[14], T[14], 17); STEP(F, b, c, d, a, X[15], T[15], 22); + /* Round 2 */ + STEP(G, a, b, c, d, X[ 1], T[16], 5); STEP(G, d, a, b, c, X[ 6], T[17], 9); + STEP(G, c, d, a, b, X[11], T[18], 14); STEP(G, b, c, d, a, X[ 0], T[19], 20); + STEP(G, a, b, c, d, X[ 5], T[20], 5); STEP(G, d, a, b, c, X[10], T[21], 9); + STEP(G, c, d, a, b, X[15], T[22], 14); STEP(G, b, c, d, a, X[ 4], T[23], 20); + STEP(G, a, b, c, d, X[ 9], T[24], 5); STEP(G, d, a, b, c, X[14], T[25], 9); + STEP(G, c, d, a, b, X[ 3], T[26], 14); STEP(G, b, c, d, a, X[ 8], T[27], 20); + STEP(G, a, b, c, d, X[13], T[28], 5); STEP(G, d, a, b, c, X[ 2], T[29], 9); + STEP(G, c, d, a, b, X[ 7], T[30], 14); STEP(G, b, c, d, a, X[12], T[31], 20); + /* Round 3 */ + STEP(H, a, b, c, d, X[ 5], T[32], 4); STEP(H, d, a, b, c, X[ 8], T[33], 11); + STEP(H, c, d, a, b, X[11], T[34], 16); STEP(H, b, c, d, a, X[14], T[35], 23); + STEP(H, a, b, c, d, X[ 1], T[36], 4); STEP(H, d, a, b, c, X[ 4], T[37], 11); + STEP(H, c, d, a, b, X[ 7], T[38], 16); STEP(H, b, c, d, a, X[10], T[39], 23); + STEP(H, a, b, c, d, X[13], T[40], 4); STEP(H, d, a, b, c, X[ 0], T[41], 11); + STEP(H, c, d, a, b, X[ 3], T[42], 16); STEP(H, b, c, d, a, X[ 6], T[43], 23); + STEP(H, a, b, c, d, X[ 9], T[44], 4); STEP(H, d, a, b, c, X[12], T[45], 11); + STEP(H, c, d, a, b, X[15], T[46], 16); STEP(H, b, c, d, a, X[ 2], T[47], 23); + /* Round 4 */ + STEP(I, a, b, c, d, X[ 0], T[48], 6); STEP(I, d, a, b, c, X[ 7], T[49], 10); + STEP(I, c, d, a, b, X[14], T[50], 15); STEP(I, b, c, d, a, X[ 5], T[51], 21); + STEP(I, a, b, c, d, X[12], T[52], 6); STEP(I, d, a, b, c, X[ 3], T[53], 10); + STEP(I, c, d, a, b, X[10], T[54], 15); STEP(I, b, c, d, a, X[ 1], T[55], 21); + STEP(I, a, b, c, d, X[ 8], T[56], 6); STEP(I, d, a, b, c, X[15], T[57], 10); + STEP(I, c, d, a, b, X[ 6], T[58], 15); STEP(I, b, c, d, a, X[13], T[59], 21); + STEP(I, a, b, c, d, X[ 4], T[60], 6); STEP(I, d, a, b, c, X[11], T[61], 10); + STEP(I, c, d, a, b, X[ 2], T[62], 15); STEP(I, b, c, d, a, X[ 9], T[63], 21); + + state[0] = (unsigned int)(state[0] + a); state[1] = (unsigned int)(state[1] + b); + state[2] = (unsigned int)(state[2] + c); state[3] = (unsigned int)(state[3] + d); +} + +void MD5_Init(MD5_CTX *ctx) { + ctx->state[0] = 0x67452301u; + ctx->state[1] = 0xefcdab89u; + ctx->state[2] = 0x98badcfeu; + ctx->state[3] = 0x10325476u; + ctx->count[0] = ctx->count[1] = 0; +} + +void MD5_Update(MD5_CTX *ctx, const void *data, size_t len) { + const unsigned char *p = (const unsigned char *)data; + unsigned int i, n; + + i = (unsigned int)((ctx->count[0] >> 3) & 63u); + ctx->count[0] += (unsigned int)(len << 3); + if (ctx->count[0] < (len << 3)) + ctx->count[1]++; + ctx->count[1] += (unsigned int)(len >> 29); + + n = 64u - i; + if (len >= n) { + memcpy(ctx->buf + i, p, n); + md5_transform(ctx->state, ctx->buf); + for (; n + 63 < len; n += 64) + md5_transform(ctx->state, p + n); + i = 0; + p += n; + len -= n; + } + if (len) + memcpy(ctx->buf + i, p, len); +} + +void MD5_Final(unsigned char digest[16], MD5_CTX *ctx) { + static const unsigned char padding[64] = { + 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + }; + unsigned char bits[8]; + unsigned int i, index, padLen; + + /* Encode count into bits (little-endian, low word first) */ + bits[0] = (unsigned char)(ctx->count[0] ); + bits[1] = (unsigned char)(ctx->count[0] >> 8); + bits[2] = (unsigned char)(ctx->count[0] >> 16); + bits[3] = (unsigned char)(ctx->count[0] >> 24); + bits[4] = (unsigned char)(ctx->count[1] ); + bits[5] = (unsigned char)(ctx->count[1] >> 8); + bits[6] = (unsigned char)(ctx->count[1] >> 16); + bits[7] = (unsigned char)(ctx->count[1] >> 24); + + index = (unsigned int)((ctx->count[0] >> 3) & 63u); + padLen = (index < 56) ? (56 - index) : (120 - index); + MD5_Update(ctx, padding, padLen); + MD5_Update(ctx, bits, 8); + + for (i = 0; i < 4; i++) { + digest[i*4 ] = (unsigned char)(ctx->state[i] ); + digest[i*4+1] = (unsigned char)(ctx->state[i] >> 8); + digest[i*4+2] = (unsigned char)(ctx->state[i] >> 16); + digest[i*4+3] = (unsigned char)(ctx->state[i] >> 24); + } +} diff --git a/src/common/md5.h b/src/common/md5.h @@ -0,0 +1,20 @@ +/* + * Minimal MD5 (RFC 1321) for Digest auth. No external crypto library. + * API compatible with OpenSSL MD5_* for drop-in use. + */ +#ifndef UPBX_MD5_H +#define UPBX_MD5_H + +#include <stddef.h> + +typedef struct { + unsigned int state[4]; + unsigned int count[2]; + unsigned char buf[64]; +} MD5_CTX; + +void MD5_Init(MD5_CTX *ctx); +void MD5_Update(MD5_CTX *ctx, const void *data, size_t len); +void MD5_Final(unsigned char digest[16], MD5_CTX *ctx); + +#endif diff --git a/src/common/socket_util.c b/src/common/socket_util.c @@ -0,0 +1,358 @@ +#include "common/socket_util.h" +#include "rxi/log.h" +#include <fcntl.h> +#include <unistd.h> +#include <string.h> +#include <stdio.h> +#include <stdlib.h> +#include <sys/socket.h> +#include <netinet/in.h> +#include <arpa/inet.h> +#include <netdb.h> +#include <errno.h> +#include <sys/un.h> +#include <sys/stat.h> + +int set_socket_nonblocking(int fd, int nonblock) { + int flags = fcntl(fd, F_GETFL, 0); + if (flags < 0) return -1; + if (nonblock) + flags |= O_NONBLOCK; + else + flags &= ~O_NONBLOCK; + return fcntl(fd, F_SETFL, flags) == 0 ? 0 : -1; +} + +int *tcp_listen(const char *addr, const char *default_host, const char *default_port) { + char host[256] = ""; + char port[32] = ""; + + if (default_host && default_host[0]) { + snprintf(host, sizeof(host), "%s", default_host); + } + if (default_port && default_port[0]) { + snprintf(port, sizeof(port), "%s", default_port); + } + + if (!addr || !addr[0]) { + if (!host[0] || !port[0]) { + log_error("tcp_listen: empty address and no defaults"); + return NULL; + } + } else if (addr[0] == '[') { + const char *close_bracket = strchr(addr, ']'); + if (!close_bracket) { + log_error("tcp_listen: invalid IPv6 format: missing ']'"); + return NULL; + } + size_t hlen = (size_t)(close_bracket - addr - 1); + if (hlen > 0) { + if (hlen >= sizeof(host)) hlen = sizeof(host) - 1; + memcpy(host, addr + 1, hlen); + host[hlen] = '\0'; + } + const char *colon = close_bracket + 1; + if (*colon == ':') { + snprintf(port, sizeof(port), "%s", colon + 1); + } else if (*colon != '\0') { + log_error("tcp_listen: invalid IPv6 format: expected ':' after ']'"); + return NULL; + } + } else { + int leading_colon = (addr[0] == ':'); + const char *p = leading_colon ? addr + 1 : addr; + int is_port_only = 1; + for (const char *q = p; *q; q++) { + if (*q < '0' || *q > '9') { is_port_only = 0; break; } + } + + const char *colon = strrchr(addr, ':'); + if (leading_colon && is_port_only) { + snprintf(port, sizeof(port), "%s", p); + } else if (is_port_only) { + if (default_host && default_host[0]) { + snprintf(host, sizeof(host), "%s", default_host); + } + snprintf(port, sizeof(port), "%s", p); + } else if (colon) { + size_t hlen = (size_t)(colon - addr); + if (hlen > 0) { + if (hlen >= sizeof(host)) hlen = sizeof(host) - 1; + memcpy(host, addr, hlen); + host[hlen] = '\0'; + } + snprintf(port, sizeof(port), "%s", colon + 1); + } else { + snprintf(host, sizeof(host), "%s", addr); + } + } + + if (!port[0]) { + log_error("tcp_listen: no port specified"); + return NULL; + } + + struct addrinfo hints, *res = NULL; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_flags = AI_PASSIVE; + if (getaddrinfo(host[0] ? host : NULL, port, &hints, &res) != 0 || !res) { + log_error("tcp_listen: getaddrinfo failed for %s:%s", host, port); + return NULL; + } + + int *fds = malloc(sizeof(int) * 3); + if (!fds) { + freeaddrinfo(res); + return NULL; + } + fds[0] = 0; + + int listen_all = (host[0] == '\0'); + struct addrinfo *p; + + for (p = res; p; p = p->ai_next) { + if (p->ai_family == AF_INET) { + int fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) continue; + int opt = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + if (bind(fd, p->ai_addr, p->ai_addrlen) == 0 && listen(fd, 8) == 0) { + set_socket_nonblocking(fd, 1); + fds[++fds[0]] = fd; + } else { + close(fd); + } + } else if (p->ai_family == AF_INET6 && listen_all) { + int fd = socket(AF_INET6, SOCK_STREAM, 0); + if (fd < 0) continue; + int opt = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &opt, sizeof(opt)); + if (bind(fd, p->ai_addr, p->ai_addrlen) == 0 && listen(fd, 8) == 0) { + set_socket_nonblocking(fd, 1); + fds[++fds[0]] = fd; + } else { + close(fd); + } + } + } + + freeaddrinfo(res); + + if (fds[0] == 0) { + log_error("tcp_listen: failed to bind to %s:%s", host, port); + free(fds); + return NULL; + } + + return fds; +} + +int *udp_recv(const char *addr, const char *default_host, const char *default_port) { + char host[256] = ""; + char port[32] = ""; + + if (default_host && default_host[0]) { + snprintf(host, sizeof(host), "%s", default_host); + } + if (default_port && default_port[0]) { + snprintf(port, sizeof(port), "%s", default_port); + } + + if (!addr || !addr[0]) { + if (!host[0] || !port[0]) { + log_error("udp_recv: empty address and no defaults"); + return NULL; + } + } else if (addr[0] == '[') { + const char *close_bracket = strchr(addr, ']'); + if (!close_bracket) { + log_error("udp_recv: invalid IPv6 format: missing ']'"); + return NULL; + } + size_t hlen = (size_t)(close_bracket - addr - 1); + if (hlen > 0) { + if (hlen >= sizeof(host)) hlen = sizeof(host) - 1; + memcpy(host, addr + 1, hlen); + host[hlen] = '\0'; + } + const char *colon = close_bracket + 1; + if (*colon == ':') { + snprintf(port, sizeof(port), "%s", colon + 1); + } else if (*colon != '\0') { + log_error("udp_recv: invalid IPv6 format: expected ':' after ']'"); + return NULL; + } + } else { + int leading_colon = (addr[0] == ':'); + const char *p = leading_colon ? addr + 1 : addr; + int is_port_only = 1; + for (const char *q = p; *q; q++) { + if (*q < '0' || *q > '9') { is_port_only = 0; break; } + } + + const char *colon = strrchr(addr, ':'); + if (leading_colon && is_port_only) { + snprintf(port, sizeof(port), "%s", p); + } else if (is_port_only) { + if (default_host && default_host[0]) { + snprintf(host, sizeof(host), "%s", default_host); + } + snprintf(port, sizeof(port), "%s", p); + } else if (colon) { + size_t hlen = (size_t)(colon - addr); + if (hlen > 0) { + if (hlen >= sizeof(host)) hlen = sizeof(host) - 1; + memcpy(host, addr, hlen); + host[hlen] = '\0'; + } + snprintf(port, sizeof(port), "%s", colon + 1); + } else { + snprintf(host, sizeof(host), "%s", addr); + } + } + + if (!port[0]) { + log_error("udp_recv: no port specified"); + return NULL; + } + + struct addrinfo hints, *res = NULL; + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_DGRAM; + hints.ai_flags = AI_PASSIVE; + if (getaddrinfo(host[0] ? host : NULL, port, &hints, &res) != 0 || !res) { + log_error("udp_recv: getaddrinfo failed for %s:%s", host, port); + return NULL; + } + + int *fds = malloc(sizeof(int) * 3); + if (!fds) { + freeaddrinfo(res); + return NULL; + } + fds[0] = 0; + + int listen_all = (host[0] == '\0'); + struct addrinfo *p; + + for (p = res; p; p = p->ai_next) { + if (p->ai_family == AF_INET) { + int fd = socket(AF_INET, SOCK_DGRAM, 0); + if (fd < 0) continue; + int opt = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + if (bind(fd, p->ai_addr, p->ai_addrlen) == 0) { + set_socket_nonblocking(fd, 1); + fds[++fds[0]] = fd; + } else { + close(fd); + } + } else if (p->ai_family == AF_INET6 && listen_all) { + int fd = socket(AF_INET6, SOCK_DGRAM, 0); + if (fd < 0) continue; + int opt = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); + setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &opt, sizeof(opt)); + if (bind(fd, p->ai_addr, p->ai_addrlen) == 0) { + set_socket_nonblocking(fd, 1); + fds[++fds[0]] = fd; + } else { + close(fd); + } + } + } + + freeaddrinfo(res); + + if (fds[0] == 0) { + log_error("udp_recv: failed to bind to %s:%s", host, port); + free(fds); + return NULL; + } + + return fds; +} + +int *unix_listen(const char *path, int sock_type) { + if (!path || !path[0]) { + log_error("unix_listen: empty path"); + return NULL; + } + + char *path_copy = strdup(path); + if (!path_copy) { + return NULL; + } + + char *dir = strdup(path); + if (!dir) { + free(path_copy); + return NULL; + } + + char *last_slash = strrchr(dir, '/'); + if (last_slash && last_slash != dir) { + *last_slash = '\0'; + if (strlen(dir) > 0) { + mkdir(dir, 0755); + } + } else if (!last_slash) { + dir[0] = '.'; + dir[1] = '\0'; + } + free(dir); + + unlink(path_copy); + + int *fds = malloc(sizeof(int) * 2); + if (!fds) { + free(path_copy); + return NULL; + } + fds[0] = 0; + + int fd = socket(AF_UNIX, sock_type, 0); + if (fd < 0) { + log_error("unix_listen: socket failed: %s", strerror(errno)); + free(path_copy); + free(fds); + return NULL; + } + + struct sockaddr_un addr; + memset(&addr, 0, sizeof(addr)); + addr.sun_family = AF_UNIX; + strncpy(addr.sun_path, path_copy, sizeof(addr.sun_path) - 1); + + if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + log_error("unix_listen: bind failed: %s", strerror(errno)); + close(fd); + free(path_copy); + free(fds); + return NULL; + } + + if (sock_type == SOCK_DGRAM) { + chmod(path_copy, 0777); + } else if (sock_type == SOCK_STREAM) { + if (listen(fd, 8) < 0) { + log_error("unix_listen: listen failed: %s", strerror(errno)); + close(fd); + unlink(path_copy); + free(path_copy); + free(fds); + return NULL; + } + } + + set_socket_nonblocking(fd, 1); + + fds[++fds[0]] = fd; + + free(path_copy); + return fds; +} diff --git a/src/common/socket_util.h b/src/common/socket_util.h @@ -0,0 +1,22 @@ +#ifndef UPBX_SOCKET_UTIL_H +#define UPBX_SOCKET_UTIL_H + +/* Set socket blocking (0) or non-blocking (1). Returns 0 on success, -1 on error. */ +int set_socket_nonblocking(int fd, int nonblock); + +/* Create listening sockets. Supports dual-stack when host is empty. + * addr can be "port", "host:port", or "[ipv6]:port". Missing host uses default_host, + * missing port uses default_port. + * Returns int array: index 0 = count, index 1+ = socket fds. Caller must free. + * On error returns NULL. */ +int *tcp_listen(const char *addr, const char *default_host, const char *default_port); + +/* Create UDP receiving sockets. Same semantics as tcp_listen(). */ +int *udp_recv(const char *addr, const char *default_host, const char *default_port); + +/* Create Unix domain socket. path is the socket path, sock_type is SOCK_DGRAM or SOCK_STREAM. + * Returns int array: index 0 = count, index 1 = socket fd. Caller must free. + * On error returns NULL. */ +int *unix_listen(const char *path, int sock_type); + +#endif diff --git a/src/common/util/hex.c b/src/common/util/hex.c @@ -0,0 +1,31 @@ +#include "common/util/hex.h" + +void hex_bytes_to_str(const unsigned char *bytes, size_t len, char *out) { + static const char hex[] = "0123456789abcdef"; + for (size_t i = 0; i < len; i++) { + out[i * 2] = hex[(bytes[i] >> 4) & 0x0f]; + out[i * 2 + 1] = hex[bytes[i] & 0x0f]; + } + out[len * 2] = '\0'; +} + +int hex_char_to_val(char c) { + if (c >= '0' && c <= '9') return c - '0'; + if (c >= 'A' && c <= 'F') return c - 'A' + 10; + if (c >= 'a' && c <= 'f') return c - 'a' + 10; + return -1; +} + +int hex_str_to_bytes(const char *hex, size_t hex_len, unsigned char *out, size_t out_size) { + if (hex_len % 2 != 0) return -1; + size_t bytes = hex_len / 2; + if (bytes > out_size) return -1; + + for (size_t i = 0; i < bytes; i++) { + int hi = hex_char_to_val(hex[i * 2]); + int lo = hex_char_to_val(hex[i * 2 + 1]); + if (hi < 0 || lo < 0) return -1; + out[i] = (unsigned char)((hi << 4) | lo); + } + return 0; +} diff --git a/src/common/util/hex.h b/src/common/util/hex.h @@ -0,0 +1,12 @@ +#ifndef UPBX_COMMON_UTIL_HEX_H +#define UPBX_COMMON_UTIL_HEX_H + +#include <stddef.h> + +void hex_bytes_to_str(const unsigned char *bytes, size_t len, char *out); + +int hex_char_to_val(char c); + +int hex_str_to_bytes(const char *hex, size_t hex_len, unsigned char *out, size_t out_size); + +#endif diff --git a/src/config.c b/src/config.c @@ -0,0 +1,62 @@ +#include <stdlib.h> +#include <string.h> +#include "benhoyt/inih.h" +#include "rxi/log.h" +#include "config.h" +#include "RespModule/resp.h" + +resp_object *global_cfg = NULL; +resp_object *pending_cfg = NULL; + +static const char *stored_config_path = NULL; + +static int config_handler(void *user, const char *section, const char *name, const char *value, int lineno) { + (void)lineno; + resp_object *cfg = (resp_object *)user; + resp_object *sec = resp_map_get(cfg, section); + if (!sec || sec->type != RESPT_ARRAY) { + sec = resp_array_init(); + resp_map_set(cfg, section, sec); + sec = resp_map_get(cfg, section); + } + resp_array_append_bulk(sec, name); + resp_array_append_bulk(sec, value); + return 1; +} + +void config_init(void) { + global_cfg = resp_array_init(); + config_load(global_cfg, config_get_path()); +} + +int config_load(resp_object *cfg, const char *path) { + return ini_parse(path, config_handler, cfg); +} + +void config_pending_init(void) { + pending_cfg = resp_array_init(); +} + +void config_swap(void) { + resp_object *old = global_cfg; + global_cfg = pending_cfg; + pending_cfg = old; + if (old) resp_free(old); +} + +int config_reload(void) { + config_pending_init(); + int r = config_load(pending_cfg, config_get_path()); + if (r < 0) return -1; + config_swap(); + return 0; +} + +void config_set_path(const char *path) { + if (stored_config_path) free((void*)stored_config_path); + stored_config_path = path ? strdup(path) : NULL; +} + +const char *config_get_path(void) { + return stored_config_path; +} diff --git a/src/config.h b/src/config.h @@ -0,0 +1,15 @@ +#ifndef UPBX_CONFIG_H +#define UPBX_CONFIG_H + +#include "RespModule/resp.h" + +extern resp_object *global_cfg; +extern resp_object *pending_cfg; + +void config_init(void); +int config_load(resp_object *cfg, const char *path); +int config_reload(void); +void config_set_path(const char *path); +const char *config_get_path(void); + +#endif diff --git a/src/main.c b/src/main.c @@ -0,0 +1,278 @@ +/// <!-- path: src/main.c --> +#ifdef __cplusplus +extern "C" { +#endif + +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <strings.h> +#include <signal.h> + +#include "cofyc/argparse.h" +#include "rxi/log.h" + +#include "CliModule/setup.h" +#include "CliModule/execute_command.h" +#include "CliModule/common.h" +#include "config.h" + +#ifdef __cplusplus +} +#endif + +extern void appmodule_setup(void); + +#define INCBIN_SILENCE_BITCODE_WARNING +#include "graphitemaster/incbin.h" +INCTXT(License, "LICENSE.md"); + +/// # GLOBAL OPTIONS +/// Global options apply to every **udphole** run. They must appear **before** the command name; everything after the command is passed to that command. +/// +/// **Synopsis** +/// +/// **udphole** [global options] &lt;command&gt; [command options and arguments] +/// **udphole** `-h` | `--help` +/// **udphole** `--license` +/// **udphole** `list-commands` +/// +/// **Options** +/// +/// `-h`, `--help` +/// Print the usage strings and options for this layer to stderr and exit. Shows the usages and the global options; does not list commands. +/// +/// `-f`, `--config` &lt;path&gt; +/// Config file path. If omitted, the following are tried in order: **$HOME/.config/udphole.conf**, **$HOME/.udphole.conf**, **/etc/udphole/udphole.conf**, **/etc/udphole.conf**. Used by all commands that read or write config (daemon). +/// +/// `-v`, `--verbosity` &lt;level&gt; +/// Log verbosity: **fatal**, **error**, **warn**, **info**, **debug**, **trace**. Default: **info**. Applies to the daemon and any command that uses the logging subsystem. +/// +/// `--log` &lt;path&gt; +/// In addition to stderr, append logs to the given file. Send SIGHUP to the process to reopen the file (e.g. after logrotate). Useful when running the daemon. +/// +/// `--license` +/// Print the full license text to stdout (paragraphs and list, wrapped to terminal width) and exit with 0. No config is loaded and no command is run. +/// +/// **Top-level commands** +/// +/// - [**daemon**](#daemon) — Run the UDP hole server. +/// - [**list-commands**](#list-commands) — List available commands and short descriptions. +/// +static const char *const usages[] = { + "udphole [global] command [local]", + "udphole list-commands", + "udphole --license", + NULL, +}; + +/* Log file state for --log and SIGHUP re-open */ +static FILE *log_file; +static char *log_path; +static volatile sig_atomic_t sighup_received; + +static void logfile_callback(log_Event *ev) { + if (sighup_received) { + sighup_received = 0; + if (log_path && log_file) { + fclose(log_file); + log_file = fopen(log_path, "a"); + } + config_reload(); + } + if (log_file) { + char buf[64]; + buf[strftime(buf, sizeof(buf), "%Y-%m-%d %H:%M:%S", ev->time)] = '\0'; + fprintf(log_file, "%s %-5s %s:%d: ", buf, log_level_string(ev->level), ev->file, ev->line); + vfprintf(log_file, ev->fmt, ev->ap); + fprintf(log_file, "\n"); + fflush(log_file); + } +} + +static void sighup_handler(int sig) { + (void)sig; + sighup_received = 1; +} + +#define MARKER_PARAGRAPH "<!-- paragraph -->" +#define MARKER_LIST_START "<!-- list:start -->" +#define MARKER_LIST_END "<!-- list:end -->" + +static void skip_whitespace(const char **p) { + while (**p == ' ' || **p == '\t' || **p == '\n' || **p == '\r') (*p)++; +} + +static void print_license_paragraph(const char *start, const char *end, int width) { + if (start >= end) return; + while (start < end && (*start == ' ' || *start == '\n' || *start == '\r')) start++; + while (end > start && (end[-1] == ' ' || end[-1] == '\n' || end[-1] == '\r')) end--; + if (start >= end) return; + static char buf[4096]; + size_t n = (size_t)(end - start); + if (n >= sizeof(buf)) n = sizeof(buf) - 1; + memcpy(buf, start, n); + buf[n] = '\0'; + for (size_t i = 0; i < n; i++) + if (buf[i] == '\n') buf[i] = ' '; + cli_print_wrapped(stdout, buf, width, 0); + fputc('\n', stdout); + fputc('\n', stdout); +} + +static void print_license_list(const char *start, const char *end, int width) { + const int left_col = 5; + const char *p = start; + while (p < end) { + skip_whitespace(&p); + if (p >= end) break; + if (*p < '0' || *p > '9') { p++; continue; } + const char *num_start = p; + while (p < end && *p >= '0' && *p <= '9') p++; + if (p >= end || *p != '.') continue; + p++; + skip_whitespace(&p); + const char *text_start = p; + const char *item_end = p; + while (item_end < end) { + const char *next = item_end; + while (next < end && *next != '\n') next++; + if (next < end) next++; + if (next >= end) { item_end = end; break; } + skip_whitespace(&next); + if (next < end && *next >= '0' && *next <= '9') { + const char *q = next; + while (q < end && *q >= '0' && *q <= '9') q++; + if (q < end && *q == '.') break; + } + item_end = next; + } + while (item_end > text_start && (item_end[-1] == ' ' || item_end[-1] == '\n' || item_end[-1] == '\r')) item_end--; + int num = atoi(num_start); + fprintf(stdout, " %2d. ", num); + static char buf[1024]; + size_t n = (size_t)(item_end - text_start); + if (n >= sizeof(buf)) n = sizeof(buf) - 1; + memcpy(buf, text_start, n); + buf[n] = '\0'; + for (size_t i = 0; i < n; i++) + if (buf[i] == '\n' || buf[i] == '\r') buf[i] = ' '; + cli_print_wrapped(stdout, buf, width, left_col); + fputc('\n', stdout); + p = item_end; + } + fputc('\n', stdout); +} + +static void cli_print_license(FILE *out, const char *text, int width) { + const char *p = text; + (void)out; + while (*p) { + const char *q = strstr(p, "<!-- "); + if (!q) { + print_license_paragraph(p, p + strlen(p), width); + break; + } + if (q > p) + print_license_paragraph(p, q, width); + q += 5; + if (strncmp(q, "paragraph -->", 13) == 0) { + q += 13; + skip_whitespace(&q); + const char *r = strstr(q, "<!-- "); + if (!r) r = q + strlen(q); + print_license_paragraph(q, r, width); + p = r; + continue; + } + if (strncmp(q, "list:start -->", 14) == 0) { + q += 14; + skip_whitespace(&q); + const char *r = strstr(q, "<!-- list:end -->"); + if (r) + print_license_list(q, r, width); + p = r ? r + 17 : q + strlen(q); + continue; + } + p = q; + } +} + +int main(int argc, const char **argv) { + const char *loglevel = "info"; + const char *logfile_path = NULL; + const char *config_path = NULL; + static int license_flag = 0; + + climodule_setup(); + appmodule_setup(); + + struct argparse argparse; + struct argparse_option options[] = { + OPT_HELP(), + OPT_STRING('f', "config", &config_path, "config file path (default: auto-detect)", NULL, 0, 0), + OPT_STRING('v', "verbosity", &loglevel, "log verbosity: fatal,error,warn,info,debug,trace (default: info)", NULL, 0, 0), + OPT_STRING(0, "log", &logfile_path, "also write log to file (SIGHUP reopens for logrotate)", NULL, 0, 0), + OPT_BOOLEAN(0, "license", &license_flag, "print license and exit", NULL, 0, 0), + OPT_END(), + }; + argparse_init(&argparse, options, usages, ARGPARSE_STOP_AT_NON_OPTION); + argc = argparse_parse(&argparse, argc, argv); + + if (license_flag) { + cli_print_license(stdout, (const char *)gLicenseData, cli_get_output_width(120)); + return 0; + } + + if (argc < 1) { + argparse_usage(&argparse); + return 1; + } + + /* Resolve config path: explicit -f, then default locations */ + if (!config_path || !config_path[0]) + config_path = cli_resolve_default_config(); + config_set_path(config_path); + config_init(); + + int level = LOG_INFO; + if (0) { + (void)0; + } else if (!strcasecmp(loglevel, "trace")) { + level = LOG_TRACE; + } else if (!strcasecmp(loglevel, "debug")) { + level = LOG_DEBUG; + } else if (!strcasecmp(loglevel, "info")) { + level = LOG_INFO; + } else if (!strcasecmp(loglevel, "warn")) { + level = LOG_WARN; + } else if (!strcasecmp(loglevel, "error")) { + level = LOG_ERROR; + } else if (!strcasecmp(loglevel, "fatal")) { + level = LOG_FATAL; + } else { + fprintf(stderr, "Unknown log level: %s\n", loglevel); + return 1; + } + log_set_level(level); + setvbuf(stderr, NULL, _IOLBF, 0); + + log_file = NULL; + log_path = NULL; + sighup_received = 0; + + if (logfile_path && logfile_path[0]) { + log_path = strdup(logfile_path); + log_file = fopen(log_path, "a"); + if (log_file) { + log_add_callback(logfile_callback, log_path, level); + signal(SIGHUP, sighup_handler); + } else { + fprintf(stderr, "Could not open log file: %s\n", logfile_path); + free(log_path); + log_path = NULL; + } + } + + return climodule_execute_command(argc, argv); +}