Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add option to seek Kafka transport to timestamp-based offsets #205

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

alistairking
Copy link
Member

This is a prototype feature and is not yet integrated with the normal BGPStream time interval filter.
It must be enabled directly using the -o timestamp-from data interface option like:

bgpreader -d kafka \
  -o brokers=stream.routeviews.org \
  -o topic='^routeviews\.linx\..+\.bmp_raw' \
  -o timestamp-from=1611446400000

Note that the timestamp here is msec since unix epoch. If a topic doesn't have a given timestamp, it will seek to the end.

Previously we ignored the `--enable-debug` configure flag.
These are currently only exposed through the `kafka` data-interface.
@salcock
Copy link
Contributor

salcock commented Jan 24, 2021

Just a heads up that this doesn't build on Ubuntu 16.04:

bs_transport_kafka.c: In function ‘rebalance_cb’:
bs_transport_kafka.c:218:3: error: unknown type name ‘rd_kafka_error_t’
   rd_kafka_error_t *error = NULL;
   ^
In file included from bs_transport_kafka.c:29:0:
bs_transport_kafka.c:266:19: warning: implicit declaration of function ‘rd_kafka_error_string’ [-Wimplicit-function-declaration]
                   rd_kafka_error_string(error));
                   ^
../../lib/bgpstream_log.h:50:55: note: in definition of macro ‘bgpstream_log’
       bgpstream_log_func((level), __FILE__, __LINE__, __VA_ARGS__);            
                                                       ^
bs_transport_kafka.c:265:38: warning: format ‘%s’ expects argument of type ‘char *’, but argument 5 has type ‘int’ [-Wformat=]
     bgpstream_log(BGPSTREAM_LOG_ERR, "kafka: incremental assign failure: %s",
                                      ^
../../lib/bgpstream_log.h:50:55: note: in definition of macro ‘bgpstream_log’
       bgpstream_log_func((level), __FILE__, __LINE__, __VA_ARGS__);            
                                                       ^
bs_transport_kafka.c:267:5: warning: implicit declaration of function ‘rd_kafka_error_destroy’ [-Wimplicit-function-declaration]
     rd_kafka_error_destroy(error);

Guess I could just upgrade...

@alistairking
Copy link
Member Author

@salcock somehow i missed your message.
yeah, i think we really need to upgrade the version of librdkafka we require/backport

@digizeph
Copy link
Contributor

digizeph commented Apr 6, 2021

@alistairking I resolved the merge conflict for this pull request. Could you take a look at d634a30 and see if you see any issue?

@digizeph
Copy link
Contributor

digizeph commented Apr 6, 2021

This does not build on Ubuntu 20.04. Same error message show commented by @salcock (rd_kafka_error_t). librdkafka version shown as below:

image

image

@digizeph
Copy link
Contributor

digizeph commented Apr 6, 2021

The rd_kafka_error_t was introduced in this commit: confluentinc/librdkafka@13b9e92 and is part of the v1.6.1 release (https://github.com/edenhill/librdkafka/releases/tag/v1.6.1) a couple months ago.

To support release this new, we might need to ask users (Ubuntu/Debian) to add their apt repository and pull the deb from there (https://docs.confluent.io/current/installation/installing_cp/deb-ubuntu.html#get-the-software).

Another option will be add a submodule to libbgpstream and build librdkafka from source as part of the build process.

@digizeph
Copy link
Contributor

digizeph commented Apr 6, 2021

I was able to build this with the updated librdkafka (built from source). However:

To support release this new, we might need to ask users (Ubuntu/Debian) to add their apt repository and pull the deb from there (https://docs.confluent.io/current/installation/installing_cp/deb-ubuntu.html#get-the-software).

Another option will be add a submodule to libbgpstream and build librdkafka from source as part of the build process.

I realized that this might not be a good idea since users may not be able to upgrade their local librdkafka version for various reasons.

@alistairking
Copy link
Member Author

Right, there's a couple things here.

  1. In this particular case we don't need the latest rdkafka. I can just rewrite this code to use the older error API. But,
  2. We do need to use the latest rdkafka for other reasons (rebalance strategies for example).

I think the idea of a submodule is a good one. We may be able to get it to build a static lib and then link against that. Maybe let's create a new issue to track that work and I'll try and find the time to fix this code to work on older versions of rdkafka?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants