Skip to content

Commit

Permalink
add raw event API
Browse files Browse the repository at this point in the history
Signed-off-by: Maciej Obuchowski <[email protected]>
  • Loading branch information
mobuchowski committed Sep 12, 2022
1 parent 5da6ee6 commit dec48e1
Show file tree
Hide file tree
Showing 24 changed files with 525 additions and 1 deletion.
1 change: 1 addition & 0 deletions api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ dependencies {
implementation "io.prometheus:simpleclient_hotspot:${prometheusVersion}"
implementation "io.prometheus:simpleclient_servlet:${prometheusVersion}"
implementation "org.jdbi:jdbi3-core:${jdbi3Version}"
implementation "org.jdbi:jdbi3-jackson2:${jdbi3Version}"
implementation "org.jdbi:jdbi3-postgres:${jdbi3Version}"
implementation "org.jdbi:jdbi3-sqlobject:${jdbi3Version}"
implementation 'com.google.guava:guava:31.1-jre'
Expand Down
13 changes: 12 additions & 1 deletion api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import lombok.Getter;
import lombok.NonNull;
import marquez.api.DatasetResource;
import marquez.api.EventResource;
import marquez.api.JobResource;
import marquez.api.NamespaceResource;
import marquez.api.OpenLineageResource;
Expand All @@ -25,6 +26,7 @@
import marquez.db.DatasetDao;
import marquez.db.DatasetFieldDao;
import marquez.db.DatasetVersionDao;
import marquez.db.EventDao;
import marquez.db.JobContextDao;
import marquez.db.JobDao;
import marquez.db.JobVersionDao;
Expand All @@ -42,6 +44,7 @@
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
import marquez.service.EventService;
import marquez.service.JobService;
import marquez.service.LineageService;
import marquez.service.NamespaceService;
Expand Down Expand Up @@ -71,6 +74,7 @@ public final class MarquezContext {
@Getter private final OpenLineageDao openLineageDao;
@Getter private final LineageDao lineageDao;
@Getter private final SearchDao searchDao;
@Getter private final EventDao eventDao;

@Getter private final List<RunTransitionListener> runTransitionListeners;

Expand All @@ -82,6 +86,7 @@ public final class MarquezContext {
@Getter private final RunService runService;
@Getter private final OpenLineageService openLineageService;
@Getter private final LineageService lineageService;
@Getter private final EventService eventService;

@Getter private final NamespaceResource namespaceResource;
@Getter private final SourceResource sourceResource;
Expand All @@ -90,6 +95,7 @@ public final class MarquezContext {
@Getter private final TagResource tagResource;
@Getter private final OpenLineageResource openLineageResource;
@Getter private final SearchResource searchResource;
@Getter private final EventResource eventResource;

@Getter private final ImmutableList<Object> resources;
@Getter private final JdbiExceptionExceptionMapper jdbiException;
Expand Down Expand Up @@ -119,6 +125,7 @@ private MarquezContext(
this.openLineageDao = jdbi.onDemand(OpenLineageDao.class);
this.lineageDao = jdbi.onDemand(LineageDao.class);
this.searchDao = jdbi.onDemand(SearchDao.class);
this.eventDao = jdbi.onDemand(EventDao.class);
this.runTransitionListeners = runTransitionListeners;

this.namespaceService = new NamespaceService(baseDao);
Expand All @@ -131,6 +138,7 @@ private MarquezContext(
this.tagService.init(tags);
this.openLineageService = new OpenLineageService(baseDao, runService);
this.lineageService = new LineageService(lineageDao, jobDao);
this.eventService = new EventService(baseDao);
this.jdbiException = new JdbiExceptionExceptionMapper();
final ServiceFactory serviceFactory =
ServiceFactory.builder()
Expand All @@ -144,6 +152,7 @@ private MarquezContext(
.lineageService(lineageService)
.datasetFieldService(new DatasetFieldService(baseDao))
.datasetVersionService(new DatasetVersionService(baseDao))
.eventService(eventService)
.build();
this.namespaceResource = new NamespaceResource(serviceFactory);
this.sourceResource = new SourceResource(serviceFactory);
Expand All @@ -152,6 +161,7 @@ private MarquezContext(
this.tagResource = new TagResource(serviceFactory);
this.openLineageResource = new OpenLineageResource(serviceFactory);
this.searchResource = new SearchResource(searchDao);
this.eventResource = new EventResource(serviceFactory);

this.resources =
ImmutableList.of(
Expand All @@ -162,7 +172,8 @@ private MarquezContext(
tagResource,
jdbiException,
openLineageResource,
searchResource);
searchResource,
eventResource);

final MarquezGraphqlServletBuilder servlet = new MarquezGraphqlServletBuilder();
this.graphqlServlet = servlet.getServlet(new GraphqlSchemaBuilder(jdbi));
Expand Down
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/api/BaseResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import marquez.service.DatasetFieldService;
import marquez.service.DatasetService;
import marquez.service.DatasetVersionService;
import marquez.service.EventService;
import marquez.service.JobService;
import marquez.service.LineageService;
import marquez.service.NamespaceService;
Expand All @@ -50,6 +51,7 @@ public class BaseResource {
protected DatasetVersionService datasetVersionService;
protected DatasetFieldService datasetFieldService;
protected LineageService lineageService;
protected EventService eventService;

public BaseResource(ServiceFactory serviceFactory) {
this.serviceFactory = serviceFactory;
Expand All @@ -63,6 +65,7 @@ public BaseResource(ServiceFactory serviceFactory) {
this.datasetVersionService = serviceFactory.getDatasetVersionService();
this.datasetFieldService = serviceFactory.getDatasetFieldService();
this.lineageService = serviceFactory.getLineageService();
this.eventService = serviceFactory.getEventService();
}

void throwIfNotExists(@NonNull NamespaceName namespaceName) {
Expand Down
62 changes: 62 additions & 0 deletions api/src/main/java/marquez/api/EventResource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package marquez.api;

import static javax.ws.rs.core.MediaType.APPLICATION_JSON;

import com.codahale.metrics.annotation.ExceptionMetered;
import com.codahale.metrics.annotation.ResponseMetered;
import com.codahale.metrics.annotation.Timed;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
import javax.validation.constraints.Min;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Response;
import lombok.NonNull;
import lombok.Value;
import marquez.service.ServiceFactory;

@Path("/api/v1")
public class EventResource extends BaseResource {
public EventResource(@NonNull final ServiceFactory serviceFactory) {
super(serviceFactory);
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Path("/events")
@Produces(APPLICATION_JSON)
public Response get(
@QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit,
@QueryParam("offset") @DefaultValue("0") @Min(value = 0) int offset) {
final List<JsonNode> events = eventService.getAll(limit, offset);
return Response.ok(new Events(events)).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Path("/events/{namespace}")
@Produces(APPLICATION_JSON)
public Response getByNamespace(
@PathParam("namespace") String namespace,
@QueryParam("limit") @DefaultValue("100") @Min(value = 0) int limit,
@QueryParam("offset") @DefaultValue("0") @Min(value = 0) int offset) {
final List<JsonNode> event = eventService.getByNamespace(namespace, limit, offset);
return Response.ok(new Events(event)).build();
}

@Value
static class Events {
@NonNull
@JsonProperty("events")
List<JsonNode> value;
}
}
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,7 @@ public interface BaseDao extends SqlObject {

@CreateSqlObject
OpenLineageDao createOpenLineageDao();

@CreateSqlObject
EventDao createEventDao();
}
3 changes: 3 additions & 0 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,9 @@ private Columns() {}
public static final String RUN_UUID = "run_uuid";
public static final String STATE = "state";

/* LINEAGE EVENT ROW COLUMNS */
public static final String EVENT = "event";

public static UUID uuidOrNull(final ResultSet results, final String column) throws SQLException {
if (results.getObject(column) == null) {
return null;
Expand Down
37 changes: 37 additions & 0 deletions api/src/main/java/marquez/db/EventDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package marquez.db;

import com.fasterxml.jackson.databind.JsonNode;
import java.util.List;
import marquez.db.mappers.RawLineageEventMapper;
import org.jdbi.v3.sqlobject.config.RegisterRowMapper;
import org.jdbi.v3.sqlobject.customizer.Bind;
import org.jdbi.v3.sqlobject.statement.SqlQuery;

@RegisterRowMapper(RawLineageEventMapper.class)
public interface EventDao extends BaseDao {

@SqlQuery(
"""
SELECT event
FROM lineage_events
ORDER BY event_time DESC
LIMIT :limit
OFFSET :offset""")
List<JsonNode> getAll(int limit, int offset);

/**
* This is a "hack" to get inputs/outputs namespace from jsonb column: <a
* href="https://github.com/jdbi/jdbi/issues/1510#issuecomment-485423083">explanation</a>
*/
@SqlQuery(
"""
SELECT le.event
FROM lineage_events le, jsonb_array_elements(coalesce(le.event -> 'inputs', '[]'::jsonb) || coalesce(le.event -> 'outputs', '[]'::jsonb)) AS ds
WHERE le.job_namespace = :namespace
OR ds ->> 'namespace' = :namespace
ORDER BY event_time DESC
LIMIT :limit
OFFSET :offset""")
List<JsonNode> getByNamespace(
@Bind("namespace") String namespace, @Bind("limit") int limit, @Bind("offset") int offset);
}
30 changes: 30 additions & 0 deletions api/src/main/java/marquez/db/mappers/RawLineageEventMapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package marquez.db.mappers;

import static marquez.db.Columns.stringOrThrow;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.sql.ResultSet;
import java.sql.SQLException;
import lombok.extern.slf4j.Slf4j;
import marquez.common.Utils;
import marquez.db.Columns;
import org.jdbi.v3.core.mapper.RowMapper;
import org.jdbi.v3.core.statement.StatementContext;

@Slf4j
public class RawLineageEventMapper implements RowMapper<JsonNode> {
@Override
public JsonNode map(ResultSet rs, StatementContext ctx) throws SQLException {
String rawEvent = stringOrThrow(rs, Columns.EVENT);

try {
ObjectMapper mapper = Utils.getMapper();
return mapper.readTree(rawEvent);
} catch (JsonProcessingException e) {
log.error("Failed to process json", e);
}
return null;
}
}
6 changes: 6 additions & 0 deletions api/src/main/java/marquez/service/DelegatingDaos.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import marquez.db.DatasetDao;
import marquez.db.DatasetFieldDao;
import marquez.db.DatasetVersionDao;
import marquez.db.EventDao;
import marquez.db.JobContextDao;
import marquez.db.JobDao;
import marquez.db.JobVersionDao;
Expand Down Expand Up @@ -98,4 +99,9 @@ public static class DelegatingTagDao implements TagDao {
public static class DelegatingLineageDao implements LineageDao {
@Delegate private final LineageDao delegate;
}

@RequiredArgsConstructor
public static class DelegatingEventDao implements EventDao {
@Delegate private final EventDao delegate;
}
}
10 changes: 10 additions & 0 deletions api/src/main/java/marquez/service/EventService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
package marquez.service;

import lombok.NonNull;
import marquez.db.BaseDao;

public class EventService extends DelegatingDaos.DelegatingEventDao {
public EventService(@NonNull BaseDao baseDao) {
super(baseDao.createEventDao());
}
}
1 change: 1 addition & 0 deletions api/src/main/java/marquez/service/ServiceFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,5 @@ public class ServiceFactory {
@NonNull DatasetVersionService datasetVersionService;
@NonNull DatasetFieldService datasetFieldService;
@NonNull LineageService lineageService;
@NonNull EventService eventService;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE INDEX lineage_events_event_time
on lineage_events(event_time DESC);

CREATE INDEX lineage_events_namespace_event_time
on lineage_events(job_namespace, event_time DESC);
Loading

0 comments on commit dec48e1

Please sign in to comment.