Skip to content

Commit

Permalink
[FLINK] don't send RUNNING events after COMPLETE (OpenLineage#2075)
Browse files Browse the repository at this point in the history
Signed-off-by: Pawel Leszczynski <[email protected]>
Signed-off-by: Sheeri K. Cabral <[email protected]>
  • Loading branch information
pawel-big-lebowski authored and Sheeri committed Nov 22, 2023
1 parent 861317e commit e95f993
Show file tree
Hide file tree
Showing 25 changed files with 690 additions and 126 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
* **Spark: Improve RDDs on S3 integration. (TODO PR number)** [`#2039`](https://github.com/OpenLineage/OpenLineage/pull/2039) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Prepare integration test to access S3, fix input dataset duplicates and other minor fixes.*

### Fixed
* **Flink: prevent sending `running` events after job completes.** [`#2075`](https://github.com/OpenLineage/OpenLineage/pull/2075) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
*Flink checkpoint tracking thread did not get stopped properly on job complete.*

## [1.1.0](https://github.com/OpenLineage/OpenLineage/compare/1.0.0...1.1.0) - 2023-08-23
### Added
* **Flink: create Openlineage configuration based on Flink configuration** [`#2033`](https://github.com/OpenLineage/OpenLineage/pull/2033) [@pawel-big-lebowski](https://github.com/pawel-big-lebowski)
Expand Down
2 changes: 1 addition & 1 deletion integration/flink/examples/stateful/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ dependencies {
testRuntimeOnly "org.junit.jupiter:junit-jupiter:5.10.0"
testCompileOnly 'org.codehaus.groovy:groovy-all:3.0.19'
testImplementation 'org.spockframework:spock-core:2.3-groovy-4.0'
testImplementation 'org.awaitility:awaitility:4.2.0'
implementation 'org.awaitility:awaitility:4.2.0'

implementation("io.openlineage:openlineage-java:$project.version")
compileOnly "org.apache.flink:flink-java:$flinkVersion"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@

package io.openlineage.flink;

import io.openlineage.flink.FlinkFakeApplication.FakeSink;
import io.openlineage.flink.FlinkFakeApplication.FakeSource;
import io.openlineage.flink.api.DatasetFactory;
import io.openlineage.flink.api.LineageProvider;
import io.openlineage.util.FlinkListenerUtils;
import org.apache.flink.core.execution.JobListener;
import io.openlineage.util.OpenLineageFlinkJobListenerBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand All @@ -24,8 +25,13 @@ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = setupEnv(args);
env.addSource(new FakeSource()).addSink(new FakeSink());

env.registerJobListener(FlinkListenerUtils.instantiate(env));
env.execute("flink-fake-application");
env.registerJobListener(
OpenLineageFlinkJobListenerBuilder
.create()
.executionEnvironment(env)
.jobName("flink-crushing-lineage-job")
.build()
);
}

static class FakeSource implements SourceFunction<Integer> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,8 @@

package io.openlineage.flink;

import io.openlineage.util.FlinkListenerUtils;
import org.apache.flink.api.common.functions.MapFunction;
import io.openlineage.util.OpenLineageFlinkJobListenerBuilder;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
Expand All @@ -32,23 +28,29 @@ public static void main(String[] args) throws Exception {
TableLoader sinkLoader = TableLoader.fromHadoopTable("/tmp/warehouse/db/sink");

DataStream<RowData> stream = FlinkSource.forRowData()
.env(env)
.tableLoader(sourceLoader)
.streaming(true)
.build();
.env(env)
.tableLoader(sourceLoader)
.streaming(true)
.build();

DataStream<RowData> failedTransform = stream.map(
row -> {
throw new RuntimeException("fail");
}
row -> {
throw new RuntimeException("fail");
}
);

FlinkSink.forRowData(failedTransform)
.tableLoader(sinkLoader)
.overwrite(true)
.append();

env.registerJobListener(FlinkListenerUtils.instantiate(env));
env.execute("flink-failed-job");
.tableLoader(sinkLoader)
.overwrite(true)
.append();

env.registerJobListener(
OpenLineageFlinkJobListenerBuilder
.create()
.executionEnvironment(env)
.jobName("flink_failed_job")
.build()
);
env.execute("flink_failed_job");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@

package io.openlineage.flink;

import io.openlineage.util.FlinkListenerUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.streaming.api.CheckpointingMode;
import io.openlineage.util.OpenLineageFlinkJobListenerBuilder;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
Expand All @@ -21,8 +18,13 @@ public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = setupEnv(args);

env.addSource(new FakeSource()).addSink(new FakeSink());

env.registerJobListener(FlinkListenerUtils.instantiate(env));
env.registerJobListener(
OpenLineageFlinkJobListenerBuilder
.create()
.executionEnvironment(env)
.jobName("flink-fake-application")
.build()
);
env.execute("flink-fake-application");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@

package io.openlineage.flink;

import io.openlineage.util.FlinkListenerUtils;
import org.apache.flink.core.execution.JobListener;
import io.openlineage.util.OpenLineageFlinkJobListenerBuilder;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.RowData;
Expand Down Expand Up @@ -35,7 +34,13 @@ public static void main(String[] args) throws Exception {
.overwrite(true)
.append();

env.registerJobListener(FlinkListenerUtils.instantiate(env));
env.execute("flink-examples-iceberg");
env.registerJobListener(
OpenLineageFlinkJobListenerBuilder
.create()
.executionEnvironment(env)
.jobName("flink_examples_iceberg")
.build()
);
env.execute("flink_examples_iceberg");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,14 @@
package io.openlineage.flink;

import io.openlineage.flink.avro.event.InputEvent;
import io.openlineage.util.FlinkListenerUtils;
import io.openlineage.util.OpenLineageFlinkJobListenerBuilder;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import static io.openlineage.flink.StreamEnvironment.setupEnv;
import static io.openlineage.kafka.KafkaClientProvider.legacyKafkaSink;
import static io.openlineage.kafka.KafkaClientProvider.legacyKafkaSource;
import static org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION;

public class FlinkLegacyKafkaApplication {

Expand All @@ -32,8 +29,14 @@ public static void main(String[] args) throws Exception {
.process(new StatefulCounter()).name("process").uid("process")
.addSink(legacyKafkaSink(parameters.getRequired("output-topic"))).name("kafka-sink").uid("kafka-sink");


env.registerJobListener(FlinkListenerUtils.instantiate(env));
env.execute("flink-legacy-stateful");
String jobName = parameters.get("job-name", "flink_legacy_stateful");
env.registerJobListener(
OpenLineageFlinkJobListenerBuilder
.create()
.executionEnvironment(env)
.jobName(jobName)
.build()
);
env.execute(jobName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,13 @@

import io.openlineage.flink.avro.event.InputEvent;
import io.openlineage.flink.avro.event.OutputEvent;
import io.openlineage.util.FlinkListenerUtils;
import java.util.Collections;
import java.util.Properties;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import io.openlineage.util.OpenLineageFlinkJobListenerBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.formats.avro.AvroDeserializationSchema;
import org.apache.flink.formats.avro.registry.confluent.ConfluentRegistryAvroDeserializationSchema;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

import static io.openlineage.common.config.ConfigWrapper.fromResource;
import static io.openlineage.flink.StreamEnvironment.setupEnv;
Expand Down Expand Up @@ -50,7 +42,14 @@ public static void main(String[] args) throws Exception {
.process(new StatefulCounter()).name("process").uid("process")
.sinkTo(aKafkaSink(parameters.getRequired("output-topic"))).name("kafka-sink").uid("kafka-sink");

env.registerJobListener(FlinkListenerUtils.instantiate(env));
env.execute("flink-source-with-generic-record");
String jobName = parameters.get("job-name", "flink_source_with_generic_record");
env.registerJobListener(
OpenLineageFlinkJobListenerBuilder
.create()
.executionEnvironment(env)
.jobName(jobName)
.build()
);
env.execute(jobName);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
package io.openlineage.flink;

import io.openlineage.flink.avro.event.InputEvent;
import io.openlineage.util.FlinkListenerUtils;
import io.openlineage.util.OpenLineageFlinkJobListenerBuilder;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

Expand All @@ -28,8 +28,14 @@ public static void main(String[] args) throws Exception {
.process(new StatefulCounter()).name("process").uid("process")
.sinkTo(aKafkaSink(parameters.getRequired("output-topic"))).name("kafka-sink").uid("kafka-sink");


env.registerJobListener(FlinkListenerUtils.instantiate(env));
env.execute("flink-examples-stateful");
String jobName = parameters.get("job-name", "flink_examples_stateful");
env.registerJobListener(
OpenLineageFlinkJobListenerBuilder
.create()
.executionEnvironment(env)
.jobName(jobName)
.build()
);
env.execute(jobName);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
/* Copyright 2018-2023 contributors to the OpenLineage project
/* SPDX-License-Identifier: Apache-2.0
*/

package io.openlineage.flink;

import static io.openlineage.flink.StreamEnvironment.setupEnv;
import static io.openlineage.kafka.KafkaClientProvider.aKafkaSink;
import static io.openlineage.kafka.KafkaClientProvider.aKafkaSource;
import static org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.openlineage.flink.avro.event.InputEvent;
import io.openlineage.flink.tracker.restapi.Checkpoints;
import io.openlineage.util.OpenLineageFlinkJobListenerBuilder;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.JobListener;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.hc.client5.http.classic.methods.HttpGet;
import org.apache.hc.client5.http.impl.classic.CloseableHttpClient;
import org.apache.hc.client5.http.impl.classic.CloseableHttpResponse;
import org.apache.hc.client5.http.impl.classic.HttpClients;
import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkStoppableApplication {
private static final String TOPIC_PARAM_SEPARATOR = ",";
private static final Logger LOGGER = LoggerFactory.getLogger(FlinkStoppableApplication.class);

public static void main(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromArgs(args);
StreamExecutionEnvironment env = setupEnv(args);

env.fromSource(aKafkaSource(parameters.getRequired("input-topics").split(TOPIC_PARAM_SEPARATOR)), noWatermarks(), "kafka-source").uid("kafka-source")
.keyBy(InputEvent::getId)
.process(new StatefulCounter()).name("process").uid("process")
.sinkTo(aKafkaSink(parameters.getRequired("output-topic"))).name("kafka-sink").uid("kafka-sink");

env.registerJobListener(
OpenLineageFlinkJobListenerBuilder
.create()
.executionEnvironment(env)
.jobName("flink-stoppable-job")
.jobTrackingInterval(Duration.ofSeconds(1))
.build()
);

JobClient jobClient = env.executeAsync("flink-stoppable-job");

// wait until job is running
Awaitility.await().until(() -> jobClient.getJobStatus().get().equals(JobStatus.RUNNING));

// wait for some checkpoints to be written
CloseableHttpClient httpClient = HttpClients.createDefault();
String checkpointApiUrl =
String.format(
"http://%s:%s/jobs/%s/checkpoints",
env.getConfiguration().get(RestOptions.ADDRESS),
env.getConfiguration().get(RestOptions.PORT),
jobClient.getJobID().toString());
HttpGet request = new HttpGet(checkpointApiUrl);

Awaitility
.await()
.atMost(Duration.ofSeconds(30))
.pollInterval(Duration.ofSeconds(1))
.until(() -> {
CloseableHttpResponse response = httpClient.execute(request);
String json = EntityUtils.toString(response.getEntity());
Checkpoints checkpoints = new ObjectMapper()
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
.readValue(json, Checkpoints.class);

return checkpoints != null && checkpoints.getCounts().getCompleted() > 0;
});

// save the job gracefully
LOGGER.info("Stopping gracefully");
jobClient.stopWithSavepoint(
false,
"/tmp/savepoint_" + UUID.randomUUID(),
SavepointFormatType.DEFAULT
).get();

// wait until job is finished
Awaitility.await().until(() -> jobClient.getJobStatus().get().equals(JobStatus.FINISHED));
LOGGER.info("Application finished");

// manually call listeners because we need to run executeAsync to gracefully finish but
// listener is called only on execute
Field field = FieldUtils.getField(StreamExecutionEnvironment.class, "jobListeners", true);
List<JobListener> jobListeners = (List<JobListener>) field.get(env);

LOGGER.info("calling onJobExecuted on listeners");
jobListeners.forEach(
jobListener -> {
try {
jobListener.onJobExecuted(jobClient.getJobExecutionResult().get(), null);
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
throw new RuntimeException(e);
}
}
);

// wait another few secs to still check if tracker thread stopped emitting running events
// checkpointing thread is triggered to run each second
Thread.sleep(5000);
}
}
Loading

0 comments on commit e95f993

Please sign in to comment.