node.c (8955B)
1 #include "node.h" 2 3 #include <arpa/inet.h> 4 #include <errno.h> 5 #include <fcntl.h> 6 #include <netdb.h> 7 #include <netinet/in.h> 8 #include <netinet/tcp.h> 9 #include <stdio.h> 10 #include <stdlib.h> 11 #include <string.h> 12 #include <sys/socket.h> 13 #include <sys/un.h> 14 #include <unistd.h> 15 16 #include "common/resp.h" 17 #include "common/socket_util.h" 18 #include "common/url_utils.h" 19 #include "rxi/log.h" 20 21 #define QUERY_TIMEOUT_MS 500 22 #define HEALTHCHECK_INTERVAL_MS 5000 23 24 static int parse_address(const char *address, char **host, int *port, char **unix_path) { 25 if (!address) return -1; 26 27 struct parsed_url *purl = NULL; 28 if (parse_address_url(address, &purl) != 0) { 29 return -1; 30 } 31 32 if (purl->scheme && strcmp(purl->scheme, "unix") == 0) { 33 *unix_path = strdup(purl->path ? purl->path : ""); 34 *host = NULL; 35 *port = 0; 36 parsed_url_free(purl); 37 return 0; 38 } 39 40 if (!purl->host) { 41 log_error("cluster: no host in address '%s'", address); 42 parsed_url_free(purl); 43 return -1; 44 } 45 46 *host = strdup(purl->host); 47 *port = purl->port ? atoi(purl->port) : 0; 48 *unix_path = NULL; 49 50 if (*port <= 0) { 51 log_error("cluster: invalid port in address '%s'", address); 52 free(*host); 53 *host = NULL; 54 parsed_url_free(purl); 55 return -1; 56 } 57 58 parsed_url_free(purl); 59 return 0; 60 } 61 62 int cluster_node_init(cluster_node_t *node, const char *name, const char *address, const char *username, 63 const char *password) { 64 memset(node, 0, sizeof(*node)); 65 66 node->name = strdup(name); 67 node->address = address ? strdup(address) : NULL; 68 node->username = username ? strdup(username) : NULL; 69 node->password = password ? strdup(password) : NULL; 70 71 if (!node->address) { 72 log_error("cluster: node '%s' has no address configured", name); 73 return -1; 74 } 75 76 if (parse_address(node->address, &node->host, &node->port, &node->unix_path) != 0) { 77 return -1; 78 } 79 80 node->fd = -1; 81 node->available = 0; 82 node->last_ping = 0; 83 node->last_check = 0; 84 node->weight = 1; 85 86 return 0; 87 } 88 89 void cluster_node_free(cluster_node_t *node) { 90 if (!node) return; 91 cluster_node_disconnect(node); 92 free(node->name); 93 free(node->address); 94 free(node->host); 95 free(node->unix_path); 96 free(node->username); 97 free(node->password); 98 } 99 100 static int connect_tcp(const char *host, int port) { 101 struct addrinfo hints, *res, *res0; 102 int sockfd, error; 103 104 memset(&hints, 0, sizeof(hints)); 105 hints.ai_family = AF_UNSPEC; 106 hints.ai_socktype = SOCK_STREAM; 107 hints.ai_protocol = 0; 108 109 char port_str[16]; 110 snprintf(port_str, sizeof(port_str), "%d", port); 111 112 error = getaddrinfo(host, port_str, &hints, &res0); 113 if (error) { 114 log_error("cluster: getaddrinfo(%s:%s): %s", host, port_str, gai_strerror(error)); 115 return -1; 116 } 117 118 sockfd = -1; 119 for (res = res0; res; res = res->ai_next) { 120 sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); 121 if (sockfd < 0) continue; 122 123 if (connect(sockfd, res->ai_addr, res->ai_addrlen) == 0) break; 124 125 close(sockfd); 126 sockfd = -1; 127 } 128 129 freeaddrinfo(res0); 130 131 if (sockfd >= 0) { 132 int flag = 1; 133 setsockopt(sockfd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)); 134 } 135 136 return sockfd; 137 } 138 139 static int connect_unix(const char *path) { 140 struct sockaddr_un addr; 141 int sockfd; 142 143 if (strlen(path) >= sizeof(addr.sun_path)) { 144 log_error("cluster: unix socket path too long: %s", path); 145 return -1; 146 } 147 148 sockfd = socket(AF_UNIX, SOCK_STREAM, 0); 149 if (sockfd < 0) { 150 return -1; 151 } 152 153 memset(&addr, 0, sizeof(addr)); 154 addr.sun_family = AF_UNIX; 155 strncpy(addr.sun_path, path, sizeof(addr.sun_path) - 1); 156 157 if (connect(sockfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { 158 close(sockfd); 159 return -1; 160 } 161 162 return sockfd; 163 } 164 165 int cluster_node_connect(cluster_node_t *node) { 166 if (node->fd >= 0) { 167 return 0; 168 } 169 170 if (node->unix_path) { 171 node->fd = connect_unix(node->unix_path); 172 } else { 173 node->fd = connect_tcp(node->host, node->port); 174 } 175 176 if (node->fd < 0) { 177 log_debug("cluster: failed to connect to node '%s' at %s", node->name, node->address); 178 return -1; 179 } 180 181 log_info("cluster: connected to node '%s' at %s", node->name, node->address); 182 183 if (node->username && node->password) { 184 char *cmd_str = NULL; 185 size_t cmd_len = 0; 186 resp_object *cmd = resp_array_init(); 187 resp_array_append_bulk(cmd, "auth"); 188 resp_array_append_bulk(cmd, node->username); 189 resp_array_append_bulk(cmd, node->password); 190 resp_serialize(cmd, &cmd_str, &cmd_len); 191 resp_free(cmd); 192 193 if (cmd_str) { 194 ssize_t n = send(node->fd, cmd_str, cmd_len, 0); 195 if (n > 0) { 196 resp_object *resp = resp_read(node->fd); 197 if (!resp || resp->type == RESPT_ERROR) { 198 log_warn("cluster: authentication failed for node '%s'", node->name); 199 close(node->fd); 200 node->fd = -1; 201 free(cmd_str); 202 return -1; 203 } 204 resp_free(resp); 205 } 206 free(cmd_str); 207 } 208 } 209 210 return 0; 211 } 212 213 void cluster_node_disconnect(cluster_node_t *node) { 214 if (node->fd >= 0) { 215 close(node->fd); 216 node->fd = -1; 217 } 218 } 219 220 static int send_resp_command(int fd, const char *cmd) { 221 size_t cmd_len = strlen(cmd); 222 ssize_t n = send(fd, cmd, cmd_len, 0); 223 return (n == (ssize_t)cmd_len) ? 0 : -1; 224 } 225 226 int cluster_node_send_command(cluster_node_t *node, const char *cmd, resp_object **out_response) { 227 if (node->fd < 0) { 228 if (cluster_node_connect(node) != 0) { 229 return -1; 230 } 231 } 232 233 if (send_resp_command(node->fd, cmd) != 0) { 234 log_debug("cluster: failed to send command to node '%s'", node->name); 235 cluster_node_disconnect(node); 236 return -1; 237 } 238 239 resp_object *resp = resp_read(node->fd); 240 if (!resp) { 241 log_debug("cluster: no response from node '%s'", node->name); 242 cluster_node_disconnect(node); 243 return -1; 244 } 245 246 *out_response = resp; 247 return 0; 248 } 249 250 int cluster_node_healthcheck_pt(int64_t timestamp, struct pt_task *task) { 251 cluster_node_t *node = task->udata; 252 253 if (!node) { 254 return SCHED_DONE; 255 } 256 257 if (node->last_check > 0 && timestamp - node->last_check < HEALTHCHECK_INTERVAL_MS) { 258 return SCHED_RUNNING; 259 } 260 node->last_check = timestamp; 261 262 if (node->fd < 0) { 263 if (cluster_node_connect(node) != 0) { 264 node->available = 0; 265 return SCHED_RUNNING; 266 } 267 } 268 269 char *cmd_str = NULL; 270 size_t cmd_len = 0; 271 resp_object *cmd = resp_array_init(); 272 resp_array_append_bulk(cmd, "ping"); 273 resp_serialize(cmd, &cmd_str, &cmd_len); 274 resp_free(cmd); 275 276 if (!cmd_str) { 277 node->available = 0; 278 cluster_node_disconnect(node); 279 return SCHED_RUNNING; 280 } 281 282 ssize_t n = send(node->fd, cmd_str, cmd_len, 0); 283 free(cmd_str); 284 285 if (n <= 0) { 286 node->available = 0; 287 cluster_node_disconnect(node); 288 return SCHED_RUNNING; 289 } 290 291 resp_object *resp = resp_read(node->fd); 292 if (!resp || (resp->type != RESPT_SIMPLE && resp->type != RESPT_BULK)) { 293 node->available = 0; 294 if (resp) resp_free(resp); 295 cluster_node_disconnect(node); 296 return SCHED_RUNNING; 297 } 298 299 node->available = 1; 300 node->last_ping = timestamp; 301 log_trace("cluster: node '%s' healthcheck OK", node->name); 302 resp_free(resp); 303 304 return SCHED_RUNNING; 305 } 306 307 cluster_nodes_t *cluster_nodes_create(void) { 308 cluster_nodes_t *cnodes = calloc(1, sizeof(cluster_nodes_t)); 309 return cnodes; 310 } 311 312 void cluster_nodes_free(cluster_nodes_t *cnodes) { 313 if (!cnodes) return; 314 for (size_t i = 0; i < cnodes->nodes_count; i++) { 315 if (cnodes->nodes[i]) { 316 cluster_node_free(cnodes->nodes[i]); 317 free(cnodes->nodes[i]); 318 } 319 } 320 free(cnodes->nodes); 321 free(cnodes); 322 } 323 324 int cluster_nodes_add(cluster_nodes_t *cnodes, cluster_node_t *node) { 325 cluster_node_t **new_nodes = realloc(cnodes->nodes, sizeof(cluster_node_t *) * (cnodes->nodes_count + 1)); 326 if (!new_nodes) return -1; 327 cnodes->nodes = new_nodes; 328 cnodes->nodes[cnodes->nodes_count++] = node; 329 return 0; 330 } 331 332 cluster_node_t *cluster_nodes_get(cluster_nodes_t *cnodes, const char *name) { 333 for (size_t i = 0; i < cnodes->nodes_count; i++) { 334 if (cnodes->nodes[i] && strcmp(cnodes->nodes[i]->name, name) == 0) { 335 return cnodes->nodes[i]; 336 } 337 } 338 return NULL; 339 } 340 341 cluster_node_t **cluster_nodes_get_available(cluster_nodes_t *cnodes, size_t *out_count) { 342 size_t count = 0; 343 for (size_t i = 0; i < cnodes->nodes_count; i++) { 344 if (cnodes->nodes[i] && cnodes->nodes[i]->available) { 345 count++; 346 } 347 } 348 349 if (count == 0) { 350 *out_count = 0; 351 return NULL; 352 } 353 354 cluster_node_t **available = malloc(sizeof(cluster_node_t *) * count); 355 if (!available) { 356 *out_count = 0; 357 return NULL; 358 } 359 360 count = 0; 361 for (size_t i = 0; i < cnodes->nodes_count; i++) { 362 if (cnodes->nodes[i] && cnodes->nodes[i]->available) { 363 available[count++] = cnodes->nodes[i]; 364 } 365 } 366 367 *out_count = count; 368 return available; 369 }