Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Shkuro <[email protected]>
  • Loading branch information
yurishkuro committed Jul 28, 2024
1 parent 6163bb9 commit 30ceb02
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 41 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci-e2e-kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ jobs:

- name: Run Kafka integration tests
id: test-execution
run: bash scripts/kafka-integration-test.sh -k ${{ matrix.jaeger-version }}
run: bash scripts/kafka-integration-test.sh -j ${{ matrix.jaeger-version }}

- name: Output Kafka logs on failure
run: docker compose -f ${{ steps.test-execution.outputs.docker_compose_file }} logs
Expand All @@ -46,4 +46,4 @@ jobs:
uses: ./.github/actions/upload-codecov
with:
files: cover.out
flags: kafka-${{ matrix.jaeger-version }}
flags: kafka-${{ matrix.jaeger-version }}
7 changes: 6 additions & 1 deletion cmd/jaeger/collector-with-kafka.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,12 @@ service:
receivers: [otlp, jaeger]
processors: [batch]
exporters: [kafka]

telemetry:
resource:
service.name: jaeger_collector
metrics:
level: detailed

receivers:
otlp:
protocols:
Expand Down
26 changes: 16 additions & 10 deletions cmd/jaeger/internal/integration/e2e_integration.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,11 @@ const otlpPort = 4317
// - At last, clean up anything declared in its own test functions.
// (e.g. close remote-storage)
type E2EStorageIntegration struct {
SkipStorageCleaner bool
integration.StorageIntegration
ConfigFile string

SkipStorageCleaner bool
ConfigFile string
HealthCheckEndpoint string
}

// e2eInitialize starts the Jaeger-v2 collector with the provided config file,
Expand All @@ -52,11 +54,10 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
configFile = createStorageCleanerConfig(t, s.ConfigFile, storage)
}

// Ensure the configuration file exists
absConfigFile, err := filepath.Abs(configFile)
configFile, err := filepath.Abs(configFile)
require.NoError(t, err, "Failed to get absolute path of the config file")
require.FileExists(t, absConfigFile, "Config file does not exist at the resolved path")
require.FileExists(t, configFile, "Config file does not exist at the resolved path")

t.Logf("Starting Jaeger-v2 in the background with config file %s", configFile)

outFile, err := os.OpenFile(
Expand Down Expand Up @@ -87,12 +88,17 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
}
t.Logf("Running command: %v", cmd.Args)
require.NoError(t, cmd.Start())

// Wait for the binary to start and become ready to serve requests.
healthCheckEndpoint := s.HealthCheckEndpoint
if healthCheckEndpoint == "" {
healthCheckEndpoint = fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP)
}
require.Eventually(t, func() bool {
url := fmt.Sprintf("http://localhost:%d/", ports.QueryHTTP)
t.Logf("Checking if Jaeger-v2 is available on %s", url)
t.Logf("Checking if Jaeger-v2 is available on %s", healthCheckEndpoint)
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthCheckEndpoint, nil)
if err != nil {
t.Logf("HTTP request creation failed: %v", err)
return false
Expand All @@ -104,7 +110,7 @@ func (s *E2EStorageIntegration) e2eInitialize(t *testing.T, storage string) {
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}, 60*time.Second, 1*time.Second, "Jaeger-v2 did not start")
}, 60*time.Second, 3*time.Second, "Jaeger-v2 did not start")
t.Log("Jaeger-v2 is ready")
t.Cleanup(func() {
if err := cmd.Process.Kill(); err != nil {
Expand Down
8 changes: 6 additions & 2 deletions cmd/jaeger/internal/integration/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@ import (
func TestKafkaStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "kafka")

// TODO these config files use topic: "jaeger-spans",
// but for integration tests we want to use random topic in each run.

collectorConfig := "../../collector-with-kafka.yaml"
ingesterConfig := "../../ingester-remote-storage.yaml"

collector := &E2EStorageIntegration{
SkipStorageCleaner: true,
ConfigFile: collectorConfig,
SkipStorageCleaner: true,
ConfigFile: collectorConfig,
HealthCheckEndpoint: "http://localhost:8888/metrics",
}

// Initialize and start the collector
Expand Down
67 changes: 41 additions & 26 deletions scripts/kafka-integration-test.sh
Original file line number Diff line number Diff line change
@@ -1,26 +1,43 @@
#!/bin/bash

set -e
set -euf -o pipefail

export STORAGE=kafka
compose_file="docker-compose/kafka-integration-test/docker-compose.yml"
echo "docker_compose_file=${compose_file}" >> "${GITHUB_OUTPUT:-/dev/null}"

usage() {
echo $"Usage: $0 -k <jaeger_version>"
jaeger_version=""
manage_kafka="true"

print_help() {
echo "Usage: $0 [-K] -j <jaeger_version>"
echo " -K: do not start or stop Kafka container (useful for local testing)"
echo " -j: major version of Jaeger to test (v1|v2)"
exit 1
}

check_args() {
if [ $# -ne 2 ]; then
echo "ERROR: need exactly two arguments, -k and <jaeger_version>"
usage
parse_args() {
while getopts "j:Kh" opt; do
case "${opt}" in
j)
jaeger_version=${OPTARG}
;;
K)
manage_kafka="false"
;;
*)
print_help
;;
esac
done
if [ "$jaeger_version" != "v1" ] && [ "$jaeger_version" != "v2" ]; then
echo "Error: Invalid Jaeger version. Valid options are v1 or v2"
print_help
fi
}

setup_kafka() {
echo "Starting Kafka using Docker Compose..."
docker compose -f "${compose_file}" up -d kafka
echo "docker_compose_file=${compose_file}" >> "${GITHUB_OUTPUT:-/dev/null}"
}

teardown_kafka() {
Expand All @@ -43,43 +60,41 @@ wait_for_kafka() {

while [ $SECONDS -lt $end_time ]; do
if is_kafka_ready; then
break
return
fi
echo "Kafka broker not ready, waiting ${interval} seconds"
sleep $interval
done

if ! is_kafka_ready; then
echo "Timed out waiting for Kafka to start"
exit 1
fi
echo "Timed out waiting for Kafka to start"
exit 1
}

run_integration_test() {
local version=$1

if [ "${version}" = "v1" ]; then
STORAGE=kafka make storage-integration-test
elif [ "${version}" = "v2" ]; then
STORAGE=kafka make jaeger-v2-storage-integration-test
export STORAGE=kafka
if [ "${jaeger_version}" = "v1" ]; then
make storage-integration-test
elif [ "${jaeger_version}" = "v2" ]; then
make jaeger-v2-storage-integration-test
else
echo "Unknown test version ${version}. Valid options are v1 or v2"
exit 1
echo "Unknown Jaeger version ${jaeger_version}."
print_help
fi
}

main() {
check_args "$@"
parse_args "$@"

echo "Executing Kafka integration test for version $2"
set -x

if [ "$1" == "-k" ]; then
if [[ "$manage_kafka" == "true" ]]; then
setup_kafka
trap 'teardown_kafka' EXIT
wait_for_kafka
fi
wait_for_kafka

run_integration_test "$2"
run_integration_test
}

main "$@"

0 comments on commit 30ceb02

Please sign in to comment.