session.c (28102B)
1 #include "session.h" 2 3 #include <arpa/inet.h> 4 #include <errno.h> 5 #include <fcntl.h> 6 #include <netinet/in.h> 7 #include <stdarg.h> 8 #include <stdio.h> 9 #include <stdlib.h> 10 #include <string.h> 11 #include <sys/socket.h> 12 #include <sys/stat.h> 13 #include <sys/un.h> 14 #include <time.h> 15 #include <unistd.h> 16 17 #include "common/resp.h" 18 #include "common/scheduler.h" 19 #include "common/socket_util.h" 20 #include "domain/config.h" 21 #include "rxi/log.h" 22 23 static resp_object *get_udphole_cfg(void) { 24 return domain_cfg ? resp_map_get(domain_cfg, "udphole") : NULL; 25 } 26 27 #define SESSION_HASH_SIZE 256 28 #define BUFFER_SIZE 4096 29 #define DEFAULT_IDLE_EXPIRY 60 30 31 typedef struct socket { 32 char *socket_id; 33 int *fds; 34 int local_port; 35 int mode; 36 struct sockaddr_storage remote_addr; 37 socklen_t remote_addrlen; 38 int learned_valid; 39 struct sockaddr_storage learned_addr; 40 socklen_t learned_addrlen; 41 } socket_t; 42 43 typedef struct forward { 44 char *src_socket_id; 45 char *dst_socket_id; 46 } forward_t; 47 48 typedef struct session { 49 char *session_id; 50 time_t idle_expiry; 51 time_t created; 52 time_t last_activity; 53 socket_t **sockets; 54 size_t sockets_count; 55 forward_t *forwards; 56 size_t forwards_count; 57 int marked_for_deletion; 58 int *ready_fds; 59 int *all_fds; 60 struct pt_task *task; 61 } session_t; 62 63 static session_t **sessions = NULL; 64 static size_t sessions_count = 0; 65 66 static session_t *find_session(const char *session_id) { 67 for (size_t i = 0; i < sessions_count; i++) { 68 if (strcmp(sessions[i]->session_id, session_id) == 0) { 69 return sessions[i]; 70 } 71 } 72 return NULL; 73 } 74 75 static socket_t *find_socket(session_t *s, const char *socket_id) { 76 if (!s || !s->sockets || !socket_id) return NULL; 77 for (size_t i = 0; i < s->sockets_count; i++) { 78 if (s->sockets[i] && strcmp(s->sockets[i]->socket_id, socket_id) == 0) { 79 return s->sockets[i]; 80 } 81 } 82 return NULL; 83 } 84 85 // port_cur = last-assigned-port 86 int port_cur = 0; 87 static int alloc_port(void) { 88 resp_object *udphole = get_udphole_cfg(); 89 if (!udphole) return 0; 90 const char *ports_str = resp_map_get_string(udphole, "ports"); 91 int port_low = 7000, port_high = 7999; 92 if (ports_str) sscanf(ports_str, "%d-%d", &port_low, &port_high); 93 94 // Limit port tries 95 for (int i = 0; i < (port_high - port_low); i++) { 96 97 // Select next port 98 port_cur++; 99 if (port_cur < port_low ) port_cur = port_low; 100 if (port_cur > port_high) port_cur = port_low; 101 int port = port_cur; 102 103 // Build sockaddr 104 struct sockaddr_in addr; 105 memset(&addr, 0, sizeof(addr)); 106 addr.sin_family = AF_INET; 107 addr.sin_addr.s_addr = INADDR_ANY; 108 addr.sin_port = htons(port); 109 110 // Check if the port is free 111 int udp_fd = socket(AF_INET, SOCK_DGRAM, 0); 112 if (udp_fd < 0) continue; 113 int ok = (bind(udp_fd, (struct sockaddr *)&addr, sizeof(addr)) == 0); 114 close(udp_fd); 115 if (!ok) continue; 116 117 return port; 118 } 119 120 // Nothing found 121 return 0; 122 } 123 124 static int parse_ip_addr(const char *ip_str, int port, struct sockaddr_storage *addr, socklen_t *addrlen) { 125 memset(addr, 0, sizeof(*addr)); 126 127 struct sockaddr_in *addr4 = (struct sockaddr_in *)addr; 128 if (inet_pton(AF_INET, ip_str, &addr4->sin_addr) == 1) { 129 addr4->sin_family = AF_INET; 130 addr4->sin_port = htons(port); 131 *addrlen = sizeof(*addr4); 132 return 0; 133 } 134 135 struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)addr; 136 if (inet_pton(AF_INET6, ip_str, &addr6->sin6_addr) == 1) { 137 addr6->sin6_family = AF_INET6; 138 addr6->sin6_port = htons(port); 139 *addrlen = sizeof(*addr6); 140 return 0; 141 } 142 143 return -1; 144 } 145 146 static void close_socket(socket_t *sock) { 147 if (!sock || !sock->fds) return; 148 for (int i = 1; i <= sock->fds[0]; i++) { 149 if (sock->fds[i] >= 0) { 150 close(sock->fds[i]); 151 } 152 } 153 free(sock->fds); 154 sock->fds = NULL; 155 } 156 157 static void free_socket(socket_t *sock) { 158 if (!sock) return; 159 close_socket(sock); 160 free(sock->socket_id); 161 free(sock); 162 } 163 164 static void destroy_session(session_t *s) { 165 if (!s) return; 166 s->marked_for_deletion = 1; 167 168 for (size_t i = 0; i < s->sockets_count; i++) { 169 if (s->sockets[i]) { 170 free_socket(s->sockets[i]); 171 } 172 } 173 free(s->sockets); 174 175 for (size_t i = 0; i < s->forwards_count; i++) { 176 free(s->forwards[i].src_socket_id); 177 free(s->forwards[i].dst_socket_id); 178 } 179 free(s->forwards); 180 181 free(s->session_id); 182 free(s); 183 184 for (size_t i = 0; i < sessions_count; i++) { 185 if (sessions[i] == s) { 186 for (size_t j = i; j < sessions_count - 1; j++) { 187 sessions[j] = sessions[j + 1]; 188 } 189 sessions_count--; 190 break; 191 } 192 } 193 } 194 195 static session_t *create_session(const char *session_id, int idle_expiry) { 196 const session_t *cs = find_session(session_id); 197 if (cs) return (session_t *)cs; 198 199 session_t *s = calloc(1, sizeof(*s)); 200 if (!s) return NULL; 201 202 s->session_id = strdup(session_id); 203 s->created = time(NULL); 204 s->last_activity = s->created; 205 s->idle_expiry = idle_expiry > 0 ? idle_expiry : DEFAULT_IDLE_EXPIRY; 206 207 sessions = realloc(sessions, sizeof(session_t *) * (sessions_count + 1)); 208 sessions[sessions_count++] = s; 209 210 return s; 211 } 212 213 static void cleanup_expired_sessions(void) { 214 if (!sessions) return; 215 time_t now = time(NULL); 216 217 for (size_t i = 0; i < sessions_count; i++) { 218 session_t *s = sessions[i]; 219 if (!s) continue; 220 if (now - s->last_activity > s->idle_expiry) { 221 log_debug("udphole: session %s expired (idle %ld > expiry %ld)", s->session_id, (long)(now - s->last_activity), 222 (long)s->idle_expiry); 223 destroy_session(s); 224 } 225 } 226 } 227 228 static int add_forward(session_t *s, const char *src_id, const char *dst_id) { 229 for (size_t i = 0; i < s->forwards_count; i++) { 230 if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) { 231 return 0; 232 } 233 } 234 235 forward_t *new_forwards = realloc(s->forwards, sizeof(forward_t) * (s->forwards_count + 1)); 236 if (!new_forwards) return -1; 237 s->forwards = new_forwards; 238 239 s->forwards[s->forwards_count].src_socket_id = strdup(src_id); 240 s->forwards[s->forwards_count].dst_socket_id = strdup(dst_id); 241 s->forwards_count++; 242 243 return 0; 244 } 245 246 static int remove_forward(session_t *s, const char *src_id, const char *dst_id) { 247 for (size_t i = 0; i < s->forwards_count; i++) { 248 if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) { 249 free(s->forwards[i].src_socket_id); 250 free(s->forwards[i].dst_socket_id); 251 for (size_t j = i; j < s->forwards_count - 1; j++) { 252 s->forwards[j] = s->forwards[j + 1]; 253 } 254 s->forwards_count--; 255 return 0; 256 } 257 } 258 return -1; 259 } 260 261 static socket_t *create_listen_socket(session_t *sess, const char *socket_id) { 262 socket_t *existing = find_socket(sess, socket_id); 263 if (existing) return existing; 264 265 int port = alloc_port(); 266 if (!port) { 267 log_error("udphole: no ports available"); 268 return NULL; 269 } 270 271 char port_str[16]; 272 snprintf(port_str, sizeof(port_str), "%d", port); 273 int *fds = udp_recv(port_str, NULL, NULL); 274 if (!fds || fds[0] == 0) { 275 log_error("udphole: failed to create UDP socket on port %d", port); 276 free(fds); 277 return NULL; 278 } 279 280 socket_t *sock = calloc(1, sizeof(*sock)); 281 if (!sock) { 282 free(fds); 283 return NULL; 284 } 285 286 sock->socket_id = strdup(socket_id); 287 sock->fds = fds; 288 sock->local_port = port; 289 sock->mode = 0; 290 sock->learned_valid = 0; 291 292 sess->sockets = realloc(sess->sockets, sizeof(socket_t *) * (sess->sockets_count + 1)); 293 sess->sockets[sess->sockets_count++] = sock; 294 295 log_debug("udphole: created listen socket %s in session %s on port %d", socket_id, sess->session_id, port); 296 return sock; 297 } 298 299 static socket_t *create_connect_socket(session_t *sess, const char *socket_id, const char *ip, int port) { 300 socket_t *existing = find_socket(sess, socket_id); 301 if (existing) return existing; 302 303 int local_port = alloc_port(); 304 if (!local_port) { 305 log_error("udphole: no ports available"); 306 return NULL; 307 } 308 309 char port_str[16]; 310 snprintf(port_str, sizeof(port_str), "%d", local_port); 311 int *fds = udp_recv(port_str, NULL, NULL); 312 if (!fds || fds[0] == 0) { 313 log_error("udphole: failed to create UDP socket on port %d", local_port); 314 free(fds); 315 return NULL; 316 } 317 318 struct sockaddr_storage remote_addr; 319 socklen_t remote_addrlen; 320 if (parse_ip_addr(ip, port, &remote_addr, &remote_addrlen) != 0) { 321 log_error("udphole: invalid remote address %s:%d", ip, port); 322 free(fds); 323 return NULL; 324 } 325 326 socket_t *sock = calloc(1, sizeof(*sock)); 327 if (!sock) { 328 free(fds); 329 return NULL; 330 } 331 332 sock->socket_id = strdup(socket_id); 333 sock->fds = fds; 334 sock->local_port = local_port; 335 sock->mode = 1; 336 sock->remote_addr = remote_addr; 337 sock->remote_addrlen = remote_addrlen; 338 sock->learned_valid = 0; 339 340 sess->sockets = realloc(sess->sockets, sizeof(socket_t *) * (sess->sockets_count + 1)); 341 sess->sockets[sess->sockets_count++] = sock; 342 343 log_debug("udphole: created connect socket %s in session %s on port %d -> %s:%d", socket_id, sess->session_id, 344 local_port, ip, port); 345 return sock; 346 } 347 348 static int destroy_socket(session_t *sess, const char *socket_id) { 349 socket_t *sock = find_socket(sess, socket_id); 350 if (!sock) return -1; 351 352 for (size_t i = 0; i < sess->sockets_count; i++) { 353 if (sess->sockets[i] == sock) { 354 sess->sockets[i] = NULL; 355 break; 356 } 357 } 358 free_socket(sock); 359 360 for (size_t i = 0; i < sess->forwards_count;) { 361 if (strcmp(sess->forwards[i].src_socket_id, socket_id) == 0 || 362 strcmp(sess->forwards[i].dst_socket_id, socket_id) == 0) { 363 free(sess->forwards[i].src_socket_id); 364 free(sess->forwards[i].dst_socket_id); 365 for (size_t j = i; j < sess->forwards_count - 1; j++) { 366 sess->forwards[j] = sess->forwards[j + 1]; 367 } 368 sess->forwards_count--; 369 } else { 370 i++; 371 } 372 } 373 374 return 0; 375 } 376 377 static socket_t *find_socket_by_fd(session_t *s, int fd) { 378 if (!s || !s->sockets) return NULL; 379 for (size_t j = 0; j < s->sockets_count; j++) { 380 socket_t *sock = s->sockets[j]; 381 if (!sock || !sock->fds) continue; 382 for (int i = 1; i <= sock->fds[0]; i++) { 383 if (sock->fds[i] == fd) { 384 return sock; 385 } 386 } 387 } 388 return NULL; 389 } 390 391 int session_pt(int64_t timestamp, struct pt_task *task) { 392 (void)timestamp; 393 session_t *s = task->udata; 394 395 if (s->marked_for_deletion) { 396 goto cleanup; 397 } 398 399 if (!s->sockets || s->sockets_count == 0) { 400 return SCHED_RUNNING; 401 } 402 403 s->all_fds = realloc(s->all_fds, sizeof(int) * (s->sockets_count * 2 + 1)); 404 if (!s->all_fds) { 405 return SCHED_RUNNING; 406 } 407 s->all_fds[0] = 0; 408 409 for (size_t j = 0; j < s->sockets_count; j++) { 410 socket_t *sock = s->sockets[j]; 411 if (!sock || !sock->fds) continue; 412 for (int i = 1; i <= sock->fds[0]; i++) { 413 s->all_fds[++s->all_fds[0]] = sock->fds[i]; 414 } 415 } 416 417 if (s->all_fds[0] == 0) { 418 return SCHED_RUNNING; 419 } 420 421 int ready_fd = sched_has_data(s->all_fds); 422 if (ready_fd < 0) { 423 return SCHED_RUNNING; 424 } 425 426 char buffer[BUFFER_SIZE]; 427 socket_t *src_sock = find_socket_by_fd(s, ready_fd); 428 if (!src_sock) { 429 return SCHED_RUNNING; 430 } 431 432 struct sockaddr_storage from_addr; 433 socklen_t from_len = sizeof(from_addr); 434 ssize_t n = recvfrom(ready_fd, buffer, sizeof(buffer) - 1, 0, (struct sockaddr *)&from_addr, &from_len); 435 436 if (n <= 0) { 437 if (errno != EAGAIN && errno != EWOULDBLOCK) { 438 log_warn("udphole: recvfrom error on socket %s: %s", src_sock->socket_id, strerror(errno)); 439 } 440 return SCHED_RUNNING; 441 } 442 443 s->last_activity = time(NULL); 444 445 if (src_sock->mode == 0 && !src_sock->learned_valid) { 446 src_sock->learned_addr = from_addr; 447 src_sock->learned_addrlen = from_len; 448 src_sock->learned_valid = 1; 449 log_debug("udphole: socket %s learned remote address", src_sock->socket_id); 450 } 451 452 for (size_t i = 0; i < s->forwards_count; i++) { 453 if (strcmp(s->forwards[i].src_socket_id, src_sock->socket_id) != 0) { 454 continue; 455 } 456 457 socket_t *dst_sock = find_socket(s, s->forwards[i].dst_socket_id); 458 if (!dst_sock || !dst_sock->fds || dst_sock->fds[0] == 0) continue; 459 460 struct sockaddr *dest_addr = NULL; 461 socklen_t dest_addrlen = 0; 462 463 if (dst_sock->mode == 1) { 464 dest_addr = (struct sockaddr *)&dst_sock->remote_addr; 465 dest_addrlen = dst_sock->remote_addrlen; 466 } else if (dst_sock->learned_valid) { 467 dest_addr = (struct sockaddr *)&dst_sock->learned_addr; 468 dest_addrlen = dst_sock->learned_addrlen; 469 } 470 471 if (dest_addr && dest_addrlen > 0) { 472 int dst_fd = dst_sock->fds[1]; 473 ssize_t sent = sendto(dst_fd, buffer, n, 0, dest_addr, dest_addrlen); 474 if (sent < 0) { 475 log_warn("udphole: forward failed %s -> %s: %s", src_sock->socket_id, dst_sock->socket_id, strerror(errno)); 476 } 477 } 478 } 479 480 return SCHED_RUNNING; 481 482 cleanup: 483 log_debug("udphole: session %s exiting", s->session_id); 484 485 if (s->all_fds) { 486 free(s->all_fds); 487 s->all_fds = NULL; 488 } 489 if (s->ready_fds) { 490 free(s->ready_fds); 491 s->ready_fds = NULL; 492 } 493 494 return SCHED_DONE; 495 } 496 497 static void spawn_session_pt(session_t *s) { 498 s->task = (struct pt_task *)(intptr_t)sched_create(session_pt, s); 499 } 500 501 resp_object *domain_session_create(const char *cmd, resp_object *args) { 502 (void)cmd; 503 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) { 504 resp_object *err = resp_array_init(); 505 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.create'"); 506 return err; 507 } 508 509 const char *session_id = NULL; 510 if (args->u.arr.n > 1 && args->u.arr.elem[1].type == RESPT_BULK) { 511 session_id = args->u.arr.elem[1].u.s; 512 } 513 514 if (!session_id) { 515 resp_object *err = resp_array_init(); 516 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.create'"); 517 return err; 518 } 519 520 int idle_expiry = 0; 521 if (args->u.arr.n >= 3 && args->u.arr.elem[2].type == RESPT_BULK && args->u.arr.elem[2].u.s) { 522 idle_expiry = atoi(args->u.arr.elem[2].u.s); 523 } 524 525 session_t *s = create_session(session_id, idle_expiry); 526 if (!s) { 527 resp_object *err = resp_array_init(); 528 resp_array_append_simple(err, "ERR failed to create session"); 529 return err; 530 } 531 532 spawn_session_pt(s); 533 534 resp_object *res = resp_simple_init("OK"); 535 return res; 536 } 537 538 resp_object *domain_session_list(const char *cmd, resp_object *args) { 539 (void)cmd; 540 (void)args; 541 542 resp_object *res = resp_array_init(); 543 if (!res) return NULL; 544 545 for (size_t i = 0; i < sessions_count; i++) { 546 session_t *s = sessions[i]; 547 if (!s) continue; 548 resp_array_append_bulk(res, s->session_id); 549 } 550 551 return res; 552 } 553 554 resp_object *domain_session_info(const char *cmd, resp_object *args) { 555 (void)cmd; 556 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) { 557 resp_object *err = resp_array_init(); 558 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.info'"); 559 return err; 560 } 561 562 const char *session_id = NULL; 563 if (args->u.arr.elem[1].type == RESPT_BULK) { 564 session_id = args->u.arr.elem[1].u.s; 565 } 566 567 if (!session_id) { 568 resp_object *err = resp_array_init(); 569 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.info'"); 570 return err; 571 } 572 573 session_t *s = find_session(session_id); 574 if (!s) { 575 resp_object *err = resp_error_init("ERR session not found"); 576 return err; 577 } 578 579 resp_object *res = resp_array_init(); 580 if (!res) return NULL; 581 582 resp_array_append_bulk(res, "session_id"); 583 resp_array_append_bulk(res, s->session_id); 584 585 resp_array_append_bulk(res, "created"); 586 resp_array_append_int(res, (long long)s->created); 587 588 resp_array_append_bulk(res, "last_activity"); 589 resp_array_append_int(res, (long long)s->last_activity); 590 591 resp_array_append_bulk(res, "idle_expiry"); 592 resp_array_append_int(res, (long long)s->idle_expiry); 593 594 resp_array_append_bulk(res, "sockets"); 595 resp_object *sockets_arr = resp_array_init(); 596 for (size_t i = 0; i < s->sockets_count; i++) { 597 socket_t *sock = s->sockets[i]; 598 if (!sock) continue; 599 resp_array_append_bulk(sockets_arr, sock->socket_id); 600 } 601 resp_array_append_obj(res, sockets_arr); 602 603 resp_array_append_bulk(res, "forwards"); 604 resp_object *forwards_arr = resp_array_init(); 605 for (size_t i = 0; i < s->forwards_count; i++) { 606 resp_array_append_bulk(forwards_arr, s->forwards[i].src_socket_id); 607 resp_array_append_bulk(forwards_arr, s->forwards[i].dst_socket_id); 608 } 609 resp_array_append_obj(res, forwards_arr); 610 611 resp_array_append_bulk(res, "marked_for_deletion"); 612 resp_array_append_int(res, s->marked_for_deletion); 613 614 return res; 615 } 616 617 resp_object *domain_session_destroy(const char *cmd, resp_object *args) { 618 (void)cmd; 619 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) { 620 resp_object *err = resp_array_init(); 621 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.destroy'"); 622 return err; 623 } 624 625 const char *session_id = NULL; 626 if (args->u.arr.elem[1].type == RESPT_BULK) { 627 session_id = args->u.arr.elem[1].u.s; 628 } 629 630 if (!session_id) { 631 resp_object *err = resp_array_init(); 632 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.destroy'"); 633 return err; 634 } 635 636 session_t *s = find_session(session_id); 637 if (!s) { 638 resp_object *err = resp_error_init("ERR session not found"); 639 return err; 640 } 641 642 destroy_session(s); 643 644 resp_object *res = resp_simple_init("OK"); 645 return res; 646 } 647 648 resp_object *domain_socket_create_listen(const char *cmd, resp_object *args) { 649 (void)cmd; 650 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 3) { 651 resp_object *err = resp_array_init(); 652 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.listen'"); 653 return err; 654 } 655 656 const char *session_id = NULL; 657 const char *socket_id = NULL; 658 659 if (args->u.arr.elem[1].type == RESPT_BULK) { 660 session_id = args->u.arr.elem[1].u.s; 661 } 662 if (args->u.arr.elem[2].type == RESPT_BULK) { 663 socket_id = args->u.arr.elem[2].u.s; 664 } 665 666 if (!session_id || !socket_id) { 667 resp_object *err = resp_array_init(); 668 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.listen'"); 669 return err; 670 } 671 672 session_t *s = find_session(session_id); 673 if (!s) { 674 resp_object *err = resp_error_init("ERR session not found"); 675 return err; 676 } 677 678 socket_t *sock = create_listen_socket(s, socket_id); 679 if (!sock) { 680 resp_object *err = resp_array_init(); 681 resp_array_append_simple(err, "ERR failed to create socket"); 682 return err; 683 } 684 685 resp_object *res = resp_array_init(); 686 resp_array_append_int(res, sock->local_port); 687 resp_object *udphole = get_udphole_cfg(); 688 const char *advertise = udphole ? resp_map_get_string(udphole, "advertise") : NULL; 689 resp_array_append_bulk(res, advertise ? advertise : ""); 690 return res; 691 } 692 693 resp_object *domain_socket_create_connect(const char *cmd, resp_object *args) { 694 (void)cmd; 695 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 5) { 696 resp_object *err = resp_array_init(); 697 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.connect'"); 698 return err; 699 } 700 701 const char *session_id = NULL; 702 const char *socket_id = NULL; 703 const char *ip = NULL; 704 const char *port_str = NULL; 705 706 if (args->u.arr.elem[1].type == RESPT_BULK) { 707 session_id = args->u.arr.elem[1].u.s; 708 } 709 if (args->u.arr.elem[2].type == RESPT_BULK) { 710 socket_id = args->u.arr.elem[2].u.s; 711 } 712 if (args->u.arr.elem[3].type == RESPT_BULK) { 713 ip = args->u.arr.elem[3].u.s; 714 } 715 if (args->u.arr.elem[4].type == RESPT_BULK) { 716 port_str = args->u.arr.elem[4].u.s; 717 } 718 719 if (!session_id || !socket_id || !ip || !port_str) { 720 resp_object *err = resp_array_init(); 721 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.connect'"); 722 return err; 723 } 724 725 int port = atoi(port_str); 726 727 session_t *s = find_session(session_id); 728 if (!s) { 729 resp_object *err = resp_error_init("ERR session not found"); 730 return err; 731 } 732 733 socket_t *sock = create_connect_socket(s, socket_id, ip, port); 734 if (!sock) { 735 resp_object *err = resp_array_init(); 736 resp_array_append_simple(err, "ERR failed to create socket"); 737 return err; 738 } 739 740 resp_object *res = resp_array_init(); 741 resp_array_append_int(res, sock->local_port); 742 resp_object *udphole = get_udphole_cfg(); 743 const char *advertise = udphole ? resp_map_get_string(udphole, "advertise") : NULL; 744 resp_array_append_bulk(res, advertise ? advertise : ""); 745 return res; 746 } 747 748 resp_object *domain_socket_destroy(const char *cmd, resp_object *args) { 749 (void)cmd; 750 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 3) { 751 resp_object *err = resp_array_init(); 752 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.destroy'"); 753 return err; 754 } 755 756 const char *session_id = NULL; 757 const char *socket_id = NULL; 758 759 if (args->u.arr.elem[1].type == RESPT_BULK) { 760 session_id = args->u.arr.elem[1].u.s; 761 } 762 if (args->u.arr.elem[2].type == RESPT_BULK) { 763 socket_id = args->u.arr.elem[2].u.s; 764 } 765 766 if (!session_id || !socket_id) { 767 resp_object *err = resp_array_init(); 768 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.destroy'"); 769 return err; 770 } 771 772 session_t *s = find_session(session_id); 773 if (!s) { 774 resp_object *err = resp_error_init("ERR session not found"); 775 return err; 776 } 777 778 if (destroy_socket(s, socket_id) != 0) { 779 resp_object *err = resp_array_init(); 780 resp_array_append_simple(err, "ERR socket not found"); 781 return err; 782 } 783 784 resp_object *res = resp_simple_init("OK"); 785 return res; 786 } 787 788 resp_object *domain_forward_list(const char *cmd, resp_object *args) { 789 (void)cmd; 790 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) { 791 resp_object *err = resp_array_init(); 792 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.list'"); 793 return err; 794 } 795 796 const char *session_id = NULL; 797 if (args->u.arr.elem[1].type == RESPT_BULK) { 798 session_id = args->u.arr.elem[1].u.s; 799 } 800 801 if (!session_id) { 802 resp_object *err = resp_array_init(); 803 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.list'"); 804 return err; 805 } 806 807 session_t *s = find_session(session_id); 808 if (!s) { 809 resp_object *err = resp_error_init("ERR session not found"); 810 return err; 811 } 812 813 resp_object *res = resp_array_init(); 814 for (size_t i = 0; i < s->forwards_count; i++) { 815 resp_array_append_bulk(res, s->forwards[i].src_socket_id); 816 resp_array_append_bulk(res, s->forwards[i].dst_socket_id); 817 } 818 return res; 819 } 820 821 resp_object *domain_forward_create(const char *cmd, resp_object *args) { 822 (void)cmd; 823 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 4) { 824 resp_object *err = resp_array_init(); 825 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.create'"); 826 return err; 827 } 828 829 const char *session_id = NULL; 830 const char *src_socket_id = NULL; 831 const char *dst_socket_id = NULL; 832 833 if (args->u.arr.elem[1].type == RESPT_BULK) { 834 session_id = args->u.arr.elem[1].u.s; 835 } 836 if (args->u.arr.elem[2].type == RESPT_BULK) { 837 src_socket_id = args->u.arr.elem[2].u.s; 838 } 839 if (args->u.arr.elem[3].type == RESPT_BULK) { 840 dst_socket_id = args->u.arr.elem[3].u.s; 841 } 842 843 if (!session_id || !src_socket_id || !dst_socket_id) { 844 resp_object *err = resp_array_init(); 845 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.create'"); 846 return err; 847 } 848 849 session_t *s = find_session(session_id); 850 if (!s) { 851 resp_object *err = resp_error_init("ERR session not found"); 852 return err; 853 } 854 855 socket_t *src = find_socket(s, src_socket_id); 856 if (!src) { 857 resp_object *err = resp_array_init(); 858 resp_array_append_simple(err, "ERR source socket not found"); 859 return err; 860 } 861 862 socket_t *dst = find_socket(s, dst_socket_id); 863 if (!dst) { 864 resp_object *err = resp_array_init(); 865 resp_array_append_simple(err, "ERR destination socket not found"); 866 return err; 867 } 868 869 if (add_forward(s, src_socket_id, dst_socket_id) != 0) { 870 resp_object *err = resp_array_init(); 871 resp_array_append_simple(err, "ERR failed to add forward"); 872 return err; 873 } 874 875 log_debug("udphole: created forward %s -> %s in session %s", src_socket_id, dst_socket_id, session_id); 876 877 resp_object *res = resp_simple_init("OK"); 878 return res; 879 } 880 881 resp_object *domain_forward_destroy(const char *cmd, resp_object *args) { 882 (void)cmd; 883 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 4) { 884 resp_object *err = resp_array_init(); 885 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.destroy'"); 886 return err; 887 } 888 889 const char *session_id = NULL; 890 const char *src_socket_id = NULL; 891 const char *dst_socket_id = NULL; 892 893 if (args->u.arr.elem[1].type == RESPT_BULK) { 894 session_id = args->u.arr.elem[1].u.s; 895 } 896 if (args->u.arr.elem[2].type == RESPT_BULK) { 897 src_socket_id = args->u.arr.elem[2].u.s; 898 } 899 if (args->u.arr.elem[3].type == RESPT_BULK) { 900 dst_socket_id = args->u.arr.elem[3].u.s; 901 } 902 903 if (!session_id || !src_socket_id || !dst_socket_id) { 904 resp_object *err = resp_array_init(); 905 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.destroy'"); 906 return err; 907 } 908 909 session_t *s = find_session(session_id); 910 if (!s) { 911 resp_object *err = resp_error_init("ERR session not found"); 912 return err; 913 } 914 915 if (remove_forward(s, src_socket_id, dst_socket_id) != 0) { 916 resp_object *err = resp_array_init(); 917 resp_array_append_simple(err, "ERR forward not found"); 918 return err; 919 } 920 921 resp_object *res = resp_simple_init("OK"); 922 return res; 923 } 924 925 resp_object *domain_system_load(const char *cmd, resp_object *args) { 926 (void)cmd; 927 (void)args; 928 929 double loadavg[3]; 930 if (getloadavg(loadavg, 3) != 3) { 931 resp_object *err = resp_array_init(); 932 resp_array_append_simple(err, "ERR failed to get load average"); 933 return err; 934 } 935 936 resp_object *res = resp_array_init(); 937 char buf[64]; 938 939 resp_array_append_bulk(res, "1min"); 940 snprintf(buf, sizeof(buf), "%.2f", loadavg[0]); 941 resp_array_append_bulk(res, buf); 942 943 resp_array_append_bulk(res, "5min"); 944 snprintf(buf, sizeof(buf), "%.2f", loadavg[1]); 945 resp_array_append_bulk(res, buf); 946 947 resp_array_append_bulk(res, "15min"); 948 snprintf(buf, sizeof(buf), "%.2f", loadavg[2]); 949 resp_array_append_bulk(res, buf); 950 951 return res; 952 } 953 954 resp_object *domain_session_count(const char *cmd, resp_object *args) { 955 (void)cmd; 956 (void)args; 957 958 size_t count = 0; 959 for (size_t i = 0; i < sessions_count; i++) { 960 if (sessions[i] != NULL) { 961 count++; 962 } 963 } 964 965 resp_object *res = malloc(sizeof(resp_object)); 966 if (!res) return NULL; 967 res->type = RESPT_INT; 968 res->u.i = (long long)count; 969 return res; 970 } 971 972 int session_manager_pt(int64_t timestamp, struct pt_task *task) { 973 session_manager_udata_t *udata = task->udata; 974 975 resp_object *udphole = get_udphole_cfg(); 976 if (!udphole) { 977 return SCHED_RUNNING; 978 } 979 980 if (!udata->initialized) { 981 const char *ports_str = resp_map_get_string(udphole, "ports"); 982 int port_low = 7000, port_high = 7999; 983 if (ports_str) sscanf(ports_str, "%d-%d", &port_low, &port_high); 984 log_info("udphole: manager started with port range %d-%d", port_low, port_high); 985 udata->initialized = 1; 986 } 987 988 if (timestamp - udata->last_cleanup >= 1000) { 989 cleanup_expired_sessions(); 990 udata->last_cleanup = timestamp; 991 } 992 993 return SCHED_RUNNING; 994 }