diff --git a/lib/bgpstream_log.h b/lib/bgpstream_log.h index 65615b35..fd2592c8 100644 --- a/lib/bgpstream_log.h +++ b/lib/bgpstream_log.h @@ -27,6 +27,7 @@ #ifndef _BGPSTREAM_LOG_H #define _BGPSTREAM_LOG_H +#include "config.h" #include #define BGPSTREAM_LOG_ERR 0 @@ -37,7 +38,11 @@ #define BGPSTREAM_LOG_VFINE 50 #define BGPSTREAM_LOG_FINEST 60 +#ifdef DEBUG +#define BGPSTREAM_LOG_LEVEL BGPSTREAM_LOG_FINE +#else #define BGPSTREAM_LOG_LEVEL BGPSTREAM_LOG_INFO +#endif #define bgpstream_log(level, ...) \ do { \ diff --git a/lib/bgpstream_resource.h b/lib/bgpstream_resource.h index 52785921..a856996f 100644 --- a/lib/bgpstream_resource.h +++ b/lib/bgpstream_resource.h @@ -87,6 +87,9 @@ typedef enum bgpstream_resource_attr_type { /** The path toward a local cache */ BGPSTREAM_RESOURCE_ATTR_CACHE_DIR_PATH = 3, + /** Kafka message timestamp to begin from */ + BGPSTREAM_RESOURCE_ATTR_KAFKA_TIMESTAMP_FROM = 4, + /** INTERNAL: The total number of attribute types in use */ _BGPSTREAM_RESOURCE_ATTR_CNT, diff --git a/lib/datainterfaces/bsdi_kafka.c b/lib/datainterfaces/bsdi_kafka.c index 3f79e06f..5f8f12a5 100644 --- a/lib/datainterfaces/bsdi_kafka.c +++ b/lib/datainterfaces/bsdi_kafka.c @@ -30,6 +30,7 @@ #include "config.h" #include "utils.h" #include +#include #include #include #include @@ -60,6 +61,7 @@ enum { OPTION_TOPIC, // stored in kafka_topic res attribute OPTION_CONSUMER_GROUP, // allow multiple BGPStream instances to load-balance OPTION_OFFSET, // begin, end, committed + OPTION_TIMESTAMP_FROM, // msec since epoch (inclusive) OPTION_DATA_TYPE, // OPTION_PROJECT, // OPTION_COLLECTOR, // @@ -95,6 +97,13 @@ static bgpstream_data_interface_option_t options[] = { "offset", // name "initial offset (earliest/latest) (default: " BGPSTREAM_TRANSPORT_KAFKA_DEFAULT_OFFSET ")", }, + /* Timestamp to start from */ + { + BGPSTREAM_DATA_INTERFACE_KAFKA, // interface ID + OPTION_TIMESTAMP_FROM, // internal ID + "timestamp-from", // name + "start from given timestamp (default: unused)", + }, /* Data type */ { BGPSTREAM_DATA_INTERFACE_KAFKA, // interface ID @@ -140,6 +149,9 @@ typedef struct bsdi_kafka_state { // Offset char *offset; + // Seek to message with timestamp + char *timestamp_from; + // explicitly set project name char *project; @@ -198,6 +210,7 @@ int bsdi_kafka_set_option(bsdi_t *di, const char *option_value) { int found = 0; + int32_t i64; switch (option_type->id) { case OPTION_BROKERS: @@ -241,6 +254,21 @@ int bsdi_kafka_set_option(bsdi_t *di, } break; + case OPTION_TIMESTAMP_FROM: + errno = 0; + i64 = strtoll(option_value, NULL, 10); + if (i64 == 0 || errno != 0) { + fprintf(stderr, + "ERROR: Invalid timestamp-from '%s'. Must be msec since epoch\n", + option_value); + return -1; + } + free(STATE->timestamp_from); + if ((STATE->timestamp_from = strdup(option_value)) == NULL) { + return -1; + } + break; + case OPTION_DATA_TYPE: for (bgpstream_resource_format_type_t i = 0; i < ARR_CNT(type_strs); i++) { if (strcmp(option_value, type_strs[i]) == 0) { @@ -294,6 +322,9 @@ void bsdi_kafka_destroy(bsdi_t *di) free(STATE->offset); STATE->offset = NULL; + free(STATE->timestamp_from); + STATE->timestamp_from = NULL; + free(STATE->project); STATE->project = NULL; @@ -343,5 +374,11 @@ int bsdi_kafka_update_resources(bsdi_t *di) return -1; } + if (STATE->timestamp_from != NULL && + bgpstream_resource_set_attr( + res, BGPSTREAM_RESOURCE_ATTR_KAFKA_TIMESTAMP_FROM, STATE->timestamp_from) != 0) { + return -1; + } + return 0; } diff --git a/lib/transports/bs_transport_kafka.c b/lib/transports/bs_transport_kafka.c index 09c04dde..b03b809f 100644 --- a/lib/transports/bs_transport_kafka.c +++ b/lib/transports/bs_transport_kafka.c @@ -37,6 +37,7 @@ #define STATE ((state_t *)(transport->state)) #define POLL_TIMEOUT_MSEC 500 +#define SEEK_TIMEOUT_MSEC (10 * 1000) typedef struct state { @@ -44,6 +45,7 @@ typedef struct state { char *topic; char *group; char *offset; + int64_t timestamp_from; // rdkafka instance rd_kafka_t *rk; @@ -57,12 +59,17 @@ typedef struct state { // has a fatal error occured? int fatal_error; + // have we already performed an initial rebalance + // (used when seeking to timestamp) + int rebalance_done; + } state_t; static int parse_attrs(bgpstream_transport_t *transport) { char buf[1024]; - uint64_t ts; + uint64_t u64; + const char *tmpstr; // Topic Name (required) if (bgpstream_resource_get_attr( @@ -81,9 +88,9 @@ static int parse_attrs(bgpstream_transport_t *transport) if (bgpstream_resource_get_attr( transport->res, BGPSTREAM_RESOURCE_ATTR_KAFKA_CONSUMER_GROUP) == NULL) { // generate a "random" group ID - ts = epoch_msec(); - srand(ts); - snprintf(buf, sizeof(buf), "bgpstream-%" PRIx64 "-%x", ts, rand()); + u64 = epoch_msec(); + srand(u64); + snprintf(buf, sizeof(buf), "bgpstream-%" PRIx64 "-%x", u64, rand()); if ((STATE->group = strdup(buf)) == NULL) { return -1; } @@ -112,10 +119,18 @@ static int parse_attrs(bgpstream_transport_t *transport) } } + // Timestamp-from (optional) + if ((tmpstr = bgpstream_resource_get_attr( + transport->res, BGPSTREAM_RESOURCE_ATTR_KAFKA_TIMESTAMP_FROM)) != NULL) { + STATE->timestamp_from = strtoll(tmpstr, NULL, 10); + } + bgpstream_log( BGPSTREAM_LOG_FINE, - "Kafka transport: brokers: '%s', topic: '%s', group: '%s', offset: %s", - transport->res->url, STATE->topic, STATE->group, STATE->offset); + "Kafka transport: brokers: '%s', topic: '%s', group: '%s', offset: %s, " + "timestamp-from: %"PRIi64, + transport->res->url, STATE->topic, STATE->group, STATE->offset, + STATE->timestamp_from); return 0; } @@ -145,6 +160,118 @@ static void kafka_error_callback(rd_kafka_t *rk, int err, const char *reason, rd_kafka_err2str(err), err, reason); } +#ifdef DEBUG +static void log_partition_list (const rd_kafka_topic_partition_list_t *partitions) +{ + int i; + for (i = 0; i < partitions->cnt; i++) { + bgpstream_log(BGPSTREAM_LOG_FINE, " - %s [%" PRId32 "] offset %" PRId64, + partitions->elems[i].topic, + partitions->elems[i].partition, partitions->elems[i].offset); + } +} +#endif + +static void seek_timestamp_offset(bgpstream_transport_t *transport, + rd_kafka_topic_partition_list_t *partitions) +{ +#ifdef DEBUG + bgpstream_log(BGPSTREAM_LOG_FINE, "Before seeking offsets to timestamps:"); + log_partition_list(partitions); +#endif + // first, set all the offsets to our timestamp_from value + for (int i = 0; i < partitions->cnt; i++) { + partitions->elems[i].offset = STATE->timestamp_from; + } + + // now ask for those to be replaced with the appropriate offset + rd_kafka_resp_err_t ret_err = + rd_kafka_offsets_for_times(STATE->rk, partitions, SEEK_TIMEOUT_MSEC); + + switch (ret_err) { + case RD_KAFKA_RESP_ERR_NO_ERROR: + // all good + break; + + case RD_KAFKA_RESP_ERR__TIMED_OUT: + case RD_KAFKA_RESP_ERR__INVALID_ARG: + case RD_KAFKA_RESP_ERR__UNKNOWN_PARTITION: + case RD_KAFKA_RESP_ERR_LEADER_NOT_AVAILABLE: + default: + // well, at least we tried + bgpstream_log(BGPSTREAM_LOG_WARN, + "Failed to seek some topics to initial timestamp: %s", + rd_kafka_err2str(ret_err)); + break; + } + +#ifdef DEBUG + bgpstream_log(BGPSTREAM_LOG_FINE, "After seeking offsets to timestamps:"); + log_partition_list(partitions); +#endif +} + +static void rebalance_cb(rd_kafka_t *rk, rd_kafka_resp_err_t err, + rd_kafka_topic_partition_list_t *partitions, + void *opaque) +{ + bgpstream_transport_t *transport = (bgpstream_transport_t*)opaque; + rd_kafka_error_t *error = NULL; + rd_kafka_resp_err_t ret_err = RD_KAFKA_RESP_ERR_NO_ERROR; + + // TODO: only seek to start time once per topic + bgpstream_log(BGPSTREAM_LOG_FINE, "Consumer group rebalanced, assigning offsets "); + + switch (err) { + case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS: +#ifdef DEBUG + bgpstream_log(BGPSTREAM_LOG_FINE, "kafka: assigned (%s):", "TODO"); + // rd_kafka_rebalance_protocol(rk)); + log_partition_list(partitions); +#endif + if (STATE->rebalance_done == 0) { + seek_timestamp_offset(transport, partitions); + } + STATE->rebalance_done = 1; + // XXX TODO: fix this for new (as yet unreleased) librdkafka API!! + //if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) + // error = rd_kafka_incremental_assign(rk, partitions); + //else + ret_err = rd_kafka_assign(rk, partitions); + break; + + case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS: +#ifdef DEBUG + bgpstream_log(BGPSTREAM_LOG_FINE, "kafka: revoked (%s):", "TODO"); + // rd_kafka_rebalance_protocol(rk)); + log_partition_list(partitions); +#endif + + // XXX TODO: fix this for new (as yet unreleased) librdkafka API!! + //if (!strcmp(rd_kafka_rebalance_protocol(rk), "COOPERATIVE")) { + // error = rd_kafka_incremental_unassign(rk, partitions); + //} else { + ret_err = rd_kafka_assign(rk, NULL); + //} + break; + + default: + bgpstream_log(BGPSTREAM_LOG_ERR, "kafka: failed: %s", + rd_kafka_err2str(err)); + rd_kafka_assign(rk, NULL); + break; + } + + if (error != NULL) { + bgpstream_log(BGPSTREAM_LOG_ERR, "kafka: incremental assign failure: %s", + rd_kafka_error_string(error)); + rd_kafka_error_destroy(error); + } else if (ret_err) { + bgpstream_log(BGPSTREAM_LOG_ERR, "kafka: assign failure: %s", + rd_kafka_err2str(ret_err)); + } +} + static int init_kafka_config(bgpstream_transport_t *transport, rd_kafka_conf_t *conf) { @@ -179,6 +306,11 @@ static int init_kafka_config(bgpstream_transport_t *transport, return -1; } + // Set up a rebalance callback if we're going to seek to specific offsets + if (STATE->timestamp_from != 0) { + rd_kafka_conf_set_rebalance_cb(conf, rebalance_cb); + } + // Enable SO_KEEPALIVE in case we're behind a NAT if (rd_kafka_conf_set(conf, "socket.keepalive.enable", "true", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK) { diff --git a/tools/bgpreader.c b/tools/bgpreader.c index c6a66da6..3a2b9895 100644 --- a/tools/bgpreader.c +++ b/tools/bgpreader.c @@ -283,7 +283,7 @@ static void dump_if_options() fprintf(stderr, " [NONE]\n"); } else { for (i = 0; i < opt_cnt; i++) { - fprintf(stderr, " %-*s", 15, options[i].name); + fprintf(stderr, " %-*s", 16, options[i].name); wrap(options[i].description, 18, 18); fprintf(stderr, "\n"); }