From c774003bda28cf3b794202532a293d4c06709820 Mon Sep 17 00:00:00 2001 From: Maciej Obuchowski Date: Fri, 12 Aug 2022 14:51:24 +0200 Subject: [PATCH] add raw event API Signed-off-by: Maciej Obuchowski --- api/build.gradle | 1 + api/src/main/java/marquez/MarquezContext.java | 13 +- .../main/java/marquez/api/BaseResource.java | 3 + .../main/java/marquez/api/EventResource.java | 62 ++++++++ api/src/main/java/marquez/db/BaseDao.java | 3 + api/src/main/java/marquez/db/Columns.java | 3 + api/src/main/java/marquez/db/EventDao.java | 37 +++++ .../db/mappers/RawLineageEventMapper.java | 30 ++++ .../java/marquez/service/DelegatingDaos.java | 6 + .../java/marquez/service/EventService.java | 10 ++ .../java/marquez/service/ServiceFactory.java | 1 + .../V46__add_lineage_event_indexes.sql | 5 + .../marquez/OpenLineageIntegrationTest.java | 137 ++++++++++++++++++ .../test/java/marquez/api/ApiTestUtils.java | 3 + .../MarquezJdbiExternalPostgresExtension.java | 2 + build.gradle | 2 +- .../java/marquez/client/MarquezClient.java | 26 ++++ .../java/marquez/client/MarquezPathV1.java | 8 + .../main/java/marquez/client/MarquezUrl.java | 9 ++ .../client/models/RawLineageEvent.java | 39 +++++ 20 files changed, 398 insertions(+), 2 deletions(-) create mode 100644 api/src/main/java/marquez/api/EventResource.java create mode 100644 api/src/main/java/marquez/db/EventDao.java create mode 100644 api/src/main/java/marquez/db/mappers/RawLineageEventMapper.java create mode 100644 api/src/main/java/marquez/service/EventService.java create mode 100644 api/src/main/resources/marquez/db/migration/V46__add_lineage_event_indexes.sql create mode 100644 clients/java/src/main/java/marquez/client/models/RawLineageEvent.java diff --git a/api/build.gradle b/api/build.gradle index 9db5e79e84..376fb60f8b 100644 --- a/api/build.gradle +++ b/api/build.gradle @@ -40,6 +40,7 @@ dependencies { implementation "io.prometheus:simpleclient_hotspot:${prometheusVersion}" implementation "io.prometheus:simpleclient_servlet:${prometheusVersion}" implementation "org.jdbi:jdbi3-core:${jdbi3Version}" + implementation "org.jdbi:jdbi3-jackson2:${jdbi3Version}" implementation "org.jdbi:jdbi3-postgres:${jdbi3Version}" implementation "org.jdbi:jdbi3-sqlobject:${jdbi3Version}" implementation 'com.google.guava:guava:31.1-jre' diff --git a/api/src/main/java/marquez/MarquezContext.java b/api/src/main/java/marquez/MarquezContext.java index 1459f3041e..771f58559c 100644 --- a/api/src/main/java/marquez/MarquezContext.java +++ b/api/src/main/java/marquez/MarquezContext.java @@ -14,6 +14,7 @@ import lombok.Getter; import lombok.NonNull; import marquez.api.DatasetResource; +import marquez.api.EventResource; import marquez.api.JobResource; import marquez.api.NamespaceResource; import marquez.api.OpenLineageResource; @@ -25,6 +26,7 @@ import marquez.db.DatasetDao; import marquez.db.DatasetFieldDao; import marquez.db.DatasetVersionDao; +import marquez.db.EventDao; import marquez.db.JobContextDao; import marquez.db.JobDao; import marquez.db.JobVersionDao; @@ -42,6 +44,7 @@ import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; +import marquez.service.EventService; import marquez.service.JobService; import marquez.service.LineageService; import marquez.service.NamespaceService; @@ -71,6 +74,7 @@ public final class MarquezContext { @Getter private final OpenLineageDao openLineageDao; @Getter private final LineageDao lineageDao; @Getter private final SearchDao searchDao; + @Getter private final EventDao eventDao; @Getter private final List runTransitionListeners; @@ -82,6 +86,7 @@ public final class MarquezContext { @Getter private final RunService runService; @Getter private final OpenLineageService openLineageService; @Getter private final LineageService lineageService; + @Getter private final EventService eventService; @Getter private final NamespaceResource namespaceResource; @Getter private final SourceResource sourceResource; @@ -90,6 +95,7 @@ public final class MarquezContext { @Getter private final TagResource tagResource; @Getter private final OpenLineageResource openLineageResource; @Getter private final SearchResource searchResource; + @Getter private final EventResource eventResource; @Getter private final ImmutableList resources; @Getter private final JdbiExceptionExceptionMapper jdbiException; @@ -119,6 +125,7 @@ private MarquezContext( this.openLineageDao = jdbi.onDemand(OpenLineageDao.class); this.lineageDao = jdbi.onDemand(LineageDao.class); this.searchDao = jdbi.onDemand(SearchDao.class); + this.eventDao = jdbi.onDemand(EventDao.class); this.runTransitionListeners = runTransitionListeners; this.namespaceService = new NamespaceService(baseDao); @@ -131,6 +138,7 @@ private MarquezContext( this.tagService.init(tags); this.openLineageService = new OpenLineageService(baseDao, runService); this.lineageService = new LineageService(lineageDao, jobDao); + this.eventService = new EventService(baseDao); this.jdbiException = new JdbiExceptionExceptionMapper(); final ServiceFactory serviceFactory = ServiceFactory.builder() @@ -144,6 +152,7 @@ private MarquezContext( .lineageService(lineageService) .datasetFieldService(new DatasetFieldService(baseDao)) .datasetVersionService(new DatasetVersionService(baseDao)) + .eventService(eventService) .build(); this.namespaceResource = new NamespaceResource(serviceFactory); this.sourceResource = new SourceResource(serviceFactory); @@ -152,6 +161,7 @@ private MarquezContext( this.tagResource = new TagResource(serviceFactory); this.openLineageResource = new OpenLineageResource(serviceFactory); this.searchResource = new SearchResource(searchDao); + this.eventResource = new EventResource(serviceFactory); this.resources = ImmutableList.of( @@ -162,7 +172,8 @@ private MarquezContext( tagResource, jdbiException, openLineageResource, - searchResource); + searchResource, + eventResource); final MarquezGraphqlServletBuilder servlet = new MarquezGraphqlServletBuilder(); this.graphqlServlet = servlet.getServlet(new GraphqlSchemaBuilder(jdbi)); diff --git a/api/src/main/java/marquez/api/BaseResource.java b/api/src/main/java/marquez/api/BaseResource.java index ce15d31ab3..d499c481bf 100644 --- a/api/src/main/java/marquez/api/BaseResource.java +++ b/api/src/main/java/marquez/api/BaseResource.java @@ -28,6 +28,7 @@ import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; +import marquez.service.EventService; import marquez.service.JobService; import marquez.service.LineageService; import marquez.service.NamespaceService; @@ -50,6 +51,7 @@ public class BaseResource { protected DatasetVersionService datasetVersionService; protected DatasetFieldService datasetFieldService; protected LineageService lineageService; + protected EventService eventService; public BaseResource(ServiceFactory serviceFactory) { this.serviceFactory = serviceFactory; @@ -63,6 +65,7 @@ public BaseResource(ServiceFactory serviceFactory) { this.datasetVersionService = serviceFactory.getDatasetVersionService(); this.datasetFieldService = serviceFactory.getDatasetFieldService(); this.lineageService = serviceFactory.getLineageService(); + this.eventService = serviceFactory.getEventService(); } void throwIfNotExists(@NonNull NamespaceName namespaceName) { diff --git a/api/src/main/java/marquez/api/EventResource.java b/api/src/main/java/marquez/api/EventResource.java new file mode 100644 index 0000000000..b1273b4589 --- /dev/null +++ b/api/src/main/java/marquez/api/EventResource.java @@ -0,0 +1,62 @@ +package marquez.api; + +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; + +import com.codahale.metrics.annotation.ExceptionMetered; +import com.codahale.metrics.annotation.ResponseMetered; +import com.codahale.metrics.annotation.Timed; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import javax.validation.constraints.Min; +import javax.ws.rs.DefaultValue; +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Response; +import lombok.NonNull; +import lombok.Value; +import marquez.service.ServiceFactory; + +@Path("/api/v1") +public class EventResource extends BaseResource { + public EventResource(@NonNull final ServiceFactory serviceFactory) { + super(serviceFactory); + } + + @Timed + @ResponseMetered + @ExceptionMetered + @GET + @Path("/events") + @Produces(APPLICATION_JSON) + public Response get( + @QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit, + @QueryParam("offset") @DefaultValue("0") @Min(value = 0) int offset) { + final List events = eventService.getAll(limit, offset); + return Response.ok(new Events(events)).build(); + } + + @Timed + @ResponseMetered + @ExceptionMetered + @GET + @Path("/events/{namespace}") + @Produces(APPLICATION_JSON) + public Response getByNamespace( + @PathParam("namespace") String namespace, + @QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit, + @QueryParam("offset") @DefaultValue("0") @Min(value = 0) int offset) { + final List event = eventService.getByNamespace(namespace, limit, offset); + return Response.ok(new Events(event)).build(); + } + + @Value + static class Events { + @NonNull + @JsonProperty("events") + List value; + } +} diff --git a/api/src/main/java/marquez/db/BaseDao.java b/api/src/main/java/marquez/db/BaseDao.java index 848372d863..9e8b3d37ac 100644 --- a/api/src/main/java/marquez/db/BaseDao.java +++ b/api/src/main/java/marquez/db/BaseDao.java @@ -50,4 +50,7 @@ public interface BaseDao extends SqlObject { @CreateSqlObject OpenLineageDao createOpenLineageDao(); + + @CreateSqlObject + EventDao createEventDao(); } diff --git a/api/src/main/java/marquez/db/Columns.java b/api/src/main/java/marquez/db/Columns.java index 5af8e60079..0cfe4d1e3a 100644 --- a/api/src/main/java/marquez/db/Columns.java +++ b/api/src/main/java/marquez/db/Columns.java @@ -126,6 +126,9 @@ private Columns() {} public static final String RUN_UUID = "run_uuid"; public static final String STATE = "state"; + /* LINEAGE EVENT ROW COLUMNS */ + public static final String EVENT = "event"; + public static UUID uuidOrNull(final ResultSet results, final String column) throws SQLException { if (results.getObject(column) == null) { return null; diff --git a/api/src/main/java/marquez/db/EventDao.java b/api/src/main/java/marquez/db/EventDao.java new file mode 100644 index 0000000000..5ab67a5b98 --- /dev/null +++ b/api/src/main/java/marquez/db/EventDao.java @@ -0,0 +1,37 @@ +package marquez.db; + +import com.fasterxml.jackson.databind.JsonNode; +import java.util.List; +import marquez.db.mappers.RawLineageEventMapper; +import org.jdbi.v3.sqlobject.config.RegisterRowMapper; +import org.jdbi.v3.sqlobject.customizer.Bind; +import org.jdbi.v3.sqlobject.statement.SqlQuery; + +@RegisterRowMapper(RawLineageEventMapper.class) +public interface EventDao extends BaseDao { + + @SqlQuery( + """ + SELECT event + FROM lineage_events + ORDER BY event_time DESC + LIMIT :limit + OFFSET :offset""") + List getAll(int limit, int offset); + + /** + * This is a "hack" to get inputs/outputs namespace from jsonb column: explanation + */ + @SqlQuery( + """ + SELECT le.event + FROM lineage_events le, jsonb_array_elements(coalesce(le.event -> 'inputs', '[]'::jsonb) || coalesce(le.event -> 'outputs', '[]'::jsonb)) AS ds + WHERE le.job_namespace = :namespace + OR ds ->> 'namespace' = :namespace + ORDER BY event_time DESC + LIMIT :limit + OFFSET :offset""") + List getByNamespace( + @Bind("namespace") String namespace, @Bind("limit") int limit, @Bind("offset") int offset); +} diff --git a/api/src/main/java/marquez/db/mappers/RawLineageEventMapper.java b/api/src/main/java/marquez/db/mappers/RawLineageEventMapper.java new file mode 100644 index 0000000000..5209f034fc --- /dev/null +++ b/api/src/main/java/marquez/db/mappers/RawLineageEventMapper.java @@ -0,0 +1,30 @@ +package marquez.db.mappers; + +import static marquez.db.Columns.stringOrThrow; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.ResultSet; +import java.sql.SQLException; +import lombok.extern.slf4j.Slf4j; +import marquez.common.Utils; +import marquez.db.Columns; +import org.jdbi.v3.core.mapper.RowMapper; +import org.jdbi.v3.core.statement.StatementContext; + +@Slf4j +public class RawLineageEventMapper implements RowMapper { + @Override + public JsonNode map(ResultSet rs, StatementContext ctx) throws SQLException { + String rawEvent = stringOrThrow(rs, Columns.EVENT); + + try { + ObjectMapper mapper = Utils.getMapper(); + return mapper.readTree(rawEvent); + } catch (JsonProcessingException e) { + log.error("Failed to process json", e); + } + return null; + } +} diff --git a/api/src/main/java/marquez/service/DelegatingDaos.java b/api/src/main/java/marquez/service/DelegatingDaos.java index 8184200ab4..e1f91e7c5c 100644 --- a/api/src/main/java/marquez/service/DelegatingDaos.java +++ b/api/src/main/java/marquez/service/DelegatingDaos.java @@ -10,6 +10,7 @@ import marquez.db.DatasetDao; import marquez.db.DatasetFieldDao; import marquez.db.DatasetVersionDao; +import marquez.db.EventDao; import marquez.db.JobContextDao; import marquez.db.JobDao; import marquez.db.JobVersionDao; @@ -98,4 +99,9 @@ public static class DelegatingTagDao implements TagDao { public static class DelegatingLineageDao implements LineageDao { @Delegate private final LineageDao delegate; } + + @RequiredArgsConstructor + public static class DelegatingEventDao implements EventDao { + @Delegate private final EventDao delegate; + } } diff --git a/api/src/main/java/marquez/service/EventService.java b/api/src/main/java/marquez/service/EventService.java new file mode 100644 index 0000000000..9da1afe672 --- /dev/null +++ b/api/src/main/java/marquez/service/EventService.java @@ -0,0 +1,10 @@ +package marquez.service; + +import lombok.NonNull; +import marquez.db.BaseDao; + +public class EventService extends DelegatingDaos.DelegatingEventDao { + public EventService(@NonNull BaseDao baseDao) { + super(baseDao.createEventDao()); + } +} diff --git a/api/src/main/java/marquez/service/ServiceFactory.java b/api/src/main/java/marquez/service/ServiceFactory.java index 5a4b51465b..4a8ded0a0a 100644 --- a/api/src/main/java/marquez/service/ServiceFactory.java +++ b/api/src/main/java/marquez/service/ServiceFactory.java @@ -22,4 +22,5 @@ public class ServiceFactory { @NonNull DatasetVersionService datasetVersionService; @NonNull DatasetFieldService datasetFieldService; @NonNull LineageService lineageService; + @NonNull EventService eventService; } diff --git a/api/src/main/resources/marquez/db/migration/V46__add_lineage_event_indexes.sql b/api/src/main/resources/marquez/db/migration/V46__add_lineage_event_indexes.sql new file mode 100644 index 0000000000..32249c6a84 --- /dev/null +++ b/api/src/main/resources/marquez/db/migration/V46__add_lineage_event_indexes.sql @@ -0,0 +1,5 @@ +CREATE INDEX lineage_events_event_time + on lineage_events(event_time DESC); + +CREATE INDEX lineage_events_namespace_event_time + on lineage_events(job_namespace, event_time DESC); diff --git a/api/src/test/java/marquez/OpenLineageIntegrationTest.java b/api/src/test/java/marquez/OpenLineageIntegrationTest.java index b3783e39ec..bcca459446 100644 --- a/api/src/test/java/marquez/OpenLineageIntegrationTest.java +++ b/api/src/test/java/marquez/OpenLineageIntegrationTest.java @@ -11,6 +11,7 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import io.dropwizard.util.Resources; import io.openlineage.client.OpenLineage; @@ -39,6 +40,7 @@ import marquez.client.models.DatasetVersion; import marquez.client.models.Job; import marquez.client.models.JobId; +import marquez.client.models.RawLineageEvent; import marquez.client.models.Run; import marquez.common.Utils; import marquez.db.LineageTestUtils; @@ -320,6 +322,141 @@ public void testOpenLineageJobHierarchySparkAndAirflow() assertThat(runsList).isNotEmpty().hasSize(1); } + @Test + public void testSendEventAndGetItBack() { + LineageEvent.Run run = + new LineageEvent.Run(UUID.randomUUID().toString(), LineageEvent.RunFacet.builder().build()); + LineageEvent.Job job = + LineageEvent.Job.builder().namespace(NAMESPACE_NAME).name(JOB_NAME).build(); + LineageEvent.Dataset dataset = + LineageEvent.Dataset.builder().namespace(NAMESPACE_NAME).name(DB_TABLE_NAME).build(); + + // We're losing zone info on write, so I have to UTC it here to compare later + ZonedDateTime time = ZonedDateTime.now(ZoneId.of("UTC")); + + final LineageEvent lineageEvent = + LineageEvent.builder() + .producer("testSendEventAndGetItBack") + .eventType("COMPLETE") + .run(run) + .job(job) + .eventTime(time) + .inputs(Collections.emptyList()) + .outputs(Collections.singletonList(dataset)) + .build(); + + final CompletableFuture resp = sendEvent(lineageEvent); + assertThat(resp.join()).isEqualTo(201); + + List events = client.listEvents(); + + assertThat(events.size()).isEqualTo(1); + + ObjectMapper mapper = Utils.getMapper(); + JsonNode prev = mapper.valueToTree(events.get(0)); + assertThat(prev).isEqualTo(mapper.valueToTree(lineageEvent)); + } + + @Test + public void testFindEventByDatasetNamespace() { + LineageEvent.Run run = + new LineageEvent.Run(UUID.randomUUID().toString(), LineageEvent.RunFacet.builder().build()); + LineageEvent.Job job = + LineageEvent.Job.builder().namespace(NAMESPACE_NAME).name(JOB_NAME).build(); + + ZonedDateTime time = ZonedDateTime.now(ZoneId.of("UTC")); + + LineageEvent.LineageEventBuilder builder = + LineageEvent.builder() + .producer("testFindEventByDatasetNamespace") + .eventType("COMPLETE") + .run(run) + .job(job) + .eventTime(time) + .inputs(Collections.emptyList()); + + for (int i = 0; i < 10; i++) { + LineageEvent.Dataset dataset = + LineageEvent.Dataset.builder() + .namespace(String.format("namespace%d", i)) + .name(DB_TABLE_NAME) + .build(); + + LineageEvent event = builder.outputs(Collections.singletonList(dataset)).build(); + + final CompletableFuture resp = sendEvent(event); + assertThat(resp.join()).isEqualTo(201); + } + + List rawEvents = client.listEvents("namespace3"); + + LineageEvent thirdEvent = + builder + .outputs( + Collections.singletonList( + LineageEvent.Dataset.builder() + .namespace(String.format("namespace3")) + .name(DB_TABLE_NAME) + .build())) + .build(); + + assertThat(rawEvents.size()).isEqualTo(1); + ObjectMapper mapper = Utils.getMapper(); + assertThat((JsonNode) mapper.valueToTree(thirdEvent)) + .isEqualTo(mapper.valueToTree(rawEvents.get(0))); + } + + @Test + public void testFindEventIsSortedByTime() { + LineageEvent.Run run = + new LineageEvent.Run(UUID.randomUUID().toString(), LineageEvent.RunFacet.builder().build()); + LineageEvent.Job job = + LineageEvent.Job.builder().namespace(NAMESPACE_NAME).name(JOB_NAME).build(); + + ZonedDateTime time = ZonedDateTime.now(ZoneId.of("UTC")); + LineageEvent.Dataset dataset = + LineageEvent.Dataset.builder().namespace(NAMESPACE_NAME).name(DB_TABLE_NAME).build(); + + LineageEvent.LineageEventBuilder builder = + LineageEvent.builder() + .producer("testFindEventIsSortedByTime") + .run(run) + .job(job) + .inputs(Collections.emptyList()) + .outputs(Collections.singletonList(dataset)); + + LineageEvent firstEvent = builder.eventTime(time).eventType("START").build(); + + CompletableFuture resp = sendEvent(firstEvent); + assertThat(resp.join()).isEqualTo(201); + + LineageEvent secondEvent = + builder.eventTime(time.plusSeconds(10)).eventType("COMPLETE").build(); + + resp = sendEvent(secondEvent); + assertThat(resp.join()).isEqualTo(201); + + List rawEvents = client.listEvents(NAMESPACE_NAME); + + assertThat(rawEvents.size()).isEqualTo(2); + ObjectMapper mapper = Utils.getMapper(); + assertThat((JsonNode) mapper.valueToTree(firstEvent)) + .isEqualTo(mapper.valueToTree(rawEvents.get(1))); + assertThat((JsonNode) mapper.valueToTree(secondEvent)) + .isEqualTo(mapper.valueToTree(rawEvents.get(0))); + } + + private CompletableFuture sendEvent(LineageEvent event) { + return this.sendLineage(Utils.toJson(event)) + .thenApply(HttpResponse::statusCode) + .whenComplete( + (val, error) -> { + if (error != null) { + Assertions.fail("Could not complete request"); + } + }); + } + private CompletableFuture sendAllEvents(RunEvent... events) { return Arrays.stream(events) .reduce( diff --git a/api/src/test/java/marquez/api/ApiTestUtils.java b/api/src/test/java/marquez/api/ApiTestUtils.java index f9e181cf79..8cacedbd5b 100644 --- a/api/src/test/java/marquez/api/ApiTestUtils.java +++ b/api/src/test/java/marquez/api/ApiTestUtils.java @@ -11,6 +11,7 @@ import marquez.service.DatasetFieldService; import marquez.service.DatasetService; import marquez.service.DatasetVersionService; +import marquez.service.EventService; import marquez.service.JobService; import marquez.service.LineageService; import marquez.service.NamespaceService; @@ -53,6 +54,8 @@ public static ServiceFactory mockServiceFactory(Map mocks) { (SourceService) mocks.getOrDefault(SourceService.class, (mock(SourceService.class)))) .datasetService( (DatasetService) mocks.getOrDefault(DatasetService.class, (mock(DatasetService.class)))) + .eventService( + (EventService) mocks.getOrDefault(EventService.class, (mock(EventService.class)))) .build(); } } diff --git a/api/src/test/java/marquez/jdbi/MarquezJdbiExternalPostgresExtension.java b/api/src/test/java/marquez/jdbi/MarquezJdbiExternalPostgresExtension.java index 3673313444..0a1d4acf0d 100644 --- a/api/src/test/java/marquez/jdbi/MarquezJdbiExternalPostgresExtension.java +++ b/api/src/test/java/marquez/jdbi/MarquezJdbiExternalPostgresExtension.java @@ -7,6 +7,7 @@ import javax.sql.DataSource; import marquez.PostgresContainer; +import org.jdbi.v3.jackson2.Jackson2Plugin; import org.jdbi.v3.postgres.PostgresPlugin; import org.jdbi.v3.sqlobject.SqlObjectPlugin; import org.postgresql.ds.PGSimpleDataSource; @@ -34,6 +35,7 @@ public class MarquezJdbiExternalPostgresExtension extends JdbiExternalPostgresEx database = POSTGRES.getDatabaseName(); plugins.add(new SqlObjectPlugin()); plugins.add(new PostgresPlugin()); + plugins.add(new Jackson2Plugin()); migration = Migration.before().withPaths("marquez/db/migration", "classpath:marquez/db/migrations"); } diff --git a/build.gradle b/build.gradle index bfcbe65dd1..fee4e13905 100644 --- a/build.gradle +++ b/build.gradle @@ -21,7 +21,7 @@ buildscript { dependencies { classpath 'com.adarshr:gradle-test-logger-plugin:3.2.0' classpath 'gradle.plugin.com.github.johnrengelman:shadow:7.1.2' - classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.6.1' + classpath 'com.diffplug.spotless:spotless-plugin-gradle:6.7.2' } } diff --git a/clients/java/src/main/java/marquez/client/MarquezClient.java b/clients/java/src/main/java/marquez/client/MarquezClient.java index bc617a4f2a..6376ef3f46 100644 --- a/clients/java/src/main/java/marquez/client/MarquezClient.java +++ b/clients/java/src/main/java/marquez/client/MarquezClient.java @@ -40,6 +40,7 @@ import marquez.client.models.JobVersion; import marquez.client.models.Namespace; import marquez.client.models.NamespaceMeta; +import marquez.client.models.RawLineageEvent; import marquez.client.models.Run; import marquez.client.models.RunMeta; import marquez.client.models.RunState; @@ -87,6 +88,16 @@ public MarquezClient(final URL baseUrl, @Nullable final String apiKey) { this.http = http; } + public List listEvents() { + final String bodyAsJson = http.get(url.toEventUrl()); + return Events.fromJson(bodyAsJson).getValue(); + } + + public List listEvents(String namespaceName) { + final String bodyAsJson = http.get(url.toEventUrl(namespaceName)); + return Events.fromJson(bodyAsJson).getValue(); + } + public Namespace createNamespace( @NonNull String namespaceName, @NonNull NamespaceMeta namespaceMeta) { final String bodyAsJson = http.put(url.toNamespaceUrl(namespaceName), namespaceMeta.toJson()); @@ -528,6 +539,21 @@ static Datasets fromJson(final String json) { } } + @Value + @EqualsAndHashCode(callSuper = false) + static class Events extends ResultsPage { + @Getter List value; + + @JsonCreator + Events(@JsonProperty("events") final List value) { + this.value = ImmutableList.copyOf(value); + } + + static Events fromJson(final String json) { + return Utils.fromJson(json, new TypeReference() {}); + } + } + @Value static class DatasetVersions { @Getter List value; diff --git a/clients/java/src/main/java/marquez/client/MarquezPathV1.java b/clients/java/src/main/java/marquez/client/MarquezPathV1.java index a029e9ac4a..ac8fc34d23 100644 --- a/clients/java/src/main/java/marquez/client/MarquezPathV1.java +++ b/clients/java/src/main/java/marquez/client/MarquezPathV1.java @@ -72,6 +72,14 @@ static String namespacePath(String namespaceName) { return path("/namespaces/%s", namespaceName); } + static String eventPath() { + return path("/events"); + } + + static String eventPath(String namespaceName) { + return path("/events/%s", namespaceName); + } + static String sourcePath(String sourceName) { return path("/sources/%s", sourceName); } diff --git a/clients/java/src/main/java/marquez/client/MarquezUrl.java b/clients/java/src/main/java/marquez/client/MarquezUrl.java index 48197c5485..217c9e2b13 100644 --- a/clients/java/src/main/java/marquez/client/MarquezUrl.java +++ b/clients/java/src/main/java/marquez/client/MarquezUrl.java @@ -12,6 +12,7 @@ import static marquez.client.MarquezPathV1.datasetPath; import static marquez.client.MarquezPathV1.datasetTagPath; import static marquez.client.MarquezPathV1.datasetVersionPath; +import static marquez.client.MarquezPathV1.eventPath; import static marquez.client.MarquezPathV1.fieldTagPath; import static marquez.client.MarquezPathV1.jobPath; import static marquez.client.MarquezPathV1.jobVersionPath; @@ -89,6 +90,14 @@ URL toNamespaceUrl(String namespaceName) { return from(namespacePath(namespaceName)); } + URL toEventUrl() { + return from(eventPath()); + } + + URL toEventUrl(String namespaceName) { + return from(eventPath(namespaceName)); + } + URL toSourceUrl(String sourceName) { return from(sourcePath(sourceName)); } diff --git a/clients/java/src/main/java/marquez/client/models/RawLineageEvent.java b/clients/java/src/main/java/marquez/client/models/RawLineageEvent.java new file mode 100644 index 0000000000..480b14835a --- /dev/null +++ b/clients/java/src/main/java/marquez/client/models/RawLineageEvent.java @@ -0,0 +1,39 @@ +/* + * Copyright 2018-2022 contributors to the Marquez project + * SPDX-License-Identifier: Apache-2.0 + */ + +package marquez.client.models; + +import com.fasterxml.jackson.core.type.TypeReference; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.Map; +import lombok.AllArgsConstructor; +import lombok.Builder; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.NonNull; +import lombok.Setter; +import lombok.ToString; +import marquez.client.Utils; + +@Builder +@AllArgsConstructor +@NoArgsConstructor +@Setter +@Getter +@ToString +public class RawLineageEvent { + private String eventType; + private ZonedDateTime eventTime; + private Map run; + private Map job; + private List inputs; + private List outputs; + private String producer; + + public static RawLineageEvent fromJson(@NonNull final String json) { + return Utils.fromJson(json, new TypeReference() {}); + } +}