Skip to content

Commit

Permalink
clients: add OPENLINEAGE_DISABLED environment variable which override…
Browse files Browse the repository at this point in the history
…s config to NoopTransport

Signed-off-by: Maciej Obuchowski <[email protected]>
  • Loading branch information
mobuchowski committed May 26, 2022
1 parent 4873a11 commit 5531fde
Show file tree
Hide file tree
Showing 11 changed files with 118 additions and 2 deletions.
1 change: 1 addition & 0 deletions client/java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ dependencies {
testImplementation "org.assertj:assertj-core:${assertjVersion}"
testImplementation "org.junit.jupiter:junit-jupiter:${junit5Version}"
testImplementation "org.mockito:mockito-core:${mockitoVersion}"
testImplementation "org.mockito:mockito-inline:${mockitoVersion}"
testImplementation "org.mockito:mockito-junit-jupiter:${mockitoVersion}"
testImplementation "org.projectlombok:lombok:${lombokVersion}"
testAnnotationProcessor "org.projectlombok:lombok:${lombokVersion}"
Expand Down
5 changes: 5 additions & 0 deletions client/java/src/main/java/io/openlineage/client/Clients.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.openlineage.client;

import io.openlineage.client.transports.NoopTransport;
import io.openlineage.client.transports.Transport;
import io.openlineage.client.transports.TransportFactory;

Expand All @@ -13,6 +14,10 @@ public static OpenLineageClient newClient() {
}

public static OpenLineageClient newClient(ConfigPathProvider configPathProvider) {
String isDisabled = Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED");
if (isDisabled != null && (isDisabled.equals("true") || isDisabled.equals("True"))) {
return OpenLineageClient.builder().transport(new NoopTransport()).build();
}
final OpenLineageYaml openLineageYaml = Utils.loadOpenLineageYaml(configPathProvider);
final TransportFactory factory = new TransportFactory(openLineageYaml.getTransportConfig());
final Transport transport = factory.build();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package io.openlineage.client;

// This class exists because it's not possible to mock System
public class Environment {
public static String getEnvironmentVariable(String key) {
return System.getenv(key);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package io.openlineage.client.transports;

import io.openlineage.client.OpenLineage;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class NoopTransport extends Transport {
public NoopTransport() {
super(Type.NOOP);
log.info("OpenLineage client is disabled");
}

@Override
public void emit(OpenLineage.RunEvent runEvent) {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ public abstract class Transport {
enum Type {
CONSOLE,
HTTP,
KAFKA
KAFKA,
NOOP
};

private final Type type;
Expand Down
28 changes: 28 additions & 0 deletions client/java/src/test/java/io/openlineage/client/ConfigTest.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package io.openlineage.client;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.when;

import io.openlineage.client.transports.HttpTransport;
import io.openlineage.client.transports.NoopTransport;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;

public class ConfigTest {
@Test
Expand All @@ -19,6 +23,30 @@ void testLoadConfigFromYaml() throws URISyntaxException {
assertThat(client.transport).isInstanceOf(HttpTransport.class);
}

@Test
void testDisableOverridesConfigFromYaml() throws URISyntaxException {
try (MockedStatic mocked = mockStatic(Environment.class)) {
when(Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED")).thenReturn("true");

Path configPath =
Paths.get(this.getClass().getClassLoader().getResource("config/http.yaml").toURI());
OpenLineageClient client = Clients.newClient(new TestConfigPathProvider(configPath));
assertThat(client.transport).isInstanceOf(NoopTransport.class);
}
}

@Test
void testWrongDoesNotDisableConfigFromYaml() throws URISyntaxException {
try (MockedStatic mocked = mockStatic(Environment.class)) {
when(Environment.getEnvironmentVariable("OPENLINEAGE_DISABLED")).thenReturn("anything_else");

Path configPath =
Paths.get(this.getClass().getClassLoader().getResource("config/http.yaml").toURI());
OpenLineageClient client = Clients.newClient(new TestConfigPathProvider(configPath));
assertThat(client.transport).isInstanceOf(HttpTransport.class);
}
}

static class TestConfigPathProvider implements ConfigPathProvider {
private final Path path;

Expand Down
2 changes: 2 additions & 0 deletions client/python/openlineage/client/transport/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# flake8: noqa
from typing import Type

from openlineage.client.transport.noop import NoopTransport
from openlineage.client.transport.transport import Transport, Config, TransportFactory
from openlineage.client.transport.factory import DefaultTransportFactory
from openlineage.client.transport.http import HttpTransport, HttpConfig
Expand All @@ -12,6 +13,7 @@
_factory.register_transport(HttpTransport.kind, HttpTransport)
_factory.register_transport(KafkaTransport.kind, KafkaTransport)
_factory.register_transport(ConsoleTransport.kind, ConsoleTransport)
_factory.register_transport(NoopTransport.kind, NoopTransport)


def get_default_factory():
Expand Down
4 changes: 4 additions & 0 deletions client/python/openlineage/client/transport/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import logging
from typing import Type, Union, Optional

from openlineage.client.transport.noop import NoopConfig, NoopTransport
from openlineage.client.transport.transport import Config, Transport, TransportFactory
from openlineage.client.utils import try_import_from_string

Expand All @@ -25,6 +26,9 @@ def register_transport(self, type: str, clazz: Union[Type[Transport], str]):
self.transports[type] = clazz

def create(self) -> Transport:
if os.environ.get("OPENLINEAGE_DISABLED", False) in [True, "true", "True"]:
return NoopTransport(NoopConfig())

if yaml:
yml_config = self._try_config_from_yaml()
if yml_config:
Expand Down
24 changes: 24 additions & 0 deletions client/python/openlineage/client/transport/noop.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# SPDX-License-Identifier: Apache-2.0.
import logging

from openlineage.client.run import RunEvent
from openlineage.client.serde import Serde
from openlineage.client.transport.transport import Transport, Config


log = logging.getLogger(__name__)


class NoopConfig(Config):
pass


class NoopTransport(Transport):
kind = 'noop'
config = NoopConfig

def __init__(self, config: NoopConfig):
log.debug("OpenLineage client is disabled. NoopTransport.")

def emit(self, event: RunEvent):
pass
21 changes: 21 additions & 0 deletions client/python/tests/test_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from openlineage.client.transport import DefaultTransportFactory, \
get_default_factory, KafkaTransport
from openlineage.client.transport.http import HttpTransport
from openlineage.client.transport.noop import NoopTransport
from tests.transport import AccumulatingTransport, FakeTransport

current_path = os.path.join(os.getcwd(), "tests")
Expand Down Expand Up @@ -87,3 +88,23 @@ def test_transport_decorator_registers(join, listdir, yaml):

transport = get_default_factory().create()
assert isinstance(transport, FakeTransport)


@patch.dict(os.environ, {"OPENLINEAGE_DISABLED": "true"})
def test_env_disables_client():
transport = get_default_factory().create()
assert isinstance(transport, NoopTransport)


@patch.dict(os.environ, {
"OPENLINEAGE_DISABLED": "true",
"OPENLINEAGE_CONFIG": "tests/config/config.yml"
})
def test_env_disabled_ignores_config():
factory = DefaultTransportFactory()
factory.register_transport(
"fake",
clazz="tests.transport.FakeTransport"
)
transport = factory.create()
assert isinstance(transport, NoopTransport)
9 changes: 8 additions & 1 deletion integration/airflow/openlineage/airflow/plugin.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os

from airflow.plugins_manager import AirflowPlugin
from airflow.version import version as AIRFLOW_VERSION
from pkg_resources import parse_version
Expand All @@ -6,7 +8,12 @@
# Provide empty plugin for older version
from openlineage.airflow.macros import lineage_parent_id, lineage_run_id

if parse_version(AIRFLOW_VERSION) < parse_version("2.3.0.dev0"):

def _is_disabled():
return os.getenv("OPENLINEAGE_DISABLED", None) in [True, 'true', "True"]


if parse_version(AIRFLOW_VERSION) < parse_version("2.3.0.dev0") or _is_disabled():
class OpenLineagePlugin(AirflowPlugin):
name = "OpenLineagePlugin"
macros = [lineage_run_id, lineage_parent_id]
Expand Down

0 comments on commit 5531fde

Please sign in to comment.