udphole

Basic UDP wormhole proxy
git clone git://git.finwo.net/app/udphole
Log | Files | Refs | README | LICENSE

session.c (28520B)


      1 #include <stdlib.h>
      2 #include <string.h>
      3 #include <stdio.h>
      4 #include <stdarg.h>
      5 #include <unistd.h>
      6 #include <errno.h>
      7 #include <fcntl.h>
      8 #include <time.h>
      9 #include <sys/socket.h>
     10 #include <sys/un.h>
     11 #include <sys/stat.h>
     12 #include <netinet/in.h>
     13 #include <arpa/inet.h>
     14 
     15 #include "rxi/log.h"
     16 #include "domain/protothreads.h"
     17 #include "domain/scheduler.h"
     18 #include "domain/config.h"
     19 #include "common/socket_util.h"
     20 #include "common/resp.h"
     21 #include "tidwall/hashmap.h"
     22 #include "session.h"
     23 
     24 #define SESSION_HASH_SIZE 256
     25 #define BUFFER_SIZE 4096
     26 #define DEFAULT_IDLE_EXPIRY 60
     27 
     28 typedef struct socket {
     29   char *socket_id;
     30   int *fds;
     31   int local_port;
     32   int mode;
     33   struct sockaddr_storage remote_addr;
     34   socklen_t remote_addrlen;
     35   int learned_valid;
     36   struct sockaddr_storage learned_addr;
     37   socklen_t learned_addrlen;
     38 } socket_t;
     39 
     40 typedef struct forward {
     41   char *src_socket_id;
     42   char *dst_socket_id;
     43 } forward_t;
     44 
     45 typedef struct session {
     46   char *session_id;
     47   time_t idle_expiry;
     48   time_t created;
     49   time_t last_activity;
     50   socket_t **sockets;
     51   size_t sockets_count;
     52   forward_t *forwards;
     53   size_t forwards_count;
     54   int marked_for_deletion;
     55   int *ready_fds;
     56   int *all_fds;
     57   struct pt pt;
     58   struct pt_task *task;
     59 } session_t;
     60 
     61 static session_t **sessions = NULL;
     62 static size_t sessions_count = 0;
     63 static int running = 0;
     64 
     65 static session_t *find_session(const char *session_id) {
     66   for (size_t i = 0; i < sessions_count; i++) {
     67     if (strcmp(sessions[i]->session_id, session_id) == 0) {
     68       return sessions[i];
     69     }
     70   }
     71   return NULL;
     72 }
     73 
     74 static uint64_t socket_hash(const void *item, uint64_t seed0, uint64_t seed1) {
     75   const socket_t *s = item;
     76   return hashmap_sip(s->socket_id, strlen(s->socket_id), seed0, seed1);
     77 }
     78 
     79 static int socket_compare(const void *a, const void *b, void *udata) {
     80   (void)udata;
     81   const socket_t *sa = a;
     82   const socket_t *sb = b;
     83   return strcmp(sa->socket_id, sb->socket_id);
     84 }
     85 
     86 static socket_t *find_socket(session_t *s, const char *socket_id) {
     87   if (!s || !s->sockets || !socket_id) return NULL;
     88   for (size_t i = 0; i < s->sockets_count; i++) {
     89     if (s->sockets[i] && strcmp(s->sockets[i]->socket_id, socket_id) == 0) {
     90       return s->sockets[i];
     91     }
     92   }
     93   return NULL;
     94 }
     95 
     96 static int alloc_port(void) {
     97   if (!domain_cfg) return 0;
     98   for (int i = 0; i < domain_cfg->port_high - domain_cfg->port_low; i++) {
     99     int port = domain_cfg->port_cur + i;
    100     if (port > domain_cfg->port_high) port = domain_cfg->port_low;
    101     domain_cfg->port_cur = port + 1;
    102     if (domain_cfg->port_cur > domain_cfg->port_high) domain_cfg->port_cur = domain_cfg->port_low;
    103 
    104     struct sockaddr_in addr;
    105     memset(&addr, 0, sizeof(addr));
    106     addr.sin_family = AF_INET;
    107     addr.sin_addr.s_addr = INADDR_ANY;
    108     addr.sin_port = htons(port);
    109 
    110     int udp_fd = socket(AF_INET, SOCK_DGRAM, 0);
    111     if (udp_fd < 0) continue;
    112     int ok = (bind(udp_fd, (struct sockaddr *)&addr, sizeof(addr)) == 0);
    113     close(udp_fd);
    114     if (!ok) continue;
    115 
    116     return port;
    117   }
    118   return 0;
    119 }
    120 
    121 static int parse_ip_addr(const char *ip_str, int port, struct sockaddr_storage *addr, socklen_t *addrlen) {
    122   memset(addr, 0, sizeof(*addr));
    123 
    124   struct sockaddr_in *addr4 = (struct sockaddr_in *)addr;
    125   if (inet_pton(AF_INET, ip_str, &addr4->sin_addr) == 1) {
    126     addr4->sin_family = AF_INET;
    127     addr4->sin_port = htons(port);
    128     *addrlen = sizeof(*addr4);
    129     return 0;
    130   }
    131 
    132   struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)addr;
    133   if (inet_pton(AF_INET6, ip_str, &addr6->sin6_addr) == 1) {
    134     addr6->sin6_family = AF_INET6;
    135     addr6->sin6_port = htons(port);
    136     *addrlen = sizeof(*addr6);
    137     return 0;
    138   }
    139 
    140   return -1;
    141 }
    142 
    143 static void close_socket(socket_t *sock) {
    144   if (!sock || !sock->fds) return;
    145   for (int i = 1; i <= sock->fds[0]; i++) {
    146     if (sock->fds[i] >= 0) {
    147       close(sock->fds[i]);
    148     }
    149   }
    150   free(sock->fds);
    151   sock->fds = NULL;
    152 }
    153 
    154 static void free_socket(socket_t *sock) {
    155   if (!sock) return;
    156   close_socket(sock);
    157   free(sock->socket_id);
    158   free(sock);
    159 }
    160 
    161 static void destroy_session(session_t *s) {
    162   if (!s) return;
    163   s->marked_for_deletion = 1;
    164 
    165   for (size_t i = 0; i < s->sockets_count; i++) {
    166     if (s->sockets[i]) {
    167       free_socket(s->sockets[i]);
    168     }
    169   }
    170   free(s->sockets);
    171 
    172   for (size_t i = 0; i < s->forwards_count; i++) {
    173     free(s->forwards[i].src_socket_id);
    174     free(s->forwards[i].dst_socket_id);
    175   }
    176   free(s->forwards);
    177 
    178   free(s->session_id);
    179   free(s);
    180 
    181   for (size_t i = 0; i < sessions_count; i++) {
    182     if (sessions[i] == s) {
    183       for (size_t j = i; j < sessions_count - 1; j++) {
    184         sessions[j] = sessions[j + 1];
    185       }
    186       sessions_count--;
    187       break;
    188     }
    189   }
    190 }
    191 
    192 static session_t *create_session(const char *session_id, int idle_expiry) {
    193   const session_t *cs = find_session(session_id);
    194   if (cs) return (session_t *)cs;
    195 
    196   session_t *s = calloc(1, sizeof(*s));
    197   if (!s) return NULL;
    198 
    199   s->session_id = strdup(session_id);
    200   s->created = time(NULL);
    201   s->last_activity = s->created;
    202   s->idle_expiry = idle_expiry > 0 ? idle_expiry : DEFAULT_IDLE_EXPIRY;
    203 
    204   sessions = realloc(sessions, sizeof(session_t *) * (sessions_count + 1));
    205   sessions[sessions_count++] = s;
    206 
    207   return s;
    208 }
    209 
    210 static void cleanup_expired_sessions(void) {
    211   if (!sessions) return;
    212   time_t now = time(NULL);
    213 
    214   for (size_t i = 0; i < sessions_count; i++) {
    215     session_t *s = sessions[i];
    216     if (!s) continue;
    217     if (now - s->last_activity > s->idle_expiry) {
    218       log_debug("udphole: session %s expired (idle %ld > expiry %ld)",
    219                 s->session_id, (long)(now - s->last_activity), (long)s->idle_expiry);
    220       destroy_session(s);
    221     }
    222   }
    223 }
    224 
    225 static int add_forward(session_t *s, const char *src_id, const char *dst_id) {
    226   for (size_t i = 0; i < s->forwards_count; i++) {
    227     if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 &&
    228         strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) {
    229       return 0;
    230     }
    231   }
    232 
    233   forward_t *new_forwards = realloc(s->forwards, sizeof(forward_t) * (s->forwards_count + 1));
    234   if (!new_forwards) return -1;
    235   s->forwards = new_forwards;
    236 
    237   s->forwards[s->forwards_count].src_socket_id = strdup(src_id);
    238   s->forwards[s->forwards_count].dst_socket_id = strdup(dst_id);
    239   s->forwards_count++;
    240 
    241   return 0;
    242 }
    243 
    244 static int remove_forward(session_t *s, const char *src_id, const char *dst_id) {
    245   for (size_t i = 0; i < s->forwards_count; i++) {
    246     if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 &&
    247         strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) {
    248       free(s->forwards[i].src_socket_id);
    249       free(s->forwards[i].dst_socket_id);
    250       for (size_t j = i; j < s->forwards_count - 1; j++) {
    251         s->forwards[j] = s->forwards[j + 1];
    252       }
    253       s->forwards_count--;
    254       return 0;
    255     }
    256   }
    257   return -1;
    258 }
    259 
    260 static socket_t *create_listen_socket(session_t *sess, const char *socket_id) {
    261   socket_t *existing = find_socket(sess, socket_id);
    262   if (existing) return existing;
    263 
    264   int port = alloc_port();
    265   if (!port) {
    266     log_error("udphole: no ports available");
    267     return NULL;
    268   }
    269 
    270   char port_str[16];
    271   snprintf(port_str, sizeof(port_str), "%d", port);
    272   int *fds = udp_recv(port_str, NULL, NULL);
    273   if (!fds || fds[0] == 0) {
    274     log_error("udphole: failed to create UDP socket on port %d", port);
    275     free(fds);
    276     return NULL;
    277   }
    278 
    279   socket_t *sock = calloc(1, sizeof(*sock));
    280   if (!sock) {
    281     free(fds);
    282     return NULL;
    283   }
    284 
    285   sock->socket_id = strdup(socket_id);
    286   sock->fds = fds;
    287   sock->local_port = port;
    288   sock->mode = 0;
    289   sock->learned_valid = 0;
    290 
    291   sess->sockets = realloc(sess->sockets, sizeof(socket_t *) * (sess->sockets_count + 1));
    292   sess->sockets[sess->sockets_count++] = sock;
    293 
    294   log_debug("udphole: created listen socket %s in session %s on port %d",
    295             socket_id, sess->session_id, port);
    296   return sock;
    297 }
    298 
    299 static socket_t *create_connect_socket(session_t *sess, const char *socket_id,
    300                                        const char *ip, int port) {
    301   socket_t *existing = find_socket(sess, socket_id);
    302   if (existing) return existing;
    303 
    304   int local_port = alloc_port();
    305   if (!local_port) {
    306     log_error("udphole: no ports available");
    307     return NULL;
    308   }
    309 
    310   char port_str[16];
    311   snprintf(port_str, sizeof(port_str), "%d", local_port);
    312   int *fds = udp_recv(port_str, NULL, NULL);
    313   if (!fds || fds[0] == 0) {
    314     log_error("udphole: failed to create UDP socket on port %d", local_port);
    315     free(fds);
    316     return NULL;
    317   }
    318 
    319   struct sockaddr_storage remote_addr;
    320   socklen_t remote_addrlen;
    321   if (parse_ip_addr(ip, port, &remote_addr, &remote_addrlen) != 0) {
    322     log_error("udphole: invalid remote address %s:%d", ip, port);
    323     free(fds);
    324     return NULL;
    325   }
    326 
    327   socket_t *sock = calloc(1, sizeof(*sock));
    328   if (!sock) {
    329     free(fds);
    330     return NULL;
    331   }
    332 
    333   sock->socket_id = strdup(socket_id);
    334   sock->fds = fds;
    335   sock->local_port = local_port;
    336   sock->mode = 1;
    337   sock->remote_addr = remote_addr;
    338   sock->remote_addrlen = remote_addrlen;
    339   sock->learned_valid = 0;
    340 
    341   sess->sockets = realloc(sess->sockets, sizeof(socket_t *) * (sess->sockets_count + 1));
    342   sess->sockets[sess->sockets_count++] = sock;
    343 
    344   log_debug("udphole: created connect socket %s in session %s on port %d -> %s:%d",
    345             socket_id, sess->session_id, local_port, ip, port);
    346   return sock;
    347 }
    348 
    349 static int destroy_socket(session_t *sess, const char *socket_id) {
    350   socket_t *sock = find_socket(sess, socket_id);
    351   if (!sock) return -1;
    352 
    353   for (size_t i = 0; i < sess->sockets_count; i++) {
    354     if (sess->sockets[i] == sock) {
    355       sess->sockets[i] = NULL;
    356       break;
    357     }
    358   }
    359   free_socket(sock);
    360 
    361   for (size_t i = 0; i < sess->forwards_count; ) {
    362     if (strcmp(sess->forwards[i].src_socket_id, socket_id) == 0 ||
    363         strcmp(sess->forwards[i].dst_socket_id, socket_id) == 0) {
    364       free(sess->forwards[i].src_socket_id);
    365       free(sess->forwards[i].dst_socket_id);
    366       for (size_t j = i; j < sess->forwards_count - 1; j++) {
    367         sess->forwards[j] = sess->forwards[j + 1];
    368       }
    369       sess->forwards_count--;
    370     } else {
    371       i++;
    372     }
    373   }
    374 
    375   return 0;
    376 }
    377 
    378 static socket_t *find_socket_by_fd(session_t *s, int fd) {
    379   if (!s || !s->sockets) return NULL;
    380   for (size_t j = 0; j < s->sockets_count; j++) {
    381     socket_t *sock = s->sockets[j];
    382     if (!sock || !sock->fds) continue;
    383     for (int i = 1; i <= sock->fds[0]; i++) {
    384       if (sock->fds[i] == fd) {
    385         return sock;
    386       }
    387     }
    388   }
    389   return NULL;
    390 }
    391 
    392 PT_THREAD(session_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) {
    393   session_t *s = task->udata;
    394 
    395   (void)timestamp;
    396   PT_BEGIN(pt);
    397 
    398   char buffer[BUFFER_SIZE];
    399 
    400   for (;;) {
    401     if (s->marked_for_deletion) {
    402       break;
    403     }
    404 
    405     if (!s->sockets || s->sockets_count == 0) {
    406       PT_YIELD(pt);
    407       continue;
    408     }
    409 
    410     s->all_fds = realloc(s->all_fds, sizeof(int) * (s->sockets_count * 2 + 1));
    411     if (!s->all_fds) {
    412       PT_YIELD(pt);
    413       continue;
    414     }
    415     s->all_fds[0] = 0;
    416 
    417     for (size_t j = 0; j < s->sockets_count; j++) {
    418       socket_t *sock = s->sockets[j];
    419       if (!sock || !sock->fds) continue;
    420       for (int i = 1; i <= sock->fds[0]; i++) {
    421         s->all_fds[++s->all_fds[0]] = sock->fds[i];
    422       }
    423     }
    424 
    425     if (s->all_fds[0] == 0) {
    426       PT_YIELD(pt);
    427       continue;
    428     }
    429 
    430     PT_WAIT_UNTIL(pt, domain_schedmod_has_data(s->all_fds, &s->ready_fds) > 0);
    431 
    432     if (!s->ready_fds || s->ready_fds[0] == 0) {
    433       PT_YIELD(pt);
    434       continue;
    435     }
    436 
    437     for (int r = 1; r <= s->ready_fds[0]; r++) {
    438       int ready_fd = s->ready_fds[r];
    439 
    440       socket_t *src_sock = find_socket_by_fd(s, ready_fd);
    441       if (!src_sock) continue;
    442 
    443       struct sockaddr_storage from_addr;
    444       socklen_t from_len = sizeof(from_addr);
    445       ssize_t n = recvfrom(ready_fd, buffer, sizeof(buffer) - 1, 0,
    446                            (struct sockaddr *)&from_addr, &from_len);
    447 
    448       if (n <= 0) {
    449         if (errno != EAGAIN && errno != EWOULDBLOCK) {
    450           log_warn("udphole: recvfrom error on socket %s: %s",
    451                    src_sock->socket_id, strerror(errno));
    452         }
    453         continue;
    454       }
    455 
    456       s->last_activity = time(NULL);
    457 
    458       if (src_sock->mode == 0 && !src_sock->learned_valid) {
    459         src_sock->learned_addr = from_addr;
    460         src_sock->learned_addrlen = from_len;
    461         src_sock->learned_valid = 1;
    462         log_debug("udphole: socket %s learned remote address", src_sock->socket_id);
    463       }
    464 
    465       for (size_t i = 0; i < s->forwards_count; i++) {
    466         if (strcmp(s->forwards[i].src_socket_id, src_sock->socket_id) != 0) {
    467           continue;
    468         }
    469 
    470         socket_t *dst_sock = find_socket(s, s->forwards[i].dst_socket_id);
    471         if (!dst_sock || !dst_sock->fds || dst_sock->fds[0] == 0) continue;
    472 
    473         struct sockaddr *dest_addr = NULL;
    474         socklen_t dest_addrlen = 0;
    475 
    476         if (dst_sock->mode == 1) {
    477           dest_addr = (struct sockaddr *)&dst_sock->remote_addr;
    478           dest_addrlen = dst_sock->remote_addrlen;
    479         } else if (dst_sock->learned_valid) {
    480           dest_addr = (struct sockaddr *)&dst_sock->learned_addr;
    481           dest_addrlen = dst_sock->learned_addrlen;
    482         }
    483 
    484         if (dest_addr && dest_addrlen > 0) {
    485           int dst_fd = dst_sock->fds[1];
    486           ssize_t sent = sendto(dst_fd, buffer, n, 0, dest_addr, dest_addrlen);
    487           if (sent < 0) {
    488             log_warn("udphole: forward failed %s -> %s: %s",
    489                      src_sock->socket_id, dst_sock->socket_id, strerror(errno));
    490           }
    491         }
    492       }
    493     }
    494 
    495   }
    496 
    497   log_debug("udphole: session %s protothread exiting", s->session_id);
    498 
    499   if (s->all_fds) {
    500     free(s->all_fds);
    501     s->all_fds = NULL;
    502   }
    503   if (s->ready_fds) {
    504     free(s->ready_fds);
    505     s->ready_fds = NULL;
    506   }
    507 
    508   PT_END(pt);
    509 }
    510 
    511 static void spawn_session_pt(session_t *s) {
    512   s->task = (struct pt_task *)(intptr_t)domain_schedmod_pt_create(session_pt, s);
    513 }
    514 
    515 resp_object *domain_session_create(const char *cmd, resp_object *args) {
    516   (void)cmd;
    517   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) {
    518     resp_object *err = resp_array_init();
    519     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.create'");
    520     return err;
    521   }
    522 
    523   const char *session_id = NULL;
    524   if (args->u.arr.n > 1 && args->u.arr.elem[1].type == RESPT_BULK) {
    525     session_id = args->u.arr.elem[1].u.s;
    526   }
    527 
    528   if (!session_id) {
    529     resp_object *err = resp_array_init();
    530     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.create'");
    531     return err;
    532   }
    533 
    534   int idle_expiry = 0;
    535   if (args->u.arr.n >= 3 && args->u.arr.elem[2].type == RESPT_BULK && args->u.arr.elem[2].u.s) {
    536     idle_expiry = atoi(args->u.arr.elem[2].u.s);
    537   }
    538 
    539   session_t *s = create_session(session_id, idle_expiry);
    540   if (!s) {
    541     resp_object *err = resp_array_init();
    542     resp_array_append_simple(err, "ERR failed to create session");
    543     return err;
    544   }
    545 
    546   spawn_session_pt(s);
    547 
    548   resp_object *res = resp_array_init();
    549   resp_array_append_simple(res, "OK");
    550   return res;
    551 }
    552 
    553 resp_object *domain_session_list(const char *cmd, resp_object *args) {
    554   (void)cmd;
    555   (void)args;
    556 
    557   resp_object *res = resp_array_init();
    558   if (!res) return NULL;
    559 
    560   for (size_t i = 0; i < sessions_count; i++) {
    561     session_t *s = sessions[i];
    562     if (!s) continue;
    563     resp_array_append_bulk(res, s->session_id);
    564   }
    565 
    566   return res;
    567 }
    568 
    569 resp_object *domain_session_info(const char *cmd, resp_object *args) {
    570   (void)cmd;
    571   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) {
    572     resp_object *err = resp_array_init();
    573     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.info'");
    574     return err;
    575   }
    576 
    577   const char *session_id = NULL;
    578   if (args->u.arr.elem[1].type == RESPT_BULK) {
    579     session_id = args->u.arr.elem[1].u.s;
    580   }
    581 
    582   if (!session_id) {
    583     resp_object *err = resp_array_init();
    584     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.info'");
    585     return err;
    586   }
    587 
    588   session_t *s = find_session(session_id);
    589   if (!s) {
    590     resp_object *err = resp_array_init();
    591     resp_array_append_simple(err, "ERR session not found");
    592     return err;
    593   }
    594 
    595   resp_object *res = resp_array_init();
    596   if (!res) return NULL;
    597 
    598   resp_array_append_bulk(res, "session_id");
    599   resp_array_append_bulk(res, s->session_id);
    600 
    601   resp_array_append_bulk(res, "created");
    602   resp_array_append_int(res, (long long)s->created);
    603 
    604   resp_array_append_bulk(res, "last_activity");
    605   resp_array_append_int(res, (long long)s->last_activity);
    606 
    607   resp_array_append_bulk(res, "idle_expiry");
    608   resp_array_append_int(res, (long long)s->idle_expiry);
    609 
    610   resp_array_append_bulk(res, "sockets");
    611   resp_object *sockets_arr = resp_array_init();
    612   for (size_t i = 0; i < s->sockets_count; i++) {
    613     socket_t *sock = s->sockets[i];
    614     if (!sock) continue;
    615     resp_array_append_bulk(sockets_arr, sock->socket_id);
    616   }
    617   resp_array_append_obj(res, sockets_arr);
    618 
    619   resp_array_append_bulk(res, "forwards");
    620   resp_object *forwards_arr = resp_array_init();
    621   for (size_t i = 0; i < s->forwards_count; i++) {
    622     resp_array_append_bulk(forwards_arr, s->forwards[i].src_socket_id);
    623     resp_array_append_bulk(forwards_arr, s->forwards[i].dst_socket_id);
    624   }
    625   resp_array_append_obj(res, forwards_arr);
    626 
    627   resp_array_append_bulk(res, "marked_for_deletion");
    628   resp_array_append_int(res, s->marked_for_deletion);
    629 
    630   return res;
    631 }
    632 
    633 resp_object *domain_session_destroy(const char *cmd, resp_object *args) {
    634   (void)cmd;
    635   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) {
    636     resp_object *err = resp_array_init();
    637     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.destroy'");
    638     return err;
    639   }
    640 
    641   const char *session_id = NULL;
    642   if (args->u.arr.elem[1].type == RESPT_BULK) {
    643     session_id = args->u.arr.elem[1].u.s;
    644   }
    645 
    646   if (!session_id) {
    647     resp_object *err = resp_array_init();
    648     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.destroy'");
    649     return err;
    650   }
    651 
    652   session_t *s = find_session(session_id);
    653   if (!s) {
    654     resp_object *err = resp_array_init();
    655     resp_array_append_simple(err, "ERR session not found");
    656     return err;
    657   }
    658 
    659   destroy_session(s);
    660 
    661   resp_object *res = resp_array_init();
    662   resp_array_append_simple(res, "OK");
    663   return res;
    664 }
    665 
    666 resp_object *domain_socket_create_listen(const char *cmd, resp_object *args) {
    667   (void)cmd;
    668   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 3) {
    669     resp_object *err = resp_array_init();
    670     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.listen'");
    671     return err;
    672   }
    673 
    674   const char *session_id = NULL;
    675   const char *socket_id = NULL;
    676 
    677   if (args->u.arr.elem[1].type == RESPT_BULK) {
    678     session_id = args->u.arr.elem[1].u.s;
    679   }
    680   if (args->u.arr.elem[2].type == RESPT_BULK) {
    681     socket_id = args->u.arr.elem[2].u.s;
    682   }
    683 
    684   if (!session_id || !socket_id) {
    685     resp_object *err = resp_array_init();
    686     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.listen'");
    687     return err;
    688   }
    689 
    690   session_t *s = find_session(session_id);
    691   if (!s) {
    692     resp_object *err = resp_array_init();
    693     resp_array_append_simple(err, "ERR session not found");
    694     return err;
    695   }
    696 
    697   socket_t *sock = create_listen_socket(s, socket_id);
    698   if (!sock) {
    699     resp_object *err = resp_array_init();
    700     resp_array_append_simple(err, "ERR failed to create socket");
    701     return err;
    702   }
    703 
    704   resp_object *res = resp_array_init();
    705   resp_array_append_int(res, sock->local_port);
    706   resp_array_append_bulk(res, domain_cfg && domain_cfg->advertise_addr ? domain_cfg->advertise_addr : "");
    707   return res;
    708 }
    709 
    710 resp_object *domain_socket_create_connect(const char *cmd, resp_object *args) {
    711   (void)cmd;
    712   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 5) {
    713     resp_object *err = resp_array_init();
    714     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.connect'");
    715     return err;
    716   }
    717 
    718   const char *session_id = NULL;
    719   const char *socket_id = NULL;
    720   const char *ip = NULL;
    721   const char *port_str = NULL;
    722 
    723   if (args->u.arr.elem[1].type == RESPT_BULK) {
    724     session_id = args->u.arr.elem[1].u.s;
    725   }
    726   if (args->u.arr.elem[2].type == RESPT_BULK) {
    727     socket_id = args->u.arr.elem[2].u.s;
    728   }
    729   if (args->u.arr.elem[3].type == RESPT_BULK) {
    730     ip = args->u.arr.elem[3].u.s;
    731   }
    732   if (args->u.arr.elem[4].type == RESPT_BULK) {
    733     port_str = args->u.arr.elem[4].u.s;
    734   }
    735 
    736   if (!session_id || !socket_id || !ip || !port_str) {
    737     resp_object *err = resp_array_init();
    738     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.connect'");
    739     return err;
    740   }
    741 
    742   int port = atoi(port_str);
    743 
    744   session_t *s = find_session(session_id);
    745   if (!s) {
    746     resp_object *err = resp_array_init();
    747     resp_array_append_simple(err, "ERR session not found");
    748     return err;
    749   }
    750 
    751   socket_t *sock = create_connect_socket(s, socket_id, ip, port);
    752   if (!sock) {
    753     resp_object *err = resp_array_init();
    754     resp_array_append_simple(err, "ERR failed to create socket");
    755     return err;
    756   }
    757 
    758   resp_object *res = resp_array_init();
    759   resp_array_append_int(res, sock->local_port);
    760   resp_array_append_bulk(res, domain_cfg && domain_cfg->advertise_addr ? domain_cfg->advertise_addr : "");
    761   return res;
    762 }
    763 
    764 resp_object *domain_socket_destroy(const char *cmd, resp_object *args) {
    765   (void)cmd;
    766   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 3) {
    767     resp_object *err = resp_array_init();
    768     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.destroy'");
    769     return err;
    770   }
    771 
    772   const char *session_id = NULL;
    773   const char *socket_id = NULL;
    774 
    775   if (args->u.arr.elem[1].type == RESPT_BULK) {
    776     session_id = args->u.arr.elem[1].u.s;
    777   }
    778   if (args->u.arr.elem[2].type == RESPT_BULK) {
    779     socket_id = args->u.arr.elem[2].u.s;
    780   }
    781 
    782   if (!session_id || !socket_id) {
    783     resp_object *err = resp_array_init();
    784     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.destroy'");
    785     return err;
    786   }
    787 
    788   session_t *s = find_session(session_id);
    789   if (!s) {
    790     resp_object *err = resp_array_init();
    791     resp_array_append_simple(err, "ERR session not found");
    792     return err;
    793   }
    794 
    795   if (destroy_socket(s, socket_id) != 0) {
    796     resp_object *err = resp_array_init();
    797     resp_array_append_simple(err, "ERR socket not found");
    798     return err;
    799   }
    800 
    801   resp_object *res = resp_array_init();
    802   resp_array_append_simple(res, "OK");
    803   return res;
    804 }
    805 
    806 resp_object *domain_forward_list(const char *cmd, resp_object *args) {
    807   (void)cmd;
    808   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) {
    809     resp_object *err = resp_array_init();
    810     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.list'");
    811     return err;
    812   }
    813 
    814   const char *session_id = NULL;
    815   if (args->u.arr.elem[1].type == RESPT_BULK) {
    816     session_id = args->u.arr.elem[1].u.s;
    817   }
    818 
    819   if (!session_id) {
    820     resp_object *err = resp_array_init();
    821     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.list'");
    822     return err;
    823   }
    824 
    825   session_t *s = find_session(session_id);
    826   if (!s) {
    827     resp_object *err = resp_array_init();
    828     resp_array_append_simple(err, "ERR session not found");
    829     return err;
    830   }
    831 
    832   resp_object *res = resp_array_init();
    833   for (size_t i = 0; i < s->forwards_count; i++) {
    834     resp_array_append_bulk(res, s->forwards[i].src_socket_id);
    835     resp_array_append_bulk(res, s->forwards[i].dst_socket_id);
    836   }
    837   return res;
    838 }
    839 
    840 resp_object *domain_forward_create(const char *cmd, resp_object *args) {
    841   (void)cmd;
    842   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 4) {
    843     resp_object *err = resp_array_init();
    844     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.create'");
    845     return err;
    846   }
    847 
    848   const char *session_id = NULL;
    849   const char *src_socket_id = NULL;
    850   const char *dst_socket_id = NULL;
    851 
    852   if (args->u.arr.elem[1].type == RESPT_BULK) {
    853     session_id = args->u.arr.elem[1].u.s;
    854   }
    855   if (args->u.arr.elem[2].type == RESPT_BULK) {
    856     src_socket_id = args->u.arr.elem[2].u.s;
    857   }
    858   if (args->u.arr.elem[3].type == RESPT_BULK) {
    859     dst_socket_id = args->u.arr.elem[3].u.s;
    860   }
    861 
    862   if (!session_id || !src_socket_id || !dst_socket_id) {
    863     resp_object *err = resp_array_init();
    864     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.create'");
    865     return err;
    866   }
    867 
    868   session_t *s = find_session(session_id);
    869   if (!s) {
    870     resp_object *err = resp_array_init();
    871     resp_array_append_simple(err, "ERR session not found");
    872     return err;
    873   }
    874 
    875   socket_t *src = find_socket(s, src_socket_id);
    876   if (!src) {
    877     resp_object *err = resp_array_init();
    878     resp_array_append_simple(err, "ERR source socket not found");
    879     return err;
    880   }
    881 
    882   socket_t *dst = find_socket(s, dst_socket_id);
    883   if (!dst) {
    884     resp_object *err = resp_array_init();
    885     resp_array_append_simple(err, "ERR destination socket not found");
    886     return err;
    887   }
    888 
    889   if (add_forward(s, src_socket_id, dst_socket_id) != 0) {
    890     resp_object *err = resp_array_init();
    891     resp_array_append_simple(err, "ERR failed to add forward");
    892     return err;
    893   }
    894 
    895   log_debug("udphole: created forward %s -> %s in session %s", src_socket_id, dst_socket_id, session_id);
    896 
    897   resp_object *res = resp_array_init();
    898   resp_array_append_simple(res, "OK");
    899   return res;
    900 }
    901 
    902 resp_object *domain_forward_destroy(const char *cmd, resp_object *args) {
    903   (void)cmd;
    904   if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 4) {
    905     resp_object *err = resp_array_init();
    906     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.destroy'");
    907     return err;
    908   }
    909 
    910   const char *session_id = NULL;
    911   const char *src_socket_id = NULL;
    912   const char *dst_socket_id = NULL;
    913 
    914   if (args->u.arr.elem[1].type == RESPT_BULK) {
    915     session_id = args->u.arr.elem[1].u.s;
    916   }
    917   if (args->u.arr.elem[2].type == RESPT_BULK) {
    918     src_socket_id = args->u.arr.elem[2].u.s;
    919   }
    920   if (args->u.arr.elem[3].type == RESPT_BULK) {
    921     dst_socket_id = args->u.arr.elem[3].u.s;
    922   }
    923 
    924   if (!session_id || !src_socket_id || !dst_socket_id) {
    925     resp_object *err = resp_array_init();
    926     resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.destroy'");
    927     return err;
    928   }
    929 
    930   session_t *s = find_session(session_id);
    931   if (!s) {
    932     resp_object *err = resp_array_init();
    933     resp_array_append_simple(err, "ERR session not found");
    934     return err;
    935   }
    936 
    937   if (remove_forward(s, src_socket_id, dst_socket_id) != 0) {
    938     resp_object *err = resp_array_init();
    939     resp_array_append_simple(err, "ERR forward not found");
    940     return err;
    941   }
    942 
    943   resp_object *res = resp_array_init();
    944   resp_array_append_simple(res, "OK");
    945   return res;
    946 }
    947 
    948 resp_object *domain_system_load(const char *cmd, resp_object *args) {
    949   (void)cmd;
    950   (void)args;
    951 
    952   double loadavg[3];
    953   if (getloadavg(loadavg, 3) != 3) {
    954     resp_object *err = resp_array_init();
    955     resp_array_append_simple(err, "ERR failed to get load average");
    956     return err;
    957   }
    958 
    959   resp_object *res = resp_array_init();
    960   char buf[64];
    961 
    962   resp_array_append_bulk(res, "1min");
    963   snprintf(buf, sizeof(buf), "%.2f", loadavg[0]);
    964   resp_array_append_bulk(res, buf);
    965 
    966   resp_array_append_bulk(res, "5min");
    967   snprintf(buf, sizeof(buf), "%.2f", loadavg[1]);
    968   resp_array_append_bulk(res, buf);
    969 
    970   resp_array_append_bulk(res, "15min");
    971   snprintf(buf, sizeof(buf), "%.2f", loadavg[2]);
    972   resp_array_append_bulk(res, buf);
    973 
    974   return res;
    975 }
    976 
    977 resp_object *domain_session_count(const char *cmd, resp_object *args) {
    978   (void)cmd;
    979   (void)args;
    980 
    981   size_t count = 0;
    982   for (size_t i = 0; i < sessions_count; i++) {
    983     if (sessions[i] != NULL) {
    984       count++;
    985     }
    986   }
    987 
    988   resp_object *res = malloc(sizeof(resp_object));
    989   if (!res) return NULL;
    990   res->type = RESPT_INT;
    991   res->u.i = (long long)count;
    992   return res;
    993 }
    994 
    995 PT_THREAD(session_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) {
    996   (void)timestamp;
    997   log_trace("session_manager: protothread entry");
    998   PT_BEGIN(pt);
    999 
   1000   PT_WAIT_UNTIL(pt, domain_cfg);
   1001 
   1002   running = 1;
   1003   log_info("udphole: manager started with port range %d-%d", domain_cfg->port_low, domain_cfg->port_high);
   1004 
   1005   int64_t last_cleanup = 0;
   1006 
   1007   for (;;) {
   1008     int64_t now = (int64_t)(time(NULL));
   1009     if (now - last_cleanup >= 1) {
   1010       cleanup_expired_sessions();
   1011       last_cleanup = now;
   1012     }
   1013 
   1014     PT_YIELD(pt);
   1015   }
   1016 
   1017   running = 0;
   1018 
   1019   for (size_t i = 0; i < sessions_count; i++) {
   1020     if (sessions[i]) {
   1021       sessions[i]->marked_for_deletion = 1;
   1022     }
   1023   }
   1024 
   1025   PT_END(pt);
   1026 }