Skip to content

Commit

Permalink
Add schemaURL to run event (#1917)
Browse files Browse the repository at this point in the history
* Add schemaURL to run event

Signed-off-by: Bernát Gábor <[email protected]>

* PR feedback

Signed-off-by: Bernát Gábor <[email protected]>

---------

Signed-off-by: Bernát Gábor <[email protected]>
  • Loading branch information
gaborbernat authored Jun 7, 2023
1 parent bf4f219 commit a37d5e4
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 78 deletions.
5 changes: 4 additions & 1 deletion client/python/openlineage/client/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from openlineage.client.facet import NominalTimeRunFacet, ParentRunFacet
from openlineage.client.utils import RedactMixin

SCHEMA_URL = "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent"


class RunState(Enum):
START = "START"
Expand Down Expand Up @@ -97,8 +99,9 @@ class RunEvent(RedactMixin):
producer: str = attr.ib()
inputs: Optional[List[Dataset]] = attr.ib(factory=list) # type: ignore[assignment]
outputs: Optional[List[Dataset]] = attr.ib(factory=list) # type: ignore[assignment]
schemaURL: str = attr.ib(default=SCHEMA_URL) # noqa: N815

_skip_redact: List[str] = ["eventType", "eventTime", "producer"]
_skip_redact: List[str] = ["eventType", "eventTime", "producer", "schemaURL"]

@eventTime.validator
def check(self, attribute: str, value: str) -> None: # noqa: ARG002
Expand Down
8 changes: 4 additions & 4 deletions client/python/pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[build-system]
build-backend = "hatchling.build"
requires = [
"hatchling>=1.15",
"hatchling>=1.17",
]

[project]
Expand Down Expand Up @@ -29,16 +29,16 @@ dependencies = [
"attrs>=23.1",
"python-dateutil>=2.8.2",
"pyyaml>=6",
"requests>=2.30",
"requests>=2.31",
]
optional-dependencies.kafka = [
"confluent-kafka>=2.1.1",
]
optional-dependencies.test = [
"covdefaults>=2.3",
"pytest>=7.3.1",
"pytest-cov>=4",
"pytest-mock>=3.6.1",
"pytest-cov>=4.1",
"pytest-mock>=3.10",
"pyyaml>=6",
]

Expand Down
19 changes: 19 additions & 0 deletions client/python/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Copyright 2018-2023 contributors to the OpenLineage project
# SPDX-License-Identifier: Apache-2.0
from __future__ import annotations

from pathlib import Path

import pytest

from openlineage.client import set_producer


@pytest.fixture(scope="session")
def root() -> Path:
return Path(__file__).parent


@pytest.fixture(scope="session", autouse=True)
def _setup_producer() -> None:
set_producer("https://github.com/OpenLineage/OpenLineage/tree/0.0.1/client/python")
3 changes: 2 additions & 1 deletion client/python/tests/serde_example_run_event.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@
},
"inputs": [],
"outputs": [],
"producer": "https://github.com/OpenLineage/OpenLineage/tree/0.0.1/client/python"
"producer": "https://github.com/OpenLineage/OpenLineage/tree/0.0.1/client/python",
"schemaURL": "https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent"
}
44 changes: 30 additions & 14 deletions client/python/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,25 @@
import os
import re
import uuid
from typing import TYPE_CHECKING
from unittest.mock import MagicMock, patch

import pytest

from openlineage.client.client import OpenLineageClient
from openlineage.client.run import Dataset, DatasetEvent, Job, JobEvent, Run, RunEvent, RunState
from openlineage.client.run import (
SCHEMA_URL,
Dataset,
DatasetEvent,
Job,
JobEvent,
Run,
RunEvent,
RunState,
)

if TYPE_CHECKING:
from pathlib import Path


def test_client_fails_with_wrong_event_type() -> None:
Expand Down Expand Up @@ -73,7 +86,7 @@ def test_client_sends_proper_json_with_minimal_run_event() -> None:
'{"eventTime": "2021-11-03T10:53:52.427343", "eventType": "START", "inputs": [], "job": '
'{"facets": {}, "name": "job", "namespace": "openlineage"}, "outputs": [], '
'"producer": "producer", "run": {"facets": {}, "runId": '
'"69f4acab-b87d-4fc0-b27b-8ea950370ff3"}}',
f'"69f4acab-b87d-4fc0-b27b-8ea950370ff3"}}, "schemaURL": "{SCHEMA_URL}"}}',
timeout=5.0,
verify=True,
)
Expand Down Expand Up @@ -139,6 +152,7 @@ def test_client_uses_passed_transport() -> None:
Run("69f4acab-b87d-4fc0-b27b-8ea950370ff3"),
Job("openlineage", "job"),
"producer",
"schemaURL",
),
)
client.transport.emit.assert_called_once()
Expand All @@ -147,26 +161,27 @@ def test_client_uses_passed_transport() -> None:
@pytest.mark.parametrize(
("name", "config_path", "should_emit"),
[
("job", "tests/config/exact_filter.yml", False),
("wrong", "tests/config/exact_filter.yml", False),
("job1", "tests/config/exact_filter.yml", True),
("1wrong", "tests/config/exact_filter.yml", True),
("asdf", "tests/config/exact_filter.yml", True),
("", "tests/config/exact_filter.yml", True),
("whatever", "tests/config/regex_filter.yml", False),
("something_whatever_asdf", "tests/config/regex_filter.yml", False),
("$$$.whatever", "tests/config/regex_filter.yml", False),
("asdf", "tests/config/regex_filter.yml", True),
("", "tests/config/regex_filter.yml", True),
("job", "exact_filter.yml", False),
("wrong", "exact_filter.yml", False),
("job1", "exact_filter.yml", True),
("1wrong", "exact_filter.yml", True),
("asdf", "exact_filter.yml", True),
("", "exact_filter.yml", True),
("whatever", "regex_filter.yml", False),
("something_whatever_asdf", "regex_filter.yml", False),
("$$$.whatever", "regex_filter.yml", False),
("asdf", "regex_filter.yml", True),
("", "regex_filter.yml", True),
],
)
def test_client_filters_exact_job_name_events(
name: str,
config_path: str,
root: Path,
*,
should_emit: bool,
) -> None:
with patch.dict(os.environ, {"OPENLINEAGE_CONFIG": config_path}):
with patch.dict(os.environ, {"OPENLINEAGE_CONFIG": str(root / "config" / config_path)}):
factory = MagicMock()
transport = MagicMock()
factory.create.return_value = transport
Expand All @@ -179,6 +194,7 @@ def test_client_filters_exact_job_name_events(
run=run,
job=Job(name=name, namespace=""),
producer="",
schemaURL="",
)

client.emit(event)
Expand Down
11 changes: 5 additions & 6 deletions client/python/tests/test_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,11 @@
import attr
import pytest

from openlineage.client import facet, run, set_producer
from openlineage.client import facet, run
from openlineage.client.run import RunState
from openlineage.client.serde import Serde


@pytest.fixture(scope="session", autouse=True)
def _setup_producer() -> None:
set_producer("https://github.com/OpenLineage/OpenLineage/tree/0.0.1/client/python")


def get_sorted_json(file_name: str) -> str:
dirpath = os.path.dirname(os.path.realpath(__file__))
with open(os.path.join(dirpath, file_name)) as f:
Expand Down Expand Up @@ -46,6 +41,7 @@ def test_full_core_event_serializes_properly() -> None:
inputs=[],
outputs=[],
producer="https://github.com/OpenLineage/OpenLineage/tree/0.0.1/client/python",
schemaURL="https://openlineage.io/spec/1-0-5/OpenLineage.json#/definitions/RunEvent",
)

assert Serde.to_json(run_event) == get_sorted_json("serde_example_run_event.json")
Expand All @@ -66,6 +62,7 @@ def test_run_event_type_validated() -> None:
run.Run("69f4acab-b87d-4fc0-b27b-8ea950370ff3", {}),
run.Job("default", "name"),
"producer",
"schemaURL",
)
with pytest.raises(ValueError, match="'eventType' must be in <enum"):
run.RunEvent(
Expand All @@ -74,6 +71,7 @@ def test_run_event_type_validated() -> None:
valid_event.run,
valid_event.job,
valid_event.producer,
valid_event.schemaURL,
)

with pytest.raises(ValueError, match="Parsed date-time has to contain time: 2021-11-03"):
Expand All @@ -83,6 +81,7 @@ def test_run_event_type_validated() -> None:
valid_event.run,
valid_event.job,
valid_event.producer,
valid_event.schemaURL,
)


Expand Down
Loading

0 comments on commit a37d5e4

Please sign in to comment.