udphole

Basic UDP wormhole proxy
git clone git://git.finwo.net/app/udphole
Log | Files | Refs | README | LICENSE

node.c (6343B)


      1 #include <stdlib.h>
      2 #include <string.h>
      3 #include <stdio.h>
      4 #include <unistd.h>
      5 #include <fcntl.h>
      6 #include <sys/socket.h>
      7 #include <sys/un.h>
      8 #include <netinet/in.h>
      9 #include <arpa/inet.h>
     10 #include <netdb.h>
     11 #include <ctype.h>
     12 #include <stdarg.h>
     13 #include <errno.h>
     14 
     15 #include "rxi/log.h"
     16 #include "domain/cluster/node.h"
     17 #include "common/resp.h"
     18 
     19 #define MAX_HOST_LEN 256
     20 #define MAX_PORT_LEN 16
     21 #define MAX_PATH_LEN 256
     22 
     23 static int parse_url(const char *url, char *host, size_t host_len, int *port, char *unix_path, size_t path_len) {
     24   if (!url) return -1;
     25 
     26   if (strncmp(url, "unix://", 7) == 0) {
     27     const char *p = url + 7;
     28     size_t path_len_calc = strlen(p);
     29     if (path_len_calc >= path_len) path_len_calc = path_len - 1;
     30     strncpy(unix_path, p, path_len_calc);
     31     unix_path[path_len_calc] = '\0';
     32     return 1;
     33   }
     34 
     35   if (strncmp(url, "tcp://", 6) != 0) {
     36     return -1;
     37   }
     38 
     39   const char *p = url + 6;
     40   const char *colon = strchr(p, ':');
     41   if (!colon) return -1;
     42 
     43   size_t host_len_calc = colon - p;
     44   if (host_len_calc >= host_len) host_len_calc = host_len - 1;
     45   strncpy(host, p, host_len_calc);
     46   host[host_len_calc] = '\0';
     47 
     48   *port = atoi(colon + 1);
     49   if (*port <= 0) return -1;
     50 
     51   return 0;
     52 }
     53 
     54 cluster_node_t *cluster_node_create(const char *name, const char *url, int weight, const char *user, const char *secret) {
     55   cluster_node_t *node = calloc(1, sizeof(*node));
     56   if (!node) return NULL;
     57 
     58   node->name = strdup(name);
     59   node->url = strdup(url);
     60   node->weight = weight > 0 ? weight : 1;
     61   node->user = user ? strdup(user) : NULL;
     62   node->secret = secret ? strdup(secret) : NULL;
     63   node->fd = -1;
     64   node->session_count = 0;
     65   node->available = false;
     66   node->consecutive_failures = 0;
     67   node->last_check = 0;
     68   node->next = NULL;
     69 
     70   char host[MAX_HOST_LEN];
     71   int port;
     72   char unix_path[MAX_PATH_LEN];
     73   if (parse_url(url, host, sizeof(host), &port, unix_path, sizeof(unix_path)) < 0) {
     74     log_error("cluster: failed to parse URL %s", url);
     75     cluster_node_free(node);
     76     return NULL;
     77   }
     78 
     79   return node;
     80 }
     81 
     82 void cluster_node_free(cluster_node_t *node) {
     83   if (!node) return;
     84   cluster_node_disconnect(node);
     85   free(node->name);
     86   free(node->url);
     87   free(node->user);
     88   free(node->secret);
     89   free(node);
     90 }
     91 
     92 int cluster_node_connect(cluster_node_t *node) {
     93   if (!node) return -1;
     94 
     95   cluster_node_disconnect(node);
     96 
     97   char host[MAX_HOST_LEN];
     98   int port;
     99   char unix_path[MAX_PATH_LEN];
    100   int url_type = parse_url(node->url, host, sizeof(host), &port, unix_path, sizeof(unix_path));
    101   if (url_type < 0) {
    102     return -1;
    103   }
    104 
    105   int fd;
    106   if (url_type == 1) {
    107     fd = socket(AF_UNIX, SOCK_STREAM, 0);
    108     if (fd < 0) return -1;
    109 
    110     struct sockaddr_un addr;
    111     memset(&addr, 0, sizeof(addr));
    112     addr.sun_family = AF_UNIX;
    113     strncpy(addr.sun_path, unix_path, sizeof(addr.sun_path) - 1);
    114 
    115     int ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
    116     if (ret < 0) {
    117       close(fd);
    118       return -1;
    119     }
    120 
    121     node->fd = fd;
    122     return 0;
    123   }
    124 
    125   struct sockaddr_in addr;
    126   memset(&addr, 0, sizeof(addr));
    127   addr.sin_family = AF_INET;
    128   addr.sin_port = htons(port);
    129 
    130   if (inet_pton(AF_INET, host, &addr.sin_addr) <= 0) {
    131     struct hostent *he = gethostbyname(host);
    132     if (!he) return -1;
    133     memcpy(&addr.sin_addr, he->h_addr_list[0], he->h_length);
    134   }
    135 
    136   fd = socket(AF_INET, SOCK_STREAM, 0);
    137   if (fd < 0) return -1;
    138 
    139   int flags = fcntl(fd, F_GETFL, 0);
    140   fcntl(fd, F_SETFL, flags | O_NONBLOCK);
    141 
    142   int ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
    143   if (ret < 0 && errno != EINPROGRESS) {
    144     close(fd);
    145     return -1;
    146   }
    147 
    148   node->fd = fd;
    149   return 0;
    150 }
    151 
    152 void cluster_node_disconnect(cluster_node_t *node) {
    153   if (!node || node->fd < 0) return;
    154   close(node->fd);
    155   node->fd = -1;
    156 }
    157 
    158 int cluster_node_ping(cluster_node_t *node) {
    159   if (!node || node->fd < 0) {
    160     if (node && cluster_node_connect(node) != 0) {
    161       return -1;
    162     }
    163   }
    164 
    165   resp_object *resp = cluster_node_send_command(node, "ping");
    166   if (!resp) return -1;
    167 
    168   int ok = (resp->type == RESPT_SIMPLE && resp->u.s && strcmp(resp->u.s, "PONG") == 0);
    169   resp_free(resp);
    170 
    171   return ok ? 0 : -1;
    172 }
    173 
    174 int cluster_node_get_session_count(cluster_node_t *node) {
    175   if (!node || node->fd < 0) return -1;
    176 
    177   resp_object *resp = cluster_node_send_command(node, "session.count");
    178   if (!resp) return -1;
    179 
    180   int count = -1;
    181   if (resp->type == RESPT_INT) {
    182     count = (int)resp->u.i;
    183   }
    184   resp_free(resp);
    185 
    186   return count;
    187 }
    188 
    189 resp_object *cluster_node_send_command(cluster_node_t *node, const char *cmd, ...) {
    190   if (!node || node->fd < 0 || !cmd) return NULL;
    191 
    192   char *buf = NULL;
    193   size_t len = 0;
    194 
    195   va_list args;
    196   va_start(args, cmd);
    197 
    198   int argc = 1;
    199   const char *arg = cmd;
    200   while (va_arg(args, const char *) != NULL) {
    201     argc++;
    202   }
    203   va_end(args);
    204 
    205   resp_object **argv = malloc(sizeof(resp_object *) * argc);
    206   if (!argv) return NULL;
    207 
    208   argv[0] = resp_array_init();
    209   resp_array_append_bulk(argv[0], cmd);
    210 
    211   va_start(args, cmd);
    212   int i = 1;
    213   while ((arg = va_arg(args, const char *)) != NULL) {
    214     argv[i] = resp_array_init();
    215     resp_array_append_bulk(argv[i], arg);
    216     i++;
    217   }
    218   va_end(args);
    219 
    220   resp_object *cmd_arr = resp_array_init();
    221   for (i = 0; i < argc; i++) {
    222     resp_array_append_obj(cmd_arr, argv[i]);
    223   }
    224 
    225   resp_encode_array(argc, (const resp_object *const *)argv, &buf, &len);
    226 
    227   for (i = 0; i < argc; i++) {
    228     resp_free(argv[i]);
    229   }
    230   free(argv);
    231   resp_free(cmd_arr);
    232 
    233   if (!buf) return NULL;
    234 
    235   ssize_t written = send(node->fd, buf, len, 0);
    236   free(buf);
    237 
    238   if (written != (ssize_t)len) {
    239     return NULL;
    240   }
    241 
    242   return resp_read(node->fd);
    243 }
    244 
    245 void cluster_node_set_available(cluster_node_t *node, bool available) {
    246   if (!node) return;
    247   node->available = available;
    248   if (!available) {
    249     node->consecutive_failures++;
    250   } else {
    251     node->consecutive_failures = 0;
    252   }
    253 }
    254 
    255 bool cluster_node_select_weighted_lowest(cluster_node_t *node, cluster_node_t *best) {
    256   if (!node || !node->available) return false;
    257 
    258   if (!best) return true;
    259 
    260   double node_ratio = (double)node->session_count / node->weight;
    261   double best_ratio = (double)best->session_count / best->weight;
    262 
    263   if (node_ratio < best_ratio) return true;
    264   if (node_ratio > best_ratio) return false;
    265 
    266   if (node->weight > best->weight) return true;
    267   if (node->weight < best->weight) return false;
    268 
    269   return false;
    270 }