session.c (28160B)
1 #include <stdlib.h> 2 #include <string.h> 3 #include <stdio.h> 4 #include <stdarg.h> 5 #include <unistd.h> 6 #include <errno.h> 7 #include <fcntl.h> 8 #include <time.h> 9 #include <sys/socket.h> 10 #include <sys/un.h> 11 #include <sys/stat.h> 12 #include <netinet/in.h> 13 #include <arpa/inet.h> 14 15 #include "rxi/log.h" 16 #include "domain/protothreads.h" 17 #include "domain/scheduler.h" 18 #include "domain/config.h" 19 #include "common/socket_util.h" 20 #include "common/resp.h" 21 #include "tidwall/hashmap.h" 22 #include "session.h" 23 24 #define SESSION_HASH_SIZE 256 25 #define BUFFER_SIZE 4096 26 #define DEFAULT_IDLE_EXPIRY 60 27 28 typedef struct socket { 29 char *socket_id; 30 int *fds; 31 int local_port; 32 int mode; 33 struct sockaddr_storage remote_addr; 34 socklen_t remote_addrlen; 35 int learned_valid; 36 struct sockaddr_storage learned_addr; 37 socklen_t learned_addrlen; 38 } socket_t; 39 40 typedef struct forward { 41 char *src_socket_id; 42 char *dst_socket_id; 43 } forward_t; 44 45 typedef struct session { 46 char *session_id; 47 time_t idle_expiry; 48 time_t created; 49 time_t last_activity; 50 socket_t **sockets; 51 size_t sockets_count; 52 forward_t *forwards; 53 size_t forwards_count; 54 int marked_for_deletion; 55 int *ready_fds; 56 int *all_fds; 57 struct pt pt; 58 struct pt_task *task; 59 } session_t; 60 61 static session_t **sessions = NULL; 62 static size_t sessions_count = 0; 63 static int running = 0; 64 65 static session_t *find_session(const char *session_id) { 66 for (size_t i = 0; i < sessions_count; i++) { 67 if (strcmp(sessions[i]->session_id, session_id) == 0) { 68 return sessions[i]; 69 } 70 } 71 return NULL; 72 } 73 74 static uint64_t socket_hash(const void *item, uint64_t seed0, uint64_t seed1) { 75 const socket_t *s = item; 76 return hashmap_sip(s->socket_id, strlen(s->socket_id), seed0, seed1); 77 } 78 79 static int socket_compare(const void *a, const void *b, void *udata) { 80 (void)udata; 81 const socket_t *sa = a; 82 const socket_t *sb = b; 83 return strcmp(sa->socket_id, sb->socket_id); 84 } 85 86 static socket_t *find_socket(session_t *s, const char *socket_id) { 87 if (!s || !s->sockets || !socket_id) return NULL; 88 for (size_t i = 0; i < s->sockets_count; i++) { 89 if (s->sockets[i] && strcmp(s->sockets[i]->socket_id, socket_id) == 0) { 90 return s->sockets[i]; 91 } 92 } 93 return NULL; 94 } 95 96 static int alloc_port(void) { 97 if (!domain_cfg) return 0; 98 for (int i = 0; i < domain_cfg->port_high - domain_cfg->port_low; i++) { 99 int port = domain_cfg->port_cur + i; 100 if (port > domain_cfg->port_high) port = domain_cfg->port_low; 101 domain_cfg->port_cur = port + 1; 102 if (domain_cfg->port_cur > domain_cfg->port_high) domain_cfg->port_cur = domain_cfg->port_low; 103 104 struct sockaddr_in addr; 105 memset(&addr, 0, sizeof(addr)); 106 addr.sin_family = AF_INET; 107 addr.sin_addr.s_addr = INADDR_ANY; 108 addr.sin_port = htons(port); 109 110 int udp_fd = socket(AF_INET, SOCK_DGRAM, 0); 111 if (udp_fd < 0) continue; 112 int ok = (bind(udp_fd, (struct sockaddr *)&addr, sizeof(addr)) == 0); 113 close(udp_fd); 114 if (!ok) continue; 115 116 return port; 117 } 118 return 0; 119 } 120 121 static int parse_ip_addr(const char *ip_str, int port, struct sockaddr_storage *addr, socklen_t *addrlen) { 122 memset(addr, 0, sizeof(*addr)); 123 124 struct sockaddr_in *addr4 = (struct sockaddr_in *)addr; 125 if (inet_pton(AF_INET, ip_str, &addr4->sin_addr) == 1) { 126 addr4->sin_family = AF_INET; 127 addr4->sin_port = htons(port); 128 *addrlen = sizeof(*addr4); 129 return 0; 130 } 131 132 struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)addr; 133 if (inet_pton(AF_INET6, ip_str, &addr6->sin6_addr) == 1) { 134 addr6->sin6_family = AF_INET6; 135 addr6->sin6_port = htons(port); 136 *addrlen = sizeof(*addr6); 137 return 0; 138 } 139 140 return -1; 141 } 142 143 static void close_socket(socket_t *sock) { 144 if (!sock || !sock->fds) return; 145 for (int i = 1; i <= sock->fds[0]; i++) { 146 if (sock->fds[i] >= 0) { 147 close(sock->fds[i]); 148 } 149 } 150 free(sock->fds); 151 sock->fds = NULL; 152 } 153 154 static void free_socket(socket_t *sock) { 155 if (!sock) return; 156 close_socket(sock); 157 free(sock->socket_id); 158 free(sock); 159 } 160 161 static void destroy_session(session_t *s) { 162 if (!s) return; 163 s->marked_for_deletion = 1; 164 165 for (size_t i = 0; i < s->sockets_count; i++) { 166 if (s->sockets[i]) { 167 free_socket(s->sockets[i]); 168 } 169 } 170 free(s->sockets); 171 172 for (size_t i = 0; i < s->forwards_count; i++) { 173 free(s->forwards[i].src_socket_id); 174 free(s->forwards[i].dst_socket_id); 175 } 176 free(s->forwards); 177 178 free(s->session_id); 179 free(s); 180 181 for (size_t i = 0; i < sessions_count; i++) { 182 if (sessions[i] == s) { 183 for (size_t j = i; j < sessions_count - 1; j++) { 184 sessions[j] = sessions[j + 1]; 185 } 186 sessions_count--; 187 break; 188 } 189 } 190 } 191 192 static session_t *create_session(const char *session_id, int idle_expiry) { 193 const session_t *cs = find_session(session_id); 194 if (cs) return (session_t *)cs; 195 196 session_t *s = calloc(1, sizeof(*s)); 197 if (!s) return NULL; 198 199 s->session_id = strdup(session_id); 200 s->created = time(NULL); 201 s->last_activity = s->created; 202 s->idle_expiry = idle_expiry > 0 ? idle_expiry : DEFAULT_IDLE_EXPIRY; 203 204 sessions = realloc(sessions, sizeof(session_t *) * (sessions_count + 1)); 205 sessions[sessions_count++] = s; 206 207 return s; 208 } 209 210 static void cleanup_expired_sessions(void) { 211 if (!sessions) return; 212 time_t now = time(NULL); 213 214 for (size_t i = 0; i < sessions_count; i++) { 215 session_t *s = sessions[i]; 216 if (!s) continue; 217 if (now - s->last_activity > s->idle_expiry) { 218 log_debug("udphole: session %s expired (idle %ld > expiry %ld)", 219 s->session_id, (long)(now - s->last_activity), (long)s->idle_expiry); 220 destroy_session(s); 221 } 222 } 223 } 224 225 static int add_forward(session_t *s, const char *src_id, const char *dst_id) { 226 for (size_t i = 0; i < s->forwards_count; i++) { 227 if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && 228 strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) { 229 return 0; 230 } 231 } 232 233 forward_t *new_forwards = realloc(s->forwards, sizeof(forward_t) * (s->forwards_count + 1)); 234 if (!new_forwards) return -1; 235 s->forwards = new_forwards; 236 237 s->forwards[s->forwards_count].src_socket_id = strdup(src_id); 238 s->forwards[s->forwards_count].dst_socket_id = strdup(dst_id); 239 s->forwards_count++; 240 241 return 0; 242 } 243 244 static int remove_forward(session_t *s, const char *src_id, const char *dst_id) { 245 for (size_t i = 0; i < s->forwards_count; i++) { 246 if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && 247 strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) { 248 free(s->forwards[i].src_socket_id); 249 free(s->forwards[i].dst_socket_id); 250 for (size_t j = i; j < s->forwards_count - 1; j++) { 251 s->forwards[j] = s->forwards[j + 1]; 252 } 253 s->forwards_count--; 254 return 0; 255 } 256 } 257 return -1; 258 } 259 260 static socket_t *create_listen_socket(session_t *sess, const char *socket_id) { 261 socket_t *existing = find_socket(sess, socket_id); 262 if (existing) return existing; 263 264 int port = alloc_port(); 265 if (!port) { 266 log_error("udphole: no ports available"); 267 return NULL; 268 } 269 270 char port_str[16]; 271 snprintf(port_str, sizeof(port_str), "%d", port); 272 int *fds = udp_recv(port_str, NULL, NULL); 273 if (!fds || fds[0] == 0) { 274 log_error("udphole: failed to create UDP socket on port %d", port); 275 free(fds); 276 return NULL; 277 } 278 279 socket_t *sock = calloc(1, sizeof(*sock)); 280 if (!sock) { 281 free(fds); 282 return NULL; 283 } 284 285 sock->socket_id = strdup(socket_id); 286 sock->fds = fds; 287 sock->local_port = port; 288 sock->mode = 0; 289 sock->learned_valid = 0; 290 291 sess->sockets = realloc(sess->sockets, sizeof(socket_t *) * (sess->sockets_count + 1)); 292 sess->sockets[sess->sockets_count++] = sock; 293 294 log_debug("udphole: created listen socket %s in session %s on port %d", 295 socket_id, sess->session_id, port); 296 return sock; 297 } 298 299 static socket_t *create_connect_socket(session_t *sess, const char *socket_id, 300 const char *ip, int port) { 301 socket_t *existing = find_socket(sess, socket_id); 302 if (existing) return existing; 303 304 int local_port = alloc_port(); 305 if (!local_port) { 306 log_error("udphole: no ports available"); 307 return NULL; 308 } 309 310 char port_str[16]; 311 snprintf(port_str, sizeof(port_str), "%d", local_port); 312 int *fds = udp_recv(port_str, NULL, NULL); 313 if (!fds || fds[0] == 0) { 314 log_error("udphole: failed to create UDP socket on port %d", local_port); 315 free(fds); 316 return NULL; 317 } 318 319 struct sockaddr_storage remote_addr; 320 socklen_t remote_addrlen; 321 if (parse_ip_addr(ip, port, &remote_addr, &remote_addrlen) != 0) { 322 log_error("udphole: invalid remote address %s:%d", ip, port); 323 free(fds); 324 return NULL; 325 } 326 327 socket_t *sock = calloc(1, sizeof(*sock)); 328 if (!sock) { 329 free(fds); 330 return NULL; 331 } 332 333 sock->socket_id = strdup(socket_id); 334 sock->fds = fds; 335 sock->local_port = local_port; 336 sock->mode = 1; 337 sock->remote_addr = remote_addr; 338 sock->remote_addrlen = remote_addrlen; 339 sock->learned_valid = 0; 340 341 sess->sockets = realloc(sess->sockets, sizeof(socket_t *) * (sess->sockets_count + 1)); 342 sess->sockets[sess->sockets_count++] = sock; 343 344 log_debug("udphole: created connect socket %s in session %s on port %d -> %s:%d", 345 socket_id, sess->session_id, local_port, ip, port); 346 return sock; 347 } 348 349 static int destroy_socket(session_t *sess, const char *socket_id) { 350 socket_t *sock = find_socket(sess, socket_id); 351 if (!sock) return -1; 352 353 for (size_t i = 0; i < sess->sockets_count; i++) { 354 if (sess->sockets[i] == sock) { 355 sess->sockets[i] = NULL; 356 break; 357 } 358 } 359 free_socket(sock); 360 361 for (size_t i = 0; i < sess->forwards_count; ) { 362 if (strcmp(sess->forwards[i].src_socket_id, socket_id) == 0 || 363 strcmp(sess->forwards[i].dst_socket_id, socket_id) == 0) { 364 free(sess->forwards[i].src_socket_id); 365 free(sess->forwards[i].dst_socket_id); 366 for (size_t j = i; j < sess->forwards_count - 1; j++) { 367 sess->forwards[j] = sess->forwards[j + 1]; 368 } 369 sess->forwards_count--; 370 } else { 371 i++; 372 } 373 } 374 375 return 0; 376 } 377 378 static socket_t *find_socket_by_fd(session_t *s, int fd) { 379 if (!s || !s->sockets) return NULL; 380 for (size_t j = 0; j < s->sockets_count; j++) { 381 socket_t *sock = s->sockets[j]; 382 if (!sock || !sock->fds) continue; 383 for (int i = 1; i <= sock->fds[0]; i++) { 384 if (sock->fds[i] == fd) { 385 return sock; 386 } 387 } 388 } 389 return NULL; 390 } 391 392 PT_THREAD(session_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { 393 session_t *s = task->udata; 394 395 (void)timestamp; 396 PT_BEGIN(pt); 397 398 char buffer[BUFFER_SIZE]; 399 400 for (;;) { 401 if (s->marked_for_deletion) { 402 break; 403 } 404 405 if (!s->sockets || s->sockets_count == 0) { 406 PT_YIELD(pt); 407 continue; 408 } 409 410 s->all_fds = realloc(s->all_fds, sizeof(int) * (s->sockets_count * 2 + 1)); 411 if (!s->all_fds) { 412 PT_YIELD(pt); 413 continue; 414 } 415 s->all_fds[0] = 0; 416 417 for (size_t j = 0; j < s->sockets_count; j++) { 418 socket_t *sock = s->sockets[j]; 419 if (!sock || !sock->fds) continue; 420 for (int i = 1; i <= sock->fds[0]; i++) { 421 s->all_fds[++s->all_fds[0]] = sock->fds[i]; 422 } 423 } 424 425 if (s->all_fds[0] == 0) { 426 PT_YIELD(pt); 427 continue; 428 } 429 430 PT_WAIT_UNTIL(pt, domain_schedmod_has_data(s->all_fds, &s->ready_fds) > 0); 431 432 if (!s->ready_fds || s->ready_fds[0] == 0) { 433 PT_YIELD(pt); 434 continue; 435 } 436 437 for (int r = 1; r <= s->ready_fds[0]; r++) { 438 int ready_fd = s->ready_fds[r]; 439 440 socket_t *src_sock = find_socket_by_fd(s, ready_fd); 441 if (!src_sock) continue; 442 443 struct sockaddr_storage from_addr; 444 socklen_t from_len = sizeof(from_addr); 445 ssize_t n = recvfrom(ready_fd, buffer, sizeof(buffer) - 1, 0, 446 (struct sockaddr *)&from_addr, &from_len); 447 448 if (n <= 0) { 449 if (errno != EAGAIN && errno != EWOULDBLOCK) { 450 log_warn("udphole: recvfrom error on socket %s: %s", 451 src_sock->socket_id, strerror(errno)); 452 } 453 continue; 454 } 455 456 s->last_activity = time(NULL); 457 458 if (src_sock->mode == 0 && !src_sock->learned_valid) { 459 src_sock->learned_addr = from_addr; 460 src_sock->learned_addrlen = from_len; 461 src_sock->learned_valid = 1; 462 log_debug("udphole: socket %s learned remote address", src_sock->socket_id); 463 } 464 465 for (size_t i = 0; i < s->forwards_count; i++) { 466 if (strcmp(s->forwards[i].src_socket_id, src_sock->socket_id) != 0) { 467 continue; 468 } 469 470 socket_t *dst_sock = find_socket(s, s->forwards[i].dst_socket_id); 471 if (!dst_sock || !dst_sock->fds || dst_sock->fds[0] == 0) continue; 472 473 struct sockaddr *dest_addr = NULL; 474 socklen_t dest_addrlen = 0; 475 476 if (dst_sock->mode == 1) { 477 dest_addr = (struct sockaddr *)&dst_sock->remote_addr; 478 dest_addrlen = dst_sock->remote_addrlen; 479 } else if (dst_sock->learned_valid) { 480 dest_addr = (struct sockaddr *)&dst_sock->learned_addr; 481 dest_addrlen = dst_sock->learned_addrlen; 482 } 483 484 if (dest_addr && dest_addrlen > 0) { 485 int dst_fd = dst_sock->fds[1]; 486 ssize_t sent = sendto(dst_fd, buffer, n, 0, dest_addr, dest_addrlen); 487 if (sent < 0) { 488 log_warn("udphole: forward failed %s -> %s: %s", 489 src_sock->socket_id, dst_sock->socket_id, strerror(errno)); 490 } 491 } 492 } 493 } 494 495 } 496 497 log_debug("udphole: session %s protothread exiting", s->session_id); 498 499 if (s->all_fds) { 500 free(s->all_fds); 501 s->all_fds = NULL; 502 } 503 if (s->ready_fds) { 504 free(s->ready_fds); 505 s->ready_fds = NULL; 506 } 507 508 PT_END(pt); 509 } 510 511 static void spawn_session_pt(session_t *s) { 512 s->task = (struct pt_task *)(intptr_t)domain_schedmod_pt_create(session_pt, s); 513 } 514 515 resp_object *domain_session_create(resp_object *args) { 516 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) { 517 resp_object *err = resp_array_init(); 518 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.create'"); 519 return err; 520 } 521 522 const char *session_id = NULL; 523 if (args->u.arr.n > 1 && args->u.arr.elem[1].type == RESPT_BULK) { 524 session_id = args->u.arr.elem[1].u.s; 525 } 526 527 if (!session_id) { 528 resp_object *err = resp_array_init(); 529 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.create'"); 530 return err; 531 } 532 533 int idle_expiry = 0; 534 if (args->u.arr.n >= 3 && args->u.arr.elem[2].type == RESPT_BULK && args->u.arr.elem[2].u.s) { 535 idle_expiry = atoi(args->u.arr.elem[2].u.s); 536 } 537 538 session_t *s = create_session(session_id, idle_expiry); 539 if (!s) { 540 resp_object *err = resp_array_init(); 541 resp_array_append_simple(err, "ERR failed to create session"); 542 return err; 543 } 544 545 spawn_session_pt(s); 546 547 resp_object *res = resp_array_init(); 548 resp_array_append_simple(res, "OK"); 549 return res; 550 } 551 552 resp_object *domain_session_list(resp_object *args) { 553 (void)args; 554 555 resp_object *res = resp_array_init(); 556 if (!res) return NULL; 557 558 for (size_t i = 0; i < sessions_count; i++) { 559 session_t *s = sessions[i]; 560 if (!s) continue; 561 resp_array_append_bulk(res, s->session_id); 562 } 563 564 return res; 565 } 566 567 resp_object *domain_session_info(resp_object *args) { 568 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) { 569 resp_object *err = resp_array_init(); 570 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.info'"); 571 return err; 572 } 573 574 const char *session_id = NULL; 575 if (args->u.arr.elem[1].type == RESPT_BULK) { 576 session_id = args->u.arr.elem[1].u.s; 577 } 578 579 if (!session_id) { 580 resp_object *err = resp_array_init(); 581 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.info'"); 582 return err; 583 } 584 585 session_t *s = find_session(session_id); 586 if (!s) { 587 resp_object *err = resp_array_init(); 588 resp_array_append_simple(err, "ERR session not found"); 589 return err; 590 } 591 592 resp_object *res = resp_array_init(); 593 if (!res) return NULL; 594 595 resp_array_append_bulk(res, "session_id"); 596 resp_array_append_bulk(res, s->session_id); 597 598 resp_array_append_bulk(res, "created"); 599 resp_array_append_int(res, (long long)s->created); 600 601 resp_array_append_bulk(res, "last_activity"); 602 resp_array_append_int(res, (long long)s->last_activity); 603 604 resp_array_append_bulk(res, "idle_expiry"); 605 resp_array_append_int(res, (long long)s->idle_expiry); 606 607 resp_array_append_bulk(res, "sockets"); 608 resp_object *sockets_arr = resp_array_init(); 609 for (size_t i = 0; i < s->sockets_count; i++) { 610 socket_t *sock = s->sockets[i]; 611 if (!sock) continue; 612 resp_array_append_bulk(sockets_arr, sock->socket_id); 613 } 614 resp_array_append_obj(res, sockets_arr); 615 616 resp_array_append_bulk(res, "forwards"); 617 resp_object *forwards_arr = resp_array_init(); 618 for (size_t i = 0; i < s->forwards_count; i++) { 619 resp_array_append_bulk(forwards_arr, s->forwards[i].src_socket_id); 620 resp_array_append_bulk(forwards_arr, s->forwards[i].dst_socket_id); 621 } 622 resp_array_append_obj(res, forwards_arr); 623 624 resp_array_append_bulk(res, "marked_for_deletion"); 625 resp_array_append_int(res, s->marked_for_deletion); 626 627 return res; 628 } 629 630 resp_object *domain_session_destroy(resp_object *args) { 631 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) { 632 resp_object *err = resp_array_init(); 633 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.destroy'"); 634 return err; 635 } 636 637 const char *session_id = NULL; 638 if (args->u.arr.elem[1].type == RESPT_BULK) { 639 session_id = args->u.arr.elem[1].u.s; 640 } 641 642 if (!session_id) { 643 resp_object *err = resp_array_init(); 644 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.destroy'"); 645 return err; 646 } 647 648 session_t *s = find_session(session_id); 649 if (!s) { 650 resp_object *err = resp_array_init(); 651 resp_array_append_simple(err, "ERR session not found"); 652 return err; 653 } 654 655 destroy_session(s); 656 657 resp_object *res = resp_array_init(); 658 resp_array_append_simple(res, "OK"); 659 return res; 660 } 661 662 resp_object *domain_socket_create_listen(resp_object *args) { 663 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 3) { 664 resp_object *err = resp_array_init(); 665 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.listen'"); 666 return err; 667 } 668 669 const char *session_id = NULL; 670 const char *socket_id = NULL; 671 672 if (args->u.arr.elem[1].type == RESPT_BULK) { 673 session_id = args->u.arr.elem[1].u.s; 674 } 675 if (args->u.arr.elem[2].type == RESPT_BULK) { 676 socket_id = args->u.arr.elem[2].u.s; 677 } 678 679 if (!session_id || !socket_id) { 680 resp_object *err = resp_array_init(); 681 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.listen'"); 682 return err; 683 } 684 685 session_t *s = find_session(session_id); 686 if (!s) { 687 resp_object *err = resp_array_init(); 688 resp_array_append_simple(err, "ERR session not found"); 689 return err; 690 } 691 692 socket_t *sock = create_listen_socket(s, socket_id); 693 if (!sock) { 694 resp_object *err = resp_array_init(); 695 resp_array_append_simple(err, "ERR failed to create socket"); 696 return err; 697 } 698 699 resp_object *res = resp_array_init(); 700 resp_array_append_int(res, sock->local_port); 701 resp_array_append_bulk(res, domain_cfg && domain_cfg->advertise_addr ? domain_cfg->advertise_addr : ""); 702 return res; 703 } 704 705 resp_object *domain_socket_create_connect(resp_object *args) { 706 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 5) { 707 resp_object *err = resp_array_init(); 708 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.connect'"); 709 return err; 710 } 711 712 const char *session_id = NULL; 713 const char *socket_id = NULL; 714 const char *ip = NULL; 715 const char *port_str = NULL; 716 717 if (args->u.arr.elem[1].type == RESPT_BULK) { 718 session_id = args->u.arr.elem[1].u.s; 719 } 720 if (args->u.arr.elem[2].type == RESPT_BULK) { 721 socket_id = args->u.arr.elem[2].u.s; 722 } 723 if (args->u.arr.elem[3].type == RESPT_BULK) { 724 ip = args->u.arr.elem[3].u.s; 725 } 726 if (args->u.arr.elem[4].type == RESPT_BULK) { 727 port_str = args->u.arr.elem[4].u.s; 728 } 729 730 if (!session_id || !socket_id || !ip || !port_str) { 731 resp_object *err = resp_array_init(); 732 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.connect'"); 733 return err; 734 } 735 736 int port = atoi(port_str); 737 738 session_t *s = find_session(session_id); 739 if (!s) { 740 resp_object *err = resp_array_init(); 741 resp_array_append_simple(err, "ERR session not found"); 742 return err; 743 } 744 745 socket_t *sock = create_connect_socket(s, socket_id, ip, port); 746 if (!sock) { 747 resp_object *err = resp_array_init(); 748 resp_array_append_simple(err, "ERR failed to create socket"); 749 return err; 750 } 751 752 resp_object *res = resp_array_init(); 753 resp_array_append_int(res, sock->local_port); 754 resp_array_append_bulk(res, domain_cfg && domain_cfg->advertise_addr ? domain_cfg->advertise_addr : ""); 755 return res; 756 } 757 758 resp_object *domain_socket_destroy(resp_object *args) { 759 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 3) { 760 resp_object *err = resp_array_init(); 761 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.destroy'"); 762 return err; 763 } 764 765 const char *session_id = NULL; 766 const char *socket_id = NULL; 767 768 if (args->u.arr.elem[1].type == RESPT_BULK) { 769 session_id = args->u.arr.elem[1].u.s; 770 } 771 if (args->u.arr.elem[2].type == RESPT_BULK) { 772 socket_id = args->u.arr.elem[2].u.s; 773 } 774 775 if (!session_id || !socket_id) { 776 resp_object *err = resp_array_init(); 777 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.destroy'"); 778 return err; 779 } 780 781 session_t *s = find_session(session_id); 782 if (!s) { 783 resp_object *err = resp_array_init(); 784 resp_array_append_simple(err, "ERR session not found"); 785 return err; 786 } 787 788 if (destroy_socket(s, socket_id) != 0) { 789 resp_object *err = resp_array_init(); 790 resp_array_append_simple(err, "ERR socket not found"); 791 return err; 792 } 793 794 resp_object *res = resp_array_init(); 795 resp_array_append_simple(res, "OK"); 796 return res; 797 } 798 799 resp_object *domain_forward_list(resp_object *args) { 800 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) { 801 resp_object *err = resp_array_init(); 802 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.list'"); 803 return err; 804 } 805 806 const char *session_id = NULL; 807 if (args->u.arr.elem[1].type == RESPT_BULK) { 808 session_id = args->u.arr.elem[1].u.s; 809 } 810 811 if (!session_id) { 812 resp_object *err = resp_array_init(); 813 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.list'"); 814 return err; 815 } 816 817 session_t *s = find_session(session_id); 818 if (!s) { 819 resp_object *err = resp_array_init(); 820 resp_array_append_simple(err, "ERR session not found"); 821 return err; 822 } 823 824 resp_object *res = resp_array_init(); 825 for (size_t i = 0; i < s->forwards_count; i++) { 826 resp_array_append_bulk(res, s->forwards[i].src_socket_id); 827 resp_array_append_bulk(res, s->forwards[i].dst_socket_id); 828 } 829 return res; 830 } 831 832 resp_object *domain_forward_create(resp_object *args) { 833 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 4) { 834 resp_object *err = resp_array_init(); 835 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.create'"); 836 return err; 837 } 838 839 const char *session_id = NULL; 840 const char *src_socket_id = NULL; 841 const char *dst_socket_id = NULL; 842 843 if (args->u.arr.elem[1].type == RESPT_BULK) { 844 session_id = args->u.arr.elem[1].u.s; 845 } 846 if (args->u.arr.elem[2].type == RESPT_BULK) { 847 src_socket_id = args->u.arr.elem[2].u.s; 848 } 849 if (args->u.arr.elem[3].type == RESPT_BULK) { 850 dst_socket_id = args->u.arr.elem[3].u.s; 851 } 852 853 if (!session_id || !src_socket_id || !dst_socket_id) { 854 resp_object *err = resp_array_init(); 855 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.create'"); 856 return err; 857 } 858 859 session_t *s = find_session(session_id); 860 if (!s) { 861 resp_object *err = resp_array_init(); 862 resp_array_append_simple(err, "ERR session not found"); 863 return err; 864 } 865 866 socket_t *src = find_socket(s, src_socket_id); 867 if (!src) { 868 resp_object *err = resp_array_init(); 869 resp_array_append_simple(err, "ERR source socket not found"); 870 return err; 871 } 872 873 socket_t *dst = find_socket(s, dst_socket_id); 874 if (!dst) { 875 resp_object *err = resp_array_init(); 876 resp_array_append_simple(err, "ERR destination socket not found"); 877 return err; 878 } 879 880 if (add_forward(s, src_socket_id, dst_socket_id) != 0) { 881 resp_object *err = resp_array_init(); 882 resp_array_append_simple(err, "ERR failed to add forward"); 883 return err; 884 } 885 886 log_debug("udphole: created forward %s -> %s in session %s", src_socket_id, dst_socket_id, session_id); 887 888 resp_object *res = resp_array_init(); 889 resp_array_append_simple(res, "OK"); 890 return res; 891 } 892 893 resp_object *domain_forward_destroy(resp_object *args) { 894 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 4) { 895 resp_object *err = resp_array_init(); 896 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.destroy'"); 897 return err; 898 } 899 900 const char *session_id = NULL; 901 const char *src_socket_id = NULL; 902 const char *dst_socket_id = NULL; 903 904 if (args->u.arr.elem[1].type == RESPT_BULK) { 905 session_id = args->u.arr.elem[1].u.s; 906 } 907 if (args->u.arr.elem[2].type == RESPT_BULK) { 908 src_socket_id = args->u.arr.elem[2].u.s; 909 } 910 if (args->u.arr.elem[3].type == RESPT_BULK) { 911 dst_socket_id = args->u.arr.elem[3].u.s; 912 } 913 914 if (!session_id || !src_socket_id || !dst_socket_id) { 915 resp_object *err = resp_array_init(); 916 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.destroy'"); 917 return err; 918 } 919 920 session_t *s = find_session(session_id); 921 if (!s) { 922 resp_object *err = resp_array_init(); 923 resp_array_append_simple(err, "ERR session not found"); 924 return err; 925 } 926 927 if (remove_forward(s, src_socket_id, dst_socket_id) != 0) { 928 resp_object *err = resp_array_init(); 929 resp_array_append_simple(err, "ERR forward not found"); 930 return err; 931 } 932 933 resp_object *res = resp_array_init(); 934 resp_array_append_simple(res, "OK"); 935 return res; 936 } 937 938 resp_object *domain_system_load(resp_object *args) { 939 (void)args; 940 941 double loadavg[3]; 942 if (getloadavg(loadavg, 3) != 3) { 943 resp_object *err = resp_array_init(); 944 resp_array_append_simple(err, "ERR failed to get load average"); 945 return err; 946 } 947 948 resp_object *res = resp_array_init(); 949 char buf[64]; 950 951 resp_array_append_bulk(res, "1min"); 952 snprintf(buf, sizeof(buf), "%.2f", loadavg[0]); 953 resp_array_append_bulk(res, buf); 954 955 resp_array_append_bulk(res, "5min"); 956 snprintf(buf, sizeof(buf), "%.2f", loadavg[1]); 957 resp_array_append_bulk(res, buf); 958 959 resp_array_append_bulk(res, "15min"); 960 snprintf(buf, sizeof(buf), "%.2f", loadavg[2]); 961 resp_array_append_bulk(res, buf); 962 963 return res; 964 } 965 966 resp_object *domain_session_count(resp_object *args) { 967 (void)args; 968 969 size_t count = 0; 970 for (size_t i = 0; i < sessions_count; i++) { 971 if (sessions[i] != NULL) { 972 count++; 973 } 974 } 975 976 resp_object *res = malloc(sizeof(resp_object)); 977 if (!res) return NULL; 978 res->type = RESPT_INT; 979 res->u.i = (long long)count; 980 return res; 981 } 982 983 PT_THREAD(session_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { 984 (void)timestamp; 985 log_trace("session_manager: protothread entry"); 986 PT_BEGIN(pt); 987 988 PT_WAIT_UNTIL(pt, domain_cfg); 989 990 running = 1; 991 log_info("udphole: manager started with port range %d-%d", domain_cfg->port_low, domain_cfg->port_high); 992 993 int64_t last_cleanup = 0; 994 995 for (;;) { 996 int64_t now = (int64_t)(time(NULL)); 997 if (now - last_cleanup >= 1) { 998 cleanup_expired_sessions(); 999 last_cleanup = now; 1000 } 1001 1002 PT_YIELD(pt); 1003 } 1004 1005 running = 0; 1006 1007 for (size_t i = 0; i < sessions_count; i++) { 1008 if (sessions[i]) { 1009 sessions[i]->marked_for_deletion = 1; 1010 } 1011 } 1012 1013 PT_END(pt); 1014 }