Skip to content

Commit

Permalink
relay: implement 'file' cluster type, issue #78
Browse files Browse the repository at this point in the history
Allow writing to a file as cluster target.  The ip option is
non-functional and will be removed in the next commit.
  • Loading branch information
grobian committed Jul 5, 2015
1 parent 825d019 commit e3c12f9
Show file tree
Hide file tree
Showing 3 changed files with 100 additions and 36 deletions.
2 changes: 1 addition & 1 deletion relay.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

enum rmode { NORMAL, DEBUG, SUBMISSION, TEST };

typedef enum { CON_TCP, CON_UDP, CON_PIPE } serv_ctype;
typedef enum { CON_TCP, CON_UDP, CON_PIPE, CON_FILE } serv_ctype;

extern char relay_hostname[];
extern enum rmode mode;
Expand Down
123 changes: 88 additions & 35 deletions router.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ enum clusttype {
BLACKHOLE, /* /dev/null-like destination */
GROUP, /* pseudo type to create a matching tree */
FORWARD,
FILELOG, /* like forward, write metric to file */
FILELOGIP, /* like forward, write ip metric to file */
CARBON_CH, /* room for a better/different hash definition */
FNV1A_CH, /* FNV1a-based consistent-hash */
ANYOF, /* FNV1a-based hash, but with backup by others */
Expand Down Expand Up @@ -210,6 +212,10 @@ determine_if_regex(route *r, char *pat, int flags)
* (forward | any_of [useall] | failover | (carbon|fnv1a)_ch [replication (count)])
* (ip:port[=instance] [proto (tcp | udp)] ...)
* ;
* cluster (name)
* log [ip]
* (/path/to/file ...)
* ;
* match (* | regex)
* send to (cluster | blackhole)
* [stop]
Expand Down Expand Up @@ -384,6 +390,25 @@ router_readconfig(cluster **clret, route **rret,
}
cl->type = FAILOVER;
cl->members.anyof = NULL;
} else if (strncmp(p, "file", 4) == 0 && isspace(*(p + 4))) {
p += 5;

for (; *p != '\0' && isspace(*p); p++)
;
if ((cl = cl->next = malloc(sizeof(cluster))) == NULL) {
logerr("malloc failed in cluster file\n");
free(buf);
return 0;
}
if (strncmp(p, "ip", 2) == 0 && isspace(*(p + 2))) {
p += 3;
for (; *p != '\0' && isspace(*p); p++)
;
cl->type = FILELOGIP;
} else {
cl->type = FILELOG;
}
cl->members.forward = NULL;
} else {
char *type = p;
for (; *p != '\0' && !isspace(*p); p++)
Expand Down Expand Up @@ -504,36 +529,45 @@ router_readconfig(cluster **clret, route **rret,
}
}

/* resolve host/IP */
memset(&hint, 0, sizeof(hint));
if (cl->type != FILELOG && cl->type != FILELOGIP) {
/* resolve host/IP */
memset(&hint, 0, sizeof(hint));

hint.ai_family = PF_UNSPEC;
hint.ai_socktype = *proto == 'u' ? SOCK_DGRAM : SOCK_STREAM;
hint.ai_protocol = *proto == 'u' ? IPPROTO_UDP : IPPROTO_TCP;
hint.ai_flags = AI_NUMERICSERV;
snprintf(sport, sizeof(sport), "%u", port); /* for default */
hint.ai_family = PF_UNSPEC;
hint.ai_socktype = *proto == 'u' ? SOCK_DGRAM : SOCK_STREAM;
hint.ai_protocol = *proto == 'u' ? IPPROTO_UDP : IPPROTO_TCP;
hint.ai_flags = AI_NUMERICSERV;
snprintf(sport, sizeof(sport), "%u", port); /* for default */

if ((err = getaddrinfo(ip, sport, &hint, &saddrs)) != 0) {
logerr("failed to resolve server %s:%s (%s) "
"for cluster %s: %s\n",
ip, sport, proto, name, gai_strerror(err));
free(cl);
free(buf);
return 0;
}
if ((err = getaddrinfo(ip, sport, &hint, &saddrs)) != 0) {
logerr("failed to resolve server %s:%s (%s) "
"for cluster %s: %s\n",
ip, sport, proto, name, gai_strerror(err));
free(cl);
free(buf);
return 0;
}

if (!useall && saddrs->ai_next != NULL) {
/* take first result only */
freeaddrinfo(saddrs->ai_next);
saddrs->ai_next = NULL;
if (!useall && saddrs->ai_next != NULL) {
/* take first result only */
freeaddrinfo(saddrs->ai_next);
saddrs->ai_next = NULL;
}
} else {
/* TODO: try to create/append to file */

proto = "file";
saddrs = (void *)1;
}

walk = saddrs;
while (walk != NULL) {
/* disconnect from the rest to avoid double
* frees by freeaddrinfo() in server_destroy() */
next = walk->ai_next;
walk->ai_next = NULL;
if (walk != (void *)1) {
next = walk->ai_next;
walk->ai_next = NULL;
}

if (useall) {
/* unfold whatever we resolved, for human
Expand All @@ -557,7 +591,9 @@ router_readconfig(cluster **clret, route **rret,
}

newserver = server_new(ip, (unsigned short)port,
*proto == 'u' ? CON_UDP : CON_TCP, walk,
*proto == 'f' ? CON_FILE :
*proto == 'u' ? CON_UDP : CON_TCP,
walk == (void *)1 ? NULL : walk,
queuesize, batchsize);
if (newserver == NULL) {
logerr("failed to add server %s:%d (%s) "
Expand Down Expand Up @@ -599,7 +635,9 @@ router_readconfig(cluster **clret, route **rret,
}
} else if (cl->type == FORWARD ||
cl->type == ANYOF ||
cl->type == FAILOVER)
cl->type == FAILOVER ||
cl->type == FILELOG ||
cl->type == FILELOGIP)
{
if (w == NULL) {
w = malloc(sizeof(servers));
Expand All @@ -609,26 +647,30 @@ router_readconfig(cluster **clret, route **rret,
if (w == NULL) {
logerr("malloc failed for %s %s\n",
cl->type == FORWARD ? "forward" :
cl->type == ANYOF ? "any_of" : "failover",
cl->type == ANYOF ? "any_of" :
cl->type == FAILOVER ? "failover" :
"file",
ip);
free(cl);
free(buf);
return 0;
}
w->next = NULL;
w->server = newserver;
if (cl->type == FORWARD && cl->members.forward == NULL)
if ((cl->type == FORWARD ||
cl->type == FILELOG || cl->type == FILELOGIP)
&& cl->members.forward == NULL)
cl->members.forward = w;
if ((cl->type == ANYOF || cl->type == FAILOVER) &&
cl->members.anyof == NULL)
{
cl->members.anyof = malloc(sizeof(serverlist));
cl->members.anyof->count = 0;
cl->members.anyof->servers = NULL;
cl->members.anyof->list = w;
if (cl->type == ANYOF || cl->type == FAILOVER) {
if (cl->members.anyof == NULL) {
cl->members.anyof = malloc(sizeof(serverlist));
cl->members.anyof->count = 0;
cl->members.anyof->servers = NULL;
cl->members.anyof->list = w;
} else {
cl->members.anyof->count++;
}
}
if (cl->type == ANYOF || cl->type == FAILOVER)
cl->members.anyof->count++;
}

walk = next;
Expand Down Expand Up @@ -1388,7 +1430,9 @@ router_getservers(cluster *clusters)
for (c = clusters; c != NULL; c = c->next) {
if (c->type == BLACKHOLE || c->type == REWRITE)
continue;
if (c->type == FORWARD) {
if (c->type == FORWARD ||
c->type == FILELOG || c->type == FILELOGIP)
{
for (s = c->members.forward; s != NULL; s = s->next)
add_server(s->server);
} else if (c->type == ANYOF || c->type == FAILOVER) {
Expand Down Expand Up @@ -1428,6 +1472,11 @@ router_printconfig(FILE *f, char all, cluster *clusters, route *routes)
for (s = c->members.forward; s != NULL; s = s->next)
fprintf(f, " %s:%d%s\n",
server_ip(s->server), server_port(s->server), PPROTO);
} else if (c->type == FILELOG || c->type == FILELOGIP) {
fprintf(f, " file%s\n", c->type == FILELOGIP ? " ip" : "");
for (s = c->members.forward; s != NULL; s = s->next)
fprintf(f, " %s\n",
server_ip(s->server));
} else if (c->type == ANYOF || c->type == FAILOVER) {
fprintf(f, " %s\n", c->type == ANYOF ? "any_of" : "failover");
for (s = c->members.anyof->list; s != NULL; s = s->next)
Expand Down Expand Up @@ -1540,6 +1589,8 @@ router_free(cluster *clusters, route *routes)
free(clusters->members.ch);
break;
case FORWARD:
case FILELOG:
case FILELOGIP:
case BLACKHOLE:
while (clusters->members.forward) {
server_shutdown(clusters->members.forward->server);
Expand Down Expand Up @@ -1801,6 +1852,8 @@ router_route_intern(
case BLACKHOLE: {
/* maybe just record we're dropping this metric? */
} break;
case FILELOGIP:
case FILELOG:
case FORWARD: {
/* simple case, no logic necessary */
servers *s;
Expand Down
11 changes: 11 additions & 0 deletions server.c
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,17 @@ server_queuereader(void *d)
self->failure += self->failure >= FAIL_WAIT_TIME ? 0 : 1;
continue;
}
} else if (self->ctype == CON_FILE) {
if ((self->fd = open(self->ip,
O_WRONLY | O_APPEND | O_CREAT,
S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) < 0)
{
if (!self->failure)
logerr("failed to open file '%s': %s\n",
self->ip, strerror(errno));
self->failure += self->failure >= FAIL_WAIT_TIME ? 0 : 1;
continue;
}
} else {
int ret;
int args;
Expand Down

0 comments on commit e3c12f9

Please sign in to comment.