udphole

Basic UDP wormhole proxy
git clone git://git.finwo.net/app/udphole
Log | Files | Refs | README | LICENSE

server.c (19886B)


      1 /*
      2  * Generic RESP2 API server: TCP listener, connection management, RESP2
      3  * parsing/writing, command hashmap, authentication with per-user permit
      4  * checking, and built-in commands (auth, ping, quit, command).
      5  *
      6  * Runs as a protothread in the main select() loop.
      7  */
      8 
      9 #include <stdlib.h>
     10 #include <string.h>
     11 #include <stdio.h>
     12 #include <ctype.h>
     13 #include <unistd.h>
     14 #include <errno.h>
     15 #include <fcntl.h>
     16 #include <sys/socket.h>
     17 #include <sys/types.h>
     18 #include <netinet/in.h>
     19 #include <arpa/inet.h>
     20 #include <netdb.h>
     21 
     22 #include "rxi/log.h"
     23 #include "tidwall/hashmap.h"
     24 #include "domain/protothreads.h"
     25 #include "domain/scheduler.h"
     26 #include "common/socket_util.h"
     27 #include "infrastructure/config.h"
     28 #include "interface/api/server.h"
     29 #include "common/resp.h"
     30 
     31 struct pt_task;
     32 PT_THREAD(api_client_pt(struct pt *pt, int64_t timestamp, struct pt_task *task));
     33 
     34 #define API_MAX_CLIENTS   8
     35 #define READ_BUF_SIZE     4096
     36 #define WRITE_BUF_INIT    4096
     37 #define MAX_ARGS          32
     38 
     39 struct api_client_state {
     40   int       fd;
     41   int      *fds;
     42   int      *ready_fds;
     43   int       ready_fd;
     44   char     *username;
     45   char      rbuf[READ_BUF_SIZE];
     46   size_t    rlen;
     47   char     *wbuf;
     48   size_t    wlen;
     49   size_t    wcap;
     50 };
     51 
     52 typedef struct api_client_state api_client_t;
     53 
     54 typedef struct {
     55   const char  *name;
     56   char       (*func)(api_client_t *c, char **args, int nargs);
     57 } api_cmd_entry;
     58 
     59 typedef struct {
     60   const char  *name;
     61   domain_cmd_fn func;
     62 } domain_cmd_entry;
     63 
     64 static char     *current_listen = NULL;
     65 static struct hashmap  *cmd_map = NULL;
     66 static struct hashmap  *domain_cmd_map = NULL;
     67 
     68 typedef struct {
     69   int      *server_fds;
     70   int      *ready_fds;
     71 } api_server_udata_t;
     72 
     73 bool api_write_raw(api_client_t *c, const void *data, size_t len) {
     74   if (c->fd < 0) return false;
     75   if (c->wlen + len > c->wcap) {
     76     size_t need = c->wlen + len;
     77     size_t ncap = c->wcap ? c->wcap : WRITE_BUF_INIT;
     78     while (ncap < need) ncap *= 2;
     79     char *nb = realloc(c->wbuf, ncap);
     80     if (!nb) return false;
     81     c->wbuf = nb;
     82     c->wcap = ncap;
     83   }
     84   memcpy(c->wbuf + c->wlen, data, len);
     85   c->wlen += len;
     86   return true;
     87 }
     88 
     89 bool api_write_cstr(api_client_t *c, const char *s) {
     90   return api_write_raw(c, s, strlen(s));
     91 }
     92 
     93 bool api_write_ok(api_client_t *c) {
     94   return api_write_cstr(c, "+OK\r\n");
     95 }
     96 
     97 bool api_write_err(api_client_t *c, const char *msg) {
     98   if (!api_write_cstr(c, "-ERR ")) return false;
     99   if (!api_write_cstr(c, msg)) return false;
    100   return api_write_cstr(c, "\r\n");
    101 }
    102 
    103 bool api_write_nil(api_client_t *c) {
    104   return api_write_cstr(c, "$-1\r\n");
    105 }
    106 
    107 bool api_write_int(api_client_t *c, int value) {
    108   char buf[32];
    109   snprintf(buf, sizeof(buf), ":%d\r\n", value);
    110   return api_write_cstr(c, buf);
    111 }
    112 
    113 bool api_write_array(api_client_t *c, size_t nitems) {
    114   char buf[32];
    115   snprintf(buf, sizeof(buf), "*%zu\r\n", nitems);
    116   return api_write_cstr(c, buf);
    117 }
    118 
    119 bool api_write_bulk_cstr(api_client_t *c, const char *s) {
    120   if (!s) return api_write_nil(c);
    121   size_t len = strlen(s);
    122   char prefix[32];
    123   snprintf(prefix, sizeof(prefix), "$%zu\r\n", len);
    124   if (!api_write_cstr(c, prefix)) return false;
    125   if (!api_write_raw(c, s, len)) return false;
    126   return api_write_cstr(c, "\r\n");
    127 }
    128 
    129 bool api_write_bulk_int(api_client_t *c, int val) {
    130   char buf[32];
    131   snprintf(buf, sizeof(buf), "%d", val);
    132   return api_write_bulk_cstr(c, buf);
    133 }
    134 
    135 static void client_close(api_client_t *c) {
    136   if (c->fd >= 0) {
    137     close(c->fd);
    138     c->fd = -1;
    139   }
    140   free(c->wbuf);
    141   c->wbuf = NULL;
    142   c->wlen = c->wcap = 0;
    143   c->rlen = 0;
    144   free(c->username);
    145   c->username = NULL;
    146 }
    147 
    148 static void client_flush(api_client_t *c) {
    149   if (c->fd < 0 || c->wlen == 0) return;
    150   ssize_t n = send(c->fd, c->wbuf, c->wlen, 0);
    151   if (n > 0) {
    152     if ((size_t)n < c->wlen)
    153       memmove(c->wbuf, c->wbuf + n, c->wlen - (size_t)n);
    154     c->wlen -= (size_t)n;
    155   } else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
    156     client_close(c);
    157   }
    158 }
    159 
    160 static int parse_inline(const char *line, size_t len, char **args, int max_args) {
    161   int nargs = 0;
    162   const char *p = line;
    163   const char *end = line + len;
    164   while (p < end && nargs < max_args) {
    165     while (p < end && (*p == ' ' || *p == '\t')) p++;
    166     if (p >= end) break;
    167     const char *start;
    168     const char *tok_end;
    169     if (*p == '"' || *p == '\'') {
    170       char quote = *p++;
    171       start = p;
    172       while (p < end && *p != quote) p++;
    173       tok_end = p;
    174       if (p < end) p++;
    175     } else {
    176       start = p;
    177       while (p < end && *p != ' ' && *p != '\t') p++;
    178       tok_end = p;
    179     }
    180     size_t tlen = (size_t)(tok_end - start);
    181     char *arg = malloc(tlen + 1);
    182     if (!arg) return -1;
    183     memcpy(arg, start, tlen);
    184     arg[tlen] = '\0';
    185     args[nargs++] = arg;
    186   }
    187   return nargs;
    188 }
    189 
    190 static int parse_resp_command(api_client_t *c, char **args, int max_args, int *nargs) {
    191   *nargs = 0;
    192   if (c->rlen == 0) return 0;
    193 
    194   if (c->rbuf[0] != '*') {
    195     char *nl = memchr(c->rbuf, '\n', c->rlen);
    196     if (!nl) return 0;
    197     size_t line_len = (size_t)(nl - c->rbuf);
    198     size_t trim = line_len;
    199     if (trim > 0 && c->rbuf[trim - 1] == '\r') trim--;
    200     int n = parse_inline(c->rbuf, trim, args, max_args);
    201     if (n < 0) return -1;
    202     *nargs = n;
    203     size_t consumed = line_len + 1;
    204     c->rlen -= consumed;
    205     if (c->rlen > 0) memmove(c->rbuf, c->rbuf + consumed, c->rlen);
    206     return n > 0 ? 1 : 0;
    207   }
    208 
    209   size_t pos = 0;
    210   char *nl = memchr(c->rbuf + pos, '\n', c->rlen - pos);
    211   if (!nl) return 0;
    212   int count = atoi(c->rbuf + 1);
    213   if (count <= 0 || count > max_args) return -1;
    214   pos = (size_t)(nl - c->rbuf) + 1;
    215 
    216   for (int i = 0; i < count; i++) {
    217     if (pos >= c->rlen) return 0;
    218     if (c->rbuf[pos] != '$') return -1;
    219     nl = memchr(c->rbuf + pos, '\n', c->rlen - pos);
    220     if (!nl) return 0;
    221     int blen = atoi(c->rbuf + pos + 1);
    222     if (blen < 0) return -1;
    223     size_t hdr_end = (size_t)(nl - c->rbuf) + 1;
    224     if (hdr_end + (size_t)blen + 2 > c->rlen) return 0;
    225     char *arg = malloc((size_t)blen + 1);
    226     if (!arg) return -1;
    227     memcpy(arg, c->rbuf + hdr_end, (size_t)blen);
    228     arg[blen] = '\0';
    229     args[i] = arg;
    230     pos = hdr_end + (size_t)blen + 2;
    231   }
    232 
    233   *nargs = count;
    234   c->rlen -= pos;
    235   if (c->rlen > 0) memmove(c->rbuf, c->rbuf + pos, c->rlen);
    236   return 1;
    237 }
    238 
    239 static bool permit_matches(const char *pattern, const char *cmd) {
    240   size_t plen = strlen(pattern);
    241   if (plen == 1 && pattern[0] == '*')
    242     return true;
    243   if (plen >= 2 && pattern[plen - 1] == '*') {
    244     return strncasecmp(pattern, cmd, plen - 1) == 0;
    245   }
    246   return strcasecmp(pattern, cmd) == 0;
    247 }
    248 
    249 static bool user_has_permit(api_client_t *c, const char *cmd) {
    250   char section[128];
    251   const char *uname = (c->username && c->username[0]) ? c->username : "*";
    252   snprintf(section, sizeof(section), "user:%s", uname);
    253   resp_object *sec = resp_map_get(global_cfg, section);
    254   if (sec && sec->type == RESPT_ARRAY) {
    255     for (size_t i = 0; i < sec->u.arr.n; i += 2) {
    256       if (i + 1 < sec->u.arr.n) {
    257         resp_object *key = &sec->u.arr.elem[i];
    258         resp_object *val = &sec->u.arr.elem[i + 1];
    259         if (key->type == RESPT_BULK && key->u.s && strcmp(key->u.s, "permit") == 0) {
    260           if (val->type == RESPT_ARRAY) {
    261             for (size_t j = 0; j < val->u.arr.n; j++) {
    262               resp_object *p = &val->u.arr.elem[j];
    263               if (p->type == RESPT_BULK && p->u.s && permit_matches(p->u.s, cmd))
    264                 return true;
    265             }
    266           } else if (val->type == RESPT_BULK && val->u.s && permit_matches(val->u.s, cmd)) {
    267             return true;
    268           }
    269         }
    270       }
    271     }
    272   }
    273   if (strcmp(uname, "*") != 0) {
    274     resp_object *anon = resp_map_get(global_cfg, "user:*");
    275     if (anon && anon->type == RESPT_ARRAY) {
    276       for (size_t i = 0; i < anon->u.arr.n; i += 2) {
    277         if (i + 1 < anon->u.arr.n) {
    278           resp_object *key = &anon->u.arr.elem[i];
    279           resp_object *val = &anon->u.arr.elem[i + 1];
    280           if (key->type == RESPT_BULK && key->u.s && strcmp(key->u.s, "permit") == 0) {
    281             if (val->type == RESPT_ARRAY) {
    282               for (size_t j = 0; j < val->u.arr.n; j++) {
    283                 resp_object *p = &val->u.arr.elem[j];
    284                 if (p->type == RESPT_BULK && p->u.s && permit_matches(p->u.s, cmd))
    285                   return true;
    286               }
    287             } else if (val->type == RESPT_BULK && val->u.s && permit_matches(val->u.s, cmd)) {
    288               return true;
    289             }
    290           }
    291         }
    292       }
    293     }
    294   }
    295   return false;
    296 }
    297 
    298 static uint64_t cmd_hash(const void *item, uint64_t seed0, uint64_t seed1) {
    299   const api_cmd_entry *cmd = item;
    300   return hashmap_sip(cmd->name, strlen(cmd->name), seed0, seed1);
    301 }
    302 
    303 static int cmd_compare(const void *a, const void *b, void *udata) {
    304   (void)udata;
    305   const api_cmd_entry *ca = a;
    306   const api_cmd_entry *cb = b;
    307   return strcasecmp(ca->name, cb->name);
    308 }
    309 
    310 void api_register_cmd(const char *name, char (*func)(api_client_t *, char **, int)) {
    311   if (!cmd_map)
    312     cmd_map = hashmap_new(sizeof(api_cmd_entry), 0, 0, 0, cmd_hash, cmd_compare, NULL, NULL);
    313   hashmap_set(cmd_map, &(api_cmd_entry){ .name = name, .func = func });
    314   log_trace("api: registered command '%s'", name);
    315 }
    316 
    317 static uint64_t domain_cmd_hash(const void *item, uint64_t seed0, uint64_t seed1) {
    318   const domain_cmd_entry *cmd = item;
    319   return hashmap_sip(cmd->name, strlen(cmd->name), seed0, seed1);
    320 }
    321 
    322 static int domain_cmd_compare(const void *a, const void *b, void *udata) {
    323   (void)udata;
    324   const domain_cmd_entry *ca = a;
    325   const domain_cmd_entry *cb = b;
    326   return strcasecmp(ca->name, cb->name);
    327 }
    328 
    329 void api_register_domain_cmd(const char *name, domain_cmd_fn func) {
    330   if (!domain_cmd_map)
    331     domain_cmd_map = hashmap_new(sizeof(domain_cmd_entry), 0, 0, 0, domain_cmd_hash, domain_cmd_compare, NULL, NULL);
    332   hashmap_set(domain_cmd_map, &(domain_cmd_entry){ .name = name, .func = func });
    333   log_trace("api: registered domain command '%s'", name);
    334 }
    335 
    336 static char cmdAUTH(api_client_t *c, char **args, int nargs) {
    337   if (nargs != 3) {
    338     api_write_err(c, "wrong number of arguments for 'auth' command (AUTH username password)");
    339     return 1;
    340   }
    341   const char *uname = args[1];
    342   const char *pass  = args[2];
    343   char section[128];
    344   snprintf(section, sizeof(section), "user:%s", uname);
    345   resp_object *sec = resp_map_get(global_cfg, section);
    346   const char *secret = sec ? resp_map_get_string(sec, "secret") : NULL;
    347   if (secret && pass && strcmp(secret, pass) == 0) {
    348     free(c->username);
    349     c->username = strdup(uname);
    350     if (c->username) {
    351       log_debug("api: client authenticated as '%s'", uname);
    352       return api_write_ok(c) ? 1 : 0;
    353     }
    354   }
    355   return api_write_err(c, "invalid credentials") ? 1 : 0;
    356 }
    357 
    358 static char cmdPING(api_client_t *c, char **args, int nargs) {
    359   (void)args;
    360   if (nargs == 1)
    361     return api_write_cstr(c, "+PONG\r\n") ? 1 : 0;
    362   if (nargs == 2)
    363     return api_write_bulk_cstr(c, args[1]) ? 1 : 0;
    364   return api_write_err(c, "wrong number of arguments for 'ping' command") ? 1 : 0;
    365 }
    366 
    367 static char cmdQUIT(api_client_t *c, char **args, int nargs) {
    368   (void)args; (void)nargs;
    369   api_write_ok(c);
    370   return 0;
    371 }
    372 
    373 static bool is_builtin(const char *name);
    374 
    375 static char cmdCOMMAND(api_client_t *c, char **args, int nargs) {
    376   (void)args;
    377   if (!cmd_map && !domain_cmd_map)
    378     return api_write_array(c, 0) ? 1 : 0;
    379 
    380   resp_object *result = resp_array_init();
    381   if (!result) return 0;
    382 
    383   if (domain_cmd_map) {
    384     size_t iter = 0;
    385     void *item;
    386     while (hashmap_iter(domain_cmd_map, &iter, &item)) {
    387       const domain_cmd_entry *e = item;
    388       if (!user_has_permit(c, e->name))
    389         continue;
    390 
    391       resp_array_append_bulk(result, e->name);
    392       resp_object *meta = resp_array_init();
    393       if (!meta) { resp_free(result); return 0; }
    394       resp_array_append_bulk(meta, "summary");
    395       resp_array_append_bulk(meta, "UDP hole proxy command");
    396       resp_array_append_obj(result, meta);
    397     }
    398   }
    399 
    400   if (cmd_map) {
    401     size_t iter = 0;
    402     void *item;
    403     while (hashmap_iter(cmd_map, &iter, &item)) {
    404       const api_cmd_entry *e = item;
    405       if (!is_builtin(e->name) && !user_has_permit(c, e->name))
    406         continue;
    407 
    408       resp_array_append_bulk(result, e->name);
    409       resp_object *meta = resp_array_init();
    410       if (!meta) { resp_free(result); return 0; }
    411       resp_array_append_bulk(meta, "summary");
    412       resp_array_append_bulk(meta, "UDP hole proxy command");
    413       resp_array_append_obj(result, meta);
    414     }
    415   }
    416 
    417   char *out_buf = NULL;
    418   size_t out_len = 0;
    419   if (resp_serialize(result, &out_buf, &out_len) != 0 || !out_buf) {
    420     resp_free(result);
    421     return 0;
    422   }
    423   resp_free(result);
    424 
    425   api_write_raw(c, out_buf, out_len);
    426   free(out_buf);
    427   return 1;
    428 }
    429 
    430 static void init_builtins(void) {
    431   api_register_cmd("auth",    cmdAUTH);
    432   api_register_cmd("ping",    cmdPING);
    433   api_register_cmd("quit",    cmdQUIT);
    434   api_register_cmd("command", cmdCOMMAND);
    435 }
    436 
    437 static bool is_builtin(const char *name) {
    438   return (strcasecmp(name, "auth") == 0 ||
    439           strcasecmp(name, "ping") == 0 ||
    440           strcasecmp(name, "quit") == 0 ||
    441           strcasecmp(name, "command") == 0);
    442 }
    443 
    444 static void dispatch_command(api_client_t *c, char **args, int nargs) {
    445   if (nargs <= 0) return;
    446 
    447   for (char *p = args[0]; *p; p++) *p = (char)tolower((unsigned char)*p);
    448 
    449   const domain_cmd_entry *dcmd = hashmap_get(domain_cmd_map, &(domain_cmd_entry){ .name = args[0] });
    450   if (dcmd) {
    451     if (!is_builtin(args[0])) {
    452       if (!user_has_permit(c, args[0])) {
    453         api_write_err(c, "no permission");
    454         return;
    455       }
    456     }
    457 
    458     resp_object *domain_args = resp_array_init();
    459     if (!domain_args) return;
    460 
    461     for (int i = 0; i < nargs; i++) {
    462       resp_array_append_bulk(domain_args, args[i]);
    463     }
    464 
    465     resp_object *result = dcmd->func(args[0], domain_args);
    466     resp_free(domain_args);
    467 
    468     if (!result) {
    469       api_write_err(c, "command failed");
    470       return;
    471     }
    472 
    473     char *out_buf = NULL;
    474     size_t out_len = 0;
    475     if (resp_serialize(result, &out_buf, &out_len) != 0 || !out_buf) {
    476       resp_free(result);
    477       api_write_err(c, "command failed");
    478       return;
    479     }
    480     resp_free(result);
    481 
    482     api_write_raw(c, out_buf, out_len);
    483     free(out_buf);
    484     return;
    485   }
    486 
    487   const api_cmd_entry *cmd = hashmap_get(cmd_map, &(api_cmd_entry){ .name = args[0] });
    488   if (!cmd) {
    489     api_write_err(c, "unknown command");
    490     return;
    491   }
    492 
    493   if (!is_builtin(args[0])) {
    494     if (!user_has_permit(c, args[0])) {
    495       api_write_err(c, "no permission");
    496       return;
    497     }
    498   }
    499 
    500   char result = cmd->func(c, args, nargs);
    501   if (!result) {
    502     client_flush(c);
    503     client_close(c);
    504   }
    505 }
    506 
    507 static int *create_listen_socket(const char *listen_addr) {
    508   const char *default_port = "6379";
    509   resp_object *api_sec = resp_map_get(global_cfg, "udphole");
    510   if (api_sec) {
    511     const char *cfg_port = resp_map_get_string(api_sec, "port");
    512     if (cfg_port && cfg_port[0]) default_port = cfg_port;
    513   }
    514 
    515   if (listen_addr && strncmp(listen_addr, "unix://", 7) == 0) {
    516     const char *socket_path = listen_addr + 7;
    517     const char *socket_owner = api_sec ? resp_map_get_string(api_sec, "socket_owner") : NULL;
    518     int *fds = unix_listen(socket_path, SOCK_STREAM, socket_owner);
    519     if (!fds) {
    520       return NULL;
    521     }
    522     log_info("api: listening on %s", listen_addr);
    523     return fds;
    524   }
    525 
    526   int *fds = tcp_listen(listen_addr, NULL, default_port);
    527   if (!fds) {
    528     return NULL;
    529   }
    530   log_info("api: listening on %s", listen_addr);
    531   return fds;
    532 }
    533 
    534 static void handle_accept(int ready_fd) {
    535   struct sockaddr_storage addr;
    536   socklen_t addrlen = sizeof(addr);
    537   int fd = accept(ready_fd, (struct sockaddr *)&addr, &addrlen);
    538   if (fd < 0) return;
    539   set_socket_nonblocking(fd, 1);
    540 
    541   api_client_t *state = calloc(1, sizeof(*state));
    542   if (!state) {
    543     const char *msg = "-ERR out of memory\r\n";
    544     send(fd, msg, strlen(msg), 0);
    545     close(fd);
    546     return;
    547   }
    548   state->fd = fd;
    549 
    550   domain_schedmod_pt_create(api_client_pt, state);
    551   log_trace("api: accepted connection, spawned client pt");
    552 }
    553 
    554 PT_THREAD(api_server_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) {
    555   api_server_udata_t *udata = task->udata;
    556   log_trace("api_server: protothread entry");
    557   PT_BEGIN(pt);
    558 
    559   if (!udata) {
    560     udata = calloc(1, sizeof(api_server_udata_t));
    561     if (!udata) {
    562       PT_EXIT(pt);
    563     }
    564     task->udata = udata;
    565   }
    566 
    567   resp_object *api_sec = resp_map_get(global_cfg, "udphole");
    568   const char *listen_str = api_sec ? resp_map_get_string(api_sec, "listen") : NULL;
    569   if (!listen_str || !listen_str[0]) {
    570     log_info("api: no listen address configured, API server disabled");
    571     PT_EXIT(pt);
    572   }
    573 
    574   if (!current_listen) {
    575     current_listen = strdup(listen_str);
    576     if (!current_listen) {
    577       PT_EXIT(pt);
    578     }
    579     init_builtins();
    580     udata->server_fds = create_listen_socket(current_listen);
    581     if (!udata->server_fds) {
    582       log_fatal("api: failed to listen on %s", current_listen);
    583       free(current_listen);
    584       current_listen = NULL;
    585       exit(1);
    586     }
    587   }
    588 
    589   for (;;) {
    590     (void)timestamp;
    591     if (udata->server_fds && udata->server_fds[0] > 0) {
    592       PT_WAIT_UNTIL(pt, domain_schedmod_has_data(udata->server_fds, &udata->ready_fds) > 0);
    593       if (udata->ready_fds && udata->ready_fds[0] > 0) {
    594         for (int i = 1; i <= udata->ready_fds[0]; i++) {
    595           handle_accept(udata->ready_fds[i]);
    596         }
    597       }
    598     if (udata->ready_fds) free(udata->ready_fds);
    599       udata->ready_fds = NULL;
    600     } else {
    601       PT_YIELD(pt);
    602     }
    603   }
    604   if (udata->server_fds) {
    605     for (int i = 1; i <= udata->server_fds[0]; i++) {
    606       close(udata->server_fds[i]);
    607     }
    608     free(udata->server_fds);
    609   }
    610   free(udata->ready_fds);
    611   free(udata);
    612   free(current_listen);
    613   current_listen = NULL;
    614   if (cmd_map) {
    615     hashmap_free(cmd_map);
    616     cmd_map = NULL;
    617   }
    618   if (domain_cmd_map) {
    619     hashmap_free(domain_cmd_map);
    620     domain_cmd_map = NULL;
    621   }
    622 
    623   PT_END(pt);
    624 }
    625 
    626 PT_THREAD(api_client_pt(struct pt *pt, int64_t timestamp, struct pt_task *task)) {
    627   (void)timestamp;
    628   api_client_t *state = task->udata;
    629 
    630   log_trace("api_client: protothread entry fd=%d", state->fd);
    631   PT_BEGIN(pt);
    632 
    633   state->fds = malloc(sizeof(int) * 2);
    634   if (!state->fds) {
    635     free(state);
    636     PT_EXIT(pt);
    637   }
    638   state->fds[0] = 1;
    639   state->fds[1] = state->fd;
    640 
    641   for (;;) {
    642     state->ready_fds = NULL;
    643     PT_WAIT_UNTIL(pt, domain_schedmod_has_data(state->fds, &state->ready_fds) > 0);
    644 
    645     state->ready_fd = -1;
    646     if (state->ready_fds && state->ready_fds[0] > 0) {
    647       for (int i = 1; i <= state->ready_fds[0]; i++) {
    648         if (state->ready_fds[i] == state->fd) {
    649           state->ready_fd = state->fd;
    650           break;
    651         }
    652       }
    653     }
    654     free(state->ready_fds);
    655     state->ready_fds = NULL;
    656 
    657     char buf[1];
    658     ssize_t n = recv(state->fd, buf, 1, MSG_PEEK);
    659     if (n <= 0) {
    660       break;
    661     }
    662 
    663     resp_object *cmd = resp_read(state->fd);
    664     if (!cmd) {
    665       if (errno == EAGAIN || errno == EWOULDBLOCK) {
    666         PT_YIELD(pt);
    667         continue;
    668       }
    669       break;
    670     }
    671 
    672     if (cmd->type != RESPT_ARRAY || cmd->u.arr.n == 0) {
    673       resp_free(cmd);
    674       api_write_err(state, "Protocol error");
    675       client_flush(state);
    676       continue;
    677     }
    678 
    679     char *args[MAX_ARGS];
    680     int nargs = 0;
    681     for (size_t i = 0; i < cmd->u.arr.n && nargs < MAX_ARGS; i++) {
    682       resp_object *elem = &cmd->u.arr.elem[i];
    683       if (elem->type == RESPT_BULK && elem->u.s) {
    684         args[nargs++] = elem->u.s;
    685         elem->u.s = NULL;
    686       } else if (elem->type == RESPT_SIMPLE) {
    687         args[nargs++] = elem->u.s ? elem->u.s : "";
    688       }
    689     }
    690 
    691     if (nargs > 0) {
    692       dispatch_command(state, args, nargs);
    693     }
    694 
    695     for (int j = 0; j < nargs; j++) {
    696       free(args[j]);
    697     }
    698     resp_free(cmd);
    699 
    700     client_flush(state);
    701 
    702     if (state->fd < 0) break;
    703   }
    704 
    705   if (state->fd >= 0) {
    706     close(state->fd);
    707   }
    708   free(state->fds);
    709   free(state->wbuf);
    710   free(state->username);
    711   free(state);
    712 
    713   PT_END(pt);
    714 }