udphole

Basic UDP wormhole proxy
git clone git://git.finwo.net/app/udphole
Log | Files | Refs | README | LICENSE

node.c (8955B)


      1 #include "node.h"
      2 
      3 #include <arpa/inet.h>
      4 #include <errno.h>
      5 #include <fcntl.h>
      6 #include <netdb.h>
      7 #include <netinet/in.h>
      8 #include <netinet/tcp.h>
      9 #include <stdio.h>
     10 #include <stdlib.h>
     11 #include <string.h>
     12 #include <sys/socket.h>
     13 #include <sys/un.h>
     14 #include <unistd.h>
     15 
     16 #include "common/resp.h"
     17 #include "common/socket_util.h"
     18 #include "common/url_utils.h"
     19 #include "rxi/log.h"
     20 
     21 #define QUERY_TIMEOUT_MS        500
     22 #define HEALTHCHECK_INTERVAL_MS 5000
     23 
     24 static int parse_address(const char *address, char **host, int *port, char **unix_path) {
     25   if (!address) return -1;
     26 
     27   struct parsed_url *purl = NULL;
     28   if (parse_address_url(address, &purl) != 0) {
     29     return -1;
     30   }
     31 
     32   if (purl->scheme && strcmp(purl->scheme, "unix") == 0) {
     33     *unix_path = strdup(purl->path ? purl->path : "");
     34     *host      = NULL;
     35     *port      = 0;
     36     parsed_url_free(purl);
     37     return 0;
     38   }
     39 
     40   if (!purl->host) {
     41     log_error("cluster: no host in address '%s'", address);
     42     parsed_url_free(purl);
     43     return -1;
     44   }
     45 
     46   *host      = strdup(purl->host);
     47   *port      = purl->port ? atoi(purl->port) : 0;
     48   *unix_path = NULL;
     49 
     50   if (*port <= 0) {
     51     log_error("cluster: invalid port in address '%s'", address);
     52     free(*host);
     53     *host = NULL;
     54     parsed_url_free(purl);
     55     return -1;
     56   }
     57 
     58   parsed_url_free(purl);
     59   return 0;
     60 }
     61 
     62 int cluster_node_init(cluster_node_t *node, const char *name, const char *address, const char *username,
     63                       const char *password) {
     64   memset(node, 0, sizeof(*node));
     65 
     66   node->name     = strdup(name);
     67   node->address  = address ? strdup(address) : NULL;
     68   node->username = username ? strdup(username) : NULL;
     69   node->password = password ? strdup(password) : NULL;
     70 
     71   if (!node->address) {
     72     log_error("cluster: node '%s' has no address configured", name);
     73     return -1;
     74   }
     75 
     76   if (parse_address(node->address, &node->host, &node->port, &node->unix_path) != 0) {
     77     return -1;
     78   }
     79 
     80   node->fd         = -1;
     81   node->available  = 0;
     82   node->last_ping  = 0;
     83   node->last_check = 0;
     84   node->weight     = 1;
     85 
     86   return 0;
     87 }
     88 
     89 void cluster_node_free(cluster_node_t *node) {
     90   if (!node) return;
     91   cluster_node_disconnect(node);
     92   free(node->name);
     93   free(node->address);
     94   free(node->host);
     95   free(node->unix_path);
     96   free(node->username);
     97   free(node->password);
     98 }
     99 
    100 static int connect_tcp(const char *host, int port) {
    101   struct addrinfo hints, *res, *res0;
    102   int             sockfd, error;
    103 
    104   memset(&hints, 0, sizeof(hints));
    105   hints.ai_family   = AF_UNSPEC;
    106   hints.ai_socktype = SOCK_STREAM;
    107   hints.ai_protocol = 0;
    108 
    109   char port_str[16];
    110   snprintf(port_str, sizeof(port_str), "%d", port);
    111 
    112   error = getaddrinfo(host, port_str, &hints, &res0);
    113   if (error) {
    114     log_error("cluster: getaddrinfo(%s:%s): %s", host, port_str, gai_strerror(error));
    115     return -1;
    116   }
    117 
    118   sockfd = -1;
    119   for (res = res0; res; res = res->ai_next) {
    120     sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol);
    121     if (sockfd < 0) continue;
    122 
    123     if (connect(sockfd, res->ai_addr, res->ai_addrlen) == 0) break;
    124 
    125     close(sockfd);
    126     sockfd = -1;
    127   }
    128 
    129   freeaddrinfo(res0);
    130 
    131   if (sockfd >= 0) {
    132     int flag = 1;
    133     setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
    134   }
    135 
    136   return sockfd;
    137 }
    138 
    139 static int connect_unix(const char *path) {
    140   struct sockaddr_un addr;
    141   int                sockfd;
    142 
    143   if (strlen(path) >= sizeof(addr.sun_path)) {
    144     log_error("cluster: unix socket path too long: %s", path);
    145     return -1;
    146   }
    147 
    148   sockfd = socket(AF_UNIX, SOCK_STREAM, 0);
    149   if (sockfd < 0) {
    150     return -1;
    151   }
    152 
    153   memset(&addr, 0, sizeof(addr));
    154   addr.sun_family = AF_UNIX;
    155   strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1);
    156 
    157   if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
    158     close(sockfd);
    159     return -1;
    160   }
    161 
    162   return sockfd;
    163 }
    164 
    165 int cluster_node_connect(cluster_node_t *node) {
    166   if (node->fd >= 0) {
    167     return 0;
    168   }
    169 
    170   if (node->unix_path) {
    171     node->fd = connect_unix(node->unix_path);
    172   } else {
    173     node->fd = connect_tcp(node->host, node->port);
    174   }
    175 
    176   if (node->fd < 0) {
    177     log_debug("cluster: failed to connect to node '%s' at %s", node->name, node->address);
    178     return -1;
    179   }
    180 
    181   log_info("cluster: connected to node '%s' at %s", node->name, node->address);
    182 
    183   if (node->username && node->password) {
    184     char        *cmd_str = NULL;
    185     size_t       cmd_len = 0;
    186     resp_object *cmd     = resp_array_init();
    187     resp_array_append_bulk(cmd, "auth");
    188     resp_array_append_bulk(cmd, node->username);
    189     resp_array_append_bulk(cmd, node->password);
    190     resp_serialize(cmd, &cmd_str, &cmd_len);
    191     resp_free(cmd);
    192 
    193     if (cmd_str) {
    194       ssize_t n = send(node->fd, cmd_str, cmd_len, 0);
    195       if (n > 0) {
    196         resp_object *resp = resp_read(node->fd);
    197         if (!resp || resp->type == RESPT_ERROR) {
    198           log_warn("cluster: authentication failed for node '%s'", node->name);
    199           close(node->fd);
    200           node->fd = -1;
    201           free(cmd_str);
    202           return -1;
    203         }
    204         resp_free(resp);
    205       }
    206       free(cmd_str);
    207     }
    208   }
    209 
    210   return 0;
    211 }
    212 
    213 void cluster_node_disconnect(cluster_node_t *node) {
    214   if (node->fd >= 0) {
    215     close(node->fd);
    216     node->fd = -1;
    217   }
    218 }
    219 
    220 static int send_resp_command(int fd, const char *cmd) {
    221   size_t  cmd_len = strlen(cmd);
    222   ssize_t n       = send(fd, cmd, cmd_len, 0);
    223   return (n == (ssize_t)cmd_len) ? 0 : -1;
    224 }
    225 
    226 int cluster_node_send_command(cluster_node_t *node, const char *cmd, resp_object **out_response) {
    227   if (node->fd < 0) {
    228     if (cluster_node_connect(node) != 0) {
    229       return -1;
    230     }
    231   }
    232 
    233   if (send_resp_command(node->fd, cmd) != 0) {
    234     log_debug("cluster: failed to send command to node '%s'", node->name);
    235     cluster_node_disconnect(node);
    236     return -1;
    237   }
    238 
    239   resp_object *resp = resp_read(node->fd);
    240   if (!resp) {
    241     log_debug("cluster: no response from node '%s'", node->name);
    242     cluster_node_disconnect(node);
    243     return -1;
    244   }
    245 
    246   *out_response = resp;
    247   return 0;
    248 }
    249 
    250 int cluster_node_healthcheck_pt(int64_t timestamp, struct pt_task *task) {
    251   cluster_node_t *node = task->udata;
    252 
    253   if (!node) {
    254     return SCHED_DONE;
    255   }
    256 
    257   if (node->last_check > 0 && timestamp - node->last_check < HEALTHCHECK_INTERVAL_MS) {
    258     return SCHED_RUNNING;
    259   }
    260   node->last_check = timestamp;
    261 
    262   if (node->fd < 0) {
    263     if (cluster_node_connect(node) != 0) {
    264       node->available = 0;
    265       return SCHED_RUNNING;
    266     }
    267   }
    268 
    269   char        *cmd_str = NULL;
    270   size_t       cmd_len = 0;
    271   resp_object *cmd     = resp_array_init();
    272   resp_array_append_bulk(cmd, "ping");
    273   resp_serialize(cmd, &cmd_str, &cmd_len);
    274   resp_free(cmd);
    275 
    276   if (!cmd_str) {
    277     node->available = 0;
    278     cluster_node_disconnect(node);
    279     return SCHED_RUNNING;
    280   }
    281 
    282   ssize_t n = send(node->fd, cmd_str, cmd_len, 0);
    283   free(cmd_str);
    284 
    285   if (n <= 0) {
    286     node->available = 0;
    287     cluster_node_disconnect(node);
    288     return SCHED_RUNNING;
    289   }
    290 
    291   resp_object *resp = resp_read(node->fd);
    292   if (!resp || (resp->type != RESPT_SIMPLE && resp->type != RESPT_BULK)) {
    293     node->available = 0;
    294     if (resp) resp_free(resp);
    295     cluster_node_disconnect(node);
    296     return SCHED_RUNNING;
    297   }
    298 
    299   node->available = 1;
    300   node->last_ping = timestamp;
    301   log_trace("cluster: node '%s' healthcheck OK", node->name);
    302   resp_free(resp);
    303 
    304   return SCHED_RUNNING;
    305 }
    306 
    307 cluster_nodes_t *cluster_nodes_create(void) {
    308   cluster_nodes_t *cnodes = calloc(1, sizeof(cluster_nodes_t));
    309   return cnodes;
    310 }
    311 
    312 void cluster_nodes_free(cluster_nodes_t *cnodes) {
    313   if (!cnodes) return;
    314   for (size_t i = 0; i < cnodes->nodes_count; i++) {
    315     if (cnodes->nodes[i]) {
    316       cluster_node_free(cnodes->nodes[i]);
    317       free(cnodes->nodes[i]);
    318     }
    319   }
    320   free(cnodes->nodes);
    321   free(cnodes);
    322 }
    323 
    324 int cluster_nodes_add(cluster_nodes_t *cnodes, cluster_node_t *node) {
    325   cluster_node_t **new_nodes = realloc(cnodes->nodes, sizeof(cluster_node_t *) * (cnodes->nodes_count + 1));
    326   if (!new_nodes) return -1;
    327   cnodes->nodes                        = new_nodes;
    328   cnodes->nodes[cnodes->nodes_count++] = node;
    329   return 0;
    330 }
    331 
    332 cluster_node_t *cluster_nodes_get(cluster_nodes_t *cnodes, const char *name) {
    333   for (size_t i = 0; i < cnodes->nodes_count; i++) {
    334     if (cnodes->nodes[i] && strcmp(cnodes->nodes[i]->name, name) == 0) {
    335       return cnodes->nodes[i];
    336     }
    337   }
    338   return NULL;
    339 }
    340 
    341 cluster_node_t **cluster_nodes_get_available(cluster_nodes_t *cnodes, size_t *out_count) {
    342   size_t count = 0;
    343   for (size_t i = 0; i < cnodes->nodes_count; i++) {
    344     if (cnodes->nodes[i] && cnodes->nodes[i]->available) {
    345       count++;
    346     }
    347   }
    348 
    349   if (count == 0) {
    350     *out_count = 0;
    351     return NULL;
    352   }
    353 
    354   cluster_node_t **available = malloc(sizeof(cluster_node_t *) * count);
    355   if (!available) {
    356     *out_count = 0;
    357     return NULL;
    358   }
    359 
    360   count = 0;
    361   for (size_t i = 0; i < cnodes->nodes_count; i++) {
    362     if (cnodes->nodes[i] && cnodes->nodes[i]->available) {
    363       available[count++] = cnodes->nodes[i];
    364     }
    365   }
    366 
    367   *out_count = count;
    368   return available;
    369 }