udphole

Basic UDP wormhole proxy
git clone git://git.finwo.net/app/udphole
Log | Files | Refs | README | LICENSE

session.c (28160B)


      1 #include <stdlib.h>
      2 #include <string.h>
      3 #include <stdio.h>
      4 #include <stdarg.h>
      5 #include <unistd.h>
      6 #include <errno.h>
      7 #include <fcntl.h>
      8 #include <time.h>
      9 #include <sys/socket.h>
     10 #include <sys/un.h>
     11 #include <sys/stat.h>
     12 #include <netinet/in.h>
     13 #include <arpa/inet.h>
     14 
     15 #include "rxi/log.h"
     16 #include "domain/protothreads.h"
     17 #include "domain/scheduler.h"
     18 #include "domain/config.h"
     19 #include "common/socket_util.h"
     20 #include "common/resp.h"
     21 #include "tidwall/hashmap.h"
     22 #include "session.h"
     23 
     24 #define SESSION_HASH_SIZE 256
     25 #define BUFFER_SIZE 4096
     26 #define DEFAULT_IDLE_EXPIRY 60
     27 
     28 typedef struct socket {
     29   char *socket_id;
     30   int *fds;
     31   int local_port;
     32   int mode;
     33   struct sockaddr_storage remote_addr;
     34   socklen_t remote_addrlen;
     35   int learned_valid;
     36   struct sockaddr_storage learned_addr;
     37   socklen_t learned_addrlen;
     38 } socket_t;
     39 
     40 typedef struct forward {
     41   char *src_socket_id;
     42   char *dst_socket_id;
     43 } forward_t;
     44 
     45 typedef struct session {
     46   char *session_id;
     47   time_t idle_expiry;
     48   time_t created;
     49   time_t last_activity;
     50   socket_t **sockets;
     51   size_t sockets_count;
     52   forward_t *forwards;
     53   size_t forwards_count;
     54   int marked_for_deletion;
     55   int *ready_fds;
     56   int *all_fds;
     57   struct pt pt;
     58   struct pt_task *task;
     59 } session_t;
     60 
     61 static session_t **sessions = NULL;
     62 static size_t sessions_count = 0;
     63 static int running = 0;
     64 
     65 static session_t *find_session(const char *session_id) {
     66   for (size_t i = 0; i < sessions_count; i++) {
     67     if (strcmp(sessions[i]->session_id, session_id) == 0) {
     68       return sessions[i];
     69     }
     70   }
     71   return NULL;
     72 }
     73 
     74 static uint64_t socket_hash(const void *item, uint64_t seed0, uint64_t seed1) {
     75   const socket_t *s = item;
     76   return hashmap_sip(s->socket_id, strlen(s->socket_id), seed0, seed1);
     77 }
     78 
     79 static int socket_compare(const void *a, const void *b, void *udata) {
     80   (void)udata;
     81   const socket_t *sa = a;
     82   const socket_t *sb = b;
     83   return strcmp(sa->socket_id, sb->socket_id);
     84 }
     85 
     86 static socket_t *find_socket(session_t *s, const char *socket_id) {
     87   if (!s || !s->sockets || !socket_id) return NULL;
     88   for (size_t i = 0; i < s->sockets_count; i++) {
     89     if (s->sockets[i] && strcmp(s->sockets[i]->socket_id, socket_id) == 0) {
     90       return s->sockets[i];
     91     }
     92   }
     93   return NULL;
     94 }
     95 
     96 static int alloc_port(void) {
     97   if (!domain_cfg) return 0;
     98   for (int i = 0; i < domain_cfg->port_high - domain_cfg->port_low; i++) {
     99     int port = domain_cfg->port_cur + i;
    100     if (port > domain_cfg->port_high) port = domain_cfg->port_low;
    101     domain_cfg->port_cur = port + 1;
    102     if (domain_cfg->port_cur > domain_cfg->port_high) domain_cfg->port_cur = domain_cfg->port_low;
    103 
    104     struct sockaddr_in addr;
    105     memset(&addr, 0, sizeof(addr));
    106     addr.sin_family = AF_INET;
    107     addr.sin_addr.s_addr = INADDR_ANY;
    108     addr.sin_port = htons(port);
    109 
    110     int udp_fd = socket(AF_INET, SOCK_DGRAM, 0);
    111     if (udp_fd < 0) continue;
    112     int ok = (bind(udp_fd, (struct sockaddr *)&addr, sizeof(addr)) == 0);
    113     close(udp_fd);
    114     if (!ok) continue;
    115 
    116     return port;
    117   }
    118   return 0;
    119 }
    120 
    121 static int parse_ip_addr(const char *ip_str, int port, struct sockaddr_storage *addr, socklen_t *addrlen) {
    122   memset(addr, 0, sizeof(*addr));
    123 
    124   struct sockaddr_in *addr4 = (struct sockaddr_in *)addr;
    125   if (inet_pton(AF_INET, ip_str, &addr4->sin_addr) == 1) {
    126     addr4->sin_family = AF_INET;
    127     addr4->sin_port = htons(port);
    128     *addrlen = sizeof(*addr4);
    129     return 0;
    130   }
    131 
    132   struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)addr;
    133   if (inet_pton(AF_INET6, ip_str, &addr6->sin6_addr) == 1) {
    134     addr6->sin6_family = AF_INET6;
    135     addr6->sin6_port = htons(port);
    136     *addrlen = sizeof(*addr6);
    137     return 0;
    138   }
    139 
    140   return -1;
    141 }
    142 
    143 static void close_socket(socket_t *sock) {
    144   if (!sock || !sock->fds) return;
    145   for (int i = 1; i <= sock->fds[0]; i++) {
    146     if (sock->fds[i] >= 0) {
    147       close(sock->fds[i]);
    148     }
    149   }
    150   free(sock->fds);
    151   sock->fds = NULL;
    152 }
    153 
    154 static void free_socket(socket_t *sock) {
    155   if (!sock) return;
    156   close_socket(sock);
    157   free(sock->socket_id);
    158   free(sock);
    159 }
    160 
    161 static void destroy_session(session_t *s) {
    162   if (!s) return;
    163   s->marked_for_deletion = 1;
    164 
    165   for (size_t i = 0; i < s->sockets_count; i++) {
    166     if (s->sockets[i]) {
    167       free_socket(s->sockets[i]);
    168     }
    169   }
    170   free(s->sockets);
    171 
    172   for (size_t i = 0; i < s->forwards_count; i++) {
    173     free(s->forwards[i].src_socket_id);
    174     free(s->forwards[i].dst_socket_id);
    175   }
    176   free(s->forwards);
    177 
    178   free(s->session_id);
    179   free(s);
    180 
    181   for (size_t i = 0; i < sessions_count; i++) {
    182     if (sessions[i] == s) {
    183       for (size_t j = i; j < sessions_count - 1; j++) {
    184         sessions[j] = sessions[j + 1];
    185       }
    186       sessions_count--;
    187       break;
    188     }
    189   }
    190 }
    191 
    192 static session_t *create_session(const char *session_id, int idle_expiry) {
    193   const session_t *cs = find_session(session_id);
    194   if (cs) return (session_t *)cs;
    195 
    196   session_t *s = calloc(1, sizeof(*s));
    197   if (!s) return NULL;
    198 
    199   s->session_id = strdup(session_id);
    200   s->created = time(NULL);
    201   s->last_activity = s->created;
    202   s->idle_expiry = idle_expiry > 0 ? idle_expiry : DEFAULT_IDLE_EXPIRY;
    203 
    204   sessions = realloc(sessions, sizeof(session_t *) * (sessions_count + 1));
    205   sessions[sessions_count++] = s;
    206 
    207   return s;
    208 }
    209 
    210 static void cleanup_expired_sessions(void) {
    211   if (!sessions) return;
    212   time_t now = time(NULL);
    213 
    214   for (size_t i = 0; i < sessions_count; i++) {
    215     session_t *s = sessions[i];
    216     if (!s) continue;
    217     if (now - s->last_activity > s->idle_expiry) {
    218       log_debug("udphole: session %s expired (idle %ld > expiry %ld)",
    219                 s->session_id, (long)(now - s->last_activity), (long)s->idle_expiry);
    220       destroy_session(s);
    221     }
    222   }
    223 }
    224 
    225 static int add_forward(session_t *s, const char *src_id, const char *dst_id) {
    226   for (size_t i = 0; i < s->forwards_count; i++) {
    227     if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 &&
    228         strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) {
    229       return 0;
    230     }
    231   }
    232 
    233   forward_t *new_forwards = realloc(s->forwards, sizeof(forward_t) * (s->forwards_count + 1));
    234   if (!new_forwards) return -1;
    235   s->forwards = new_forwards;
    236 
    237   s->forwards[s->forwards_count].src_socket_id = strdup(src_id);
    238   s->forwards[s->forwards_count].dst_socket_id = strdup(dst_id);
    239   s->forwards_count++;
    240 
    241   return 0;
    242 }
    243 
    244 static int remove_forward(session_t *s, const char *src_id, const char *dst_id) {
    245   for (size_t i = 0; i < s->forwards_count; i++) {
    246     if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 &&
    247         strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) {
    248       free(s->forwards[i].src_socket_id);
    249       free(s->forwards[i].dst_socket_id);
    250       for (size_t j = i; j < s->forwards_count - 1; j++) {
    251         s->forwards[j] = s->forwards[j + 1];
    252       }
    253       s->forwards_count--;
    254       return 0;
    255     }
    256   }
    257   return -1;
    258 }
    259 
    260 static socket_t *create_listen_socket(session_t *sess, const char *socket_id) {
    261   socket_t *existing = find_socket(sess, socket_id);
    262   if (existing) return existing;
    263 
    264   int port = alloc_port();
    265   if (!port) {
    266     log_error("udphole: no ports available");
    267     return NULL;
    268   }
    269 
    270   char port_str[16];
    271   snprintf(port_str, sizeof(port_str), "%d", port);
    272   int *fds = udp_recv(port_str, NULL, NULL);
    273   if (!fds || fds[0] == 0) {
    274     log_error("udphole: failed to create UDP socket on port %d", port);
    275     free(fds);
    276     return NULL;
    277   }
    278 
    279   socket_t *sock = calloc(1, sizeof(*sock));
    280   if (!sock) {
    281     free(fds);
    282     return NULL;
    283   }
    284 
    285   sock->socket_id = strdup(socket_id);
    286   sock->fds = fds;
    287   sock->local_port = port;
    288   sock->mode = 0;
    289   sock->learned_valid = 0;
    290 
    291   sess->sockets = realloc(sess->sockets, sizeof(socket_t *) * (sess->sockets_count + 1));
    292   sess->sockets[sess->sockets_count++] = sock;
    293 
    294   log_debug("udphole: created listen socket %s in session %s on port %d",
    295             socket_id, sess->session_id, port);
    296   return sock;
    297 }
    298 
    299 static socket_t *create_connect_socket(session_t *sess, const char *socket_id,
    300                                        const char *ip, int port) {
    301   socket_t *existing = find_socket(sess, socket_id);
    302   if (existing) return existing;
    303 
    304   int local_port = alloc_port();
    305   if (!local_port) {
    306     log_error("udphole: no ports available");
    307     return NULL;
    308   }
    309 
    310   char port_str[16];
    311   snprintf(port_str, sizeof(port_str), "%d", local_port);
    312   int *fds = udp_recv(port_str, NULL, NULL);
    313   if (!fds || fds[0] == 0) {
    314     log_error("udphole: failed to create UDP socket on port %d", local_port);
    315     free(fds);
    316     return NULL;
    317   }
    318 
    319   struct sockaddr_storage remote_addr;
    320   socklen_t remote_addrlen;
    321   if (parse_ip_addr(ip, port, &remote_addr, &remote_addrlen) != 0) {
    322     log_error("udphole: invalid remote address %s:%d", ip, port);
    323     free(fds);
    324     return NULL;
    325   }
    326 
    327   socket_t *sock = calloc(1, sizeof(*sock));
    328   if (!sock) {
    329     free(fds);
    330     return NULL;
    331   }
    332 
    333   sock->socket_id = strdup(socket_id);
    334   sock->fds = fds;
    335   sock->local_port = local_port;
    336   sock->mode = 1;
    337   sock->remote_addr = remote_addr;
    338   sock->remote_addrlen = remote_addrlen;
    339   sock->learned_valid = 0;
    340 
    341   sess->sockets = realloc(sess->sockets, sizeof(socket_t *) * (sess->sockets_count + 1));
    342   sess->sockets[sess->sockets_count++] = sock;
    343 
    344   log_debug("udphole: created connect socket %s in session %s on port %d -> %s:%d",
    345             socket_id, sess->session_id, local_port, ip, port);
    346   return sock;
    347 }
    348 
    349 static int destroy_socket(session_t *sess, const char *socket_id) {
    350   socket_t *sock = find_socket(sess, socket_id);
    351   if (!sock) return -1;
    352 
    353   for (size_t i = 0; i < sess->sockets_count; i++) {
    354     if (sess->sockets[i] == sock) {
    355       sess->sockets[i] = NULL;
    356       break;
    357     }
    358   }
    359   free_socket(sock);
    360 
    361   for (size_t i = 0; i < sess->forwards_count; ) {
    362     if (strcmp(sess->forwards[i].src_socket_id, socket_id) == 0 ||
    363         strcmp(sess->forwards[i].dst_socket_id, socket_id) == 0) {
    364       free(sess->forwards[i].src_socket_id);
    365       free(sess->forwards[i].dst_socket_id);
    366       for (size_t j = i; j < sess->forwards_count - 1; j++) {
    367         sess->forwards[j] = sess->forwards[j + 1];
    368       }
    369       sess->forwards_count--;
    370     } else {
    371       i++;
    372     }
    373   }
    374 
    375   return 0;
    376 }
    377 
    378 static socket_t *find_socket_by_fd(session_t *s, int fd) {
    379   if (!s || !s->sockets) return NULL;
    380   for (size_t j = 0; j < s->sockets_count; j++) {
    381     socket_t *sock = s->sockets[j];
    382     if (!sock || !sock->fds) continue;
    383     for (int i = 1; i <= sock->fds[0]; i++) {
    384       if (sock->fds[i] == fd) {
    385         return sock;
    386       }
    387     }
    388   }
    389   return NULL;
    390 }
    391 
    392 PT_THREAD(session_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) {
    393   session_t *s = task->udata;
    394 
    395   (void)timestamp;
    396   PT_BEGIN(pt);
    397 
    398   char buffer[BUFFER_SIZE];
    399 
    400   for (;;) {
    401     if (s->marked_for_deletion) {
    402       break;
    403     }
    404 
    405     if (!s->sockets || s->sockets_count == 0) {
    406       PT_YIELD(pt);
    407       continue;
    408     }
    409 
    410     s->all_fds = realloc(s->all_fds, sizeof(int) * (s->sockets_count * 2 + 1));
    411     if (!s->all_fds) {
    412       PT_YIELD(pt);
    413       continue;
    414     }
    415     s->all_fds[0] = 0;
    416 
    417     for (size_t j = 0; j < s->sockets_count; j++) {
    418       socket_t *sock = s->sockets[j];
    419       if (!sock || !sock->fds) continue;
    420       for (int i = 1; i <= sock->fds[0]; i++) {
    421         s->all_fds[++s->all_fds[0]] = sock->fds[i];
    422       }
    423     }
    424 
    425     if (s->all_fds[0] == 0) {
    426       PT_YIELD(pt);
    427       continue;
    428     }
    429 
    430     PT_WAIT_UNTIL(pt, domain_schedmod_has_data(s->all_fds, &s->ready_fds) > 0);
    431 
    432     if (!s->ready_fds || s->ready_fds[0] == 0) {
    433       PT_YIELD(pt);
    434       continue;
    435     }
    436 
    437     for (int r = 1; r <= s->ready_fds[0]; r++) {
    438       int ready_fd = s->ready_fds[r];
    439 
    440       socket_t *src_sock = find_socket_by_fd(s, ready_fd);
    441       if (!src_sock) continue;
    442 
    443       struct sockaddr_storage from_addr;
    444       socklen_t from_len = sizeof(from_addr);
    445       ssize_t n = recvfrom(ready_fd, buffer, sizeof(buffer) - 1, 0,
    446                            (struct sockaddr *)&from_addr, &from_len);
    447 
    448       if (n <= 0) {
    449         if (errno != EAGAIN && errno != EWOULDBLOCK) {
    450           log_warn("udphole: recvfrom error on socket %s: %s",
    451                    src_sock->socket_id, strerror(errno));
    452         }
    453         continue;
    454       }
    455 
    456       s->last_activity = time(NULL);
    457 
    458       if (src_sock->mode == 0 && !src_sock->learned_valid) {
    459         src_sock->learned_addr = from_addr;
    460         src_sock->learned_addrlen = from_len;
    461         src_sock->learned_valid = 1;
    462         log_debug("udphole: socket %s learned remote address", src_sock->socket_id);
    463       }
    464 
    465       for (size_t i = 0; i < s->forwards_count; i++) {
    466         if (strcmp(s->forwards[i].src_socket_id, src_sock->socket_id) != 0) {
    467           continue;
    468         }
    469 
    470         socket_t *dst_sock = find_socket(s, s->forwards[i].dst_socket_id);
    471         if (!dst_sock || !dst_sock->fds || dst_sock->fds[0] == 0) continue;
    472 
    473         struct sockaddr *dest_addr = NULL;
    474         socklen_t dest_addrlen = 0;
    475 
    476         if (dst_sock->mode == 1) {
    477           dest_addr = (struct sockaddr *)&dst_sock->remote_addr;
    478           dest_addrlen = dst_sock->remote_addrlen;
    479         } else if (dst_sock->learned_valid) {
    480           dest_addr = (struct sockaddr *)&dst_sock->learned_addr;
    481           dest_addrlen = dst_sock->learned_addrlen;
    482         }
    483 
    484         if (dest_addr && dest_addrlen > 0) {
    485           int dst_fd = dst_sock->fds[1];
    486           ssize_t sent = sendto(dst_fd, buffer, n, 0, dest_addr, dest_addrlen);
    487           if (sent < 0) {
    488             log_warn("udphole: forward failed %s -> %s: %s",
    489                      src_sock->socket_id, dst_sock->socket_id, strerror(errno));
    490           }
    491         }
    492       }
    493     }
    494 
    495   }
    496 
    497   log_debug("udphole: session %s protothread exiting", s->session_id);
    498 
    499   if (s->all_fds) {
    500     free(s->all_fds);
    501     s->all_fds = NULL;
    502   }
    503   if (s->ready_fds) {
    504     free(s->ready_fds);
    505     s->ready_fds = NULL;
    506   }
    507 
    508   PT_END(pt);
    509 }
    510 
    511 static void spawn_session_pt(session_t *s) {
    512   s->task = (struct pt_task *)(intptr_t)domain_schedmod_pt_create(session_pt, s);
    513 }
    514 
    515 resp_object *domain_session_create(resp_object *args) {
    516   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) {
    517     resp_object *err = resp_array_init();
    518     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.create'");
    519     return err;
    520   }
    521 
    522   const char *session_id = NULL;
    523   if (args->u.arr.n > 1 && args->u.arr.elem[1].type == RESPT_BULK) {
    524     session_id = args->u.arr.elem[1].u.s;
    525   }
    526 
    527   if (!session_id) {
    528     resp_object *err = resp_array_init();
    529     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.create'");
    530     return err;
    531   }
    532 
    533   int idle_expiry = 0;
    534   if (args->u.arr.n >= 3 && args->u.arr.elem[2].type == RESPT_BULK && args->u.arr.elem[2].u.s) {
    535     idle_expiry = atoi(args->u.arr.elem[2].u.s);
    536   }
    537 
    538   session_t *s = create_session(session_id, idle_expiry);
    539   if (!s) {
    540     resp_object *err = resp_array_init();
    541     resp_array_append_simple(err, "ERR failed to create session");
    542     return err;
    543   }
    544 
    545   spawn_session_pt(s);
    546 
    547   resp_object *res = resp_array_init();
    548   resp_array_append_simple(res, "OK");
    549   return res;
    550 }
    551 
    552 resp_object *domain_session_list(resp_object *args) {
    553   (void)args;
    554 
    555   resp_object *res = resp_array_init();
    556   if (!res) return NULL;
    557 
    558   for (size_t i = 0; i < sessions_count; i++) {
    559     session_t *s = sessions[i];
    560     if (!s) continue;
    561     resp_array_append_bulk(res, s->session_id);
    562   }
    563 
    564   return res;
    565 }
    566 
    567 resp_object *domain_session_info(resp_object *args) {
    568   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) {
    569     resp_object *err = resp_array_init();
    570     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.info'");
    571     return err;
    572   }
    573 
    574   const char *session_id = NULL;
    575   if (args->u.arr.elem[1].type == RESPT_BULK) {
    576     session_id = args->u.arr.elem[1].u.s;
    577   }
    578 
    579   if (!session_id) {
    580     resp_object *err = resp_array_init();
    581     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.info'");
    582     return err;
    583   }
    584 
    585   session_t *s = find_session(session_id);
    586   if (!s) {
    587     resp_object *err = resp_array_init();
    588     resp_array_append_simple(err, "ERR session not found");
    589     return err;
    590   }
    591 
    592   resp_object *res = resp_array_init();
    593   if (!res) return NULL;
    594 
    595   resp_array_append_bulk(res, "session_id");
    596   resp_array_append_bulk(res, s->session_id);
    597 
    598   resp_array_append_bulk(res, "created");
    599   resp_array_append_int(res, (long long)s->created);
    600 
    601   resp_array_append_bulk(res, "last_activity");
    602   resp_array_append_int(res, (long long)s->last_activity);
    603 
    604   resp_array_append_bulk(res, "idle_expiry");
    605   resp_array_append_int(res, (long long)s->idle_expiry);
    606 
    607   resp_array_append_bulk(res, "sockets");
    608   resp_object *sockets_arr = resp_array_init();
    609   for (size_t i = 0; i < s->sockets_count; i++) {
    610     socket_t *sock = s->sockets[i];
    611     if (!sock) continue;
    612     resp_array_append_bulk(sockets_arr, sock->socket_id);
    613   }
    614   resp_array_append_obj(res, sockets_arr);
    615 
    616   resp_array_append_bulk(res, "forwards");
    617   resp_object *forwards_arr = resp_array_init();
    618   for (size_t i = 0; i < s->forwards_count; i++) {
    619     resp_array_append_bulk(forwards_arr, s->forwards[i].src_socket_id);
    620     resp_array_append_bulk(forwards_arr, s->forwards[i].dst_socket_id);
    621   }
    622   resp_array_append_obj(res, forwards_arr);
    623 
    624   resp_array_append_bulk(res, "marked_for_deletion");
    625   resp_array_append_int(res, s->marked_for_deletion);
    626 
    627   return res;
    628 }
    629 
    630 resp_object *domain_session_destroy(resp_object *args) {
    631   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) {
    632     resp_object *err = resp_array_init();
    633     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.destroy'");
    634     return err;
    635   }
    636 
    637   const char *session_id = NULL;
    638   if (args->u.arr.elem[1].type == RESPT_BULK) {
    639     session_id = args->u.arr.elem[1].u.s;
    640   }
    641 
    642   if (!session_id) {
    643     resp_object *err = resp_array_init();
    644     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.destroy'");
    645     return err;
    646   }
    647 
    648   session_t *s = find_session(session_id);
    649   if (!s) {
    650     resp_object *err = resp_array_init();
    651     resp_array_append_simple(err, "ERR session not found");
    652     return err;
    653   }
    654 
    655   destroy_session(s);
    656 
    657   resp_object *res = resp_array_init();
    658   resp_array_append_simple(res, "OK");
    659   return res;
    660 }
    661 
    662 resp_object *domain_socket_create_listen(resp_object *args) {
    663   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 3) {
    664     resp_object *err = resp_array_init();
    665     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.listen'");
    666     return err;
    667   }
    668 
    669   const char *session_id = NULL;
    670   const char *socket_id = NULL;
    671 
    672   if (args->u.arr.elem[1].type == RESPT_BULK) {
    673     session_id = args->u.arr.elem[1].u.s;
    674   }
    675   if (args->u.arr.elem[2].type == RESPT_BULK) {
    676     socket_id = args->u.arr.elem[2].u.s;
    677   }
    678 
    679   if (!session_id || !socket_id) {
    680     resp_object *err = resp_array_init();
    681     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.listen'");
    682     return err;
    683   }
    684 
    685   session_t *s = find_session(session_id);
    686   if (!s) {
    687     resp_object *err = resp_array_init();
    688     resp_array_append_simple(err, "ERR session not found");
    689     return err;
    690   }
    691 
    692   socket_t *sock = create_listen_socket(s, socket_id);
    693   if (!sock) {
    694     resp_object *err = resp_array_init();
    695     resp_array_append_simple(err, "ERR failed to create socket");
    696     return err;
    697   }
    698 
    699   resp_object *res = resp_array_init();
    700   resp_array_append_int(res, sock->local_port);
    701   resp_array_append_bulk(res, domain_cfg && domain_cfg->advertise_addr ? domain_cfg->advertise_addr : "");
    702   return res;
    703 }
    704 
    705 resp_object *domain_socket_create_connect(resp_object *args) {
    706   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 5) {
    707     resp_object *err = resp_array_init();
    708     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.connect'");
    709     return err;
    710   }
    711 
    712   const char *session_id = NULL;
    713   const char *socket_id = NULL;
    714   const char *ip = NULL;
    715   const char *port_str = NULL;
    716 
    717   if (args->u.arr.elem[1].type == RESPT_BULK) {
    718     session_id = args->u.arr.elem[1].u.s;
    719   }
    720   if (args->u.arr.elem[2].type == RESPT_BULK) {
    721     socket_id = args->u.arr.elem[2].u.s;
    722   }
    723   if (args->u.arr.elem[3].type == RESPT_BULK) {
    724     ip = args->u.arr.elem[3].u.s;
    725   }
    726   if (args->u.arr.elem[4].type == RESPT_BULK) {
    727     port_str = args->u.arr.elem[4].u.s;
    728   }
    729 
    730   if (!session_id || !socket_id || !ip || !port_str) {
    731     resp_object *err = resp_array_init();
    732     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.connect'");
    733     return err;
    734   }
    735 
    736   int port = atoi(port_str);
    737 
    738   session_t *s = find_session(session_id);
    739   if (!s) {
    740     resp_object *err = resp_array_init();
    741     resp_array_append_simple(err, "ERR session not found");
    742     return err;
    743   }
    744 
    745   socket_t *sock = create_connect_socket(s, socket_id, ip, port);
    746   if (!sock) {
    747     resp_object *err = resp_array_init();
    748     resp_array_append_simple(err, "ERR failed to create socket");
    749     return err;
    750   }
    751 
    752   resp_object *res = resp_array_init();
    753   resp_array_append_int(res, sock->local_port);
    754   resp_array_append_bulk(res, domain_cfg && domain_cfg->advertise_addr ? domain_cfg->advertise_addr : "");
    755   return res;
    756 }
    757 
    758 resp_object *domain_socket_destroy(resp_object *args) {
    759   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 3) {
    760     resp_object *err = resp_array_init();
    761     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.destroy'");
    762     return err;
    763   }
    764 
    765   const char *session_id = NULL;
    766   const char *socket_id = NULL;
    767 
    768   if (args->u.arr.elem[1].type == RESPT_BULK) {
    769     session_id = args->u.arr.elem[1].u.s;
    770   }
    771   if (args->u.arr.elem[2].type == RESPT_BULK) {
    772     socket_id = args->u.arr.elem[2].u.s;
    773   }
    774 
    775   if (!session_id || !socket_id) {
    776     resp_object *err = resp_array_init();
    777     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.destroy'");
    778     return err;
    779   }
    780 
    781   session_t *s = find_session(session_id);
    782   if (!s) {
    783     resp_object *err = resp_array_init();
    784     resp_array_append_simple(err, "ERR session not found");
    785     return err;
    786   }
    787 
    788   if (destroy_socket(s, socket_id) != 0) {
    789     resp_object *err = resp_array_init();
    790     resp_array_append_simple(err, "ERR socket not found");
    791     return err;
    792   }
    793 
    794   resp_object *res = resp_array_init();
    795   resp_array_append_simple(res, "OK");
    796   return res;
    797 }
    798 
    799 resp_object *domain_forward_list(resp_object *args) {
    800   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) {
    801     resp_object *err = resp_array_init();
    802     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.list'");
    803     return err;
    804   }
    805 
    806   const char *session_id = NULL;
    807   if (args->u.arr.elem[1].type == RESPT_BULK) {
    808     session_id = args->u.arr.elem[1].u.s;
    809   }
    810 
    811   if (!session_id) {
    812     resp_object *err = resp_array_init();
    813     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.list'");
    814     return err;
    815   }
    816 
    817   session_t *s = find_session(session_id);
    818   if (!s) {
    819     resp_object *err = resp_array_init();
    820     resp_array_append_simple(err, "ERR session not found");
    821     return err;
    822   }
    823 
    824   resp_object *res = resp_array_init();
    825   for (size_t i = 0; i < s->forwards_count; i++) {
    826     resp_array_append_bulk(res, s->forwards[i].src_socket_id);
    827     resp_array_append_bulk(res, s->forwards[i].dst_socket_id);
    828   }
    829   return res;
    830 }
    831 
    832 resp_object *domain_forward_create(resp_object *args) {
    833   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 4) {
    834     resp_object *err = resp_array_init();
    835     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.create'");
    836     return err;
    837   }
    838 
    839   const char *session_id = NULL;
    840   const char *src_socket_id = NULL;
    841   const char *dst_socket_id = NULL;
    842 
    843   if (args->u.arr.elem[1].type == RESPT_BULK) {
    844     session_id = args->u.arr.elem[1].u.s;
    845   }
    846   if (args->u.arr.elem[2].type == RESPT_BULK) {
    847     src_socket_id = args->u.arr.elem[2].u.s;
    848   }
    849   if (args->u.arr.elem[3].type == RESPT_BULK) {
    850     dst_socket_id = args->u.arr.elem[3].u.s;
    851   }
    852 
    853   if (!session_id || !src_socket_id || !dst_socket_id) {
    854     resp_object *err = resp_array_init();
    855     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.create'");
    856     return err;
    857   }
    858 
    859   session_t *s = find_session(session_id);
    860   if (!s) {
    861     resp_object *err = resp_array_init();
    862     resp_array_append_simple(err, "ERR session not found");
    863     return err;
    864   }
    865 
    866   socket_t *src = find_socket(s, src_socket_id);
    867   if (!src) {
    868     resp_object *err = resp_array_init();
    869     resp_array_append_simple(err, "ERR source socket not found");
    870     return err;
    871   }
    872 
    873   socket_t *dst = find_socket(s, dst_socket_id);
    874   if (!dst) {
    875     resp_object *err = resp_array_init();
    876     resp_array_append_simple(err, "ERR destination socket not found");
    877     return err;
    878   }
    879 
    880   if (add_forward(s, src_socket_id, dst_socket_id) != 0) {
    881     resp_object *err = resp_array_init();
    882     resp_array_append_simple(err, "ERR failed to add forward");
    883     return err;
    884   }
    885 
    886   log_debug("udphole: created forward %s -> %s in session %s", src_socket_id, dst_socket_id, session_id);
    887 
    888   resp_object *res = resp_array_init();
    889   resp_array_append_simple(res, "OK");
    890   return res;
    891 }
    892 
    893 resp_object *domain_forward_destroy(resp_object *args) {
    894   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 4) {
    895     resp_object *err = resp_array_init();
    896     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.destroy'");
    897     return err;
    898   }
    899 
    900   const char *session_id = NULL;
    901   const char *src_socket_id = NULL;
    902   const char *dst_socket_id = NULL;
    903 
    904   if (args->u.arr.elem[1].type == RESPT_BULK) {
    905     session_id = args->u.arr.elem[1].u.s;
    906   }
    907   if (args->u.arr.elem[2].type == RESPT_BULK) {
    908     src_socket_id = args->u.arr.elem[2].u.s;
    909   }
    910   if (args->u.arr.elem[3].type == RESPT_BULK) {
    911     dst_socket_id = args->u.arr.elem[3].u.s;
    912   }
    913 
    914   if (!session_id || !src_socket_id || !dst_socket_id) {
    915     resp_object *err = resp_array_init();
    916     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.destroy'");
    917     return err;
    918   }
    919 
    920   session_t *s = find_session(session_id);
    921   if (!s) {
    922     resp_object *err = resp_array_init();
    923     resp_array_append_simple(err, "ERR session not found");
    924     return err;
    925   }
    926 
    927   if (remove_forward(s, src_socket_id, dst_socket_id) != 0) {
    928     resp_object *err = resp_array_init();
    929     resp_array_append_simple(err, "ERR forward not found");
    930     return err;
    931   }
    932 
    933   resp_object *res = resp_array_init();
    934   resp_array_append_simple(res, "OK");
    935   return res;
    936 }
    937 
    938 resp_object *domain_system_load(resp_object *args) {
    939   (void)args;
    940 
    941   double loadavg[3];
    942   if (getloadavg(loadavg, 3) != 3) {
    943     resp_object *err = resp_array_init();
    944     resp_array_append_simple(err, "ERR failed to get load average");
    945     return err;
    946   }
    947 
    948   resp_object *res = resp_array_init();
    949   char buf[64];
    950 
    951   resp_array_append_bulk(res, "1min");
    952   snprintf(buf, sizeof(buf), "%.2f", loadavg[0]);
    953   resp_array_append_bulk(res, buf);
    954 
    955   resp_array_append_bulk(res, "5min");
    956   snprintf(buf, sizeof(buf), "%.2f", loadavg[1]);
    957   resp_array_append_bulk(res, buf);
    958 
    959   resp_array_append_bulk(res, "15min");
    960   snprintf(buf, sizeof(buf), "%.2f", loadavg[2]);
    961   resp_array_append_bulk(res, buf);
    962 
    963   return res;
    964 }
    965 
    966 resp_object *domain_session_count(resp_object *args) {
    967   (void)args;
    968 
    969   size_t count = 0;
    970   for (size_t i = 0; i < sessions_count; i++) {
    971     if (sessions[i] != NULL) {
    972       count++;
    973     }
    974   }
    975 
    976   resp_object *res = malloc(sizeof(resp_object));
    977   if (!res) return NULL;
    978   res->type = RESPT_INT;
    979   res->u.i = (long long)count;
    980   return res;
    981 }
    982 
    983 PT_THREAD(session_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) {
    984   (void)timestamp;
    985   log_trace("session_manager: protothread entry");
    986   PT_BEGIN(pt);
    987 
    988   PT_WAIT_UNTIL(pt, domain_cfg);
    989 
    990   running = 1;
    991   log_info("udphole: manager started with port range %d-%d", domain_cfg->port_low, domain_cfg->port_high);
    992 
    993   int64_t last_cleanup = 0;
    994 
    995   for (;;) {
    996     int64_t now = (int64_t)(time(NULL));
    997     if (now - last_cleanup >= 1) {
    998       cleanup_expired_sessions();
    999       last_cleanup = now;
   1000     }
   1001 
   1002     PT_YIELD(pt);
   1003   }
   1004 
   1005   running = 0;
   1006 
   1007   for (size_t i = 0; i < sessions_count; i++) {
   1008     if (sessions[i]) {
   1009       sessions[i]->marked_for_deletion = 1;
   1010     }
   1011   }
   1012 
   1013   PT_END(pt);
   1014 }