From 5531fde2d86d8e82537ff69b984af6a1fb0cb351 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Wed, 25 May 2022 14:22:26 +0200 Subject: [PATCH] clients: add OPENLINEAGE_DISABLED environment variable which overrides config to NoopTransport Signed-off-by: Maciej Obuchowski --- client/java/build.gradle | 1 + .../java/io/openlineage/client/Clients.java | 5 ++++ .../io/openlineage/client/Environment.java | 8 ++++++ .../client/transports/NoopTransport.java | 15 ++++++++++ .../client/transports/Transport.java | 3 +- .../io/openlineage/client/ConfigTest.java | 28 +++++++++++++++++++ .../openlineage/client/transport/__init__.py | 2 ++ .../openlineage/client/transport/factory.py | 4 +++ .../openlineage/client/transport/noop.py | 24 ++++++++++++++++ client/python/tests/test_factory.py | 21 ++++++++++++++ .../airflow/openlineage/airflow/plugin.py | 9 +++++- 11 files changed, 118 insertions(+), 2 deletions(-) create mode 100644 client/java/src/main/java/io/openlineage/client/Environment.java create mode 100644 client/java/src/main/java/io/openlineage/client/transports/NoopTransport.java create mode 100644 client/python/openlineage/client/transport/noop.py diff --git a/client/java/build.gradle b/client/java/build.gradle index a5d2dcdd75..55676531c2 100644 --- a/client/java/build.gradle +++ b/client/java/build.gradle @@ -59,6 +59,7 @@ dependencies { testImplementation "org.assertj:assertj-core:${assertjVersion}" testImplementation "org.junit.jupiter:junit-jupiter:${junit5Version}" testImplementation "org.mockito:mockito-core:${mockitoVersion}" + testImplementation "org.mockito:mockito-inline:${mockitoVersion}" testImplementation "org.mockito:mockito-junit-jupiter:${mockitoVersion}" testImplementation "org.projectlombok:lombok:${lombokVersion}" testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}" diff --git a/client/java/src/main/java/io/openlineage/client/Clients.java b/client/java/src/main/java/io/openlineage/client/Clients.java index 9f127a6b68..0b3aa73919 100644 --- a/client/java/src/main/java/io/openlineage/client/Clients.java +++ b/client/java/src/main/java/io/openlineage/client/Clients.java @@ -1,5 +1,6 @@ package io.openlineage.client; +import io.openlineage.client.transports.NoopTransport; import io.openlineage.client.transports.Transport; import io.openlineage.client.transports.TransportFactory; @@ -13,6 +14,10 @@ public static OpenLineageClient newClient() { } public static OpenLineageClient newClient(ConfigPathProvider configPathProvider) { + String isDisabled = Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED"); + if (isDisabled != null && (isDisabled.equals("true") || isDisabled.equals("True"))) { + return OpenLineageClient.builder().transport(new NoopTransport()).build(); + } final OpenLineageYaml openLineageYaml = Utils.loadOpenLineageYaml(configPathProvider); final TransportFactory factory = new TransportFactory(openLineageYaml.getTransportConfig()); final Transport transport = factory.build(); diff --git a/client/java/src/main/java/io/openlineage/client/Environment.java b/client/java/src/main/java/io/openlineage/client/Environment.java new file mode 100644 index 0000000000..cd13c1d69f --- /dev/null +++ b/client/java/src/main/java/io/openlineage/client/Environment.java @@ -0,0 +1,8 @@ +package io.openlineage.client; + +// This class exists because it's not possible to mock System +public class Environment { + public static String getEnvironmentVariable(String key) { + return System.getenv(key); + } +} diff --git a/client/java/src/main/java/io/openlineage/client/transports/NoopTransport.java b/client/java/src/main/java/io/openlineage/client/transports/NoopTransport.java new file mode 100644 index 0000000000..8c8094942a --- /dev/null +++ b/client/java/src/main/java/io/openlineage/client/transports/NoopTransport.java @@ -0,0 +1,15 @@ +package io.openlineage.client.transports; + +import io.openlineage.client.OpenLineage; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class NoopTransport extends Transport { + public NoopTransport() { + super(Type.NOOP); + log.info("OpenLineage client is disabled"); + } + + @Override + public void emit(OpenLineage.RunEvent runEvent) {} +} diff --git a/client/java/src/main/java/io/openlineage/client/transports/Transport.java b/client/java/src/main/java/io/openlineage/client/transports/Transport.java index 3a95ace30c..92b030f560 100644 --- a/client/java/src/main/java/io/openlineage/client/transports/Transport.java +++ b/client/java/src/main/java/io/openlineage/client/transports/Transport.java @@ -7,7 +7,8 @@ public abstract class Transport { enum Type { CONSOLE, HTTP, - KAFKA + KAFKA, + NOOP }; private final Type type; diff --git a/client/java/src/test/java/io/openlineage/client/ConfigTest.java b/client/java/src/test/java/io/openlineage/client/ConfigTest.java index 1263a2cc80..b88b4e53cf 100644 --- a/client/java/src/test/java/io/openlineage/client/ConfigTest.java +++ b/client/java/src/test/java/io/openlineage/client/ConfigTest.java @@ -1,14 +1,18 @@ package io.openlineage.client; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; import io.openlineage.client.transports.HttpTransport; +import io.openlineage.client.transports.NoopTransport; import java.net.URISyntaxException; import java.nio.file.Path; import java.nio.file.Paths; import java.util.Collections; import java.util.List; import org.junit.jupiter.api.Test; +import org.mockito.MockedStatic; public class ConfigTest { @Test @@ -19,6 +23,30 @@ void testLoadConfigFromYaml() throws URISyntaxException { assertThat(client.transport).isInstanceOf(HttpTransport.class); } + @Test + void testDisableOverridesConfigFromYaml() throws URISyntaxException { + try (MockedStatic mocked = mockStatic(Environment.class)) { + when(Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED")).thenReturn("true"); + + Path configPath = + Paths.get(this.getClass().getClassLoader().getResource("config/http.yaml").toURI()); + OpenLineageClient client = Clients.newClient(new TestConfigPathProvider(configPath)); + assertThat(client.transport).isInstanceOf(NoopTransport.class); + } + } + + @Test + void testWrongDoesNotDisableConfigFromYaml() throws URISyntaxException { + try (MockedStatic mocked = mockStatic(Environment.class)) { + when(Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED")).thenReturn("anything_else"); + + Path configPath = + Paths.get(this.getClass().getClassLoader().getResource("config/http.yaml").toURI()); + OpenLineageClient client = Clients.newClient(new TestConfigPathProvider(configPath)); + assertThat(client.transport).isInstanceOf(HttpTransport.class); + } + } + static class TestConfigPathProvider implements ConfigPathProvider { private final Path path; diff --git a/client/python/openlineage/client/transport/__init__.py b/client/python/openlineage/client/transport/__init__.py index dba3f37125..dc83706751 100644 --- a/client/python/openlineage/client/transport/__init__.py +++ b/client/python/openlineage/client/transport/__init__.py @@ -1,6 +1,7 @@ # flake8: noqa from typing import Type +from openlineage.client.transport.noop import NoopTransport from openlineage.client.transport.transport import Transport, Config, TransportFactory from openlineage.client.transport.factory import DefaultTransportFactory from openlineage.client.transport.http import HttpTransport, HttpConfig @@ -12,6 +13,7 @@ _factory.register_transport(HttpTransport.kind, HttpTransport) _factory.register_transport(KafkaTransport.kind, KafkaTransport) _factory.register_transport(ConsoleTransport.kind, ConsoleTransport) +_factory.register_transport(NoopTransport.kind, NoopTransport) def get_default_factory(): diff --git a/client/python/openlineage/client/transport/factory.py b/client/python/openlineage/client/transport/factory.py index 9cad1d9748..38723d7659 100644 --- a/client/python/openlineage/client/transport/factory.py +++ b/client/python/openlineage/client/transport/factory.py @@ -4,6 +4,7 @@ import logging from typing import Type, Union, Optional +from openlineage.client.transport.noop import NoopConfig, NoopTransport from openlineage.client.transport.transport import Config, Transport, TransportFactory from openlineage.client.utils import try_import_from_string @@ -25,6 +26,9 @@ def register_transport(self, type: str, clazz: Union[Type[Transport], str]): self.transports[type] = clazz def create(self) -> Transport: + if os.environ.get("OPENLINEAGE_DISABLED", False) in [True, "true", "True"]: + return NoopTransport(NoopConfig()) + if yaml: yml_config = self._try_config_from_yaml() if yml_config: diff --git a/client/python/openlineage/client/transport/noop.py b/client/python/openlineage/client/transport/noop.py new file mode 100644 index 0000000000..fa9deffe5b --- /dev/null +++ b/client/python/openlineage/client/transport/noop.py @@ -0,0 +1,24 @@ +# SPDX-License-Identifier: Apache-2.0. +import logging + +from openlineage.client.run import RunEvent +from openlineage.client.serde import Serde +from openlineage.client.transport.transport import Transport, Config + + +log = logging.getLogger(__name__) + + +class NoopConfig(Config): + pass + + +class NoopTransport(Transport): + kind = 'noop' + config = NoopConfig + + def __init__(self, config: NoopConfig): + log.debug("OpenLineage client is disabled. NoopTransport.") + + def emit(self, event: RunEvent): + pass diff --git a/client/python/tests/test_factory.py b/client/python/tests/test_factory.py index 73287f92bf..1b7bdfc525 100644 --- a/client/python/tests/test_factory.py +++ b/client/python/tests/test_factory.py @@ -6,6 +6,7 @@ from openlineage.client.transport import DefaultTransportFactory, \ get_default_factory, KafkaTransport from openlineage.client.transport.http import HttpTransport +from openlineage.client.transport.noop import NoopTransport from tests.transport import AccumulatingTransport, FakeTransport current_path = os.path.join(os.getcwd(), "tests") @@ -87,3 +88,23 @@ def test_transport_decorator_registers(join, listdir, yaml): transport = get_default_factory().create() assert isinstance(transport, FakeTransport) + + +@patch.dict(os.environ, {"OPENLINEAGE_DISABLED": "true"}) +def test_env_disables_client(): + transport = get_default_factory().create() + assert isinstance(transport, NoopTransport) + + +@patch.dict(os.environ, { + "OPENLINEAGE_DISABLED": "true", + "OPENLINEAGE_CONFIG": "tests/config/config.yml" +}) +def test_env_disabled_ignores_config(): + factory = DefaultTransportFactory() + factory.register_transport( + "fake", + clazz="tests.transport.FakeTransport" + ) + transport = factory.create() + assert isinstance(transport, NoopTransport) diff --git a/integration/airflow/openlineage/airflow/plugin.py b/integration/airflow/openlineage/airflow/plugin.py index 078785ab7c..a3840662a1 100644 --- a/integration/airflow/openlineage/airflow/plugin.py +++ b/integration/airflow/openlineage/airflow/plugin.py @@ -1,3 +1,5 @@ +import os + from airflow.plugins_manager import AirflowPlugin from airflow.version import version as AIRFLOW_VERSION from pkg_resources import parse_version @@ -6,7 +8,12 @@ # Provide empty plugin for older version from openlineage.airflow.macros import lineage_parent_id, lineage_run_id -if parse_version(AIRFLOW_VERSION) < parse_version("2.3.0.dev0"): + +def _is_disabled(): + return os.getenv("OPENLINEAGE_DISABLED", None) in [True, 'true', "True"] + + +if parse_version(AIRFLOW_VERSION) < parse_version("2.3.0.dev0") or _is_disabled(): class OpenLineagePlugin(AirflowPlugin): name = "OpenLineagePlugin" macros = [lineage_run_id, lineage_parent_id]