Skip to content

Commit

Permalink
Remove job context.
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dardzinski <[email protected]>

Add call to API in frontend.
Add tests.

Signed-off-by: Jakub Dardzinski <[email protected]>

Update javadocs.

Signed-off-by: Jakub Dardzinski <[email protected]>

Apply spotless fixes.

Signed-off-by: Jakub Dardzinski <[email protected]>

Remove FacetResource.
Split react functions to each component.

Signed-off-by: Jakub Dardzinski <[email protected]>

Add more tests for JobResource.
Update OpenAPI spec.

Signed-off-by: Jakub Dardzinski <[email protected]>
  • Loading branch information
JDarDagran committed Feb 19, 2023
1 parent 0e434d5 commit 6e51401
Show file tree
Hide file tree
Showing 81 changed files with 672 additions and 484 deletions.
3 changes: 0 additions & 3 deletions api/src/main/java/marquez/MarquezContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import marquez.db.DatasetDao;
import marquez.db.DatasetFieldDao;
import marquez.db.DatasetVersionDao;
import marquez.db.JobContextDao;
import marquez.db.JobDao;
import marquez.db.JobVersionDao;
import marquez.db.LineageDao;
Expand Down Expand Up @@ -67,7 +66,6 @@ public final class MarquezContext {
@Getter private final DatasetVersionDao datasetVersionDao;
@Getter private final JobDao jobDao;
@Getter private final JobVersionDao jobVersionDao;
@Getter private final JobContextDao jobContextDao;
@Getter private final RunDao runDao;
@Getter private final RunArgsDao runArgsDao;
@Getter private final RunStateDao runStateDao;
Expand Down Expand Up @@ -116,7 +114,6 @@ private MarquezContext(
this.datasetVersionDao = jdbi.onDemand(DatasetVersionDao.class);
this.jobDao = jdbi.onDemand(JobDao.class);
this.jobVersionDao = jdbi.onDemand(JobVersionDao.class);
this.jobContextDao = jdbi.onDemand(JobContextDao.class);
this.runDao = jdbi.onDemand(RunDao.class);
this.runArgsDao = jdbi.onDemand(RunArgsDao.class);
this.runStateDao = jdbi.onDemand(RunStateDao.class);
Expand Down
28 changes: 28 additions & 0 deletions api/src/main/java/marquez/api/JobResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import java.util.List;
import javax.validation.Valid;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
Expand All @@ -37,13 +38,15 @@
import marquez.api.exceptions.JobVersionNotFoundException;
import marquez.api.models.JobVersion;
import marquez.api.models.ResultsPage;
import marquez.common.models.FacetType;
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.common.models.Version;
import marquez.db.JobVersionDao;
import marquez.db.models.JobRow;
import marquez.service.ServiceFactory;
import marquez.service.models.Facets;
import marquez.service.models.Job;
import marquez.service.models.JobMeta;
import marquez.service.models.Run;
Expand Down Expand Up @@ -236,6 +239,31 @@ public RunResource runResourceRoot(@PathParam("id") RunId runId) {
return new RunResource(runId, runService);
}

@Timed
@ResponseMetered
@ExceptionMetered
@GET
@Produces(APPLICATION_JSON)
@Path("/jobs/runs/{id}/facets")
public Response getRunFacets(
@PathParam("id") RunId runId, @QueryParam("type") @NotNull FacetType type) {
throwIfNotExists(runId);
Facets facets;
switch (type) {
case JOB:
facets = runService.findJobFacetsByRunUuid(runId.getValue());
break;
case RUN:
facets = runService.findRunFacetsByRunUuid(runId.getValue());
break;
default:
facets = null;
break;
}

return Response.ok(facets).build();
}

@Value
static class JobVersions {
@NonNull
Expand Down
4 changes: 0 additions & 4 deletions api/src/main/java/marquez/api/models/JobVersion.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package marquez.api.models;

import com.google.common.collect.ImmutableMap;
import java.net.URL;
import java.time.Instant;
import java.util.List;
Expand Down Expand Up @@ -35,7 +34,6 @@ public final class JobVersion {
@Getter private final Version version;
@Getter private final NamespaceName namespace;
@Nullable private final URL location;
@Getter private final ImmutableMap<String, String> context;
@Getter private final List<DatasetId> inputs;
@Getter private final List<DatasetId> outputs;
@Getter @Nullable private final Run latestRun;
Expand All @@ -46,7 +44,6 @@ public JobVersion(
@NonNull final Instant createdAt,
@NonNull final Version version,
@Nullable final URL location,
@Nullable final ImmutableMap<String, String> context,
List<DatasetId> inputs,
List<DatasetId> outputs,
@Nullable Run latestRun) {
Expand All @@ -56,7 +53,6 @@ public JobVersion(
this.version = version;
this.namespace = id.getNamespace();
this.location = location;
this.context = (context == null) ? ImmutableMap.of() : context;
this.inputs = inputs;
this.outputs = outputs;
this.latestRun = latestRun;
Expand Down
6 changes: 1 addition & 5 deletions api/src/main/java/marquez/common/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.hash.Hashing;
import io.dropwizard.jackson.Jackson;
Expand Down Expand Up @@ -236,7 +235,6 @@ public static Instant toInstant(@Nullable final String asIso) {
* @param jobName The name of the job.
* @param jobInputIds The input dataset IDs for the job.
* @param jobOutputIds The output dataset IDs for the job.
* @param jobContext The context of the job.
* @param jobLocation The source code location for the job.
* @return A {@link Version} object based on the specified job meta.
*/
Expand All @@ -245,7 +243,6 @@ public static Version newJobVersionFor(
@NonNull final JobName jobName,
@NonNull final ImmutableSet<DatasetId> jobInputIds,
@NonNull final ImmutableSet<DatasetId> jobOutputIds,
@NonNull final ImmutableMap<String, String> jobContext,
@Nullable final String jobLocation) {
final byte[] bytes =
VERSION_JOINER
Expand All @@ -268,8 +265,7 @@ public static Version newJobVersionFor(
jobOutputId.getNamespace().getValue(),
jobOutputId.getName().getValue()))
.collect(joining(VERSION_DELIM)),
jobLocation,
KV_JOINER.join(jobContext))
jobLocation)
.getBytes(UTF_8);
return Version.of(UUID.nameUUIDFromBytes(bytes));
}
Expand Down
5 changes: 2 additions & 3 deletions api/src/main/java/marquez/common/models/DatasetId.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import lombok.EqualsAndHashCode;
import lombok.NonNull;
Expand All @@ -16,8 +15,8 @@

/**
* ID for {@code Dataset}. The class implements {@link Comparable} to ensure job versions generated
* with {@link Utils#newJobVersionFor(NamespaceName, JobName, ImmutableSet, ImmutableSet,
* ImmutableMap, String)} are consistent as jobs may contain inputs and outputs out of order.
* with {@link Utils#newJobVersionFor(NamespaceName, JobName, ImmutableSet, ImmutableSet, String)}
* are consistent as jobs may contain inputs and outputs out of order.
*/
@EqualsAndHashCode
@ToString
Expand Down
11 changes: 11 additions & 0 deletions api/src/main/java/marquez/common/models/FacetType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.common.models;

public enum FacetType {
RUN,
JOB;
}
3 changes: 0 additions & 3 deletions api/src/main/java/marquez/db/BaseDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,6 @@ public interface BaseDao extends SqlObject {
@CreateSqlObject
DatasetVersionDao createDatasetVersionDao();

@CreateSqlObject
JobContextDao createJobContextDao();

@CreateSqlObject
JobDao createJobDao();

Expand Down
4 changes: 0 additions & 4 deletions api/src/main/java/marquez/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,13 +103,9 @@ private Columns() {}

/* JOB VERSION ROW COLUMNS */
public static final String JOB_UUID = "job_uuid";
public static final String JOB_CONTEXT_UUID = "job_context_uuid";
public static final String LOCATION = "location";
public static final String LATEST_RUN_UUID = "latest_run_uuid";

/* JOB CONTEXT ROW COLUMNS */
public static final String CONTEXT = "context";

/* RUN ROW COLUMNS */
public static final String EXTERNAL_ID = "external_id";
public static final String RUN_ARGS_UUID = "run_args_uuid";
Expand Down
39 changes: 0 additions & 39 deletions api/src/main/java/marquez/db/JobContextDao.java

This file was deleted.

22 changes: 4 additions & 18 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,13 @@
import java.util.Set;
import java.util.UUID;
import java.util.stream.Collectors;
import marquez.common.Utils;
import marquez.common.models.DatasetId;
import marquez.common.models.DatasetName;
import marquez.common.models.JobName;
import marquez.common.models.JobType;
import marquez.common.models.NamespaceName;
import marquez.db.mappers.JobMapper;
import marquez.db.mappers.JobRowMapper;
import marquez.db.models.JobContextRow;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
import marquez.service.models.Job;
Expand Down Expand Up @@ -58,10 +56,9 @@ SELECT EXISTS (

@SqlQuery(
"""
SELECT j.*, jc.context, f.facets
SELECT j.*, f.facets
FROM jobs_view j
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(e.facet) AS facets
FROM (
Expand Down Expand Up @@ -130,10 +127,9 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {

@SqlQuery(
"""
SELECT j.*, jc.context, f.facets
SELECT j.*, f.facets
FROM jobs_view AS j
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(e.facet) AS facets
FROM (
Expand Down Expand Up @@ -208,13 +204,6 @@ default JobRow upsertJobMeta(
createNamespaceDao()
.upsertNamespaceRow(
UUID.randomUUID(), createdAt, namespaceName.getValue(), DEFAULT_NAMESPACE_OWNER);
JobContextRow contextRow =
createJobContextDao()
.upsert(
UUID.randomUUID(),
createdAt,
Utils.toJson(jobMeta.getContext()),
Utils.checksumFor(jobMeta.getContext()));
return upsertJob(
UUID.randomUUID(),
jobMeta.getType(),
Expand All @@ -223,7 +212,6 @@ default JobRow upsertJobMeta(
namespace.getName(),
jobName.getValue(),
jobMeta.getDescription().orElse(null),
contextRow.getUuid(),
toUrlString(jobMeta.getLocation().orElse(null)),
symlinkTargetUuid,
toJson(jobMeta.getInputs(), mapper));
Expand Down Expand Up @@ -276,7 +264,7 @@ INSERT INTO jobs_view AS j (
:namespaceName,
:name,
:description,
:jobContextUuid,
null,
:location,
:inputs,
:symlinkTargetId,
Expand All @@ -291,7 +279,6 @@ JobRow upsertJob(
String namespaceName,
String name,
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs);
Expand Down Expand Up @@ -328,7 +315,7 @@ INSERT INTO jobs_view AS j (
:namespaceName,
:name,
:description,
:jobContextUuid,
null,
:location,
:inputs,
:symlinkTargetId
Expand All @@ -344,7 +331,6 @@ JobRow upsertJob(
String namespaceName,
String name,
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs);
Expand Down
Loading

0 comments on commit 6e51401

Please sign in to comment.