server.c (19886B)
1 /* 2 * Generic RESP2 API server: TCP listener, connection management, RESP2 3 * parsing/writing, command hashmap, authentication with per-user permit 4 * checking, and built-in commands (auth, ping, quit, command). 5 * 6 * Runs as a protothread in the main select() loop. 7 */ 8 9 #include <stdlib.h> 10 #include <string.h> 11 #include <stdio.h> 12 #include <ctype.h> 13 #include <unistd.h> 14 #include <errno.h> 15 #include <fcntl.h> 16 #include <sys/socket.h> 17 #include <sys/types.h> 18 #include <netinet/in.h> 19 #include <arpa/inet.h> 20 #include <netdb.h> 21 22 #include "rxi/log.h" 23 #include "tidwall/hashmap.h" 24 #include "domain/protothreads.h" 25 #include "domain/scheduler.h" 26 #include "common/socket_util.h" 27 #include "infrastructure/config.h" 28 #include "interface/api/server.h" 29 #include "common/resp.h" 30 31 struct pt_task; 32 PT_THREAD(api_client_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)); 33 34 #define API_MAX_CLIENTS 8 35 #define READ_BUF_SIZE 4096 36 #define WRITE_BUF_INIT 4096 37 #define MAX_ARGS 32 38 39 struct api_client_state { 40 int fd; 41 int *fds; 42 int *ready_fds; 43 int ready_fd; 44 char *username; 45 char rbuf[READ_BUF_SIZE]; 46 size_t rlen; 47 char *wbuf; 48 size_t wlen; 49 size_t wcap; 50 }; 51 52 typedef struct api_client_state api_client_t; 53 54 typedef struct { 55 const char *name; 56 char (*func)(api_client_t *c, char **args, int nargs); 57 } api_cmd_entry; 58 59 typedef struct { 60 const char *name; 61 domain_cmd_fn func; 62 } domain_cmd_entry; 63 64 static char *current_listen = NULL; 65 static struct hashmap *cmd_map = NULL; 66 static struct hashmap *domain_cmd_map = NULL; 67 68 typedef struct { 69 int *server_fds; 70 int *ready_fds; 71 } api_server_udata_t; 72 73 bool api_write_raw(api_client_t *c, const void *data, size_t len) { 74 if (c->fd < 0) return false; 75 if (c->wlen + len > c->wcap) { 76 size_t need = c->wlen + len; 77 size_t ncap = c->wcap ? c->wcap : WRITE_BUF_INIT; 78 while (ncap < need) ncap *= 2; 79 char *nb = realloc(c->wbuf, ncap); 80 if (!nb) return false; 81 c->wbuf = nb; 82 c->wcap = ncap; 83 } 84 memcpy(c->wbuf + c->wlen, data, len); 85 c->wlen += len; 86 return true; 87 } 88 89 bool api_write_cstr(api_client_t *c, const char *s) { 90 return api_write_raw(c, s, strlen(s)); 91 } 92 93 bool api_write_ok(api_client_t *c) { 94 return api_write_cstr(c, "+OK\r\n"); 95 } 96 97 bool api_write_err(api_client_t *c, const char *msg) { 98 if (!api_write_cstr(c, "-ERR ")) return false; 99 if (!api_write_cstr(c, msg)) return false; 100 return api_write_cstr(c, "\r\n"); 101 } 102 103 bool api_write_nil(api_client_t *c) { 104 return api_write_cstr(c, "$-1\r\n"); 105 } 106 107 bool api_write_int(api_client_t *c, int value) { 108 char buf[32]; 109 snprintf(buf, sizeof(buf), ":%d\r\n", value); 110 return api_write_cstr(c, buf); 111 } 112 113 bool api_write_array(api_client_t *c, size_t nitems) { 114 char buf[32]; 115 snprintf(buf, sizeof(buf), "*%zu\r\n", nitems); 116 return api_write_cstr(c, buf); 117 } 118 119 bool api_write_bulk_cstr(api_client_t *c, const char *s) { 120 if (!s) return api_write_nil(c); 121 size_t len = strlen(s); 122 char prefix[32]; 123 snprintf(prefix, sizeof(prefix), "$%zu\r\n", len); 124 if (!api_write_cstr(c, prefix)) return false; 125 if (!api_write_raw(c, s, len)) return false; 126 return api_write_cstr(c, "\r\n"); 127 } 128 129 bool api_write_bulk_int(api_client_t *c, int val) { 130 char buf[32]; 131 snprintf(buf, sizeof(buf), "%d", val); 132 return api_write_bulk_cstr(c, buf); 133 } 134 135 static void client_close(api_client_t *c) { 136 if (c->fd >= 0) { 137 close(c->fd); 138 c->fd = -1; 139 } 140 free(c->wbuf); 141 c->wbuf = NULL; 142 c->wlen = c->wcap = 0; 143 c->rlen = 0; 144 free(c->username); 145 c->username = NULL; 146 } 147 148 static void client_flush(api_client_t *c) { 149 if (c->fd < 0 || c->wlen == 0) return; 150 ssize_t n = send(c->fd, c->wbuf, c->wlen, 0); 151 if (n > 0) { 152 if ((size_t)n < c->wlen) 153 memmove(c->wbuf, c->wbuf + n, c->wlen - (size_t)n); 154 c->wlen -= (size_t)n; 155 } else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { 156 client_close(c); 157 } 158 } 159 160 static int parse_inline(const char *line, size_t len, char **args, int max_args) { 161 int nargs = 0; 162 const char *p = line; 163 const char *end = line + len; 164 while (p < end && nargs < max_args) { 165 while (p < end && (*p == ' ' || *p == '\t')) p++; 166 if (p >= end) break; 167 const char *start; 168 const char *tok_end; 169 if (*p == '"' || *p == '\'') { 170 char quote = *p++; 171 start = p; 172 while (p < end && *p != quote) p++; 173 tok_end = p; 174 if (p < end) p++; 175 } else { 176 start = p; 177 while (p < end && *p != ' ' && *p != '\t') p++; 178 tok_end = p; 179 } 180 size_t tlen = (size_t)(tok_end - start); 181 char *arg = malloc(tlen + 1); 182 if (!arg) return -1; 183 memcpy(arg, start, tlen); 184 arg[tlen] = '\0'; 185 args[nargs++] = arg; 186 } 187 return nargs; 188 } 189 190 static int parse_resp_command(api_client_t *c, char **args, int max_args, int *nargs) { 191 *nargs = 0; 192 if (c->rlen == 0) return 0; 193 194 if (c->rbuf[0] != '*') { 195 char *nl = memchr(c->rbuf, '\n', c->rlen); 196 if (!nl) return 0; 197 size_t line_len = (size_t)(nl - c->rbuf); 198 size_t trim = line_len; 199 if (trim > 0 && c->rbuf[trim - 1] == '\r') trim--; 200 int n = parse_inline(c->rbuf, trim, args, max_args); 201 if (n < 0) return -1; 202 *nargs = n; 203 size_t consumed = line_len + 1; 204 c->rlen -= consumed; 205 if (c->rlen > 0) memmove(c->rbuf, c->rbuf + consumed, c->rlen); 206 return n > 0 ? 1 : 0; 207 } 208 209 size_t pos = 0; 210 char *nl = memchr(c->rbuf + pos, '\n', c->rlen - pos); 211 if (!nl) return 0; 212 int count = atoi(c->rbuf + 1); 213 if (count <= 0 || count > max_args) return -1; 214 pos = (size_t)(nl - c->rbuf) + 1; 215 216 for (int i = 0; i < count; i++) { 217 if (pos >= c->rlen) return 0; 218 if (c->rbuf[pos] != '$') return -1; 219 nl = memchr(c->rbuf + pos, '\n', c->rlen - pos); 220 if (!nl) return 0; 221 int blen = atoi(c->rbuf + pos + 1); 222 if (blen < 0) return -1; 223 size_t hdr_end = (size_t)(nl - c->rbuf) + 1; 224 if (hdr_end + (size_t)blen + 2 > c->rlen) return 0; 225 char *arg = malloc((size_t)blen + 1); 226 if (!arg) return -1; 227 memcpy(arg, c->rbuf + hdr_end, (size_t)blen); 228 arg[blen] = '\0'; 229 args[i] = arg; 230 pos = hdr_end + (size_t)blen + 2; 231 } 232 233 *nargs = count; 234 c->rlen -= pos; 235 if (c->rlen > 0) memmove(c->rbuf, c->rbuf + pos, c->rlen); 236 return 1; 237 } 238 239 static bool permit_matches(const char *pattern, const char *cmd) { 240 size_t plen = strlen(pattern); 241 if (plen == 1 && pattern[0] == '*') 242 return true; 243 if (plen >= 2 && pattern[plen - 1] == '*') { 244 return strncasecmp(pattern, cmd, plen - 1) == 0; 245 } 246 return strcasecmp(pattern, cmd) == 0; 247 } 248 249 static bool user_has_permit(api_client_t *c, const char *cmd) { 250 char section[128]; 251 const char *uname = (c->username && c->username[0]) ? c->username : "*"; 252 snprintf(section, sizeof(section), "user:%s", uname); 253 resp_object *sec = resp_map_get(global_cfg, section); 254 if (sec && sec->type == RESPT_ARRAY) { 255 for (size_t i = 0; i < sec->u.arr.n; i += 2) { 256 if (i + 1 < sec->u.arr.n) { 257 resp_object *key = &sec->u.arr.elem[i]; 258 resp_object *val = &sec->u.arr.elem[i + 1]; 259 if (key->type == RESPT_BULK && key->u.s && strcmp(key->u.s, "permit") == 0) { 260 if (val->type == RESPT_ARRAY) { 261 for (size_t j = 0; j < val->u.arr.n; j++) { 262 resp_object *p = &val->u.arr.elem[j]; 263 if (p->type == RESPT_BULK && p->u.s && permit_matches(p->u.s, cmd)) 264 return true; 265 } 266 } else if (val->type == RESPT_BULK && val->u.s && permit_matches(val->u.s, cmd)) { 267 return true; 268 } 269 } 270 } 271 } 272 } 273 if (strcmp(uname, "*") != 0) { 274 resp_object *anon = resp_map_get(global_cfg, "user:*"); 275 if (anon && anon->type == RESPT_ARRAY) { 276 for (size_t i = 0; i < anon->u.arr.n; i += 2) { 277 if (i + 1 < anon->u.arr.n) { 278 resp_object *key = &anon->u.arr.elem[i]; 279 resp_object *val = &anon->u.arr.elem[i + 1]; 280 if (key->type == RESPT_BULK && key->u.s && strcmp(key->u.s, "permit") == 0) { 281 if (val->type == RESPT_ARRAY) { 282 for (size_t j = 0; j < val->u.arr.n; j++) { 283 resp_object *p = &val->u.arr.elem[j]; 284 if (p->type == RESPT_BULK && p->u.s && permit_matches(p->u.s, cmd)) 285 return true; 286 } 287 } else if (val->type == RESPT_BULK && val->u.s && permit_matches(val->u.s, cmd)) { 288 return true; 289 } 290 } 291 } 292 } 293 } 294 } 295 return false; 296 } 297 298 static uint64_t cmd_hash(const void *item, uint64_t seed0, uint64_t seed1) { 299 const api_cmd_entry *cmd = item; 300 return hashmap_sip(cmd->name, strlen(cmd->name), seed0, seed1); 301 } 302 303 static int cmd_compare(const void *a, const void *b, void *udata) { 304 (void)udata; 305 const api_cmd_entry *ca = a; 306 const api_cmd_entry *cb = b; 307 return strcasecmp(ca->name, cb->name); 308 } 309 310 void api_register_cmd(const char *name, char (*func)(api_client_t *, char **, int)) { 311 if (!cmd_map) 312 cmd_map = hashmap_new(sizeof(api_cmd_entry), 0, 0, 0, cmd_hash, cmd_compare, NULL, NULL); 313 hashmap_set(cmd_map, &(api_cmd_entry){ .name = name, .func = func }); 314 log_trace("api: registered command '%s'", name); 315 } 316 317 static uint64_t domain_cmd_hash(const void *item, uint64_t seed0, uint64_t seed1) { 318 const domain_cmd_entry *cmd = item; 319 return hashmap_sip(cmd->name, strlen(cmd->name), seed0, seed1); 320 } 321 322 static int domain_cmd_compare(const void *a, const void *b, void *udata) { 323 (void)udata; 324 const domain_cmd_entry *ca = a; 325 const domain_cmd_entry *cb = b; 326 return strcasecmp(ca->name, cb->name); 327 } 328 329 void api_register_domain_cmd(const char *name, domain_cmd_fn func) { 330 if (!domain_cmd_map) 331 domain_cmd_map = hashmap_new(sizeof(domain_cmd_entry), 0, 0, 0, domain_cmd_hash, domain_cmd_compare, NULL, NULL); 332 hashmap_set(domain_cmd_map, &(domain_cmd_entry){ .name = name, .func = func }); 333 log_trace("api: registered domain command '%s'", name); 334 } 335 336 static char cmdAUTH(api_client_t *c, char **args, int nargs) { 337 if (nargs != 3) { 338 api_write_err(c, "wrong number of arguments for 'auth' command (AUTH username password)"); 339 return 1; 340 } 341 const char *uname = args[1]; 342 const char *pass = args[2]; 343 char section[128]; 344 snprintf(section, sizeof(section), "user:%s", uname); 345 resp_object *sec = resp_map_get(global_cfg, section); 346 const char *secret = sec ? resp_map_get_string(sec, "secret") : NULL; 347 if (secret && pass && strcmp(secret, pass) == 0) { 348 free(c->username); 349 c->username = strdup(uname); 350 if (c->username) { 351 log_debug("api: client authenticated as '%s'", uname); 352 return api_write_ok(c) ? 1 : 0; 353 } 354 } 355 return api_write_err(c, "invalid credentials") ? 1 : 0; 356 } 357 358 static char cmdPING(api_client_t *c, char **args, int nargs) { 359 (void)args; 360 if (nargs == 1) 361 return api_write_cstr(c, "+PONG\r\n") ? 1 : 0; 362 if (nargs == 2) 363 return api_write_bulk_cstr(c, args[1]) ? 1 : 0; 364 return api_write_err(c, "wrong number of arguments for 'ping' command") ? 1 : 0; 365 } 366 367 static char cmdQUIT(api_client_t *c, char **args, int nargs) { 368 (void)args; (void)nargs; 369 api_write_ok(c); 370 return 0; 371 } 372 373 static bool is_builtin(const char *name); 374 375 static char cmdCOMMAND(api_client_t *c, char **args, int nargs) { 376 (void)args; 377 if (!cmd_map && !domain_cmd_map) 378 return api_write_array(c, 0) ? 1 : 0; 379 380 resp_object *result = resp_array_init(); 381 if (!result) return 0; 382 383 if (domain_cmd_map) { 384 size_t iter = 0; 385 void *item; 386 while (hashmap_iter(domain_cmd_map, &iter, &item)) { 387 const domain_cmd_entry *e = item; 388 if (!user_has_permit(c, e->name)) 389 continue; 390 391 resp_array_append_bulk(result, e->name); 392 resp_object *meta = resp_array_init(); 393 if (!meta) { resp_free(result); return 0; } 394 resp_array_append_bulk(meta, "summary"); 395 resp_array_append_bulk(meta, "UDP hole proxy command"); 396 resp_array_append_obj(result, meta); 397 } 398 } 399 400 if (cmd_map) { 401 size_t iter = 0; 402 void *item; 403 while (hashmap_iter(cmd_map, &iter, &item)) { 404 const api_cmd_entry *e = item; 405 if (!is_builtin(e->name) && !user_has_permit(c, e->name)) 406 continue; 407 408 resp_array_append_bulk(result, e->name); 409 resp_object *meta = resp_array_init(); 410 if (!meta) { resp_free(result); return 0; } 411 resp_array_append_bulk(meta, "summary"); 412 resp_array_append_bulk(meta, "UDP hole proxy command"); 413 resp_array_append_obj(result, meta); 414 } 415 } 416 417 char *out_buf = NULL; 418 size_t out_len = 0; 419 if (resp_serialize(result, &out_buf, &out_len) != 0 || !out_buf) { 420 resp_free(result); 421 return 0; 422 } 423 resp_free(result); 424 425 api_write_raw(c, out_buf, out_len); 426 free(out_buf); 427 return 1; 428 } 429 430 static void init_builtins(void) { 431 api_register_cmd("auth", cmdAUTH); 432 api_register_cmd("ping", cmdPING); 433 api_register_cmd("quit", cmdQUIT); 434 api_register_cmd("command", cmdCOMMAND); 435 } 436 437 static bool is_builtin(const char *name) { 438 return (strcasecmp(name, "auth") == 0 || 439 strcasecmp(name, "ping") == 0 || 440 strcasecmp(name, "quit") == 0 || 441 strcasecmp(name, "command") == 0); 442 } 443 444 static void dispatch_command(api_client_t *c, char **args, int nargs) { 445 if (nargs <= 0) return; 446 447 for (char *p = args[0]; *p; p++) *p = (char)tolower((unsigned char)*p); 448 449 const domain_cmd_entry *dcmd = hashmap_get(domain_cmd_map, &(domain_cmd_entry){ .name = args[0] }); 450 if (dcmd) { 451 if (!is_builtin(args[0])) { 452 if (!user_has_permit(c, args[0])) { 453 api_write_err(c, "no permission"); 454 return; 455 } 456 } 457 458 resp_object *domain_args = resp_array_init(); 459 if (!domain_args) return; 460 461 for (int i = 0; i < nargs; i++) { 462 resp_array_append_bulk(domain_args, args[i]); 463 } 464 465 resp_object *result = dcmd->func(args[0], domain_args); 466 resp_free(domain_args); 467 468 if (!result) { 469 api_write_err(c, "command failed"); 470 return; 471 } 472 473 char *out_buf = NULL; 474 size_t out_len = 0; 475 if (resp_serialize(result, &out_buf, &out_len) != 0 || !out_buf) { 476 resp_free(result); 477 api_write_err(c, "command failed"); 478 return; 479 } 480 resp_free(result); 481 482 api_write_raw(c, out_buf, out_len); 483 free(out_buf); 484 return; 485 } 486 487 const api_cmd_entry *cmd = hashmap_get(cmd_map, &(api_cmd_entry){ .name = args[0] }); 488 if (!cmd) { 489 api_write_err(c, "unknown command"); 490 return; 491 } 492 493 if (!is_builtin(args[0])) { 494 if (!user_has_permit(c, args[0])) { 495 api_write_err(c, "no permission"); 496 return; 497 } 498 } 499 500 char result = cmd->func(c, args, nargs); 501 if (!result) { 502 client_flush(c); 503 client_close(c); 504 } 505 } 506 507 static int *create_listen_socket(const char *listen_addr) { 508 const char *default_port = "6379"; 509 resp_object *api_sec = resp_map_get(global_cfg, "udphole"); 510 if (api_sec) { 511 const char *cfg_port = resp_map_get_string(api_sec, "port"); 512 if (cfg_port && cfg_port[0]) default_port = cfg_port; 513 } 514 515 if (listen_addr && strncmp(listen_addr, "unix://", 7) == 0) { 516 const char *socket_path = listen_addr + 7; 517 const char *socket_owner = api_sec ? resp_map_get_string(api_sec, "socket_owner") : NULL; 518 int *fds = unix_listen(socket_path, SOCK_STREAM, socket_owner); 519 if (!fds) { 520 return NULL; 521 } 522 log_info("api: listening on %s", listen_addr); 523 return fds; 524 } 525 526 int *fds = tcp_listen(listen_addr, NULL, default_port); 527 if (!fds) { 528 return NULL; 529 } 530 log_info("api: listening on %s", listen_addr); 531 return fds; 532 } 533 534 static void handle_accept(int ready_fd) { 535 struct sockaddr_storage addr; 536 socklen_t addrlen = sizeof(addr); 537 int fd = accept(ready_fd, (struct sockaddr *)&addr, &addrlen); 538 if (fd < 0) return; 539 set_socket_nonblocking(fd, 1); 540 541 api_client_t *state = calloc(1, sizeof(*state)); 542 if (!state) { 543 const char *msg = "-ERR out of memory\r\n"; 544 send(fd, msg, strlen(msg), 0); 545 close(fd); 546 return; 547 } 548 state->fd = fd; 549 550 domain_schedmod_pt_create(api_client_pt, state); 551 log_trace("api: accepted connection, spawned client pt"); 552 } 553 554 PT_THREAD(api_server_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { 555 api_server_udata_t *udata = task->udata; 556 log_trace("api_server: protothread entry"); 557 PT_BEGIN(pt); 558 559 if (!udata) { 560 udata = calloc(1, sizeof(api_server_udata_t)); 561 if (!udata) { 562 PT_EXIT(pt); 563 } 564 task->udata = udata; 565 } 566 567 resp_object *api_sec = resp_map_get(global_cfg, "udphole"); 568 const char *listen_str = api_sec ? resp_map_get_string(api_sec, "listen") : NULL; 569 if (!listen_str || !listen_str[0]) { 570 log_info("api: no listen address configured, API server disabled"); 571 PT_EXIT(pt); 572 } 573 574 if (!current_listen) { 575 current_listen = strdup(listen_str); 576 if (!current_listen) { 577 PT_EXIT(pt); 578 } 579 init_builtins(); 580 udata->server_fds = create_listen_socket(current_listen); 581 if (!udata->server_fds) { 582 log_fatal("api: failed to listen on %s", current_listen); 583 free(current_listen); 584 current_listen = NULL; 585 exit(1); 586 } 587 } 588 589 for (;;) { 590 (void)timestamp; 591 if (udata->server_fds && udata->server_fds[0] > 0) { 592 PT_WAIT_UNTIL(pt, domain_schedmod_has_data(udata->server_fds, &udata->ready_fds) > 0); 593 if (udata->ready_fds && udata->ready_fds[0] > 0) { 594 for (int i = 1; i <= udata->ready_fds[0]; i++) { 595 handle_accept(udata->ready_fds[i]); 596 } 597 } 598 if (udata->ready_fds) free(udata->ready_fds); 599 udata->ready_fds = NULL; 600 } else { 601 PT_YIELD(pt); 602 } 603 } 604 if (udata->server_fds) { 605 for (int i = 1; i <= udata->server_fds[0]; i++) { 606 close(udata->server_fds[i]); 607 } 608 free(udata->server_fds); 609 } 610 free(udata->ready_fds); 611 free(udata); 612 free(current_listen); 613 current_listen = NULL; 614 if (cmd_map) { 615 hashmap_free(cmd_map); 616 cmd_map = NULL; 617 } 618 if (domain_cmd_map) { 619 hashmap_free(domain_cmd_map); 620 domain_cmd_map = NULL; 621 } 622 623 PT_END(pt); 624 } 625 626 PT_THREAD(api_client_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) { 627 (void)timestamp; 628 api_client_t *state = task->udata; 629 630 log_trace("api_client: protothread entry fd=%d", state->fd); 631 PT_BEGIN(pt); 632 633 state->fds = malloc(sizeof(int) * 2); 634 if (!state->fds) { 635 free(state); 636 PT_EXIT(pt); 637 } 638 state->fds[0] = 1; 639 state->fds[1] = state->fd; 640 641 for (;;) { 642 state->ready_fds = NULL; 643 PT_WAIT_UNTIL(pt, domain_schedmod_has_data(state->fds, &state->ready_fds) > 0); 644 645 state->ready_fd = -1; 646 if (state->ready_fds && state->ready_fds[0] > 0) { 647 for (int i = 1; i <= state->ready_fds[0]; i++) { 648 if (state->ready_fds[i] == state->fd) { 649 state->ready_fd = state->fd; 650 break; 651 } 652 } 653 } 654 free(state->ready_fds); 655 state->ready_fds = NULL; 656 657 char buf[1]; 658 ssize_t n = recv(state->fd, buf, 1, MSG_PEEK); 659 if (n <= 0) { 660 break; 661 } 662 663 resp_object *cmd = resp_read(state->fd); 664 if (!cmd) { 665 if (errno == EAGAIN || errno == EWOULDBLOCK) { 666 PT_YIELD(pt); 667 continue; 668 } 669 break; 670 } 671 672 if (cmd->type != RESPT_ARRAY || cmd->u.arr.n == 0) { 673 resp_free(cmd); 674 api_write_err(state, "Protocol error"); 675 client_flush(state); 676 continue; 677 } 678 679 char *args[MAX_ARGS]; 680 int nargs = 0; 681 for (size_t i = 0; i < cmd->u.arr.n && nargs < MAX_ARGS; i++) { 682 resp_object *elem = &cmd->u.arr.elem[i]; 683 if (elem->type == RESPT_BULK && elem->u.s) { 684 args[nargs++] = elem->u.s; 685 elem->u.s = NULL; 686 } else if (elem->type == RESPT_SIMPLE) { 687 args[nargs++] = elem->u.s ? elem->u.s : ""; 688 } 689 } 690 691 if (nargs > 0) { 692 dispatch_command(state, args, nargs); 693 } 694 695 for (int j = 0; j < nargs; j++) { 696 free(args[j]); 697 } 698 resp_free(cmd); 699 700 client_flush(state); 701 702 if (state->fd < 0) break; 703 } 704 705 if (state->fd >= 0) { 706 close(state->fd); 707 } 708 free(state->fds); 709 free(state->wbuf); 710 free(state->username); 711 free(state); 712 713 PT_END(pt); 714 }