session.c (28520B)
1 #include <stdlib.h> 2 #include <string.h> 3 #include <stdio.h> 4 #include <stdarg.h> 5 #include <unistd.h> 6 #include <errno.h> 7 #include <fcntl.h> 8 #include <time.h> 9 #include <sys/socket.h> 10 #include <sys/un.h> 11 #include <sys/stat.h> 12 #include <netinet/in.h> 13 #include <arpa/inet.h> 14 15 #include "rxi/log.h" 16 #include "domain/protothreads.h" 17 #include "domain/scheduler.h" 18 #include "domain/config.h" 19 #include "common/socket_util.h" 20 #include "common/resp.h" 21 #include "tidwall/hashmap.h" 22 #include "session.h" 23 24 #define SESSION_HASH_SIZE 256 25 #define BUFFER_SIZE 4096 26 #define DEFAULT_IDLE_EXPIRY 60 27 28 typedef struct socket { 29 char *socket_id; 30 int *fds; 31 int local_port; 32 int mode; 33 struct sockaddr_storage remote_addr; 34 socklen_t remote_addrlen; 35 int learned_valid; 36 struct sockaddr_storage learned_addr; 37 socklen_t learned_addrlen; 38 } socket_t; 39 40 typedef struct forward { 41 char *src_socket_id; 42 char *dst_socket_id; 43 } forward_t; 44 45 typedef struct session { 46 char *session_id; 47 time_t idle_expiry; 48 time_t created; 49 time_t last_activity; 50 socket_t **sockets; 51 size_t sockets_count; 52 forward_t *forwards; 53 size_t forwards_count; 54 int marked_for_deletion; 55 int *ready_fds; 56 int *all_fds; 57 struct pt pt; 58 struct pt_task *task; 59 } session_t; 60 61 static session_t **sessions = NULL; 62 static size_t sessions_count = 0; 63 static int running = 0; 64 65 static session_t *find_session(const char *session_id) { 66 for (size_t i = 0; i < sessions_count; i++) { 67 if (strcmp(sessions[i]->session_id, session_id) == 0) { 68 return sessions[i]; 69 } 70 } 71 return NULL; 72 } 73 74 static uint64_t socket_hash(const void *item, uint64_t seed0, uint64_t seed1) { 75 const socket_t *s = item; 76 return hashmap_sip(s->socket_id, strlen(s->socket_id), seed0, seed1); 77 } 78 79 static int socket_compare(const void *a, const void *b, void *udata) { 80 (void)udata; 81 const socket_t *sa = a; 82 const socket_t *sb = b; 83 return strcmp(sa->socket_id, sb->socket_id); 84 } 85 86 static socket_t *find_socket(session_t *s, const char *socket_id) { 87 if (!s || !s->sockets || !socket_id) return NULL; 88 for (size_t i = 0; i < s->sockets_count; i++) { 89 if (s->sockets[i] && strcmp(s->sockets[i]->socket_id, socket_id) == 0) { 90 return s->sockets[i]; 91 } 92 } 93 return NULL; 94 } 95 96 static int alloc_port(void) { 97 if (!domain_cfg) return 0; 98 for (int i = 0; i < domain_cfg->port_high - domain_cfg->port_low; i++) { 99 int port = domain_cfg->port_cur + i; 100 if (port > domain_cfg->port_high) port = domain_cfg->port_low; 101 domain_cfg->port_cur = port + 1; 102 if (domain_cfg->port_cur > domain_cfg->port_high) domain_cfg->port_cur = domain_cfg->port_low; 103 104 struct sockaddr_in addr; 105 memset(&addr, 0, sizeof(addr)); 106 addr.sin_family = AF_INET; 107 addr.sin_addr.s_addr = INADDR_ANY; 108 addr.sin_port = htons(port); 109 110 int udp_fd = socket(AF_INET, SOCK_DGRAM, 0); 111 if (udp_fd < 0) continue; 112 int ok = (bind(udp_fd, (struct sockaddr *)&addr, sizeof(addr)) == 0); 113 close(udp_fd); 114 if (!ok) continue; 115 116 return port; 117 } 118 return 0; 119 } 120 121 static int parse_ip_addr(const char *ip_str, int port, struct sockaddr_storage *addr, socklen_t *addrlen) { 122 memset(addr, 0, sizeof(*addr)); 123 124 struct sockaddr_in *addr4 = (struct sockaddr_in *)addr; 125 if (inet_pton(AF_INET, ip_str, &addr4->sin_addr) == 1) { 126 addr4->sin_family = AF_INET; 127 addr4->sin_port = htons(port); 128 *addrlen = sizeof(*addr4); 129 return 0; 130 } 131 132 struct sockaddr_in6 *addr6 = (struct sockaddr_in6 *)addr; 133 if (inet_pton(AF_INET6, ip_str, &addr6->sin6_addr) == 1) { 134 addr6->sin6_family = AF_INET6; 135 addr6->sin6_port = htons(port); 136 *addrlen = sizeof(*addr6); 137 return 0; 138 } 139 140 return -1; 141 } 142 143 static void close_socket(socket_t *sock) { 144 if (!sock || !sock->fds) return; 145 for (int i = 1; i <= sock->fds[0]; i++) { 146 if (sock->fds[i] >= 0) { 147 close(sock->fds[i]); 148 } 149 } 150 free(sock->fds); 151 sock->fds = NULL; 152 } 153 154 static void free_socket(socket_t *sock) { 155 if (!sock) return; 156 close_socket(sock); 157 free(sock->socket_id); 158 free(sock); 159 } 160 161 static void destroy_session(session_t *s) { 162 if (!s) return; 163 s->marked_for_deletion = 1; 164 165 for (size_t i = 0; i < s->sockets_count; i++) { 166 if (s->sockets[i]) { 167 free_socket(s->sockets[i]); 168 } 169 } 170 free(s->sockets); 171 172 for (size_t i = 0; i < s->forwards_count; i++) { 173 free(s->forwards[i].src_socket_id); 174 free(s->forwards[i].dst_socket_id); 175 } 176 free(s->forwards); 177 178 free(s->session_id); 179 free(s); 180 181 for (size_t i = 0; i < sessions_count; i++) { 182 if (sessions[i] == s) { 183 for (size_t j = i; j < sessions_count - 1; j++) { 184 sessions[j] = sessions[j + 1]; 185 } 186 sessions_count--; 187 break; 188 } 189 } 190 } 191 192 static session_t *create_session(const char *session_id, int idle_expiry) { 193 const session_t *cs = find_session(session_id); 194 if (cs) return (session_t *)cs; 195 196 session_t *s = calloc(1, sizeof(*s)); 197 if (!s) return NULL; 198 199 s->session_id = strdup(session_id); 200 s->created = time(NULL); 201 s->last_activity = s->created; 202 s->idle_expiry = idle_expiry > 0 ? idle_expiry : DEFAULT_IDLE_EXPIRY; 203 204 sessions = realloc(sessions, sizeof(session_t *) * (sessions_count + 1)); 205 sessions[sessions_count++] = s; 206 207 return s; 208 } 209 210 static void cleanup_expired_sessions(void) { 211 if (!sessions) return; 212 time_t now = time(NULL); 213 214 for (size_t i = 0; i < sessions_count; i++) { 215 session_t *s = sessions[i]; 216 if (!s) continue; 217 if (now - s->last_activity > s->idle_expiry) { 218 log_debug("udphole: session %s expired (idle %ld > expiry %ld)", 219 s->session_id, (long)(now - s->last_activity), (long)s->idle_expiry); 220 destroy_session(s); 221 } 222 } 223 } 224 225 static int add_forward(session_t *s, const char *src_id, const char *dst_id) { 226 for (size_t i = 0; i < s->forwards_count; i++) { 227 if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && 228 strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) { 229 return 0; 230 } 231 } 232 233 forward_t *new_forwards = realloc(s->forwards, sizeof(forward_t) * (s->forwards_count + 1)); 234 if (!new_forwards) return -1; 235 s->forwards = new_forwards; 236 237 s->forwards[s->forwards_count].src_socket_id = strdup(src_id); 238 s->forwards[s->forwards_count].dst_socket_id = strdup(dst_id); 239 s->forwards_count++; 240 241 return 0; 242 } 243 244 static int remove_forward(session_t *s, const char *src_id, const char *dst_id) { 245 for (size_t i = 0; i < s->forwards_count; i++) { 246 if (strcmp(s->forwards[i].src_socket_id, src_id) == 0 && 247 strcmp(s->forwards[i].dst_socket_id, dst_id) == 0) { 248 free(s->forwards[i].src_socket_id); 249 free(s->forwards[i].dst_socket_id); 250 for (size_t j = i; j < s->forwards_count - 1; j++) { 251 s->forwards[j] = s->forwards[j + 1]; 252 } 253 s->forwards_count--; 254 return 0; 255 } 256 } 257 return -1; 258 } 259 260 static socket_t *create_listen_socket(session_t *sess, const char *socket_id) { 261 socket_t *existing = find_socket(sess, socket_id); 262 if (existing) return existing; 263 264 int port = alloc_port(); 265 if (!port) { 266 log_error("udphole: no ports available"); 267 return NULL; 268 } 269 270 char port_str[16]; 271 snprintf(port_str, sizeof(port_str), "%d", port); 272 int *fds = udp_recv(port_str, NULL, NULL); 273 if (!fds || fds[0] == 0) { 274 log_error("udphole: failed to create UDP socket on port %d", port); 275 free(fds); 276 return NULL; 277 } 278 279 socket_t *sock = calloc(1, sizeof(*sock)); 280 if (!sock) { 281 free(fds); 282 return NULL; 283 } 284 285 sock->socket_id = strdup(socket_id); 286 sock->fds = fds; 287 sock->local_port = port; 288 sock->mode = 0; 289 sock->learned_valid = 0; 290 291 sess->sockets = realloc(sess->sockets, sizeof(socket_t *) * (sess->sockets_count + 1)); 292 sess->sockets[sess->sockets_count++] = sock; 293 294 log_debug("udphole: created listen socket %s in session %s on port %d", 295 socket_id, sess->session_id, port); 296 return sock; 297 } 298 299 static socket_t *create_connect_socket(session_t *sess, const char *socket_id, 300 const char *ip, int port) { 301 socket_t *existing = find_socket(sess, socket_id); 302 if (existing) return existing; 303 304 int local_port = alloc_port(); 305 if (!local_port) { 306 log_error("udphole: no ports available"); 307 return NULL; 308 } 309 310 char port_str[16]; 311 snprintf(port_str, sizeof(port_str), "%d", local_port); 312 int *fds = udp_recv(port_str, NULL, NULL); 313 if (!fds || fds[0] == 0) { 314 log_error("udphole: failed to create UDP socket on port %d", local_port); 315 free(fds); 316 return NULL; 317 } 318 319 struct sockaddr_storage remote_addr; 320 socklen_t remote_addrlen; 321 if (parse_ip_addr(ip, port, &remote_addr, &remote_addrlen) != 0) { 322 log_error("udphole: invalid remote address %s:%d", ip, port); 323 free(fds); 324 return NULL; 325 } 326 327 socket_t *sock = calloc(1, sizeof(*sock)); 328 if (!sock) { 329 free(fds); 330 return NULL; 331 } 332 333 sock->socket_id = strdup(socket_id); 334 sock->fds = fds; 335 sock->local_port = local_port; 336 sock->mode = 1; 337 sock->remote_addr = remote_addr; 338 sock->remote_addrlen = remote_addrlen; 339 sock->learned_valid = 0; 340 341 sess->sockets = realloc(sess->sockets, sizeof(socket_t *) * (sess->sockets_count + 1)); 342 sess->sockets[sess->sockets_count++] = sock; 343 344 log_debug("udphole: created connect socket %s in session %s on port %d -> %s:%d", 345 socket_id, sess->session_id, local_port, ip, port); 346 return sock; 347 } 348 349 static int destroy_socket(session_t *sess, const char *socket_id) { 350 socket_t *sock = find_socket(sess, socket_id); 351 if (!sock) return -1; 352 353 for (size_t i = 0; i < sess->sockets_count; i++) { 354 if (sess->sockets[i] == sock) { 355 sess->sockets[i] = NULL; 356 break; 357 } 358 } 359 free_socket(sock); 360 361 for (size_t i = 0; i < sess->forwards_count; ) { 362 if (strcmp(sess->forwards[i].src_socket_id, socket_id) == 0 || 363 strcmp(sess->forwards[i].dst_socket_id, socket_id) == 0) { 364 free(sess->forwards[i].src_socket_id); 365 free(sess->forwards[i].dst_socket_id); 366 for (size_t j = i; j < sess->forwards_count - 1; j++) { 367 sess->forwards[j] = sess->forwards[j + 1]; 368 } 369 sess->forwards_count--; 370 } else { 371 i++; 372 } 373 } 374 375 return 0; 376 } 377 378 static socket_t *find_socket_by_fd(session_t *s, int fd) { 379 if (!s || !s->sockets) return NULL; 380 for (size_t j = 0; j < s->sockets_count; j++) { 381 socket_t *sock = s->sockets[j]; 382 if (!sock || !sock->fds) continue; 383 for (int i = 1; i <= sock->fds[0]; i++) { 384 if (sock->fds[i] == fd) { 385 return sock; 386 } 387 } 388 } 389 return NULL; 390 } 391 392 PT_THREAD(session_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { 393 session_t *s = task->udata; 394 395 (void)timestamp; 396 PT_BEGIN(pt); 397 398 char buffer[BUFFER_SIZE]; 399 400 for (;;) { 401 if (s->marked_for_deletion) { 402 break; 403 } 404 405 if (!s->sockets || s->sockets_count == 0) { 406 PT_YIELD(pt); 407 continue; 408 } 409 410 s->all_fds = realloc(s->all_fds, sizeof(int) * (s->sockets_count * 2 + 1)); 411 if (!s->all_fds) { 412 PT_YIELD(pt); 413 continue; 414 } 415 s->all_fds[0] = 0; 416 417 for (size_t j = 0; j < s->sockets_count; j++) { 418 socket_t *sock = s->sockets[j]; 419 if (!sock || !sock->fds) continue; 420 for (int i = 1; i <= sock->fds[0]; i++) { 421 s->all_fds[++s->all_fds[0]] = sock->fds[i]; 422 } 423 } 424 425 if (s->all_fds[0] == 0) { 426 PT_YIELD(pt); 427 continue; 428 } 429 430 PT_WAIT_UNTIL(pt, domain_schedmod_has_data(s->all_fds, &s->ready_fds) > 0); 431 432 if (!s->ready_fds || s->ready_fds[0] == 0) { 433 PT_YIELD(pt); 434 continue; 435 } 436 437 for (int r = 1; r <= s->ready_fds[0]; r++) { 438 int ready_fd = s->ready_fds[r]; 439 440 socket_t *src_sock = find_socket_by_fd(s, ready_fd); 441 if (!src_sock) continue; 442 443 struct sockaddr_storage from_addr; 444 socklen_t from_len = sizeof(from_addr); 445 ssize_t n = recvfrom(ready_fd, buffer, sizeof(buffer) - 1, 0, 446 (struct sockaddr *)&from_addr, &from_len); 447 448 if (n <= 0) { 449 if (errno != EAGAIN && errno != EWOULDBLOCK) { 450 log_warn("udphole: recvfrom error on socket %s: %s", 451 src_sock->socket_id, strerror(errno)); 452 } 453 continue; 454 } 455 456 s->last_activity = time(NULL); 457 458 if (src_sock->mode == 0 && !src_sock->learned_valid) { 459 src_sock->learned_addr = from_addr; 460 src_sock->learned_addrlen = from_len; 461 src_sock->learned_valid = 1; 462 log_debug("udphole: socket %s learned remote address", src_sock->socket_id); 463 } 464 465 for (size_t i = 0; i < s->forwards_count; i++) { 466 if (strcmp(s->forwards[i].src_socket_id, src_sock->socket_id) != 0) { 467 continue; 468 } 469 470 socket_t *dst_sock = find_socket(s, s->forwards[i].dst_socket_id); 471 if (!dst_sock || !dst_sock->fds || dst_sock->fds[0] == 0) continue; 472 473 struct sockaddr *dest_addr = NULL; 474 socklen_t dest_addrlen = 0; 475 476 if (dst_sock->mode == 1) { 477 dest_addr = (struct sockaddr *)&dst_sock->remote_addr; 478 dest_addrlen = dst_sock->remote_addrlen; 479 } else if (dst_sock->learned_valid) { 480 dest_addr = (struct sockaddr *)&dst_sock->learned_addr; 481 dest_addrlen = dst_sock->learned_addrlen; 482 } 483 484 if (dest_addr && dest_addrlen > 0) { 485 int dst_fd = dst_sock->fds[1]; 486 ssize_t sent = sendto(dst_fd, buffer, n, 0, dest_addr, dest_addrlen); 487 if (sent < 0) { 488 log_warn("udphole: forward failed %s -> %s: %s", 489 src_sock->socket_id, dst_sock->socket_id, strerror(errno)); 490 } 491 } 492 } 493 } 494 495 } 496 497 log_debug("udphole: session %s protothread exiting", s->session_id); 498 499 if (s->all_fds) { 500 free(s->all_fds); 501 s->all_fds = NULL; 502 } 503 if (s->ready_fds) { 504 free(s->ready_fds); 505 s->ready_fds = NULL; 506 } 507 508 PT_END(pt); 509 } 510 511 static void spawn_session_pt(session_t *s) { 512 s->task = (struct pt_task *)(intptr_t)domain_schedmod_pt_create(session_pt, s); 513 } 514 515 resp_object *domain_session_create(const char *cmd, resp_object *args) { 516 (void)cmd; 517 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) { 518 resp_object *err = resp_array_init(); 519 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.create'"); 520 return err; 521 } 522 523 const char *session_id = NULL; 524 if (args->u.arr.n > 1 && args->u.arr.elem[1].type == RESPT_BULK) { 525 session_id = args->u.arr.elem[1].u.s; 526 } 527 528 if (!session_id) { 529 resp_object *err = resp_array_init(); 530 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.create'"); 531 return err; 532 } 533 534 int idle_expiry = 0; 535 if (args->u.arr.n >= 3 && args->u.arr.elem[2].type == RESPT_BULK && args->u.arr.elem[2].u.s) { 536 idle_expiry = atoi(args->u.arr.elem[2].u.s); 537 } 538 539 session_t *s = create_session(session_id, idle_expiry); 540 if (!s) { 541 resp_object *err = resp_array_init(); 542 resp_array_append_simple(err, "ERR failed to create session"); 543 return err; 544 } 545 546 spawn_session_pt(s); 547 548 resp_object *res = resp_array_init(); 549 resp_array_append_simple(res, "OK"); 550 return res; 551 } 552 553 resp_object *domain_session_list(const char *cmd, resp_object *args) { 554 (void)cmd; 555 (void)args; 556 557 resp_object *res = resp_array_init(); 558 if (!res) return NULL; 559 560 for (size_t i = 0; i < sessions_count; i++) { 561 session_t *s = sessions[i]; 562 if (!s) continue; 563 resp_array_append_bulk(res, s->session_id); 564 } 565 566 return res; 567 } 568 569 resp_object *domain_session_info(const char *cmd, resp_object *args) { 570 (void)cmd; 571 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) { 572 resp_object *err = resp_array_init(); 573 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.info'"); 574 return err; 575 } 576 577 const char *session_id = NULL; 578 if (args->u.arr.elem[1].type == RESPT_BULK) { 579 session_id = args->u.arr.elem[1].u.s; 580 } 581 582 if (!session_id) { 583 resp_object *err = resp_array_init(); 584 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.info'"); 585 return err; 586 } 587 588 session_t *s = find_session(session_id); 589 if (!s) { 590 resp_object *err = resp_array_init(); 591 resp_array_append_simple(err, "ERR session not found"); 592 return err; 593 } 594 595 resp_object *res = resp_array_init(); 596 if (!res) return NULL; 597 598 resp_array_append_bulk(res, "session_id"); 599 resp_array_append_bulk(res, s->session_id); 600 601 resp_array_append_bulk(res, "created"); 602 resp_array_append_int(res, (long long)s->created); 603 604 resp_array_append_bulk(res, "last_activity"); 605 resp_array_append_int(res, (long long)s->last_activity); 606 607 resp_array_append_bulk(res, "idle_expiry"); 608 resp_array_append_int(res, (long long)s->idle_expiry); 609 610 resp_array_append_bulk(res, "sockets"); 611 resp_object *sockets_arr = resp_array_init(); 612 for (size_t i = 0; i < s->sockets_count; i++) { 613 socket_t *sock = s->sockets[i]; 614 if (!sock) continue; 615 resp_array_append_bulk(sockets_arr, sock->socket_id); 616 } 617 resp_array_append_obj(res, sockets_arr); 618 619 resp_array_append_bulk(res, "forwards"); 620 resp_object *forwards_arr = resp_array_init(); 621 for (size_t i = 0; i < s->forwards_count; i++) { 622 resp_array_append_bulk(forwards_arr, s->forwards[i].src_socket_id); 623 resp_array_append_bulk(forwards_arr, s->forwards[i].dst_socket_id); 624 } 625 resp_array_append_obj(res, forwards_arr); 626 627 resp_array_append_bulk(res, "marked_for_deletion"); 628 resp_array_append_int(res, s->marked_for_deletion); 629 630 return res; 631 } 632 633 resp_object *domain_session_destroy(const char *cmd, resp_object *args) { 634 (void)cmd; 635 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) { 636 resp_object *err = resp_array_init(); 637 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.destroy'"); 638 return err; 639 } 640 641 const char *session_id = NULL; 642 if (args->u.arr.elem[1].type == RESPT_BULK) { 643 session_id = args->u.arr.elem[1].u.s; 644 } 645 646 if (!session_id) { 647 resp_object *err = resp_array_init(); 648 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.destroy'"); 649 return err; 650 } 651 652 session_t *s = find_session(session_id); 653 if (!s) { 654 resp_object *err = resp_array_init(); 655 resp_array_append_simple(err, "ERR session not found"); 656 return err; 657 } 658 659 destroy_session(s); 660 661 resp_object *res = resp_array_init(); 662 resp_array_append_simple(res, "OK"); 663 return res; 664 } 665 666 resp_object *domain_socket_create_listen(const char *cmd, resp_object *args) { 667 (void)cmd; 668 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 3) { 669 resp_object *err = resp_array_init(); 670 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.listen'"); 671 return err; 672 } 673 674 const char *session_id = NULL; 675 const char *socket_id = NULL; 676 677 if (args->u.arr.elem[1].type == RESPT_BULK) { 678 session_id = args->u.arr.elem[1].u.s; 679 } 680 if (args->u.arr.elem[2].type == RESPT_BULK) { 681 socket_id = args->u.arr.elem[2].u.s; 682 } 683 684 if (!session_id || !socket_id) { 685 resp_object *err = resp_array_init(); 686 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.listen'"); 687 return err; 688 } 689 690 session_t *s = find_session(session_id); 691 if (!s) { 692 resp_object *err = resp_array_init(); 693 resp_array_append_simple(err, "ERR session not found"); 694 return err; 695 } 696 697 socket_t *sock = create_listen_socket(s, socket_id); 698 if (!sock) { 699 resp_object *err = resp_array_init(); 700 resp_array_append_simple(err, "ERR failed to create socket"); 701 return err; 702 } 703 704 resp_object *res = resp_array_init(); 705 resp_array_append_int(res, sock->local_port); 706 resp_array_append_bulk(res, domain_cfg && domain_cfg->advertise_addr ? domain_cfg->advertise_addr : ""); 707 return res; 708 } 709 710 resp_object *domain_socket_create_connect(const char *cmd, resp_object *args) { 711 (void)cmd; 712 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 5) { 713 resp_object *err = resp_array_init(); 714 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.connect'"); 715 return err; 716 } 717 718 const char *session_id = NULL; 719 const char *socket_id = NULL; 720 const char *ip = NULL; 721 const char *port_str = NULL; 722 723 if (args->u.arr.elem[1].type == RESPT_BULK) { 724 session_id = args->u.arr.elem[1].u.s; 725 } 726 if (args->u.arr.elem[2].type == RESPT_BULK) { 727 socket_id = args->u.arr.elem[2].u.s; 728 } 729 if (args->u.arr.elem[3].type == RESPT_BULK) { 730 ip = args->u.arr.elem[3].u.s; 731 } 732 if (args->u.arr.elem[4].type == RESPT_BULK) { 733 port_str = args->u.arr.elem[4].u.s; 734 } 735 736 if (!session_id || !socket_id || !ip || !port_str) { 737 resp_object *err = resp_array_init(); 738 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.create.connect'"); 739 return err; 740 } 741 742 int port = atoi(port_str); 743 744 session_t *s = find_session(session_id); 745 if (!s) { 746 resp_object *err = resp_array_init(); 747 resp_array_append_simple(err, "ERR session not found"); 748 return err; 749 } 750 751 socket_t *sock = create_connect_socket(s, socket_id, ip, port); 752 if (!sock) { 753 resp_object *err = resp_array_init(); 754 resp_array_append_simple(err, "ERR failed to create socket"); 755 return err; 756 } 757 758 resp_object *res = resp_array_init(); 759 resp_array_append_int(res, sock->local_port); 760 resp_array_append_bulk(res, domain_cfg && domain_cfg->advertise_addr ? domain_cfg->advertise_addr : ""); 761 return res; 762 } 763 764 resp_object *domain_socket_destroy(const char *cmd, resp_object *args) { 765 (void)cmd; 766 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 3) { 767 resp_object *err = resp_array_init(); 768 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.destroy'"); 769 return err; 770 } 771 772 const char *session_id = NULL; 773 const char *socket_id = NULL; 774 775 if (args->u.arr.elem[1].type == RESPT_BULK) { 776 session_id = args->u.arr.elem[1].u.s; 777 } 778 if (args->u.arr.elem[2].type == RESPT_BULK) { 779 socket_id = args->u.arr.elem[2].u.s; 780 } 781 782 if (!session_id || !socket_id) { 783 resp_object *err = resp_array_init(); 784 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.socket.destroy'"); 785 return err; 786 } 787 788 session_t *s = find_session(session_id); 789 if (!s) { 790 resp_object *err = resp_array_init(); 791 resp_array_append_simple(err, "ERR session not found"); 792 return err; 793 } 794 795 if (destroy_socket(s, socket_id) != 0) { 796 resp_object *err = resp_array_init(); 797 resp_array_append_simple(err, "ERR socket not found"); 798 return err; 799 } 800 801 resp_object *res = resp_array_init(); 802 resp_array_append_simple(res, "OK"); 803 return res; 804 } 805 806 resp_object *domain_forward_list(const char *cmd, resp_object *args) { 807 (void)cmd; 808 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 2) { 809 resp_object *err = resp_array_init(); 810 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.list'"); 811 return err; 812 } 813 814 const char *session_id = NULL; 815 if (args->u.arr.elem[1].type == RESPT_BULK) { 816 session_id = args->u.arr.elem[1].u.s; 817 } 818 819 if (!session_id) { 820 resp_object *err = resp_array_init(); 821 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.list'"); 822 return err; 823 } 824 825 session_t *s = find_session(session_id); 826 if (!s) { 827 resp_object *err = resp_array_init(); 828 resp_array_append_simple(err, "ERR session not found"); 829 return err; 830 } 831 832 resp_object *res = resp_array_init(); 833 for (size_t i = 0; i < s->forwards_count; i++) { 834 resp_array_append_bulk(res, s->forwards[i].src_socket_id); 835 resp_array_append_bulk(res, s->forwards[i].dst_socket_id); 836 } 837 return res; 838 } 839 840 resp_object *domain_forward_create(const char *cmd, resp_object *args) { 841 (void)cmd; 842 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 4) { 843 resp_object *err = resp_array_init(); 844 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.create'"); 845 return err; 846 } 847 848 const char *session_id = NULL; 849 const char *src_socket_id = NULL; 850 const char *dst_socket_id = NULL; 851 852 if (args->u.arr.elem[1].type == RESPT_BULK) { 853 session_id = args->u.arr.elem[1].u.s; 854 } 855 if (args->u.arr.elem[2].type == RESPT_BULK) { 856 src_socket_id = args->u.arr.elem[2].u.s; 857 } 858 if (args->u.arr.elem[3].type == RESPT_BULK) { 859 dst_socket_id = args->u.arr.elem[3].u.s; 860 } 861 862 if (!session_id || !src_socket_id || !dst_socket_id) { 863 resp_object *err = resp_array_init(); 864 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.create'"); 865 return err; 866 } 867 868 session_t *s = find_session(session_id); 869 if (!s) { 870 resp_object *err = resp_array_init(); 871 resp_array_append_simple(err, "ERR session not found"); 872 return err; 873 } 874 875 socket_t *src = find_socket(s, src_socket_id); 876 if (!src) { 877 resp_object *err = resp_array_init(); 878 resp_array_append_simple(err, "ERR source socket not found"); 879 return err; 880 } 881 882 socket_t *dst = find_socket(s, dst_socket_id); 883 if (!dst) { 884 resp_object *err = resp_array_init(); 885 resp_array_append_simple(err, "ERR destination socket not found"); 886 return err; 887 } 888 889 if (add_forward(s, src_socket_id, dst_socket_id) != 0) { 890 resp_object *err = resp_array_init(); 891 resp_array_append_simple(err, "ERR failed to add forward"); 892 return err; 893 } 894 895 log_debug("udphole: created forward %s -> %s in session %s", src_socket_id, dst_socket_id, session_id); 896 897 resp_object *res = resp_array_init(); 898 resp_array_append_simple(res, "OK"); 899 return res; 900 } 901 902 resp_object *domain_forward_destroy(const char *cmd, resp_object *args) { 903 (void)cmd; 904 if (!args || args->type != RESPT_ARRAY || args->u.arr.n < 4) { 905 resp_object *err = resp_array_init(); 906 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.destroy'"); 907 return err; 908 } 909 910 const char *session_id = NULL; 911 const char *src_socket_id = NULL; 912 const char *dst_socket_id = NULL; 913 914 if (args->u.arr.elem[1].type == RESPT_BULK) { 915 session_id = args->u.arr.elem[1].u.s; 916 } 917 if (args->u.arr.elem[2].type == RESPT_BULK) { 918 src_socket_id = args->u.arr.elem[2].u.s; 919 } 920 if (args->u.arr.elem[3].type == RESPT_BULK) { 921 dst_socket_id = args->u.arr.elem[3].u.s; 922 } 923 924 if (!session_id || !src_socket_id || !dst_socket_id) { 925 resp_object *err = resp_array_init(); 926 resp_array_append_simple(err, "ERR wrong number of arguments for 'session.forward.destroy'"); 927 return err; 928 } 929 930 session_t *s = find_session(session_id); 931 if (!s) { 932 resp_object *err = resp_array_init(); 933 resp_array_append_simple(err, "ERR session not found"); 934 return err; 935 } 936 937 if (remove_forward(s, src_socket_id, dst_socket_id) != 0) { 938 resp_object *err = resp_array_init(); 939 resp_array_append_simple(err, "ERR forward not found"); 940 return err; 941 } 942 943 resp_object *res = resp_array_init(); 944 resp_array_append_simple(res, "OK"); 945 return res; 946 } 947 948 resp_object *domain_system_load(const char *cmd, resp_object *args) { 949 (void)cmd; 950 (void)args; 951 952 double loadavg[3]; 953 if (getloadavg(loadavg, 3) != 3) { 954 resp_object *err = resp_array_init(); 955 resp_array_append_simple(err, "ERR failed to get load average"); 956 return err; 957 } 958 959 resp_object *res = resp_array_init(); 960 char buf[64]; 961 962 resp_array_append_bulk(res, "1min"); 963 snprintf(buf, sizeof(buf), "%.2f", loadavg[0]); 964 resp_array_append_bulk(res, buf); 965 966 resp_array_append_bulk(res, "5min"); 967 snprintf(buf, sizeof(buf), "%.2f", loadavg[1]); 968 resp_array_append_bulk(res, buf); 969 970 resp_array_append_bulk(res, "15min"); 971 snprintf(buf, sizeof(buf), "%.2f", loadavg[2]); 972 resp_array_append_bulk(res, buf); 973 974 return res; 975 } 976 977 resp_object *domain_session_count(const char *cmd, resp_object *args) { 978 (void)cmd; 979 (void)args; 980 981 size_t count = 0; 982 for (size_t i = 0; i < sessions_count; i++) { 983 if (sessions[i] != NULL) { 984 count++; 985 } 986 } 987 988 resp_object *res = malloc(sizeof(resp_object)); 989 if (!res) return NULL; 990 res->type = RESPT_INT; 991 res->u.i = (long long)count; 992 return res; 993 } 994 995 PT_THREAD(session_manager_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { 996 (void)timestamp; 997 log_trace("session_manager: protothread entry"); 998 PT_BEGIN(pt); 999 1000 PT_WAIT_UNTIL(pt, domain_cfg); 1001 1002 running = 1; 1003 log_info("udphole: manager started with port range %d-%d", domain_cfg->port_low, domain_cfg->port_high); 1004 1005 int64_t last_cleanup = 0; 1006 1007 for (;;) { 1008 int64_t now = (int64_t)(time(NULL)); 1009 if (now - last_cleanup >= 1) { 1010 cleanup_expired_sessions(); 1011 last_cleanup = now; 1012 } 1013 1014 PT_YIELD(pt); 1015 } 1016 1017 running = 0; 1018 1019 for (size_t i = 0; i < sessions_count; i++) { 1020 if (sessions[i]) { 1021 sessions[i]->marked_for_deletion = 1; 1022 } 1023 } 1024 1025 PT_END(pt); 1026 }