Skip to content

Commit

Permalink
clients: add OPENLINEAGE_DISABLED environment variable which override…
Browse files Browse the repository at this point in the history
…s config to NoopTransport (OpenLineage#780)

  * rebase
Signed-off-by: Tyler Farris <[email protected]>
  • Loading branch information
mobuchowski authored and Tylerpfarris committed Jun 24, 2022
1 parent 6789430 commit 48ae5f6
Showing 1 changed file with 53 additions and 25 deletions.
78 changes: 53 additions & 25 deletions client/python/openlineage/client/transport/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
from typing import Type, Union, Optional

from openlineage.client.transport.noop import NoopConfig, NoopTransport
from openlineage.client.transport.transport import Config, Transport, TransportFactory
from openlineage.client.transport.transport import (
Config,
Transport,
TransportFactory,
)
from openlineage.client.utils import try_import_from_string


Expand All @@ -27,10 +31,14 @@ def register_transport(self, type: str, clazz: Union[Type[Transport], str]):
self.transports[type] = clazz

def create(self) -> Transport:
if os.environ.get("OPENLINEAGE_DISABLED", False) in [True, "true", "True"]:
if os.environ.get("OPENLINEAGE_DISABLED", False) in [
True,
"true",
"True",
]:
return NoopTransport(NoopConfig())

if 'yaml' in sys.modules:
if "yaml" in sys.modules:
yml_config = self._try_config_from_yaml()
if yml_config:
return self._create_transport(yml_config)
Expand All @@ -39,12 +47,18 @@ def create(self) -> Transport:
if http:
return http
# If there is no HTTP transport, log events to console
from openlineage.client.transport.console import ConsoleTransport, ConsoleConfig
log.warning("Couldn't initialize transport; will print events to console.")
from openlineage.client.transport.console import (
ConsoleTransport,
ConsoleConfig,
)

log.warning(
"Couldn't initialize transport; will print events to console."
)
return ConsoleTransport(ConsoleConfig())

def _create_transport(self, config: dict):
transport_type = config['type']
transport_type = config["type"]

if transport_type in self.transports:
transport_class = self.transports[transport_type]
Expand All @@ -53,7 +67,9 @@ def _create_transport(self, config: dict):

if isinstance(transport_class, str):
transport_class = try_import_from_string(transport_class)
if not inspect.isclass(transport_class) or not issubclass(transport_class, Transport):
if not inspect.isclass(transport_class) or not issubclass(
transport_class, Transport
):
raise TypeError(
f"Transport {transport_class} has to be class, and subclass of Transport"
)
Expand All @@ -62,18 +78,22 @@ def _create_transport(self, config: dict):

if isinstance(config_class, str):
config_class = try_import_from_string(config_class)
if not inspect.isclass(config_class) or not issubclass(config_class, Config):
raise TypeError(f"Config {config_class} has to be class, and subclass of Config")
if not inspect.isclass(config_class) or not issubclass(
config_class, Config
):
raise TypeError(
f"Config {config_class} has to be class, and subclass of Config"
)

return transport_class(config_class.from_dict(config))

def _try_config_from_yaml(self) -> Optional[dict]:
file = self._find_yaml()
if file:
try:
with open(file, 'r') as f:
with open(file, "r") as f:
config = yaml.safe_load(f)
return config['transport']
return config["transport"]
except Exception:
# Just move to read env vars
pass
Expand All @@ -82,7 +102,7 @@ def _try_config_from_yaml(self) -> Optional[dict]:
@staticmethod
def _find_yaml() -> Optional[str]:
# Check OPENLINEAGE_CONFIG env variable
path = os.getenv('OPENLINEAGE_CONFIG', None)
path = os.getenv("OPENLINEAGE_CONFIG", None)
try:
if path and os.path.isfile(path) and os.access(path, os.R_OK):
return path
Expand All @@ -96,36 +116,44 @@ def _find_yaml() -> Optional[str]:
# Check current working directory:
try:
cwd = os.getcwd()
if 'openlineage.yml' in os.listdir(cwd):
return os.path.join(cwd, 'openlineage.yml')
if "openlineage.yml" in os.listdir(cwd):
return os.path.join(cwd, "openlineage.yml")
except Exception:
# We can get different errors depending on system
pass

# Check $HOME/.openlineage dir
try:
path = os.path.expanduser("~/.openlineage")
if 'openlineage.yml' in os.listdir(path):
return os.path.join(path, 'openlineage.yml')
if "openlineage.yml" in os.listdir(path):
return os.path.join(path, "openlineage.yml")
except Exception:
# We can get different errors depending on system
pass
return None

@staticmethod
def _try_http_from_env_config() -> Optional[Transport]:
from openlineage.client.transport.http import HttpTransport, HttpConfig, \
create_token_provider
from openlineage.client.transport.http import (
HttpTransport,
HttpConfig,
create_token_provider,
)

# backwards compatibility: create Transport from
# OPENLINEAGE_URL and OPENLINEAGE_API_KEY
if 'OPENLINEAGE_URL' not in os.environ:
log.error("Did not find openlineage.yml and OPENLINEAGE_URL is not set")
if "OPENLINEAGE_URL" not in os.environ:
log.error(
"Did not find openlineage.yml and OPENLINEAGE_URL is not set"
)
return None
config = HttpConfig(
url=os.environ['OPENLINEAGE_URL'],
auth=create_token_provider({
"type": "api_key",
"api_key": os.environ.get('OPENLINEAGE_API_KEY', None)
})
url=os.environ["OPENLINEAGE_URL"],
auth=create_token_provider(
{
"type": "api_key",
"api_key": os.environ.get("OPENLINEAGE_API_KEY", None),
}
),
)
return HttpTransport(config)

0 comments on commit 48ae5f6

Please sign in to comment.