udphole

Basic UDP wormhole proxy
git clone git://git.finwo.net/app/udphole
Log | Files | Refs | README | LICENSE

session.c (28102B)


      1 #include "session.h"
      2 
      3 #include <arpa/inet.h>
      4 #include <errno.h>
      5 #include <fcntl.h>
      6 #include <netinet/in.h>
      7 #include <stdarg.h>
      8 #include <stdio.h>
      9 #include <stdlib.h>
     10 #include <string.h>
     11 #include <sys/socket.h>
     12 #include <sys/stat.h>
     13 #include <sys/un.h>
     14 #include <time.h>
     15 #include <unistd.h>
     16 
     17 #include "common/resp.h"
     18 #include "common/scheduler.h"
     19 #include "common/socket_util.h"
     20 #include "domain/config.h"
     21 #include "rxi/log.h"
     22 
     23 static resp_object *get_udphole_cfg(void) {
     24   return domain_cfg ? resp_map_get(domain_cfg, "udphole") : NULL;
     25 }
     26 
     27 #define SESSION_HASH_SIZE   256
     28 #define BUFFER_SIZE         4096
     29 #define DEFAULT_IDLE_EXPIRY 60
     30 
     31 typedef struct socket {
     32   char                   *socket_id;
     33   int                    *fds;
     34   int                     local_port;
     35   int                     mode;
     36   struct sockaddr_storage remote_addr;
     37   socklen_t               remote_addrlen;
     38   int                     learned_valid;
     39   struct sockaddr_storage learned_addr;
     40   socklen_t               learned_addrlen;
     41 } socket_t;
     42 
     43 typedef struct forward {
     44   char *src_socket_id;
     45   char *dst_socket_id;
     46 } forward_t;
     47 
     48 typedef struct session {
     49   char           *session_id;
     50   time_t          idle_expiry;
     51   time_t          created;
     52   time_t          last_activity;
     53   socket_t      **sockets;
     54   size_t          sockets_count;
     55   forward_t      *forwards;
     56   size_t          forwards_count;
     57   int             marked_for_deletion;
     58   int            *ready_fds;
     59   int            *all_fds;
     60   struct pt_task *task;
     61 } session_t;
     62 
     63 static session_t **sessions       = NULL;
     64 static size_t      sessions_count = 0;
     65 
     66 static session_t *find_session(const char *session_id) {
     67   for (size_t i = 0; i < sessions_count; i++) {
     68     if (strcmp(sessions[i]->session_id, session_id) == 0) {
     69       return sessions[i];
     70     }
     71   }
     72   return NULL;
     73 }
     74 
     75 static socket_t *find_socket(session_t *s, const char *socket_id) {
     76   if (!s || !s->sockets || !socket_id) return NULL;
     77   for (size_t i = 0; i < s->sockets_count; i++) {
     78     if (s->sockets[i] && strcmp(s->sockets[i]->socket_id, socket_id) == 0) {
     79       return s->sockets[i];
     80     }
     81   }
     82   return NULL;
     83 }
     84 
     85 // port_cur = last-assigned-port
     86 int port_cur = 0;
     87 static int alloc_port(void) {
     88   resp_object *udphole = get_udphole_cfg();
     89   if (!udphole) return 0;
     90   const char *ports_str = resp_map_get_string(udphole, "ports");
     91   int         port_low = 7000, port_high = 7999;
     92   if (ports_str) sscanf(ports_str, "%d-%d", &port_low, &port_high);
     93 
     94   // Limit port tries
     95   for (int i = 0; i < (port_high - port_low); i++) {
     96 
     97     // Select next port
     98     port_cur++;
     99     if (port_cur < port_low ) port_cur = port_low;
    100     if (port_cur > port_high) port_cur = port_low;
    101     int port = port_cur;
    102 
    103     // Build sockaddr
    104     struct sockaddr_in addr;
    105     memset(&addr, 0, sizeof(addr));
    106     addr.sin_family      = AF_INET;
    107     addr.sin_addr.s_addr = INADDR_ANY;
    108     addr.sin_port        = htons(port);
    109 
    110     // Check if the port is free
    111     int udp_fd = socket(AF_INET, SOCK_DGRAM, 0);
    112     if (udp_fd < 0) continue;
    113     int ok = (bind(udp_fd, (struct sockaddr *)&addr, sizeof(addr)) == 0);
    114     close(udp_fd);
    115     if (!ok) continue;
    116 
    117     return port;
    118   }
    119 
    120   // Nothing found
    121   return 0;
    122 }
    123 
    124 static int parse_ip_addr(const char *ip_str, int port, struct sockaddr_storage *addr, socklen_t *addrlen) {
    125   memset(addr, 0, sizeof(*addr));
    126 
    127   struct sockaddr_in *addr4 = (struct sockaddr_in *)addr;
    128   if (inet_pton(AF_INET, ip_str, &addr4->sin_addr) == 1) {
    129     addr4->sin_family = AF_INET;
    130     addr4->sin_port   = htons(port);
    131     *addrlen          = sizeof(*addr4);
    132     return 0;
    133   }
    134 
    135   struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)addr;
    136   if (inet_pton(AF_INET6, ip_str, &addr6->sin6_addr) == 1) {
    137     addr6->sin6_family = AF_INET6;
    138     addr6->sin6_port   = htons(port);
    139     *addrlen           = sizeof(*addr6);
    140     return 0;
    141   }
    142 
    143   return -1;
    144 }
    145 
    146 static void close_socket(socket_t *sock) {
    147   if (!sock || !sock->fds) return;
    148   for (int i = 1; i <= sock->fds[0]; i++) {
    149     if (sock->fds[i] >= 0) {
    150       close(sock->fds[i]);
    151     }
    152   }
    153   free(sock->fds);
    154   sock->fds = NULL;
    155 }
    156 
    157 static void free_socket(socket_t *sock) {
    158   if (!sock) return;
    159   close_socket(sock);
    160   free(sock->socket_id);
    161   free(sock);
    162 }
    163 
    164 static void destroy_session(session_t *s) {
    165   if (!s) return;
    166   s->marked_for_deletion = 1;
    167 
    168   for (size_t i = 0; i < s->sockets_count; i++) {
    169     if (s->sockets[i]) {
    170       free_socket(s->sockets[i]);
    171     }
    172   }
    173   free(s->sockets);
    174 
    175   for (size_t i = 0; i < s->forwards_count; i++) {
    176     free(s->forwards[i].src_socket_id);
    177     free(s->forwards[i].dst_socket_id);
    178   }
    179   free(s->forwards);
    180 
    181   free(s->session_id);
    182   free(s);
    183 
    184   for (size_t i = 0; i < sessions_count; i++) {
    185     if (sessions[i] == s) {
    186       for (size_t j = i; j < sessions_count - 1; j++) {
    187         sessions[j] = sessions[j + 1];
    188       }
    189       sessions_count--;
    190       break;
    191     }
    192   }
    193 }
    194 
    195 static session_t *create_session(const char *session_id, int idle_expiry) {
    196   const session_t *cs = find_session(session_id);
    197   if (cs) return (session_t *)cs;
    198 
    199   session_t *s = calloc(1, sizeof(*s));
    200   if (!s) return NULL;
    201 
    202   s->session_id    = strdup(session_id);
    203   s->created       = time(NULL);
    204   s->last_activity = s->created;
    205   s->idle_expiry   = idle_expiry > 0 ? idle_expiry : DEFAULT_IDLE_EXPIRY;
    206 
    207   sessions                   = realloc(sessions, sizeof(session_t *) * (sessions_count + 1));
    208   sessions[sessions_count++] = s;
    209 
    210   return s;
    211 }
    212 
    213 static void cleanup_expired_sessions(void) {
    214   if (!sessions) return;
    215   time_t now = time(NULL);
    216 
    217   for (size_t i = 0; i < sessions_count; i++) {
    218     session_t *s = sessions[i];
    219     if (!s) continue;
    220     if (now - s->last_activity > s->idle_expiry) {
    221       log_debug("udphole: session %s expired (idle %ld > expiry %ld)", s->session_id, (long)(now - s->last_activity),
    222                 (long)s->idle_expiry);
    223       destroy_session(s);
    224     }
    225   }
    226 }
    227 
    228 static int add_forward(session_t *s, const char *src_id, const char *dst_id) {
    229   for (size_t i = 0; i < s->forwards_count; i++) {
    230     if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) {
    231       return 0;
    232     }
    233   }
    234 
    235   forward_t *new_forwards = realloc(s->forwards, sizeof(forward_t) * (s->forwards_count + 1));
    236   if (!new_forwards) return -1;
    237   s->forwards = new_forwards;
    238 
    239   s->forwards[s->forwards_count].src_socket_id = strdup(src_id);
    240   s->forwards[s->forwards_count].dst_socket_id = strdup(dst_id);
    241   s->forwards_count++;
    242 
    243   return 0;
    244 }
    245 
    246 static int remove_forward(session_t *s, const char *src_id, const char *dst_id) {
    247   for (size_t i = 0; i < s->forwards_count; i++) {
    248     if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) {
    249       free(s->forwards[i].src_socket_id);
    250       free(s->forwards[i].dst_socket_id);
    251       for (size_t j = i; j < s->forwards_count - 1; j++) {
    252         s->forwards[j] = s->forwards[j + 1];
    253       }
    254       s->forwards_count--;
    255       return 0;
    256     }
    257   }
    258   return -1;
    259 }
    260 
    261 static socket_t *create_listen_socket(session_t *sess, const char *socket_id) {
    262   socket_t *existing = find_socket(sess, socket_id);
    263   if (existing) return existing;
    264 
    265   int port = alloc_port();
    266   if (!port) {
    267     log_error("udphole: no ports available");
    268     return NULL;
    269   }
    270 
    271   char port_str[16];
    272   snprintf(port_str, sizeof(port_str), "%d", port);
    273   int *fds = udp_recv(port_str, NULL, NULL);
    274   if (!fds || fds[0] == 0) {
    275     log_error("udphole: failed to create UDP socket on port %d", port);
    276     free(fds);
    277     return NULL;
    278   }
    279 
    280   socket_t *sock = calloc(1, sizeof(*sock));
    281   if (!sock) {
    282     free(fds);
    283     return NULL;
    284   }
    285 
    286   sock->socket_id     = strdup(socket_id);
    287   sock->fds           = fds;
    288   sock->local_port    = port;
    289   sock->mode          = 0;
    290   sock->learned_valid = 0;
    291 
    292   sess->sockets                        = realloc(sess->sockets, sizeof(socket_t *) * (sess->sockets_count + 1));
    293   sess->sockets[sess->sockets_count++] = sock;
    294 
    295   log_debug("udphole: created listen socket %s in session %s on port %d", socket_id, sess->session_id, port);
    296   return sock;
    297 }
    298 
    299 static socket_t *create_connect_socket(session_t *sess, const char *socket_id, const char *ip, int port) {
    300   socket_t *existing = find_socket(sess, socket_id);
    301   if (existing) return existing;
    302 
    303   int local_port = alloc_port();
    304   if (!local_port) {
    305     log_error("udphole: no ports available");
    306     return NULL;
    307   }
    308 
    309   char port_str[16];
    310   snprintf(port_str, sizeof(port_str), "%d", local_port);
    311   int *fds = udp_recv(port_str, NULL, NULL);
    312   if (!fds || fds[0] == 0) {
    313     log_error("udphole: failed to create UDP socket on port %d", local_port);
    314     free(fds);
    315     return NULL;
    316   }
    317 
    318   struct sockaddr_storage remote_addr;
    319   socklen_t               remote_addrlen;
    320   if (parse_ip_addr(ip, port, &remote_addr, &remote_addrlen) != 0) {
    321     log_error("udphole: invalid remote address %s:%d", ip, port);
    322     free(fds);
    323     return NULL;
    324   }
    325 
    326   socket_t *sock = calloc(1, sizeof(*sock));
    327   if (!sock) {
    328     free(fds);
    329     return NULL;
    330   }
    331 
    332   sock->socket_id      = strdup(socket_id);
    333   sock->fds            = fds;
    334   sock->local_port     = local_port;
    335   sock->mode           = 1;
    336   sock->remote_addr    = remote_addr;
    337   sock->remote_addrlen = remote_addrlen;
    338   sock->learned_valid  = 0;
    339 
    340   sess->sockets                        = realloc(sess->sockets, sizeof(socket_t *) * (sess->sockets_count + 1));
    341   sess->sockets[sess->sockets_count++] = sock;
    342 
    343   log_debug("udphole: created connect socket %s in session %s on port %d -> %s:%d", socket_id, sess->session_id,
    344             local_port, ip, port);
    345   return sock;
    346 }
    347 
    348 static int destroy_socket(session_t *sess, const char *socket_id) {
    349   socket_t *sock = find_socket(sess, socket_id);
    350   if (!sock) return -1;
    351 
    352   for (size_t i = 0; i < sess->sockets_count; i++) {
    353     if (sess->sockets[i] == sock) {
    354       sess->sockets[i] = NULL;
    355       break;
    356     }
    357   }
    358   free_socket(sock);
    359 
    360   for (size_t i = 0; i < sess->forwards_count;) {
    361     if (strcmp(sess->forwards[i].src_socket_id, socket_id) == 0 ||
    362         strcmp(sess->forwards[i].dst_socket_id, socket_id) == 0) {
    363       free(sess->forwards[i].src_socket_id);
    364       free(sess->forwards[i].dst_socket_id);
    365       for (size_t j = i; j < sess->forwards_count - 1; j++) {
    366         sess->forwards[j] = sess->forwards[j + 1];
    367       }
    368       sess->forwards_count--;
    369     } else {
    370       i++;
    371     }
    372   }
    373 
    374   return 0;
    375 }
    376 
    377 static socket_t *find_socket_by_fd(session_t *s, int fd) {
    378   if (!s || !s->sockets) return NULL;
    379   for (size_t j = 0; j < s->sockets_count; j++) {
    380     socket_t *sock = s->sockets[j];
    381     if (!sock || !sock->fds) continue;
    382     for (int i = 1; i <= sock->fds[0]; i++) {
    383       if (sock->fds[i] == fd) {
    384         return sock;
    385       }
    386     }
    387   }
    388   return NULL;
    389 }
    390 
    391 int session_pt(int64_t timestamp, struct pt_task *task) {
    392   (void)timestamp;
    393   session_t *s = task->udata;
    394 
    395   if (s->marked_for_deletion) {
    396     goto cleanup;
    397   }
    398 
    399   if (!s->sockets || s->sockets_count == 0) {
    400     return SCHED_RUNNING;
    401   }
    402 
    403   s->all_fds = realloc(s->all_fds, sizeof(int) * (s->sockets_count * 2 + 1));
    404   if (!s->all_fds) {
    405     return SCHED_RUNNING;
    406   }
    407   s->all_fds[0] = 0;
    408 
    409   for (size_t j = 0; j < s->sockets_count; j++) {
    410     socket_t *sock = s->sockets[j];
    411     if (!sock || !sock->fds) continue;
    412     for (int i = 1; i <= sock->fds[0]; i++) {
    413       s->all_fds[++s->all_fds[0]] = sock->fds[i];
    414     }
    415   }
    416 
    417   if (s->all_fds[0] == 0) {
    418     return SCHED_RUNNING;
    419   }
    420 
    421   int ready_fd = sched_has_data(s->all_fds);
    422   if (ready_fd < 0) {
    423     return SCHED_RUNNING;
    424   }
    425 
    426   char      buffer[BUFFER_SIZE];
    427   socket_t *src_sock = find_socket_by_fd(s, ready_fd);
    428   if (!src_sock) {
    429     return SCHED_RUNNING;
    430   }
    431 
    432   struct sockaddr_storage from_addr;
    433   socklen_t               from_len = sizeof(from_addr);
    434   ssize_t n = recvfrom(ready_fd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&from_addr, &from_len);
    435 
    436   if (n <= 0) {
    437     if (errno != EAGAIN && errno != EWOULDBLOCK) {
    438       log_warn("udphole: recvfrom error on socket %s: %s", src_sock->socket_id, strerror(errno));
    439     }
    440     return SCHED_RUNNING;
    441   }
    442 
    443   s->last_activity = time(NULL);
    444 
    445   if (src_sock->mode == 0 && !src_sock->learned_valid) {
    446     src_sock->learned_addr    = from_addr;
    447     src_sock->learned_addrlen = from_len;
    448     src_sock->learned_valid   = 1;
    449     log_debug("udphole: socket %s learned remote address", src_sock->socket_id);
    450   }
    451 
    452   for (size_t i = 0; i < s->forwards_count; i++) {
    453     if (strcmp(s->forwards[i].src_socket_id, src_sock->socket_id) != 0) {
    454       continue;
    455     }
    456 
    457     socket_t *dst_sock = find_socket(s, s->forwards[i].dst_socket_id);
    458     if (!dst_sock || !dst_sock->fds || dst_sock->fds[0] == 0) continue;
    459 
    460     struct sockaddr *dest_addr    = NULL;
    461     socklen_t        dest_addrlen = 0;
    462 
    463     if (dst_sock->mode == 1) {
    464       dest_addr    = (struct sockaddr *)&dst_sock->remote_addr;
    465       dest_addrlen = dst_sock->remote_addrlen;
    466     } else if (dst_sock->learned_valid) {
    467       dest_addr    = (struct sockaddr *)&dst_sock->learned_addr;
    468       dest_addrlen = dst_sock->learned_addrlen;
    469     }
    470 
    471     if (dest_addr && dest_addrlen > 0) {
    472       int     dst_fd = dst_sock->fds[1];
    473       ssize_t sent   = sendto(dst_fd, buffer, n, 0, dest_addr, dest_addrlen);
    474       if (sent < 0) {
    475         log_warn("udphole: forward failed %s -> %s: %s", src_sock->socket_id, dst_sock->socket_id, strerror(errno));
    476       }
    477     }
    478   }
    479 
    480   return SCHED_RUNNING;
    481 
    482 cleanup:
    483   log_debug("udphole: session %s exiting", s->session_id);
    484 
    485   if (s->all_fds) {
    486     free(s->all_fds);
    487     s->all_fds = NULL;
    488   }
    489   if (s->ready_fds) {
    490     free(s->ready_fds);
    491     s->ready_fds = NULL;
    492   }
    493 
    494   return SCHED_DONE;
    495 }
    496 
    497 static void spawn_session_pt(session_t *s) {
    498   s->task = (struct pt_task *)(intptr_t)sched_create(session_pt, s);
    499 }
    500 
    501 resp_object *domain_session_create(const char *cmd, resp_object *args) {
    502   (void)cmd;
    503   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) {
    504     resp_object *err = resp_array_init();
    505     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.create'");
    506     return err;
    507   }
    508 
    509   const char *session_id = NULL;
    510   if (args->u.arr.n > 1 && args->u.arr.elem[1].type == RESPT_BULK) {
    511     session_id = args->u.arr.elem[1].u.s;
    512   }
    513 
    514   if (!session_id) {
    515     resp_object *err = resp_array_init();
    516     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.create'");
    517     return err;
    518   }
    519 
    520   int idle_expiry = 0;
    521   if (args->u.arr.n >= 3 && args->u.arr.elem[2].type == RESPT_BULK && args->u.arr.elem[2].u.s) {
    522     idle_expiry = atoi(args->u.arr.elem[2].u.s);
    523   }
    524 
    525   session_t *s = create_session(session_id, idle_expiry);
    526   if (!s) {
    527     resp_object *err = resp_array_init();
    528     resp_array_append_simple(err, "ERR failed to create session");
    529     return err;
    530   }
    531 
    532   spawn_session_pt(s);
    533 
    534   resp_object *res = resp_simple_init("OK");
    535   return res;
    536 }
    537 
    538 resp_object *domain_session_list(const char *cmd, resp_object *args) {
    539   (void)cmd;
    540   (void)args;
    541 
    542   resp_object *res = resp_array_init();
    543   if (!res) return NULL;
    544 
    545   for (size_t i = 0; i < sessions_count; i++) {
    546     session_t *s = sessions[i];
    547     if (!s) continue;
    548     resp_array_append_bulk(res, s->session_id);
    549   }
    550 
    551   return res;
    552 }
    553 
    554 resp_object *domain_session_info(const char *cmd, resp_object *args) {
    555   (void)cmd;
    556   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) {
    557     resp_object *err = resp_array_init();
    558     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.info'");
    559     return err;
    560   }
    561 
    562   const char *session_id = NULL;
    563   if (args->u.arr.elem[1].type == RESPT_BULK) {
    564     session_id = args->u.arr.elem[1].u.s;
    565   }
    566 
    567   if (!session_id) {
    568     resp_object *err = resp_array_init();
    569     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.info'");
    570     return err;
    571   }
    572 
    573   session_t *s = find_session(session_id);
    574   if (!s) {
    575     resp_object *err = resp_error_init("ERR session not found");
    576     return err;
    577   }
    578 
    579   resp_object *res = resp_array_init();
    580   if (!res) return NULL;
    581 
    582   resp_array_append_bulk(res, "session_id");
    583   resp_array_append_bulk(res, s->session_id);
    584 
    585   resp_array_append_bulk(res, "created");
    586   resp_array_append_int(res, (long long)s->created);
    587 
    588   resp_array_append_bulk(res, "last_activity");
    589   resp_array_append_int(res, (long long)s->last_activity);
    590 
    591   resp_array_append_bulk(res, "idle_expiry");
    592   resp_array_append_int(res, (long long)s->idle_expiry);
    593 
    594   resp_array_append_bulk(res, "sockets");
    595   resp_object *sockets_arr = resp_array_init();
    596   for (size_t i = 0; i < s->sockets_count; i++) {
    597     socket_t *sock = s->sockets[i];
    598     if (!sock) continue;
    599     resp_array_append_bulk(sockets_arr, sock->socket_id);
    600   }
    601   resp_array_append_obj(res, sockets_arr);
    602 
    603   resp_array_append_bulk(res, "forwards");
    604   resp_object *forwards_arr = resp_array_init();
    605   for (size_t i = 0; i < s->forwards_count; i++) {
    606     resp_array_append_bulk(forwards_arr, s->forwards[i].src_socket_id);
    607     resp_array_append_bulk(forwards_arr, s->forwards[i].dst_socket_id);
    608   }
    609   resp_array_append_obj(res, forwards_arr);
    610 
    611   resp_array_append_bulk(res, "marked_for_deletion");
    612   resp_array_append_int(res, s->marked_for_deletion);
    613 
    614   return res;
    615 }
    616 
    617 resp_object *domain_session_destroy(const char *cmd, resp_object *args) {
    618   (void)cmd;
    619   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) {
    620     resp_object *err = resp_array_init();
    621     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.destroy'");
    622     return err;
    623   }
    624 
    625   const char *session_id = NULL;
    626   if (args->u.arr.elem[1].type == RESPT_BULK) {
    627     session_id = args->u.arr.elem[1].u.s;
    628   }
    629 
    630   if (!session_id) {
    631     resp_object *err = resp_array_init();
    632     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.destroy'");
    633     return err;
    634   }
    635 
    636   session_t *s = find_session(session_id);
    637   if (!s) {
    638     resp_object *err = resp_error_init("ERR session not found");
    639     return err;
    640   }
    641 
    642   destroy_session(s);
    643 
    644   resp_object *res = resp_simple_init("OK");
    645   return res;
    646 }
    647 
    648 resp_object *domain_socket_create_listen(const char *cmd, resp_object *args) {
    649   (void)cmd;
    650   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 3) {
    651     resp_object *err = resp_array_init();
    652     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.listen'");
    653     return err;
    654   }
    655 
    656   const char *session_id = NULL;
    657   const char *socket_id  = NULL;
    658 
    659   if (args->u.arr.elem[1].type == RESPT_BULK) {
    660     session_id = args->u.arr.elem[1].u.s;
    661   }
    662   if (args->u.arr.elem[2].type == RESPT_BULK) {
    663     socket_id = args->u.arr.elem[2].u.s;
    664   }
    665 
    666   if (!session_id || !socket_id) {
    667     resp_object *err = resp_array_init();
    668     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.listen'");
    669     return err;
    670   }
    671 
    672   session_t *s = find_session(session_id);
    673   if (!s) {
    674     resp_object *err = resp_error_init("ERR session not found");
    675     return err;
    676   }
    677 
    678   socket_t *sock = create_listen_socket(s, socket_id);
    679   if (!sock) {
    680     resp_object *err = resp_array_init();
    681     resp_array_append_simple(err, "ERR failed to create socket");
    682     return err;
    683   }
    684 
    685   resp_object *res = resp_array_init();
    686   resp_array_append_int(res, sock->local_port);
    687   resp_object *udphole   = get_udphole_cfg();
    688   const char  *advertise = udphole ? resp_map_get_string(udphole, "advertise") : NULL;
    689   resp_array_append_bulk(res, advertise ? advertise : "");
    690   return res;
    691 }
    692 
    693 resp_object *domain_socket_create_connect(const char *cmd, resp_object *args) {
    694   (void)cmd;
    695   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 5) {
    696     resp_object *err = resp_array_init();
    697     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.connect'");
    698     return err;
    699   }
    700 
    701   const char *session_id = NULL;
    702   const char *socket_id  = NULL;
    703   const char *ip         = NULL;
    704   const char *port_str   = NULL;
    705 
    706   if (args->u.arr.elem[1].type == RESPT_BULK) {
    707     session_id = args->u.arr.elem[1].u.s;
    708   }
    709   if (args->u.arr.elem[2].type == RESPT_BULK) {
    710     socket_id = args->u.arr.elem[2].u.s;
    711   }
    712   if (args->u.arr.elem[3].type == RESPT_BULK) {
    713     ip = args->u.arr.elem[3].u.s;
    714   }
    715   if (args->u.arr.elem[4].type == RESPT_BULK) {
    716     port_str = args->u.arr.elem[4].u.s;
    717   }
    718 
    719   if (!session_id || !socket_id || !ip || !port_str) {
    720     resp_object *err = resp_array_init();
    721     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.connect'");
    722     return err;
    723   }
    724 
    725   int port = atoi(port_str);
    726 
    727   session_t *s = find_session(session_id);
    728   if (!s) {
    729     resp_object *err = resp_error_init("ERR session not found");
    730     return err;
    731   }
    732 
    733   socket_t *sock = create_connect_socket(s, socket_id, ip, port);
    734   if (!sock) {
    735     resp_object *err = resp_array_init();
    736     resp_array_append_simple(err, "ERR failed to create socket");
    737     return err;
    738   }
    739 
    740   resp_object *res = resp_array_init();
    741   resp_array_append_int(res, sock->local_port);
    742   resp_object *udphole   = get_udphole_cfg();
    743   const char  *advertise = udphole ? resp_map_get_string(udphole, "advertise") : NULL;
    744   resp_array_append_bulk(res, advertise ? advertise : "");
    745   return res;
    746 }
    747 
    748 resp_object *domain_socket_destroy(const char *cmd, resp_object *args) {
    749   (void)cmd;
    750   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 3) {
    751     resp_object *err = resp_array_init();
    752     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.destroy'");
    753     return err;
    754   }
    755 
    756   const char *session_id = NULL;
    757   const char *socket_id  = NULL;
    758 
    759   if (args->u.arr.elem[1].type == RESPT_BULK) {
    760     session_id = args->u.arr.elem[1].u.s;
    761   }
    762   if (args->u.arr.elem[2].type == RESPT_BULK) {
    763     socket_id = args->u.arr.elem[2].u.s;
    764   }
    765 
    766   if (!session_id || !socket_id) {
    767     resp_object *err = resp_array_init();
    768     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.destroy'");
    769     return err;
    770   }
    771 
    772   session_t *s = find_session(session_id);
    773   if (!s) {
    774     resp_object *err = resp_error_init("ERR session not found");
    775     return err;
    776   }
    777 
    778   if (destroy_socket(s, socket_id) != 0) {
    779     resp_object *err = resp_array_init();
    780     resp_array_append_simple(err, "ERR socket not found");
    781     return err;
    782   }
    783 
    784   resp_object *res = resp_simple_init("OK");
    785   return res;
    786 }
    787 
    788 resp_object *domain_forward_list(const char *cmd, resp_object *args) {
    789   (void)cmd;
    790   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) {
    791     resp_object *err = resp_array_init();
    792     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.list'");
    793     return err;
    794   }
    795 
    796   const char *session_id = NULL;
    797   if (args->u.arr.elem[1].type == RESPT_BULK) {
    798     session_id = args->u.arr.elem[1].u.s;
    799   }
    800 
    801   if (!session_id) {
    802     resp_object *err = resp_array_init();
    803     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.list'");
    804     return err;
    805   }
    806 
    807   session_t *s = find_session(session_id);
    808   if (!s) {
    809     resp_object *err = resp_error_init("ERR session not found");
    810     return err;
    811   }
    812 
    813   resp_object *res = resp_array_init();
    814   for (size_t i = 0; i < s->forwards_count; i++) {
    815     resp_array_append_bulk(res, s->forwards[i].src_socket_id);
    816     resp_array_append_bulk(res, s->forwards[i].dst_socket_id);
    817   }
    818   return res;
    819 }
    820 
    821 resp_object *domain_forward_create(const char *cmd, resp_object *args) {
    822   (void)cmd;
    823   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 4) {
    824     resp_object *err = resp_array_init();
    825     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.create'");
    826     return err;
    827   }
    828 
    829   const char *session_id    = NULL;
    830   const char *src_socket_id = NULL;
    831   const char *dst_socket_id = NULL;
    832 
    833   if (args->u.arr.elem[1].type == RESPT_BULK) {
    834     session_id = args->u.arr.elem[1].u.s;
    835   }
    836   if (args->u.arr.elem[2].type == RESPT_BULK) {
    837     src_socket_id = args->u.arr.elem[2].u.s;
    838   }
    839   if (args->u.arr.elem[3].type == RESPT_BULK) {
    840     dst_socket_id = args->u.arr.elem[3].u.s;
    841   }
    842 
    843   if (!session_id || !src_socket_id || !dst_socket_id) {
    844     resp_object *err = resp_array_init();
    845     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.create'");
    846     return err;
    847   }
    848 
    849   session_t *s = find_session(session_id);
    850   if (!s) {
    851     resp_object *err = resp_error_init("ERR session not found");
    852     return err;
    853   }
    854 
    855   socket_t *src = find_socket(s, src_socket_id);
    856   if (!src) {
    857     resp_object *err = resp_array_init();
    858     resp_array_append_simple(err, "ERR source socket not found");
    859     return err;
    860   }
    861 
    862   socket_t *dst = find_socket(s, dst_socket_id);
    863   if (!dst) {
    864     resp_object *err = resp_array_init();
    865     resp_array_append_simple(err, "ERR destination socket not found");
    866     return err;
    867   }
    868 
    869   if (add_forward(s, src_socket_id, dst_socket_id) != 0) {
    870     resp_object *err = resp_array_init();
    871     resp_array_append_simple(err, "ERR failed to add forward");
    872     return err;
    873   }
    874 
    875   log_debug("udphole: created forward %s -> %s in session %s", src_socket_id, dst_socket_id, session_id);
    876 
    877   resp_object *res = resp_simple_init("OK");
    878   return res;
    879 }
    880 
    881 resp_object *domain_forward_destroy(const char *cmd, resp_object *args) {
    882   (void)cmd;
    883   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 4) {
    884     resp_object *err = resp_array_init();
    885     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.destroy'");
    886     return err;
    887   }
    888 
    889   const char *session_id    = NULL;
    890   const char *src_socket_id = NULL;
    891   const char *dst_socket_id = NULL;
    892 
    893   if (args->u.arr.elem[1].type == RESPT_BULK) {
    894     session_id = args->u.arr.elem[1].u.s;
    895   }
    896   if (args->u.arr.elem[2].type == RESPT_BULK) {
    897     src_socket_id = args->u.arr.elem[2].u.s;
    898   }
    899   if (args->u.arr.elem[3].type == RESPT_BULK) {
    900     dst_socket_id = args->u.arr.elem[3].u.s;
    901   }
    902 
    903   if (!session_id || !src_socket_id || !dst_socket_id) {
    904     resp_object *err = resp_array_init();
    905     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.destroy'");
    906     return err;
    907   }
    908 
    909   session_t *s = find_session(session_id);
    910   if (!s) {
    911     resp_object *err = resp_error_init("ERR session not found");
    912     return err;
    913   }
    914 
    915   if (remove_forward(s, src_socket_id, dst_socket_id) != 0) {
    916     resp_object *err = resp_array_init();
    917     resp_array_append_simple(err, "ERR forward not found");
    918     return err;
    919   }
    920 
    921   resp_object *res = resp_simple_init("OK");
    922   return res;
    923 }
    924 
    925 resp_object *domain_system_load(const char *cmd, resp_object *args) {
    926   (void)cmd;
    927   (void)args;
    928 
    929   double loadavg[3];
    930   if (getloadavg(loadavg, 3) != 3) {
    931     resp_object *err = resp_array_init();
    932     resp_array_append_simple(err, "ERR failed to get load average");
    933     return err;
    934   }
    935 
    936   resp_object *res = resp_array_init();
    937   char         buf[64];
    938 
    939   resp_array_append_bulk(res, "1min");
    940   snprintf(buf, sizeof(buf), "%.2f", loadavg[0]);
    941   resp_array_append_bulk(res, buf);
    942 
    943   resp_array_append_bulk(res, "5min");
    944   snprintf(buf, sizeof(buf), "%.2f", loadavg[1]);
    945   resp_array_append_bulk(res, buf);
    946 
    947   resp_array_append_bulk(res, "15min");
    948   snprintf(buf, sizeof(buf), "%.2f", loadavg[2]);
    949   resp_array_append_bulk(res, buf);
    950 
    951   return res;
    952 }
    953 
    954 resp_object *domain_session_count(const char *cmd, resp_object *args) {
    955   (void)cmd;
    956   (void)args;
    957 
    958   size_t count = 0;
    959   for (size_t i = 0; i < sessions_count; i++) {
    960     if (sessions[i] != NULL) {
    961       count++;
    962     }
    963   }
    964 
    965   resp_object *res = malloc(sizeof(resp_object));
    966   if (!res) return NULL;
    967   res->type = RESPT_INT;
    968   res->u.i  = (long long)count;
    969   return res;
    970 }
    971 
    972 int session_manager_pt(int64_t timestamp, struct pt_task *task) {
    973   session_manager_udata_t *udata = task->udata;
    974 
    975   resp_object *udphole = get_udphole_cfg();
    976   if (!udphole) {
    977     return SCHED_RUNNING;
    978   }
    979 
    980   if (!udata->initialized) {
    981     const char *ports_str = resp_map_get_string(udphole, "ports");
    982     int         port_low = 7000, port_high = 7999;
    983     if (ports_str) sscanf(ports_str, "%d-%d", &port_low, &port_high);
    984     log_info("udphole: manager started with port range %d-%d", port_low, port_high);
    985     udata->initialized = 1;
    986   }
    987 
    988   if (timestamp - udata->last_cleanup >= 1000) {
    989     cleanup_expired_sessions();
    990     udata->last_cleanup = timestamp;
    991   }
    992 
    993   return SCHED_RUNNING;
    994 }