Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add job tagging to API #2774

Merged
merged 25 commits into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
0d44966
add job tagging to API
davidsharp7 Mar 17, 2024
d114c3f
fix various tests that were failing
davidsharp7 Mar 19, 2024
fd76893
Merge branch 'MarquezProject:main' into api/add_job_tags
davidsharp7 Mar 19, 2024
655bf3a
Merge branch 'MarquezProject:main' into api/add_job_tags
davidsharp7 Mar 21, 2024
02e7484
Merge branch 'MarquezProject:main' into api/add_job_tags
davidsharp7 Apr 5, 2024
58f40cf
Merge branch 'main' into api/add_job_tags
davidsharp7 Apr 17, 2024
1d3f819
fix merge commit
davidsharp7 Apr 17, 2024
057bf14
fix liniting
davidsharp7 Apr 17, 2024
4d1550a
Merge branch 'MarquezProject:main' into api/add_job_tags
davidsharp7 Apr 17, 2024
da04b41
Merge branch 'MarquezProject:main' into api/add_job_tags
davidsharp7 Apr 18, 2024
3f429f9
Merge branch 'main' into api/add_job_tags
davidsharp7 Apr 23, 2024
911549b
update based on pr feedback
davidsharp7 Apr 25, 2024
2750d66
Merge branch 'api/add_job_tags' of https://github.com/davidsharp7/mar…
davidsharp7 Apr 25, 2024
2b8ae9a
lint tag test code
davidsharp7 Apr 25, 2024
b73c6b3
lint tag code
davidsharp7 Apr 25, 2024
ca332d6
fix log typo
davidsharp7 Apr 25, 2024
322ed34
fix logging
davidsharp7 Apr 26, 2024
d127354
Merge branch 'MarquezProject:main' into api/add_job_tags
davidsharp7 Apr 26, 2024
a64c2f6
Merge branch 'MarquezProject:main' into api/add_job_tags
davidsharp7 Apr 28, 2024
4517062
Merge branch 'MarquezProject:main' into api/add_job_tags
davidsharp7 Apr 30, 2024
f726170
Merge branch 'MarquezProject:main' into api/add_job_tags
davidsharp7 May 2, 2024
4065cb5
Merge branch 'MarquezProject:main' into api/add_job_tags
davidsharp7 May 7, 2024
4c18168
update based on PR feedback.
davidsharp7 May 7, 2024
10703c1
correct db field error
davidsharp7 May 7, 2024
bf6621e
Merge branch 'main' into api/add_job_tags
wslulciuc May 7, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions api/src/main/java/marquez/api/JobResource.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import marquez.common.models.JobName;
import marquez.common.models.NamespaceName;
import marquez.common.models.RunId;
import marquez.common.models.TagName;
import marquez.common.models.Version;
import marquez.db.JobFacetsDao;
import marquez.db.JobVersionDao;
Expand Down Expand Up @@ -273,6 +274,47 @@ public Response getRunFacets(
return Response.ok(facets).build();
}

@Timed
@ResponseMetered
@ExceptionMetered
@POST
@Path("/namespaces/{namespace}/jobs/{job}/tags/{tag}")
@Produces(APPLICATION_JSON)
public Response updatetag(
@PathParam("namespace") NamespaceName namespaceName,
@PathParam("job") JobName jobName,
@PathParam("tag") TagName tagName) {
throwIfNotExists(namespaceName);
throwIfNotExists(namespaceName, jobName);

jobService.updateJobTags(namespaceName.getValue(), jobName.getValue(), tagName.getValue());
Job job =
jobService
.findJobByName(namespaceName.getValue(), jobName.getValue())
.orElseThrow(() -> new JobNotFoundException(jobName));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this should be quite unlikely given updateJobTags would presumably throw if the job didn't exist? A case of, we get an optional back so we should do something beyond an unqualified get() on it?

Copy link
Member Author

@davidsharp7 davidsharp7 Apr 25, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah understand what you mean -it will fall over before it gets to this point so why bother? Mainly as that seems to be the de-facto pattern for a lot of the code i.e

execute something -> retrieve object (job, dataset etc) else throw an error.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for following our code style!

return Response.ok(job).build();
}

@ResponseMetered
@ExceptionMetered
@DELETE
@Path("/namespaces/{namespace}/jobs/{job}/tags/{tag}")
@Produces(APPLICATION_JSON)
public Response deletetag(
@PathParam("namespace") NamespaceName namespaceName,
@PathParam("job") JobName jobName,
@PathParam("tag") TagName tagName) {
throwIfNotExists(namespaceName);
throwIfNotExists(namespaceName, jobName);

jobService.deleteJobTags(namespaceName.getValue(), jobName.getValue(), tagName.getValue());
Job job =
jobService
.findJobByName(namespaceName.getValue(), jobName.getValue())
.orElseThrow(() -> new JobNotFoundException(jobName));
return Response.ok(job).build();
}

@Value
static class JobVersions {
@NonNull
Expand Down
139 changes: 130 additions & 9 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,14 +61,53 @@ SELECT EXISTS (
@SqlQuery(
"""
WITH job_versions_facets AS (
SELECT job_version_uuid, JSON_AGG(facet) as facets
FROM job_facets
GROUP BY job_version_uuid
SELECT
job_version_uuid
, JSON_AGG(facet) as facets
FROM
job_facets
GROUP BY
job_version_uuid
),
job_tags as (
SELECT
j.uuid
, ARRAY_AGG(t.name) as tags
FROM
jobs j
INNER JOIN
jobs_tag_mapping jtm
ON
jtm.job_uuid = j.uuid
AND
j.simple_name = :jobName
AND
j.namespace_name = :namespaceName
INNER JOIN
tags t
ON
jtm.tag_uuid = t.uuid
GROUP BY
j.uuid
)
SELECT j.*, facets
FROM jobs_view j
LEFT OUTER JOIN job_versions_facets f ON j.current_version_uuid = f.job_version_uuid
WHERE j.namespace_name=:namespaceName AND (j.name=:jobName OR :jobName = ANY(j.aliases))
SELECT
j.*
, facets
, jt.tags as tags
FROM
jobs_view j
LEFT OUTER JOIN
job_versions_facets f
ON
j.current_version_uuid = f.job_version_uuid
LEFT OUTER JOIN
job_tags jt
ON
j.uuid = jt.uuid
WHERE
j.namespace_name = :namespaceName
AND
(j.name = :jobName OR :jobName = ANY(j.aliases))
""")
Optional<Job> findJobByName(String namespaceName, String jobName);

Expand Down Expand Up @@ -169,16 +208,38 @@ facets_temp AS (
lineage_event_time ASC
) e
GROUP BY e.run_uuid
)
),
job_tags as (
SELECT
j.uuid
, ARRAY_AGG(t.name) as tags
FROM
jobs j
INNER JOIN
jobs_tag_mapping jtm
ON
jtm.job_uuid = j.uuid
AND
j.namespace_name = :namespaceName
INNER JOIN
tags t
ON
jtm.tag_uuid = t.uuid
GROUP BY
j.uuid
)
SELECT
j.*,
f.facets
f.facets,
COALESCE(jt.tags, ARRAY[]::VARCHAR[]) AS tags
FROM
jobs_view_page AS j
LEFT OUTER JOIN job_versions_temp AS jv
ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN facets_temp AS f
ON f.run_uuid = jv.latest_run_uuid
LEFT OUTER JOIN job_tags jt
ON j.uuid = jt.uuid
ORDER BY
j.name
""")
Expand Down Expand Up @@ -386,4 +447,64 @@ JobRow upsertJob(
String location,
UUID symlinkTargetId,
PGobject inputs);

@SqlUpdate(
"""
WITH new_tag AS (
INSERT INTO tags (uuid, created_at, updated_at, name, description)
SELECT
gen_random_uuid(),
NOW(),
wslulciuc marked this conversation as resolved.
Show resolved Hide resolved
NOW(),
:tagName,
'No Description'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason we would want to hardcode this in the SQL, instead of in our application code to be a bit more flexible?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

description is a nullable column from what I can see, I'm not sure we need to default it at all?

Copy link
Member Author

@davidsharp7 davidsharp7 Apr 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I can leave as null - was just being explicit more than anything but it is inconsistent with dataset/dataset field tagging so makes sense to set to null.

WHERE
NOT EXISTS (SELECT 1 FROM tags WHERE name = :tagName)
RETURNING uuid
),
existing_tag AS (
SELECT uuid FROM tags WHERE name = :tagName
),
job AS (
SELECT
uuid
FROM
jobs
WHERE
simple_name = :jobName
and
namespace_name = :namespaceName
)
INSERT INTO jobs_tag_mapping (job_uuid, tag_uuid, tagged_at)
SELECT
(SELECT uuid FROM job)
, COALESCE((SELECT uuid FROM new_tag), (SELECT uuid FROM existing_tag))
, NOW()
ON CONFLICT DO NOTHING
davidsharp7 marked this conversation as resolved.
Show resolved Hide resolved
;
""")
void updateJobTags(String namespaceName, String jobName, String tagName);

@SqlUpdate(
"""
DELETE FROM jobs_tag_mapping jtm
WHERE EXISTS (
SELECT 1
FROM
jobs j
JOIN
tags t
ON
j.uuid = jtm.job_uuid
AND
t.uuid = jtm.tag_uuid
WHERE
t.name = :tagName
AND
j.simple_name = :jobName
AND
j.namespace_name = :namespaceName
);
""")
void deleteJobTags(String namespaceName, String jobName, String tagName);
}
16 changes: 15 additions & 1 deletion api/src/main/java/marquez/db/mappers/JobMapper.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package marquez.db.mappers;

import static marquez.db.Columns.stringArrayOrThrow;
import static marquez.db.Columns.stringOrNull;
import static marquez.db.Columns.stringOrThrow;
import static marquez.db.Columns.timestampOrThrow;
Expand All @@ -18,9 +19,11 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableList.Builder;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
Expand All @@ -32,6 +35,7 @@
import marquez.common.models.JobName;
import marquez.common.models.JobType;
import marquez.common.models.NamespaceName;
import marquez.common.models.TagName;
import marquez.db.Columns;
import marquez.service.models.Job;
import org.jdbi.v3.core.mapper.RowMapper;
Expand Down Expand Up @@ -68,7 +72,8 @@ public Job map(@NonNull ResultSet results, @NonNull StatementContext context)
null,
facetsOrNull,
uuidOrNull(results, Columns.CURRENT_VERSION_UUID),
getLabels(facetsOrNull));
getLabels(facetsOrNull),
toTags(results, "tags"));
return job;
}

Expand Down Expand Up @@ -106,6 +111,15 @@ private ImmutableList<String> getLabels(ImmutableMap<String, Object> facetsOrNul
return builder.build();
}

public static ImmutableSet<TagName> toTags(@NonNull ResultSet results, String column)
throws SQLException {
if (results.getObject(column) == null) {
return null;
}
List<String> arr = stringArrayOrThrow(results, column);
return arr.stream().map(TagName::of).collect(ImmutableSet.toImmutableSet());
}

private String getJobTypeFacetField(ImmutableMap<String, Object> facetsOrNull, String field) {
return Optional.ofNullable(facetsOrNull.get(JOB_TYPE_FACET_NAME))
.filter(o -> o instanceof Map)
Expand Down
7 changes: 6 additions & 1 deletion api/src/main/java/marquez/service/models/Job.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.net.URL;
import java.time.Instant;
import java.util.Optional;
Expand All @@ -23,6 +24,7 @@
import marquez.common.models.JobName;
import marquez.common.models.JobType;
import marquez.common.models.NamespaceName;
import marquez.common.models.TagName;

@EqualsAndHashCode
@ToString
Expand All @@ -43,6 +45,7 @@ public final class Job {
@Getter private final ImmutableMap<String, Object> facets;
@Nullable private UUID currentVersion;
@Getter @Nullable private ImmutableList<String> labels;
@Getter private final ImmutableSet<TagName> tags;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mark this @Nullable here? Would be consistent with other fields.


public Job(
@NonNull final JobId id,
Expand All @@ -59,7 +62,8 @@ public Job(
@Nullable final Run latestRun,
@Nullable final ImmutableMap<String, Object> facets,
@Nullable UUID currentVersion,
@Nullable ImmutableList<String> labels) {
@Nullable ImmutableList<String> labels,
@Nullable final ImmutableSet<TagName> tags) {
this.id = id;
this.type = type;
this.name = name;
Expand All @@ -76,6 +80,7 @@ public Job(
this.facets = (facets == null) ? ImmutableMap.of() : facets;
this.currentVersion = currentVersion;
this.labels = (labels == null) ? ImmutableList.of() : labels;
this.tags = (tags == null) ? ImmutableSet.of() : tags;
}

public Optional<URL> getLocation() {
Expand Down
6 changes: 5 additions & 1 deletion api/src/main/java/marquez/service/models/JobMeta.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import marquez.common.models.DatasetId;
import marquez.common.models.JobType;
import marquez.common.models.RunId;
import marquez.common.models.TagName;

@EqualsAndHashCode
@ToString
Expand All @@ -26,20 +27,23 @@ public final class JobMeta {
@Nullable private final URL location;
@Nullable private final String description;
@Nullable private final RunId runId;
@Getter private final ImmutableSet<TagName> tags;

public JobMeta(
@NonNull final JobType type,
@NonNull final ImmutableSet<DatasetId> inputs,
@NonNull final ImmutableSet<DatasetId> outputs,
@Nullable final URL location,
@Nullable final String description,
@Nullable final RunId runId) {
@Nullable final RunId runId,
@Nullable final ImmutableSet<TagName> tags) {
this.type = type;
this.inputs = inputs;
this.outputs = outputs;
this.location = location;
this.description = description;
this.runId = runId;
this.tags = tags;
}

public Optional<URL> getLocation() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
/* SPDX-License-Identifier: Apache-2.0 */
CREATE TABLE jobs_tag_mapping (
job_uuid UUID REFERENCES jobs(uuid),
tag_uuid UUID REFERENCES tags(uuid),
tagged_at TIMESTAMP NOT NULL,
wslulciuc marked this conversation as resolved.
Show resolved Hide resolved
PRIMARY KEY (tag_uuid, job_uuid)
);


Loading
Loading