node.c (6343B)
1 #include <stdlib.h> 2 #include <string.h> 3 #include <stdio.h> 4 #include <unistd.h> 5 #include <fcntl.h> 6 #include <sys/socket.h> 7 #include <sys/un.h> 8 #include <netinet/in.h> 9 #include <arpa/inet.h> 10 #include <netdb.h> 11 #include <ctype.h> 12 #include <stdarg.h> 13 #include <errno.h> 14 15 #include "rxi/log.h" 16 #include "domain/cluster/node.h" 17 #include "common/resp.h" 18 19 #define MAX_HOST_LEN 256 20 #define MAX_PORT_LEN 16 21 #define MAX_PATH_LEN 256 22 23 static int parse_url(const char *url, char *host, size_t host_len, int *port, char *unix_path, size_t path_len) { 24 if (!url) return -1; 25 26 if (strncmp(url, "unix://", 7) == 0) { 27 const char *p = url + 7; 28 size_t path_len_calc = strlen(p); 29 if (path_len_calc >= path_len) path_len_calc = path_len - 1; 30 strncpy(unix_path, p, path_len_calc); 31 unix_path[path_len_calc] = '\0'; 32 return 1; 33 } 34 35 if (strncmp(url, "tcp://", 6) != 0) { 36 return -1; 37 } 38 39 const char *p = url + 6; 40 const char *colon = strchr(p, ':'); 41 if (!colon) return -1; 42 43 size_t host_len_calc = colon - p; 44 if (host_len_calc >= host_len) host_len_calc = host_len - 1; 45 strncpy(host, p, host_len_calc); 46 host[host_len_calc] = '\0'; 47 48 *port = atoi(colon + 1); 49 if (*port <= 0) return -1; 50 51 return 0; 52 } 53 54 cluster_node_t *cluster_node_create(const char *name, const char *url, int weight, const char *user, const char *secret) { 55 cluster_node_t *node = calloc(1, sizeof(*node)); 56 if (!node) return NULL; 57 58 node->name = strdup(name); 59 node->url = strdup(url); 60 node->weight = weight > 0 ? weight : 1; 61 node->user = user ? strdup(user) : NULL; 62 node->secret = secret ? strdup(secret) : NULL; 63 node->fd = -1; 64 node->session_count = 0; 65 node->available = false; 66 node->consecutive_failures = 0; 67 node->last_check = 0; 68 node->next = NULL; 69 70 char host[MAX_HOST_LEN]; 71 int port; 72 char unix_path[MAX_PATH_LEN]; 73 if (parse_url(url, host, sizeof(host), &port, unix_path, sizeof(unix_path)) < 0) { 74 log_error("cluster: failed to parse URL %s", url); 75 cluster_node_free(node); 76 return NULL; 77 } 78 79 return node; 80 } 81 82 void cluster_node_free(cluster_node_t *node) { 83 if (!node) return; 84 cluster_node_disconnect(node); 85 free(node->name); 86 free(node->url); 87 free(node->user); 88 free(node->secret); 89 free(node); 90 } 91 92 int cluster_node_connect(cluster_node_t *node) { 93 if (!node) return -1; 94 95 cluster_node_disconnect(node); 96 97 char host[MAX_HOST_LEN]; 98 int port; 99 char unix_path[MAX_PATH_LEN]; 100 int url_type = parse_url(node->url, host, sizeof(host), &port, unix_path, sizeof(unix_path)); 101 if (url_type < 0) { 102 return -1; 103 } 104 105 int fd; 106 if (url_type == 1) { 107 fd = socket(AF_UNIX, SOCK_STREAM, 0); 108 if (fd < 0) return -1; 109 110 struct sockaddr_un addr; 111 memset(&addr, 0, sizeof(addr)); 112 addr.sun_family = AF_UNIX; 113 strncpy(addr.sun_path, unix_path, sizeof(addr.sun_path) - 1); 114 115 int ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); 116 if (ret < 0) { 117 close(fd); 118 return -1; 119 } 120 121 node->fd = fd; 122 return 0; 123 } 124 125 struct sockaddr_in addr; 126 memset(&addr, 0, sizeof(addr)); 127 addr.sin_family = AF_INET; 128 addr.sin_port = htons(port); 129 130 if (inet_pton(AF_INET, host, &addr.sin_addr) <= 0) { 131 struct hostent *he = gethostbyname(host); 132 if (!he) return -1; 133 memcpy(&addr.sin_addr, he->h_addr_list[0], he->h_length); 134 } 135 136 fd = socket(AF_INET, SOCK_STREAM, 0); 137 if (fd < 0) return -1; 138 139 int flags = fcntl(fd, F_GETFL, 0); 140 fcntl(fd, F_SETFL, flags | O_NONBLOCK); 141 142 int ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr)); 143 if (ret < 0 && errno != EINPROGRESS) { 144 close(fd); 145 return -1; 146 } 147 148 node->fd = fd; 149 return 0; 150 } 151 152 void cluster_node_disconnect(cluster_node_t *node) { 153 if (!node || node->fd < 0) return; 154 close(node->fd); 155 node->fd = -1; 156 } 157 158 int cluster_node_ping(cluster_node_t *node) { 159 if (!node || node->fd < 0) { 160 if (node && cluster_node_connect(node) != 0) { 161 return -1; 162 } 163 } 164 165 resp_object *resp = cluster_node_send_command(node, "ping"); 166 if (!resp) return -1; 167 168 int ok = (resp->type == RESPT_SIMPLE && resp->u.s && strcmp(resp->u.s, "PONG") == 0); 169 resp_free(resp); 170 171 return ok ? 0 : -1; 172 } 173 174 int cluster_node_get_session_count(cluster_node_t *node) { 175 if (!node || node->fd < 0) return -1; 176 177 resp_object *resp = cluster_node_send_command(node, "session.count"); 178 if (!resp) return -1; 179 180 int count = -1; 181 if (resp->type == RESPT_INT) { 182 count = (int)resp->u.i; 183 } 184 resp_free(resp); 185 186 return count; 187 } 188 189 resp_object *cluster_node_send_command(cluster_node_t *node, const char *cmd, ...) { 190 if (!node || node->fd < 0 || !cmd) return NULL; 191 192 char *buf = NULL; 193 size_t len = 0; 194 195 va_list args; 196 va_start(args, cmd); 197 198 int argc = 1; 199 const char *arg = cmd; 200 while (va_arg(args, const char *) != NULL) { 201 argc++; 202 } 203 va_end(args); 204 205 resp_object **argv = malloc(sizeof(resp_object *) * argc); 206 if (!argv) return NULL; 207 208 argv[0] = resp_array_init(); 209 resp_array_append_bulk(argv[0], cmd); 210 211 va_start(args, cmd); 212 int i = 1; 213 while ((arg = va_arg(args, const char *)) != NULL) { 214 argv[i] = resp_array_init(); 215 resp_array_append_bulk(argv[i], arg); 216 i++; 217 } 218 va_end(args); 219 220 resp_object *cmd_arr = resp_array_init(); 221 for (i = 0; i < argc; i++) { 222 resp_array_append_obj(cmd_arr, argv[i]); 223 } 224 225 resp_encode_array(argc, (const resp_object *const *)argv, &buf, &len); 226 227 for (i = 0; i < argc; i++) { 228 resp_free(argv[i]); 229 } 230 free(argv); 231 resp_free(cmd_arr); 232 233 if (!buf) return NULL; 234 235 ssize_t written = send(node->fd, buf, len, 0); 236 free(buf); 237 238 if (written != (ssize_t)len) { 239 return NULL; 240 } 241 242 return resp_read(node->fd); 243 } 244 245 void cluster_node_set_available(cluster_node_t *node, bool available) { 246 if (!node) return; 247 node->available = available; 248 if (!available) { 249 node->consecutive_failures++; 250 } else { 251 node->consecutive_failures = 0; 252 } 253 } 254 255 bool cluster_node_select_weighted_lowest(cluster_node_t *node, cluster_node_t *best) { 256 if (!node || !node->available) return false; 257 258 if (!best) return true; 259 260 double node_ratio = (double)node->session_count / node->weight; 261 double best_ratio = (double)best->session_count / best->weight; 262 263 if (node_ratio < best_ratio) return true; 264 if (node_ratio > best_ratio) return false; 265 266 if (node->weight > best->weight) return true; 267 if (node->weight < best->weight) return false; 268 269 return false; 270 }