commit d9cbcfcf98fb9de5ddcb6ed1aa5560d760bae242
parent 78ff0eda87ead32a12a8b26fd0d2f5129161ef94
Author: Robin Bron <robin.bron@yourhosting.nl>
Date: Mon, 2 Mar 2026 10:15:03 +0100
Radically simplify scheduler
Diffstat:
13 files changed, 373 insertions(+), 501 deletions(-)
diff --git a/.gitignore b/.gitignore
@@ -3,3 +3,5 @@
*.log
/udphole
/config.ini
+/.cache/
+/compile_commands.json
diff --git a/Makefile b/Makefile
@@ -108,6 +108,10 @@ $(BIN): $(OBJ)
.PHONY: test
test: $(BIN)
+ @gcc -Wall -O2 -I src -D INI_HANDLER_LINENO=1 test/test_scheduler.c src/common/scheduler.c -o test-scheduler
+ @./test-scheduler
+ @rm -f test-scheduler
+ @sleep 1
@node test/system-commands.js
@sleep 1
@node test/basic-forwarding-tcp.js
diff --git a/src/common/scheduler.c b/src/common/scheduler.c
@@ -0,0 +1,116 @@
+#include <stdlib.h>
+#include <sys/select.h>
+#include <sys/time.h>
+
+#include "common/scheduler.h"
+
+#ifndef NULL
+#define NULL ((void*)0)
+#endif
+
+pt_task_t *pt_first = NULL;
+fd_set g_select_result;
+static fd_set g_want_fds;
+
+int sched_create(pt_task_fn fn, void *udata) {
+ if (!fn) return 1;
+
+ pt_task_t *node = calloc(1, sizeof(pt_task_t));
+ node->next = pt_first;
+ node->func = fn;
+ node->udata = udata;
+ node->is_active = 1;
+ pt_first = node;
+
+ return 0;
+}
+
+int sched_remove(pt_task_t *task) {
+ if (!task) return 1;
+
+ pt_task_t *curr = pt_first;
+ pt_task_t *prev = NULL;
+
+ while (curr) {
+ if (curr == task) {
+ if (prev) {
+ prev->next = curr->next;
+ } else {
+ pt_first = curr->next;
+ }
+ free(curr);
+ return 0;
+ }
+ prev = curr;
+ curr = curr->next;
+ }
+
+ return 1;
+}
+
+int sched_has_data(int *in_fds) {
+ if (!in_fds || in_fds[0] == 0) return -1;
+
+ for (int i = 1; i <= in_fds[0]; i++) {
+ int fd = in_fds[i];
+ if (fd >= 0) {
+ FD_SET(fd, &g_want_fds);
+ }
+ }
+
+ for (int i = 1; i <= in_fds[0]; i++) {
+ int fd = in_fds[i];
+ if (fd >= 0 && FD_ISSET(fd, &g_select_result)) {
+ FD_CLR(fd, &g_select_result);
+ return fd;
+ }
+ }
+
+ return -1;
+}
+
+int sched_main(void) {
+ if (!pt_first) return 0;
+
+ struct timeval tv;
+ int maxfd;
+
+ for(;;) {
+ maxfd = -1;
+ for (int fd = 0; fd < FD_SETSIZE; fd++) {
+ if (FD_ISSET(fd, &g_want_fds)) {
+ if (fd > maxfd) maxfd = fd;
+ }
+ }
+
+ if (maxfd < 0) {
+ tv.tv_sec = 0;
+ tv.tv_usec = 100000;
+ select(0, NULL, NULL, NULL, &tv);
+ } else {
+ tv.tv_sec = 0;
+ tv.tv_usec = 100000;
+ select(maxfd + 1, &g_want_fds, NULL, NULL, &tv);
+ g_select_result = g_want_fds;
+ FD_ZERO(&g_want_fds);
+ }
+
+ struct timeval now;
+ gettimeofday(&now, NULL);
+ int64_t timestamp = (int64_t)now.tv_sec * 1000 + now.tv_usec / 1000;
+
+ pt_task_t *task = pt_first;
+ while (task) {
+ pt_task_t *next = task->next;
+ task->is_active = (task->func(timestamp, task) == SCHED_RUNNING);
+ if (!task->is_active) {
+ sched_remove(task);
+ }
+ task = next;
+ }
+
+ if (!pt_first) break;
+ }
+
+ return 0;
+}
diff --git a/src/common/scheduler.h b/src/common/scheduler.h
@@ -0,0 +1,29 @@
+#ifndef UDPHOLE_SCHEDULER_H
+#define UDPHOLE_SCHEDULER_H
+
+#include <stdint.h>
+#include <sys/select.h>
+
+#define SCHED_RUNNING 0
+#define SCHED_DONE 1
+#define SCHED_ERROR 2
+
+struct pt_task;
+
+typedef int (*pt_task_fn)(int64_t timestamp, struct pt_task *task);
+
+typedef struct pt_task {
+ struct pt_task *next;
+ pt_task_fn func;
+ void *udata;
+ char is_active;
+ int maxfd;
+} pt_task_t;
+
+int sched_create(pt_task_fn fn, void *udata);
+int sched_remove(pt_task_t *task);
+int sched_main(void);
+
+int sched_has_data(int *in_fds);
+
+#endif // UDPHOLE_SCHEDULER_H
diff --git a/src/domain/daemon/session.c b/src/domain/daemon/session.c
@@ -13,8 +13,7 @@
#include <arpa/inet.h>
#include "rxi/log.h"
-#include "domain/protothreads.h"
-#include "domain/scheduler.h"
+#include "common/scheduler.h"
#include "domain/config.h"
#include "common/socket_util.h"
#include "common/resp.h"
@@ -54,7 +53,6 @@ typedef struct session {
int marked_for_deletion;
int *ready_fds;
int *all_fds;
- struct pt pt;
struct pt_task *task;
} session_t;
@@ -376,111 +374,102 @@ static socket_t *find_socket_by_fd(session_t *s, int fd) {
return NULL;
}
-PT_THREAD(session_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) {
+int session_pt(int64_t timestamp, struct pt_task *task) {
+ (void)timestamp;
session_t *s = task->udata;
- PT_BEGIN(pt);
+ if (s->marked_for_deletion) {
+ goto cleanup;
+ }
- char buffer[BUFFER_SIZE];
+ if (!s->sockets || s->sockets_count == 0) {
+ return SCHED_RUNNING;
+ }
- for (;;) {
- if (s->marked_for_deletion) {
- break;
- }
+ s->all_fds = realloc(s->all_fds, sizeof(int) * (s->sockets_count * 2 + 1));
+ if (!s->all_fds) {
+ return SCHED_RUNNING;
+ }
+ s->all_fds[0] = 0;
- if (!s->sockets || s->sockets_count == 0) {
- PT_YIELD(pt);
- continue;
+ for (size_t j = 0; j < s->sockets_count; j++) {
+ socket_t *sock = s->sockets[j];
+ if (!sock || !sock->fds) continue;
+ for (int i = 1; i <= sock->fds[0]; i++) {
+ s->all_fds[++s->all_fds[0]] = sock->fds[i];
}
+ }
- s->all_fds = realloc(s->all_fds, sizeof(int) * (s->sockets_count * 2 + 1));
- if (!s->all_fds) {
- PT_YIELD(pt);
- continue;
- }
- s->all_fds[0] = 0;
+ if (s->all_fds[0] == 0) {
+ return SCHED_RUNNING;
+ }
- for (size_t j = 0; j < s->sockets_count; j++) {
- socket_t *sock = s->sockets[j];
- if (!sock || !sock->fds) continue;
- for (int i = 1; i <= sock->fds[0]; i++) {
- s->all_fds[++s->all_fds[0]] = sock->fds[i];
- }
- }
+ int ready_fd = sched_has_data(s->all_fds);
+ if (ready_fd < 0) {
+ return SCHED_RUNNING;
+ }
- if (s->all_fds[0] == 0) {
- PT_YIELD(pt);
- continue;
- }
+ char buffer[BUFFER_SIZE];
+ socket_t *src_sock = find_socket_by_fd(s, ready_fd);
+ if (!src_sock) {
+ return SCHED_RUNNING;
+ }
- PT_WAIT_UNTIL(pt, domain_schedmod_has_data(s->all_fds, &s->ready_fds) > 0);
+ struct sockaddr_storage from_addr;
+ socklen_t from_len = sizeof(from_addr);
+ ssize_t n = recvfrom(ready_fd, buffer, sizeof(buffer) - 1, 0,
+ (struct sockaddr *)&from_addr, &from_len);
- if (!s->ready_fds || s->ready_fds[0] == 0) {
- PT_YIELD(pt);
- continue;
+ if (n <= 0) {
+ if (errno != EAGAIN && errno != EWOULDBLOCK) {
+ log_warn("udphole: recvfrom error on socket %s: %s",
+ src_sock->socket_id, strerror(errno));
}
+ return SCHED_RUNNING;
+ }
- for (int r = 1; r <= s->ready_fds[0]; r++) {
- int ready_fd = s->ready_fds[r];
+ s->last_activity = time(NULL);
- socket_t *src_sock = find_socket_by_fd(s, ready_fd);
- if (!src_sock) continue;
+ if (src_sock->mode == 0 && !src_sock->learned_valid) {
+ src_sock->learned_addr = from_addr;
+ src_sock->learned_addrlen = from_len;
+ src_sock->learned_valid = 1;
+ log_debug("udphole: socket %s learned remote address", src_sock->socket_id);
+ }
- struct sockaddr_storage from_addr;
- socklen_t from_len = sizeof(from_addr);
- ssize_t n = recvfrom(ready_fd, buffer, sizeof(buffer) - 1, 0,
- (struct sockaddr *)&from_addr, &from_len);
+ for (size_t i = 0; i < s->forwards_count; i++) {
+ if (strcmp(s->forwards[i].src_socket_id, src_sock->socket_id) != 0) {
+ continue;
+ }
- if (n <= 0) {
- if (errno != EAGAIN && errno != EWOULDBLOCK) {
- log_warn("udphole: recvfrom error on socket %s: %s",
- src_sock->socket_id, strerror(errno));
- }
- continue;
- }
+ socket_t *dst_sock = find_socket(s, s->forwards[i].dst_socket_id);
+ if (!dst_sock || !dst_sock->fds || dst_sock->fds[0] == 0) continue;
- s->last_activity = time(NULL);
+ struct sockaddr *dest_addr = NULL;
+ socklen_t dest_addrlen = 0;
- if (src_sock->mode == 0 && !src_sock->learned_valid) {
- src_sock->learned_addr = from_addr;
- src_sock->learned_addrlen = from_len;
- src_sock->learned_valid = 1;
- log_debug("udphole: socket %s learned remote address", src_sock->socket_id);
- }
+ if (dst_sock->mode == 1) {
+ dest_addr = (struct sockaddr *)&dst_sock->remote_addr;
+ dest_addrlen = dst_sock->remote_addrlen;
+ } else if (dst_sock->learned_valid) {
+ dest_addr = (struct sockaddr *)&dst_sock->learned_addr;
+ dest_addrlen = dst_sock->learned_addrlen;
+ }
- for (size_t i = 0; i < s->forwards_count; i++) {
- if (strcmp(s->forwards[i].src_socket_id, src_sock->socket_id) != 0) {
- continue;
- }
-
- socket_t *dst_sock = find_socket(s, s->forwards[i].dst_socket_id);
- if (!dst_sock || !dst_sock->fds || dst_sock->fds[0] == 0) continue;
-
- struct sockaddr *dest_addr = NULL;
- socklen_t dest_addrlen = 0;
-
- if (dst_sock->mode == 1) {
- dest_addr = (struct sockaddr *)&dst_sock->remote_addr;
- dest_addrlen = dst_sock->remote_addrlen;
- } else if (dst_sock->learned_valid) {
- dest_addr = (struct sockaddr *)&dst_sock->learned_addr;
- dest_addrlen = dst_sock->learned_addrlen;
- }
-
- if (dest_addr && dest_addrlen > 0) {
- int dst_fd = dst_sock->fds[1];
- ssize_t sent = sendto(dst_fd, buffer, n, 0, dest_addr, dest_addrlen);
- if (sent < 0) {
- log_warn("udphole: forward failed %s -> %s: %s",
- src_sock->socket_id, dst_sock->socket_id, strerror(errno));
- }
- }
+ if (dest_addr && dest_addrlen > 0) {
+ int dst_fd = dst_sock->fds[1];
+ ssize_t sent = sendto(dst_fd, buffer, n, 0, dest_addr, dest_addrlen);
+ if (sent < 0) {
+ log_warn("udphole: forward failed %s -> %s: %s",
+ src_sock->socket_id, dst_sock->socket_id, strerror(errno));
}
}
-
}
- log_debug("udphole: session %s protothread exiting", s->session_id);
+ return SCHED_RUNNING;
+
+cleanup:
+ log_debug("udphole: session %s exiting", s->session_id);
if (s->all_fds) {
free(s->all_fds);
@@ -491,11 +480,11 @@ PT_THREAD(session_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) {
s->ready_fds = NULL;
}
- PT_END(pt);
+ return SCHED_DONE;
}
static void spawn_session_pt(session_t *s) {
- s->task = (struct pt_task *)(intptr_t)domain_schedmod_pt_create(session_pt, s);
+ s->task = (struct pt_task *)(intptr_t)sched_create(session_pt, s);
}
resp_object *domain_session_create(const char *cmd, resp_object *args) {
@@ -978,29 +967,22 @@ resp_object *domain_session_count(const char *cmd, resp_object *args) {
return res;
}
-PT_THREAD(session_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) {
+int session_manager_pt(int64_t timestamp, struct pt_task *task) {
session_manager_udata_t *udata = task->udata;
- log_trace("session_manager: protothread entry");
- PT_BEGIN(pt);
-
- PT_WAIT_UNTIL(pt, domain_cfg);
-
- log_info("udphole: manager started with port range %d-%d", domain_cfg->port_low, domain_cfg->port_high);
- for (;;) {
- if (timestamp - udata->last_cleanup >= 1000) {
- cleanup_expired_sessions();
- udata->last_cleanup = timestamp;
- }
+ if (!domain_cfg) {
+ return SCHED_RUNNING;
+ }
- PT_YIELD(pt);
+ if (!udata->initialized) {
+ log_info("udphole: manager started with port range %d-%d", domain_cfg->port_low, domain_cfg->port_high);
+ udata->initialized = 1;
}
- for (size_t i = 0; i < sessions_count; i++) {
- if (sessions[i]) {
- sessions[i]->marked_for_deletion = 1;
- }
+ if (timestamp - udata->last_cleanup >= 1000) {
+ cleanup_expired_sessions();
+ udata->last_cleanup = timestamp;
}
- PT_END(pt);
+ return SCHED_RUNNING;
}
\ No newline at end of file
diff --git a/src/domain/daemon/session.h b/src/domain/daemon/session.h
@@ -3,14 +3,15 @@
#include <stdint.h>
-#include "domain/scheduler.h"
+#include "common/scheduler.h"
#include "common/resp.h"
typedef struct {
int64_t last_cleanup;
+ int initialized;
} session_manager_udata_t;
-PT_THREAD(session_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task));
+int session_manager_pt(int64_t timestamp, struct pt_task *task);
resp_object *domain_session_create(const char *cmd, resp_object *args);
resp_object *domain_session_list(const char *cmd, resp_object *args);
diff --git a/src/domain/protothreads.h b/src/domain/protothreads.h
@@ -1,99 +0,0 @@
-#ifndef UDPHOLE_PROTOTHREADS_H
-#define UDPHOLE_PROTOTHREADS_H
-
-#include <stddef.h>
-
-#define PT_CONCAT2(s1, s2) s1##s2
-#define PT_CONCAT(s1, s2) PT_CONCAT2(s1, s2)
-
-#define PT_RESUME(pt) \
- do { \
- if ((pt)->lc != NULL) \
- goto *(pt)->lc; \
- } while (0)
-
-#define PT_SET(pt) \
- do { \
- PT_CONCAT(PT_LABEL, __LINE__): \
- (pt)->lc = &&PT_CONCAT(PT_LABEL, __LINE__); \
- } while (0)
-
-struct pt {
- void *lc;
-};
-
-#define PT_WAITING 0
-#define PT_YIELDED 1
-#define PT_EXITED 2
-#define PT_ENDED 3
-
-#define PT_INIT(pt) ((pt)->lc = NULL)
-
-#define PT_THREAD(name_args) char name_args
-
-#define PT_BEGIN(pt) { char PT_YIELD_FLAG = 1; (void)PT_YIELD_FLAG; PT_RESUME((pt))
-
-#define PT_END(pt) PT_INIT(pt); PT_YIELD_FLAG = 0; return PT_ENDED; }
-
-#define PT_WAIT_UNTIL(pt, condition) \
- do { \
- PT_SET(pt); \
- if (!(condition)) \
- return PT_WAITING; \
- } while (0)
-
-#define PT_WAIT_WHILE(pt, cond) PT_WAIT_UNTIL((pt), !(cond))
-
-#define PT_WAIT_THREAD(pt, thread) PT_WAIT_WHILE((pt), PT_SCHEDULE(thread))
-
-#define PT_SPAWN(pt, child, thread) \
- do { \
- PT_INIT((child)); \
- PT_WAIT_THREAD((pt), (thread)); \
- } while (0)
-
-#define PT_RESTART(pt) \
- do { \
- PT_INIT(pt); \
- return PT_WAITING; \
- } while (0)
-
-#define PT_EXIT(pt) \
- do { \
- PT_INIT(pt); \
- return PT_EXITED; \
- } while (0)
-
-#define PT_SCHEDULE(f) ((f) < PT_EXITED)
-
-#define PT_YIELD(pt) \
- do { \
- PT_YIELD_FLAG = 0; \
- PT_SET(pt); \
- if (PT_YIELD_FLAG == 0) \
- return PT_YIELDED; \
- } while (0)
-
-#define PT_YIELD_UNTIL(pt, cond) \
- do { \
- PT_YIELD_FLAG = 0; \
- PT_SET(pt); \
- if ((PT_YIELD_FLAG == 0) || !(cond)) \
- return PT_YIELDED; \
- } while (0)
-
-struct pt_sem {
- unsigned int count;
-};
-
-#define PT_SEM_INIT(s, c) ((s)->count = (c))
-
-#define PT_SEM_WAIT(pt, s) \
- do { \
- PT_WAIT_UNTIL(pt, (s)->count > 0); \
- (s)->count--; \
- } while (0)
-
-#define PT_SEM_SIGNAL(pt, s) ((s)->count++)
-
-#endif // UDPHOLE_PROTOTHREADS_H
-\ No newline at end of file
diff --git a/src/domain/scheduler.c b/src/domain/scheduler.c
@@ -1,138 +0,0 @@
-#include <stdlib.h>
-#include <sys/select.h>
-#include <sys/time.h>
-
-#include "rxi/log.h"
-#include "domain/scheduler.h"
-
-#ifndef NULL
-#define NULL ((void*)0)
-#endif
-
-pt_task_t *pt_first = NULL;
-fd_set g_select_result;
-static fd_set g_want_fds;
-
-int domain_schedmod_pt_create(pt_task_fn fn, void *udata) {
- if (!fn) return 1;
-
- pt_task_t *node = calloc(1, sizeof(pt_task_t));
- node->next = pt_first;
- node->func = fn;
- node->udata = udata;
- PT_INIT(&node->pt);
- node->is_active = 1;
- pt_first = node;
-
- log_trace("scheduler: created task %p (func=%p, udata=%p), pt_first=%p", (void*)node, (void*)fn, udata, (void*)pt_first);
-
- return 0;
-}
-
-int domain_schedmod_pt_remove(pt_task_t *task) {
- if (!task) return 1;
-
- pt_task_t *curr = pt_first;
- pt_task_t *prev = NULL;
-
- while (curr) {
- if (curr == task) {
- if (prev) {
- prev->next = curr->next;
- } else {
- pt_first = curr->next;
- }
- free(curr);
- return 0;
- }
- prev = curr;
- curr = curr->next;
- }
-
- return 1;
-}
-
-int domain_schedmod_has_data(int *in_fds, int **out_fds) {
- if (!in_fds || in_fds[0] == 0) return 0;
- log_trace("domain_schedmod_has_data: in_fds[0]=%d", in_fds[0]);
-
- for (int i = 1; i <= in_fds[0]; i++) {
- int fd = in_fds[i];
- if (fd >= 0) {
- FD_SET(fd, &g_want_fds);
- }
- }
-
- if (*out_fds) free(*out_fds);
- *out_fds = NULL;
-
- int count = 0;
- for (int i = 1; i <= in_fds[0]; i++) {
- if (in_fds[i] >= 0 && FD_ISSET(in_fds[i], &g_select_result)) {
- count++;
- }
- }
-
- if (count == 0) return 0;
-
- *out_fds = malloc(sizeof(int) * (count + 1));
- if (!*out_fds) return 0;
-
- (*out_fds)[0] = count;
- int idx = 1;
- for (int i = 1; i <= in_fds[0]; i++) {
- if (in_fds[i] >= 0 && FD_ISSET(in_fds[i], &g_select_result)) {
- (*out_fds)[idx++] = in_fds[i];
- FD_CLR(in_fds[i], &g_select_result);
- }
- }
-
- return count;
-}
-
-int domain_schedmod_main(void) {
- if (!pt_first) return 0;
-
- struct timeval tv;
- int maxfd = -1;
-
- for(;;) {
- maxfd = -1;
- for (int fd = 0; fd < FD_SETSIZE; fd++) {
- if (FD_ISSET(fd, &g_want_fds)) {
- if (fd > maxfd) maxfd = fd;
- }
- }
-
-
- tv.tv_sec = 0;
- tv.tv_usec = 100000;
- select(maxfd + 1, &g_want_fds, NULL, NULL, &tv);
- log_trace("scheduler: select returned");
-
- struct timeval now;
- gettimeofday(&now, NULL);
- int64_t timestamp = (int64_t)now.tv_sec * 1000 + now.tv_usec / 1000;
- g_select_result = g_want_fds;
-
- FD_ZERO(&g_want_fds);
-
- pt_task_t *task = pt_first;
- while (task) {
- pt_task_t *next = task->next;
- log_trace("scheduler: about to run task %p (func=%p, is_active=%d)", (void*)task, (void*)task->func, task->is_active);
- task->is_active = PT_SCHEDULE(task->func(&task->pt, timestamp, task));
- log_trace("scheduler: task %p (func=%p) returned, is_active=%d, next=%p", (void*)task, (void*)task->func, task->is_active, (void*)next);
- if (!task->is_active) {
- log_trace("scheduler: removing inactive task %p", (void*)task);
- domain_schedmod_pt_remove(task);
- }
- task = next;
- }
- log_trace("scheduler: loop done, pt_first=%p", (void*)pt_first);
-
- if (!pt_first) break;
- }
-
- return 0;
-}
-\ No newline at end of file
diff --git a/src/domain/scheduler.h b/src/domain/scheduler.h
@@ -1,30 +0,0 @@
-#ifndef UDPHOLE_SCHEDULER_H
-#define UDPHOLE_SCHEDULER_H
-
-#include "domain/protothreads.h"
-
-#include <stdint.h>
-#include <sys/select.h>
-
-struct pt_task;
-
-typedef char (*pt_task_fn)(struct pt *pt, int64_t timestamp, struct pt_task *task);
-
-typedef struct pt_task {
- struct pt pt;
- struct pt_task *next;
- pt_task_fn func;
- void *udata;
- char is_active;
- int maxfd;
-} pt_task_t;
-
-int domain_schedmod_pt_create(pt_task_fn fn, void *udata);
-int domain_schedmod_pt_remove(pt_task_t *task);
-int domain_schedmod_main(void);
-
-extern fd_set g_select_result;
-
-int domain_schedmod_has_data(int *in_fds, int **out_fds);
-
-#endif // UDPHOLE_SCHEDULER_H
-\ No newline at end of file
diff --git a/src/interface/api/server.c b/src/interface/api/server.c
@@ -21,15 +21,13 @@
#include "rxi/log.h"
#include "tidwall/hashmap.h"
-#include "domain/protothreads.h"
-#include "domain/scheduler.h"
+#include "common/scheduler.h"
#include "common/socket_util.h"
#include "infrastructure/config.h"
#include "interface/api/server.h"
#include "common/resp.h"
-struct pt_task;
-PT_THREAD(api_client_pt(struct pt *pt, int64_t timestamp, struct pt_task *task));
+int api_client_pt(int64_t timestamp, struct pt_task *task);
#define API_MAX_CLIENTS 8
#define READ_BUF_SIZE 4096
@@ -468,40 +466,33 @@ static void handle_accept(int ready_fd) {
}
state->fd = fd;
- domain_schedmod_pt_create(api_client_pt, state);
+ sched_create(api_client_pt, state);
log_trace("api: accepted connection, spawned client pt");
}
-PT_THREAD(api_server_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) {
+int api_server_pt(int64_t timestamp, struct pt_task *task) {
+ (void)timestamp;
api_server_udata_t *udata = task->udata;
- log_trace("api_server: protothread entry");
- PT_BEGIN(pt);
if (!udata) {
udata = calloc(1, sizeof(api_server_udata_t));
if (!udata) {
- PT_EXIT(pt);
+ return SCHED_ERROR;
}
task->udata = udata;
}
- resp_object *api_sec = resp_map_get(global_cfg, "udphole");
- const char *listen_str = api_sec ? resp_map_get_string(api_sec, "listen") : NULL;
-
- log_info("api: initial listen config check: %s", listen_str ? listen_str : "(null)");
+ if (udata->server_fds == NULL) {
+ resp_object *api_sec = resp_map_get(global_cfg, "udphole");
+ const char *listen_str = api_sec ? resp_map_get_string(api_sec, "listen") : NULL;
- if (!listen_str || !listen_str[0]) {
- log_info("api: no listen address configured, waiting...");
- PT_WAIT_UNTIL(pt, current_listen || (api_sec = resp_map_get(global_cfg, "udphole"),
- (listen_str = api_sec ? resp_map_get_string(api_sec, "listen") : NULL),
- listen_str && listen_str[0]));
- log_info("api: after wait, listen config: %s", listen_str ? listen_str : "(null)");
- }
+ if (!listen_str || !listen_str[0]) {
+ return SCHED_RUNNING;
+ }
- if (!current_listen && listen_str && listen_str[0]) {
current_listen = strdup(listen_str);
if (!current_listen) {
- PT_EXIT(pt);
+ return SCHED_ERROR;
}
init_builtins();
udata->server_fds = create_listen_socket(current_listen);
@@ -509,124 +500,94 @@ PT_THREAD(api_server_pt(struct pt *pt, int64_t timestamp, struct pt_task *task))
log_fatal("api: failed to listen on %s", current_listen);
free(current_listen);
current_listen = NULL;
- exit(1);
+ return SCHED_ERROR;
}
}
- for (;;) {
- if (udata->server_fds && udata->server_fds[0] > 0) {
- PT_WAIT_UNTIL(pt, domain_schedmod_has_data(udata->server_fds, &udata->ready_fds) > 0);
- if (udata->ready_fds && udata->ready_fds[0] > 0) {
- for (int i = 1; i <= udata->ready_fds[0]; i++) {
- handle_accept(udata->ready_fds[i]);
- }
- }
- if (udata->ready_fds) free(udata->ready_fds);
- udata->ready_fds = NULL;
- } else {
- PT_YIELD(pt);
- }
- }
- if (udata->server_fds) {
- for (int i = 1; i <= udata->server_fds[0]; i++) {
- close(udata->server_fds[i]);
+ if (udata->server_fds && udata->server_fds[0] > 0) {
+ int ready_fd = sched_has_data(udata->server_fds);
+ if (ready_fd >= 0) {
+ handle_accept(ready_fd);
}
- free(udata->server_fds);
- }
- free(udata->ready_fds);
- free(udata);
- free(current_listen);
- current_listen = NULL;
- if (cmd_map) {
- hashmap_free(cmd_map);
- cmd_map = NULL;
- }
- if (domain_cmd_map) {
- hashmap_free(domain_cmd_map);
- domain_cmd_map = NULL;
}
- PT_END(pt);
+ return SCHED_RUNNING;
}
-PT_THREAD(api_client_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) {
+int api_client_pt(int64_t timestamp, struct pt_task *task) {
+ (void)timestamp;
api_client_t *state = task->udata;
- log_trace("api_client: protothread entry fd=%d", state->fd);
- PT_BEGIN(pt);
-
- state->fds = malloc(sizeof(int) * 2);
if (!state->fds) {
- free(state);
- PT_EXIT(pt);
- }
- state->fds[0] = 1;
- state->fds[1] = state->fd;
-
- for (;;) {
- state->ready_fds = NULL;
- PT_WAIT_UNTIL(pt, domain_schedmod_has_data(state->fds, &state->ready_fds) > 0);
-
- state->ready_fd = -1;
- if (state->ready_fds && state->ready_fds[0] > 0) {
- for (int i = 1; i <= state->ready_fds[0]; i++) {
- if (state->ready_fds[i] == state->fd) {
- state->ready_fd = state->fd;
- break;
- }
- }
+ state->fds = malloc(sizeof(int) * 2);
+ if (!state->fds) {
+ free(state);
+ return SCHED_DONE;
}
- free(state->ready_fds);
- state->ready_fds = NULL;
+ state->fds[0] = 1;
+ state->fds[1] = state->fd;
+ }
- char buf[1];
- ssize_t n = recv(state->fd, buf, 1, MSG_PEEK);
- if (n <= 0) {
- break;
- }
+ int ready_fd = sched_has_data(state->fds);
+ if (ready_fd < 0) {
+ return SCHED_RUNNING;
+ }
- resp_object *cmd = resp_read(state->fd);
- if (!cmd) {
- if (errno == EAGAIN || errno == EWOULDBLOCK) {
- PT_YIELD(pt);
- continue;
- }
- break;
- }
+ if (ready_fd != state->fd) {
+ return SCHED_RUNNING;
+ }
- if (cmd->type != RESPT_ARRAY || cmd->u.arr.n == 0) {
- resp_free(cmd);
- api_write_err(state, "Protocol error");
- client_flush(state);
- continue;
- }
+ char buf[1];
+ ssize_t n = recv(state->fd, buf, 1, MSG_PEEK);
+ if (n <= 0) {
+ goto cleanup;
+ }
- char *args[MAX_ARGS];
- int nargs = 0;
- for (size_t i = 0; i < cmd->u.arr.n && nargs < MAX_ARGS; i++) {
- resp_object *elem = &cmd->u.arr.elem[i];
- if (elem->type == RESPT_BULK && elem->u.s) {
- args[nargs++] = elem->u.s;
- elem->u.s = NULL;
- } else if (elem->type == RESPT_SIMPLE) {
- args[nargs++] = elem->u.s ? elem->u.s : "";
- }
+ resp_object *cmd = resp_read(state->fd);
+ if (!cmd) {
+ if (errno == EAGAIN || errno == EWOULDBLOCK) {
+ return SCHED_RUNNING;
}
+ goto cleanup;
+ }
- if (nargs > 0) {
- dispatch_command(state, args, nargs);
- }
+ if (cmd->type != RESPT_ARRAY || cmd->u.arr.n == 0) {
+ resp_free(cmd);
+ api_write_err(state, "Protocol error");
+ client_flush(state);
+ return SCHED_RUNNING;
+ }
- for (int j = 0; j < nargs; j++) {
- free(args[j]);
+ char *args[MAX_ARGS];
+ int nargs = 0;
+ for (size_t i = 0; i < cmd->u.arr.n && nargs < MAX_ARGS; i++) {
+ resp_object *elem = &cmd->u.arr.elem[i];
+ if (elem->type == RESPT_BULK && elem->u.s) {
+ args[nargs++] = elem->u.s;
+ elem->u.s = NULL;
+ } else if (elem->type == RESPT_SIMPLE) {
+ args[nargs++] = elem->u.s ? elem->u.s : "";
}
- resp_free(cmd);
+ }
- client_flush(state);
+ if (nargs > 0) {
+ dispatch_command(state, args, nargs);
+ }
- if (state->fd < 0) break;
+ for (int j = 0; j < nargs; j++) {
+ free(args[j]);
}
+ resp_free(cmd);
+ client_flush(state);
+
+ if (state->fd < 0) {
+ goto cleanup;
+ }
+
+ return SCHED_RUNNING;
+
+cleanup:
if (state->fd >= 0) {
close(state->fd);
}
@@ -634,6 +595,5 @@ PT_THREAD(api_client_pt(struct pt *pt, int64_t timestamp, struct pt_task *task))
free(state->wbuf);
free(state->username);
free(state);
-
- PT_END(pt);
+ return SCHED_DONE;
}
\ No newline at end of file
diff --git a/src/interface/api/server.h b/src/interface/api/server.h
@@ -4,7 +4,7 @@
#include <stdint.h>
#include <stdbool.h>
-#include "domain/scheduler.h"
+#include "common/scheduler.h"
#include "common/resp.h"
struct api_client_state;
@@ -12,7 +12,8 @@ typedef struct api_client_state api_client_t;
typedef resp_object *(*domain_cmd_fn)(const char *cmd, resp_object *args);
-PT_THREAD(api_server_pt(struct pt *pt, int64_t timestamp, struct pt_task *task));
+int api_server_pt(int64_t timestamp, struct pt_task *task);
+int api_client_pt(int64_t timestamp, struct pt_task *task);
void api_register_cmd(const char *name, char (*func)(api_client_t *, char **, int));
diff --git a/src/interface/cli/command/daemon.c b/src/interface/cli/command/daemon.c
@@ -11,7 +11,7 @@
#include "infrastructure/config.h"
#include "common/resp.h"
#include "../common.h"
-#include "domain/scheduler.h"
+#include "common/scheduler.h"
#include "domain/config.h"
#include "daemon.h"
#include "interface/api/server.h"
@@ -109,10 +109,10 @@ int cli_cmd_daemon(int argc, const char **argv) {
session_manager_udata_t *session_udata = calloc(1, sizeof(session_manager_udata_t));
- domain_schedmod_pt_create(api_server_pt, NULL);
- domain_schedmod_pt_create(session_manager_pt, session_udata);
+ sched_create(api_server_pt, NULL);
+ sched_create(session_manager_pt, session_udata);
log_info("udphole: daemon started");
- return domain_schedmod_main();
+ return sched_main();
}
\ No newline at end of file
diff --git a/test/test_scheduler.c b/test/test_scheduler.c
@@ -0,0 +1,47 @@
+#include <stdio.h>
+#include <stdlib.h>
+#include <sys/time.h>
+
+#include "common/scheduler.h"
+
+typedef struct {
+ int id;
+ int count;
+ int64_t last_run;
+} task_data_t;
+
+int countdown_pt(int64_t timestamp, struct pt_task *task) {
+ task_data_t *data = task->udata;
+
+ if (timestamp - data->last_run < 100) {
+ return SCHED_RUNNING;
+ }
+ data->last_run = timestamp;
+
+ printf("Task %d: %d\n", data->id, data->count);
+ data->count--;
+
+ if (data->count < 0) {
+ printf("Task %d: DONE\n", data->id);
+ return SCHED_DONE;
+ }
+
+ return SCHED_RUNNING;
+}
+
+int main(void) {
+ task_data_t tasks[3] = {
+ { .id = 1, .count = 3, .last_run = 0 },
+ { .id = 2, .count = 3, .last_run = 0 },
+ { .id = 3, .count = 3, .last_run = 0 },
+ };
+
+ for (int i = 0; i < 3; i++) {
+ sched_create(countdown_pt, &tasks[i]);
+ }
+
+ printf("Starting countdown test with 3 parallel tasks...\n");
+ sched_main();
+ printf("Test complete!\n");
+ return 0;
+}