diff --git a/.github/workflows/ci-e2e-kafka.yml b/.github/workflows/ci-e2e-kafka.yml index 9e4ee1262ec..9877511a236 100644 --- a/.github/workflows/ci-e2e-kafka.yml +++ b/.github/workflows/ci-e2e-kafka.yml @@ -36,7 +36,7 @@ jobs: - name: Run Kafka integration tests id: test-execution - run: bash scripts/kafka-integration-test.sh -k ${{ matrix.jaeger-version }} + run: bash scripts/kafka-integration-test.sh -j ${{ matrix.jaeger-version }} - name: Output Kafka logs on failure run: docker compose -f ${{ steps.test-execution.outputs.docker_compose_file }} logs @@ -46,4 +46,4 @@ jobs: uses: ./.github/actions/upload-codecov with: files: cover.out - flags: kafka-${{ matrix.jaeger-version }} \ No newline at end of file + flags: kafka-${{ matrix.jaeger-version }} diff --git a/cmd/jaeger/collector-with-kafka.yaml b/cmd/jaeger/collector-with-kafka.yaml index fb16234bc8c..a248357b978 100644 --- a/cmd/jaeger/collector-with-kafka.yaml +++ b/cmd/jaeger/collector-with-kafka.yaml @@ -4,7 +4,12 @@ service: receivers: [otlp, jaeger] processors: [batch] exporters: [kafka] - + telemetry: + resource: + service.name: jaeger_collector + metrics: + level: detailed + receivers: otlp: protocols: diff --git a/cmd/jaeger/internal/integration/e2e_integration.go b/cmd/jaeger/internal/integration/e2e_integration.go index 308c7bdf68c..ea7459c268f 100644 --- a/cmd/jaeger/internal/integration/e2e_integration.go +++ b/cmd/jaeger/internal/integration/e2e_integration.go @@ -37,9 +37,11 @@ const otlpPort = 4317 // - At last, clean up anything declared in its own test functions. // (e.g. close remote-storage) type E2EStorageIntegration struct { - SkipStorageCleaner bool integration.StorageIntegration - ConfigFile string + + SkipStorageCleaner bool + ConfigFile string + HealthCheckEndpoint string } // e2eInitialize starts the Jaeger-v2 collector with the provided config file, @@ -52,11 +54,10 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { configFile = createStorageCleanerConfig(t, s.ConfigFile, storage) } - // Ensure the configuration file exists - absConfigFile, err := filepath.Abs(configFile) + configFile, err := filepath.Abs(configFile) require.NoError(t, err, "Failed to get absolute path of the config file") - require.FileExists(t, absConfigFile, "Config file does not exist at the resolved path") - + require.FileExists(t, configFile, "Config file does not exist at the resolved path") + t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile) outFile, err := os.OpenFile( @@ -87,12 +88,17 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { } t.Logf("Running command: %v", cmd.Args) require.NoError(t, cmd.Start()) + + // Wait for the binary to start and become ready to serve requests. + healthCheckEndpoint := s.HealthCheckEndpoint + if healthCheckEndpoint == "" { + healthCheckEndpoint = fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP) + } require.Eventually(t, func() bool { - url := fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP) - t.Logf("Checking if Jaeger-v2 is available on %s", url) + t.Logf("Checking if Jaeger-v2 is available on %s", healthCheckEndpoint) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) defer cancel() - req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil) + req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthCheckEndpoint, nil) if err != nil { t.Logf("HTTP request creation failed: %v", err) return false @@ -104,7 +110,7 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) { } defer resp.Body.Close() return resp.StatusCode == http.StatusOK - }, 60*time.Second, 1*time.Second, "Jaeger-v2 did not start") + }, 60*time.Second, 3*time.Second, "Jaeger-v2 did not start") t.Log("Jaeger-v2 is ready") t.Cleanup(func() { if err := cmd.Process.Kill(); err != nil { diff --git a/cmd/jaeger/internal/integration/kafka_test.go b/cmd/jaeger/internal/integration/kafka_test.go index 099bddba3e1..e3c25457c1e 100644 --- a/cmd/jaeger/internal/integration/kafka_test.go +++ b/cmd/jaeger/internal/integration/kafka_test.go @@ -12,12 +12,16 @@ import ( func TestKafkaStorage(t *testing.T) { integration.SkipUnlessEnv(t, "kafka") + // TODO these config files use topic: "jaeger-spans", + // but for integration tests we want to use random topic in each run. + collectorConfig := "../../collector-with-kafka.yaml" ingesterConfig := "../../ingester-remote-storage.yaml" collector := &E2EStorageIntegration{ - SkipStorageCleaner: true, - ConfigFile: collectorConfig, + SkipStorageCleaner: true, + ConfigFile: collectorConfig, + HealthCheckEndpoint: "http://localhost:8888/metrics", } // Initialize and start the collector diff --git a/scripts/kafka-integration-test.sh b/scripts/kafka-integration-test.sh index 03be9d70aee..eed185727a7 100755 --- a/scripts/kafka-integration-test.sh +++ b/scripts/kafka-integration-test.sh @@ -1,26 +1,43 @@ #!/bin/bash -set -e +set -euf -o pipefail -export STORAGE=kafka compose_file="docker-compose/kafka-integration-test/docker-compose.yml" +echo "docker_compose_file=${compose_file}" >> "${GITHUB_OUTPUT:-/dev/null}" -usage() { - echo $"Usage: $0 -k " +jaeger_version="" +manage_kafka="true" + +print_help() { + echo "Usage: $0 [-K] -j " + echo " -K: do not start or stop Kafka container (useful for local testing)" + echo " -j: major version of Jaeger to test (v1|v2)" exit 1 } -check_args() { - if [ $# -ne 2 ]; then - echo "ERROR: need exactly two arguments, -k and " - usage +parse_args() { + while getopts "j:Kh" opt; do + case "${opt}" in + j) + jaeger_version=${OPTARG} + ;; + K) + manage_kafka="false" + ;; + *) + print_help + ;; + esac + done + if [ "$jaeger_version" != "v1" ] && [ "$jaeger_version" != "v2" ]; then + echo "Error: Invalid Jaeger version. Valid options are v1 or v2" + print_help fi } setup_kafka() { echo "Starting Kafka using Docker Compose..." docker compose -f "${compose_file}" up -d kafka - echo "docker_compose_file=${compose_file}" >> "${GITHUB_OUTPUT:-/dev/null}" } teardown_kafka() { @@ -43,43 +60,41 @@ wait_for_kafka() { while [ $SECONDS -lt $end_time ]; do if is_kafka_ready; then - break + return fi echo "Kafka broker not ready, waiting ${interval} seconds" sleep $interval done - if ! is_kafka_ready; then - echo "Timed out waiting for Kafka to start" - exit 1 - fi + echo "Timed out waiting for Kafka to start" + exit 1 } run_integration_test() { - local version=$1 - - if [ "${version}" = "v1" ]; then - STORAGE=kafka make storage-integration-test - elif [ "${version}" = "v2" ]; then - STORAGE=kafka make jaeger-v2-storage-integration-test + export STORAGE=kafka + if [ "${jaeger_version}" = "v1" ]; then + make storage-integration-test + elif [ "${jaeger_version}" = "v2" ]; then + make jaeger-v2-storage-integration-test else - echo "Unknown test version ${version}. Valid options are v1 or v2" - exit 1 + echo "Unknown Jaeger version ${jaeger_version}." + print_help fi } main() { - check_args "$@" + parse_args "$@" echo "Executing Kafka integration test for version $2" + set -x - if [ "$1" == "-k" ]; then + if [[ "$manage_kafka" == "true" ]]; then setup_kafka trap 'teardown_kafka' EXIT - wait_for_kafka fi + wait_for_kafka - run_integration_test "$2" + run_integration_test } main "$@"