udphole

Basic UDP wormhole proxy
git clone git://git.finwo.net/app/udphole
Log | Files | Refs | README | LICENSE

server.c (19591B)


      1 /*
      2  * Generic RESP2 API server: TCP listener, connection management, RESP2
      3  * parsing/writing, command hashmap, authentication with per-user permit
      4  * checking, and built-in commands (auth, ping, quit, command).
      5  *
      6  * Runs as a protothread in the main select() loop.
      7  */
      8 
      9 #include "interface/api/server.h"
     10 
     11 #include <arpa/inet.h>
     12 #include <ctype.h>
     13 #include <errno.h>
     14 #include <fcntl.h>
     15 #include <netdb.h>
     16 #include <netinet/in.h>
     17 #include <netinet/tcp.h>
     18 #include <stdio.h>
     19 #include <stdlib.h>
     20 #include <string.h>
     21 #include <sys/socket.h>
     22 #include <sys/types.h>
     23 #include <unistd.h>
     24 
     25 #include "common/resp.h"
     26 #include "common/scheduler.h"
     27 #include "common/socket_util.h"
     28 #include "common/url_utils.h"
     29 #include "domain/config.h"
     30 #include "finwo/mindex.h"
     31 #include "infrastructure/config.h"
     32 #include "rxi/log.h"
     33 
     34 int api_client_pt(int64_t timestamp, struct pt_task *task);
     35 
     36 #define API_MAX_CLIENTS 8
     37 #define READ_BUF_SIZE   4096
     38 #define WRITE_BUF_INIT  4096
     39 #define MAX_ARGS        32
     40 
     41 struct api_client_state {
     42   int    fd;
     43   int   *fds;
     44   int   *ready_fds;
     45   int    ready_fd;
     46   char  *username;
     47   char   rbuf[READ_BUF_SIZE];
     48   size_t rlen;
     49   char  *wbuf;
     50   size_t wlen;
     51   size_t wcap;
     52 };
     53 
     54 typedef struct api_client_state api_client_t;
     55 
     56 typedef struct {
     57   const char *name;
     58   char (*func)(api_client_t *c, char **args, int nargs);
     59 } api_cmd_entry;
     60 
     61 typedef struct {
     62   const char   *name;
     63   domain_cmd_fn func;
     64 } domain_cmd_entry;
     65 
     66 static struct mindex_t *cmd_map        = NULL;
     67 static struct mindex_t *domain_cmd_map = NULL;
     68 
     69 typedef struct {
     70   int *server_fds;
     71   int *ready_fds;
     72 } api_server_udata_t;
     73 
     74 bool api_write_raw(api_client_t *c, const void *data, size_t len) {
     75   if (c->fd < 0) return false;
     76   if (c->wlen + len > c->wcap) {
     77     size_t need = c->wlen + len;
     78     size_t ncap = c->wcap ? c->wcap : WRITE_BUF_INIT;
     79     while (ncap < need) ncap *= 2;
     80     char *nb = realloc(c->wbuf, ncap);
     81     if (!nb) return false;
     82     c->wbuf = nb;
     83     c->wcap = ncap;
     84   }
     85   memcpy(c->wbuf + c->wlen, data, len);
     86   c->wlen += len;
     87   return true;
     88 }
     89 
     90 bool api_write_cstr(api_client_t *c, const char *s) {
     91   return api_write_raw(c, s, strlen(s));
     92 }
     93 
     94 bool api_write_ok(api_client_t *c) {
     95   return api_write_cstr(c, "+OK\r\n");
     96 }
     97 
     98 bool api_write_err(api_client_t *c, const char *msg) {
     99   if (!api_write_cstr(c, "-ERR ")) return false;
    100   if (!api_write_cstr(c, msg)) return false;
    101   return api_write_cstr(c, "\r\n");
    102 }
    103 
    104 bool api_write_nil(api_client_t *c) {
    105   return api_write_cstr(c, "$-1\r\n");
    106 }
    107 
    108 bool api_write_int(api_client_t *c, int value) {
    109   char buf[32];
    110   snprintf(buf, sizeof(buf), ":%d\r\n", value);
    111   return api_write_cstr(c, buf);
    112 }
    113 
    114 bool api_write_array(api_client_t *c, size_t nitems) {
    115   char buf[32];
    116   snprintf(buf, sizeof(buf), "*%zu\r\n", nitems);
    117   return api_write_cstr(c, buf);
    118 }
    119 
    120 bool api_write_bulk_cstr(api_client_t *c, const char *s) {
    121   if (!s) return api_write_nil(c);
    122   size_t len = strlen(s);
    123   char   prefix[32];
    124   snprintf(prefix, sizeof(prefix), "$%zu\r\n", len);
    125   if (!api_write_cstr(c, prefix)) return false;
    126   if (!api_write_raw(c, s, len)) return false;
    127   return api_write_cstr(c, "\r\n");
    128 }
    129 
    130 bool api_write_bulk_int(api_client_t *c, int val) {
    131   char buf[32];
    132   snprintf(buf, sizeof(buf), "%d", val);
    133   return api_write_bulk_cstr(c, buf);
    134 }
    135 
    136 static void client_close(api_client_t *c) {
    137   if (c->fd >= 0) {
    138     close(c->fd);
    139     c->fd = -1;
    140   }
    141   free(c->wbuf);
    142   c->wbuf = NULL;
    143   c->wlen = c->wcap = 0;
    144   c->rlen           = 0;
    145   free(c->username);
    146   c->username = NULL;
    147 }
    148 
    149 static void client_flush(api_client_t *c) {
    150   if (c->fd < 0 || c->wlen == 0) return;
    151   ssize_t n = send(c->fd, c->wbuf, c->wlen, 0);
    152   if (n > 0) {
    153     if ((size_t)n < c->wlen) memmove(c->wbuf, c->wbuf + n, c->wlen - (size_t)n);
    154     c->wlen -= (size_t)n;
    155   } else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
    156     client_close(c);
    157   }
    158 }
    159 
    160 static bool permit_matches(const char *pattern, const char *cmd) {
    161   size_t plen = strlen(pattern);
    162   if (plen == 1 && pattern[0] == '*') return true;
    163   if (plen >= 2 && pattern[plen - 1] == '*') {
    164     return strncasecmp(pattern, cmd, plen - 1) == 0;
    165   }
    166   return strcasecmp(pattern, cmd) == 0;
    167 }
    168 
    169 static bool user_has_permit(api_client_t *c, const char *cmd) {
    170   char        section[128];
    171   const char *uname = (c->username && c->username[0]) ? c->username : "*";
    172   snprintf(section, sizeof(section), "user:%s", uname);
    173   resp_object *sec = resp_map_get(domain_cfg, section);
    174   if (sec && sec->type == RESPT_ARRAY) {
    175     for (size_t i = 0; i < sec->u.arr.n; i += 2) {
    176       if (i + 1 < sec->u.arr.n) {
    177         resp_object *key = &sec->u.arr.elem[i];
    178         resp_object *val = &sec->u.arr.elem[i + 1];
    179         if (key->type == RESPT_BULK && key->u.s && strcmp(key->u.s, "permit") == 0) {
    180           if (val->type == RESPT_ARRAY) {
    181             for (size_t j = 0; j < val->u.arr.n; j++) {
    182               resp_object *p = &val->u.arr.elem[j];
    183               if (p->type == RESPT_BULK && p->u.s && permit_matches(p->u.s, cmd)) return true;
    184             }
    185           } else if (val->type == RESPT_BULK && val->u.s && permit_matches(val->u.s, cmd)) {
    186             return true;
    187           }
    188         }
    189       }
    190     }
    191   }
    192   if (strcmp(uname, "*") != 0) {
    193     resp_object *anon = resp_map_get(domain_cfg, "user:*");
    194     if (anon && anon->type == RESPT_ARRAY) {
    195       for (size_t i = 0; i < anon->u.arr.n; i += 2) {
    196         if (i + 1 < anon->u.arr.n) {
    197           resp_object *key = &anon->u.arr.elem[i];
    198           resp_object *val = &anon->u.arr.elem[i + 1];
    199           if (key->type == RESPT_BULK && key->u.s && strcmp(key->u.s, "permit") == 0) {
    200             if (val->type == RESPT_ARRAY) {
    201               for (size_t j = 0; j < val->u.arr.n; j++) {
    202                 resp_object *p = &val->u.arr.elem[j];
    203                 if (p->type == RESPT_BULK && p->u.s && permit_matches(p->u.s, cmd)) return true;
    204               }
    205             } else if (val->type == RESPT_BULK && val->u.s && permit_matches(val->u.s, cmd)) {
    206               return true;
    207             }
    208           }
    209         }
    210       }
    211     }
    212   }
    213   return false;
    214 }
    215 
    216 static int cmd_compare(const void *a, const void *b, void *udata) {
    217   (void)udata;
    218   const api_cmd_entry *ca = a;
    219   const api_cmd_entry *cb = b;
    220   return strcasecmp(ca->name, cb->name);
    221 }
    222 
    223 static void cmd_purge(void *item, void *udata) {
    224   (void)item;
    225   (void)udata;
    226 }
    227 
    228 void api_register_cmd(const char *name, char (*func)(api_client_t *, char **, int)) {
    229   if (!cmd_map) cmd_map = mindex_init(cmd_compare, cmd_purge, NULL);
    230   api_cmd_entry *entry = malloc(sizeof(api_cmd_entry));
    231   entry->name = name;
    232   entry->func = func;
    233   mindex_set(cmd_map, entry);
    234   log_trace("api: registered command '%s'", name);
    235 }
    236 
    237 static int domain_cmd_compare(const void *a, const void *b, void *udata) {
    238   (void)udata;
    239   const domain_cmd_entry *ca = a;
    240   const domain_cmd_entry *cb = b;
    241   return strcasecmp(ca->name, cb->name);
    242 }
    243 
    244 static void domain_cmd_purge(void *item, void *udata) {
    245   (void)item;
    246   (void)udata;
    247 }
    248 
    249 void api_register_domain_cmd(const char *name, domain_cmd_fn func) {
    250   if (!domain_cmd_map)
    251     domain_cmd_map = mindex_init(domain_cmd_compare, domain_cmd_purge, NULL);
    252   domain_cmd_entry *entry = malloc(sizeof(domain_cmd_entry));
    253   entry->name = name;
    254   entry->func = func;
    255   mindex_set(domain_cmd_map, entry);
    256   log_trace("api: registered domain command '%s'", name);
    257 }
    258 
    259 static char cmdAUTH(api_client_t *c, char **args, int nargs) {
    260   if (nargs != 3) {
    261     api_write_err(c,
    262                   "wrong number of arguments for 'auth' command (AUTH "
    263                   "username password)");
    264     return 1;
    265   }
    266   const char *uname = args[1];
    267   const char *pass  = args[2];
    268   char        section[128];
    269   snprintf(section, sizeof(section), "user:%s", uname);
    270   resp_object *sec    = resp_map_get(domain_cfg, section);
    271   const char  *secret = sec ? resp_map_get_string(sec, "secret") : NULL;
    272   if (secret && pass && strcmp(secret, pass) == 0) {
    273     free(c->username);
    274     c->username = strdup(uname);
    275     if (c->username) {
    276       log_debug("api: client authenticated as '%s'", uname);
    277       return api_write_ok(c) ? 1 : 0;
    278     }
    279   }
    280   return api_write_err(c, "invalid credentials") ? 1 : 0;
    281 }
    282 
    283 static char cmdPING(api_client_t *c, char **args, int nargs) {
    284   (void)args;
    285   if (nargs == 1) return api_write_cstr(c, "+PONG\r\n") ? 1 : 0;
    286   if (nargs == 2) return api_write_bulk_cstr(c, args[1]) ? 1 : 0;
    287   return api_write_err(c, "wrong number of arguments for 'ping' command") ? 1 : 0;
    288 }
    289 
    290 static char cmdQUIT(api_client_t *c, char **args, int nargs) {
    291   (void)args;
    292   (void)nargs;
    293   api_write_ok(c);
    294   return 0;
    295 }
    296 
    297 static bool is_builtin(const char *name);
    298 
    299 static char cmdCOMMAND(api_client_t *c, char **args, int nargs) {
    300   (void)args;
    301   if (!cmd_map && !domain_cmd_map) return api_write_array(c, 0) ? 1 : 0;
    302 
    303   resp_object *result = resp_array_init();
    304   if (!result) return 0;
    305 
    306   if (domain_cmd_map) {
    307     for (size_t i = 0; i < mindex_length(domain_cmd_map); i++) {
    308       const domain_cmd_entry *e = mindex_nth(domain_cmd_map, i);
    309       if (!user_has_permit(c, e->name)) continue;
    310 
    311       resp_array_append_bulk(result, e->name);
    312       resp_object *meta = resp_array_init();
    313       if (!meta) {
    314         resp_free(result);
    315         return 0;
    316       }
    317       resp_array_append_bulk(meta, "summary");
    318       resp_array_append_bulk(meta, "UDP hole proxy command");
    319       resp_array_append_obj(result, meta);
    320     }
    321   }
    322 
    323   if (cmd_map) {
    324     for (size_t i = 0; i < mindex_length(cmd_map); i++) {
    325       const api_cmd_entry *e = mindex_nth(cmd_map, i);
    326       if (!is_builtin(e->name) && !user_has_permit(c, e->name)) continue;
    327 
    328       resp_array_append_bulk(result, e->name);
    329       resp_object *meta = resp_array_init();
    330       if (!meta) {
    331         resp_free(result);
    332         return 0;
    333       }
    334       resp_array_append_bulk(meta, "summary");
    335       resp_array_append_bulk(meta, "UDP hole proxy command");
    336       resp_array_append_obj(result, meta);
    337     }
    338   }
    339 
    340   char  *out_buf = NULL;
    341   size_t out_len = 0;
    342   if (resp_serialize(result, &out_buf, &out_len) != 0 || !out_buf) {
    343     resp_free(result);
    344     return 0;
    345   }
    346   resp_free(result);
    347 
    348   api_write_raw(c, out_buf, out_len);
    349   free(out_buf);
    350   return 1;
    351 }
    352 
    353 static void init_builtins(void) {
    354   api_register_cmd("auth", cmdAUTH);
    355   api_register_cmd("ping", cmdPING);
    356   api_register_cmd("quit", cmdQUIT);
    357   api_register_cmd("command", cmdCOMMAND);
    358 }
    359 
    360 static bool is_builtin(const char *name) {
    361   return (strcasecmp(name, "auth") == 0 || strcasecmp(name, "ping") == 0 || strcasecmp(name, "quit") == 0 ||
    362           strcasecmp(name, "command") == 0);
    363 }
    364 
    365 static void dispatch_command(api_client_t *c, char **args, int nargs) {
    366   if (nargs <= 0) return;
    367 
    368   for (char *p = args[0]; *p; p++) *p = (char)tolower((unsigned char)*p);
    369 
    370   const domain_cmd_entry *dcmd = mindex_get(domain_cmd_map, &(domain_cmd_entry){.name = args[0]});
    371   if (dcmd) {
    372     if (!is_builtin(args[0])) {
    373       if (!user_has_permit(c, args[0])) {
    374         api_write_err(c, "no permission");
    375         return;
    376       }
    377     }
    378 
    379     resp_object *domain_args = resp_array_init();
    380     if (!domain_args) return;
    381 
    382     for (int i = 0; i < nargs; i++) {
    383       resp_array_append_bulk(domain_args, args[i]);
    384     }
    385 
    386     resp_object *result = dcmd->func(args[0], domain_args);
    387     resp_free(domain_args);
    388 
    389     if (!result) {
    390       api_write_err(c, "command failed");
    391       return;
    392     }
    393 
    394     char  *out_buf = NULL;
    395     size_t out_len = 0;
    396     if (resp_serialize(result, &out_buf, &out_len) != 0 || !out_buf) {
    397       resp_free(result);
    398       api_write_err(c, "command failed");
    399       return;
    400     }
    401     resp_free(result);
    402 
    403     api_write_raw(c, out_buf, out_len);
    404     free(out_buf);
    405     return;
    406   }
    407 
    408   const api_cmd_entry *cmd = mindex_get(cmd_map, &(api_cmd_entry){.name = args[0]});
    409   if (!cmd) {
    410     api_write_err(c, "unknown command");
    411     return;
    412   }
    413 
    414   if (!is_builtin(args[0])) {
    415     if (!user_has_permit(c, args[0])) {
    416       api_write_err(c, "no permission");
    417       return;
    418     }
    419   }
    420 
    421   char result = cmd->func(c, args, nargs);
    422   if (!result) {
    423     client_flush(c);
    424     client_close(c);
    425   }
    426 }
    427 
    428 static int *create_listen_socket(const char *listen_addr) {
    429   const char  *default_port = "6379";
    430   resp_object *api_sec      = resp_map_get(domain_cfg, "udphole");
    431   if (api_sec) {
    432     const char *cfg_port = resp_map_get_string(api_sec, "port");
    433     if (cfg_port && cfg_port[0]) default_port = cfg_port;
    434   }
    435 
    436   struct parsed_url *purl = NULL;
    437   if (parse_address_url(listen_addr, &purl) != 0) {
    438     log_error("api: failed to parse listen address '%s'", listen_addr);
    439     return NULL;
    440   }
    441 
    442   if (purl->scheme && strcmp(purl->scheme, "unix") == 0) {
    443     const char *socket_path  = purl->path;
    444     const char *socket_owner = api_sec ? resp_map_get_string(api_sec, "socket_owner") : NULL;
    445     int        *fds          = unix_listen(socket_path, SOCK_STREAM, socket_owner);
    446     parsed_url_free(purl);
    447     if (!fds) {
    448       return NULL;
    449     }
    450     log_info("api: listening on %s", listen_addr);
    451     return fds;
    452   }
    453 
    454   char addr_buf[512];
    455   if (purl->host && purl->port) {
    456     snprintf(addr_buf, sizeof(addr_buf), "%s:%s", purl->host, purl->port);
    457   } else if (purl->port) {
    458     snprintf(addr_buf, sizeof(addr_buf), ":%s", purl->port);
    459   } else {
    460     snprintf(addr_buf, sizeof(addr_buf), ":%s", default_port);
    461   }
    462 
    463   int *fds = tcp_listen(addr_buf, NULL, default_port);
    464   parsed_url_free(purl);
    465   if (!fds) {
    466     return NULL;
    467   }
    468   log_info("api: listening on %s", listen_addr);
    469   return fds;
    470 }
    471 
    472 static void handle_accept(int ready_fd) {
    473   struct sockaddr_storage addr;
    474   socklen_t               addrlen = sizeof(addr);
    475   int                     fd      = accept(ready_fd, (struct sockaddr *)&addr, &addrlen);
    476   if (fd < 0) return;
    477   set_socket_nonblocking(fd, 1);
    478 
    479   int flag = 1;
    480   setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
    481 
    482   api_client_t *state = calloc(1, sizeof(*state));
    483   if (!state) {
    484     const char *msg = "-ERR out of memory\r\n";
    485     send(fd, msg, strlen(msg), 0);
    486     close(fd);
    487     return;
    488   }
    489   state->fd = fd;
    490 
    491   sched_create(api_client_pt, state);
    492   log_trace("api: accepted connection, spawned client pt");
    493 }
    494 
    495 int api_server_pt(int64_t timestamp, struct pt_task *task) {
    496   (void)timestamp;
    497   api_server_udata_t *udata = task->udata;
    498 
    499   if (!udata) {
    500     udata = calloc(1, sizeof(api_server_udata_t));
    501     if (!udata) {
    502       return SCHED_ERROR;
    503     }
    504     task->udata = udata;
    505   }
    506 
    507   if (udata->server_fds == NULL) {
    508     resp_object *api_sec    = resp_map_get(domain_cfg, "udphole");
    509     resp_object *listen_arr = api_sec ? resp_map_get(api_sec, "listen") : NULL;
    510 
    511     if (!listen_arr || listen_arr->type != RESPT_ARRAY || listen_arr->u.arr.n == 0) {
    512       return SCHED_RUNNING;
    513     }
    514 
    515     int  **socket_arrays = NULL;
    516     size_t num_listeners = 0;
    517 
    518     for (size_t i = 0; i < listen_arr->u.arr.n; i++) {
    519       if (listen_arr->u.arr.elem[i].type != RESPT_BULK || !listen_arr->u.arr.elem[i].u.s) {
    520         continue;
    521       }
    522       const char *listen_addr = listen_arr->u.arr.elem[i].u.s;
    523       int        *fds         = create_listen_socket(listen_addr);
    524       if (!fds) {
    525         log_fatal("api: failed to listen on %s", listen_addr);
    526         for (size_t j = 0; j < num_listeners; j++) {
    527           if (socket_arrays[j]) {
    528             for (int k = 1; k <= socket_arrays[j][0]; k++) {
    529               close(socket_arrays[j][k]);
    530             }
    531             free(socket_arrays[j]);
    532           }
    533         }
    534         free(socket_arrays);
    535         return SCHED_ERROR;
    536       }
    537       socket_arrays                  = realloc(socket_arrays, sizeof(int *) * (num_listeners + 1));
    538       socket_arrays[num_listeners++] = fds;
    539     }
    540 
    541     int total_fds = 0;
    542     for (size_t i = 0; i < num_listeners; i++) {
    543       total_fds += socket_arrays[i][0];
    544     }
    545 
    546     udata->server_fds = calloc(total_fds + 1, sizeof(int));
    547     if (!udata->server_fds) {
    548       log_fatal("api: out of memory for listen sockets");
    549       for (size_t i = 0; i < num_listeners; i++) {
    550         free(socket_arrays[i]);
    551       }
    552       free(socket_arrays);
    553       return SCHED_ERROR;
    554     }
    555     udata->server_fds[0] = 0;
    556 
    557     for (size_t i = 0; i < num_listeners; i++) {
    558       for (int j = 1; j <= socket_arrays[i][0]; j++) {
    559         udata->server_fds[++udata->server_fds[0]] = socket_arrays[i][j];
    560       }
    561       free(socket_arrays[i]);
    562     }
    563     free(socket_arrays);
    564 
    565     if (udata->server_fds[0] == 0) {
    566       log_fatal("api: no listen sockets created");
    567       free(udata->server_fds);
    568       udata->server_fds = NULL;
    569       return SCHED_ERROR;
    570     }
    571 
    572     init_builtins();
    573   }
    574 
    575   if (udata->server_fds && udata->server_fds[0] > 0) {
    576     int ready_fd = sched_has_data(udata->server_fds);
    577     if (ready_fd >= 0) {
    578       handle_accept(ready_fd);
    579     }
    580   }
    581 
    582   return SCHED_RUNNING;
    583 }
    584 
    585 int api_client_pt(int64_t timestamp, struct pt_task *task) {
    586   (void)timestamp;
    587   api_client_t *state = task->udata;
    588 
    589   if (!state->fds) {
    590     state->fds = malloc(sizeof(int) * 2);
    591     if (!state->fds) {
    592       free(state);
    593       return SCHED_DONE;
    594     }
    595     state->fds[0] = 1;
    596     state->fds[1] = state->fd;
    597   }
    598 
    599   int ready_fd = sched_has_data(state->fds);
    600   if (ready_fd < 0) {
    601     return SCHED_RUNNING;
    602   }
    603 
    604   if (ready_fd != state->fd) {
    605     return SCHED_RUNNING;
    606   }
    607 
    608   // Read data into receive buffer
    609   if (state->rlen >= READ_BUF_SIZE) {
    610     // Buffer full, protocol error
    611     goto cleanup;
    612   }
    613 
    614   ssize_t n = recv(state->fd, state->rbuf + state->rlen, READ_BUF_SIZE - state->rlen, 0);
    615   if (n < 0) {
    616     if (errno == EAGAIN || errno == EWOULDBLOCK) {
    617       return SCHED_RUNNING;
    618     }
    619     goto cleanup;
    620   }
    621   if (n == 0) {
    622     goto cleanup;
    623   }
    624   state->rlen += (size_t)n;
    625   log_trace("api: received %zd bytes, buffer len=%zu", n, state->rlen);
    626 
    627   // Try to parse RESP object from buffer
    628   resp_object *cmd = NULL;
    629   int consumed = resp_read_buf(state->rbuf, state->rlen, &cmd);
    630   log_trace("api: resp_read_buf returned %d, cmd=%p", consumed, (void*)cmd);
    631   if (consumed > 0) {
    632     // Successfully parsed - consume bytes from buffer
    633     memmove(state->rbuf, state->rbuf + consumed, state->rlen - consumed);
    634     state->rlen -= consumed;
    635   } else if (consumed < 0) {
    636     // Incomplete - need more data, will retry on next call with same buffer
    637     return SCHED_RUNNING;
    638   } else {
    639     // No data - shouldn't happen, but continue
    640     return SCHED_RUNNING;
    641   }
    642 
    643   if (!cmd) {
    644     return SCHED_RUNNING;
    645   }
    646 
    647   if (cmd->type != RESPT_ARRAY || cmd->u.arr.n == 0) {
    648     log_trace("api: not an array or empty (type=%d, n=%zu), sending protocol error", cmd->type, cmd->u.arr.n);
    649     resp_free(cmd);
    650     api_write_err(state, "Protocol error");
    651     client_flush(state);
    652     return SCHED_RUNNING;
    653   }
    654 
    655   log_trace("api: array has %zu elements", cmd->u.arr.n);
    656 
    657   char *args[MAX_ARGS];
    658   int   nargs = 0;
    659   for (size_t i = 0; i < cmd->u.arr.n && nargs < MAX_ARGS; i++) {
    660     resp_object *elem = &cmd->u.arr.elem[i];
    661     log_trace("api: elem[%zu] type=%d, s=%s", i, elem->type, elem->u.s ? elem->u.s : "(null)");
    662     if (elem->type == RESPT_BULK && elem->u.s) {
    663       args[nargs++] = elem->u.s;
    664       elem->u.s     = NULL;
    665     } else if (elem->type == RESPT_SIMPLE) {
    666       args[nargs++] = elem->u.s ? elem->u.s : "";
    667     }
    668   }
    669 
    670   log_trace("api: dispatching command with %d args", nargs);
    671   if (nargs > 0) {
    672     dispatch_command(state, args, nargs);
    673   }
    674   log_trace("api: command dispatched");
    675 
    676   for (int j = 0; j < nargs; j++) {
    677     free(args[j]);
    678   }
    679   resp_free(cmd);
    680 
    681   client_flush(state);
    682 
    683   if (state->fd < 0) {
    684     goto cleanup;
    685   }
    686 
    687   return SCHED_RUNNING;
    688 
    689 cleanup:
    690   if (state->fd >= 0) {
    691     close(state->fd);
    692   }
    693   free(state->fds);
    694   free(state->wbuf);
    695   free(state->username);
    696   free(state);
    697   return SCHED_DONE;
    698 }