Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update lineage query to only look at jobs with inputs or outputs #2068

Merged
merged 2 commits into from
Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 33 additions & 29 deletions api/src/main/java/marquez/db/LineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,35 +37,39 @@ public interface LineageDao {
* @return
*/
@SqlQuery(
// dataset_ids: all the input and output datasets of the current version of the specified jobs
"WITH RECURSIVE\n"
+ " job_io AS (\n"
+ " SELECT j.uuid AS job_uuid,\n"
+ " ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,\n"
+ " ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs\n"
+ " FROM jobs_view j\n"
+ " LEFT JOIN jobs_view s ON s.symlink_target_uuid=j.uuid\n"
+ " LEFT JOIN job_versions v on COALESCE(j.current_version_uuid, s.current_version_uuid) = v.uuid\n"
+ " LEFT JOIN job_versions_io_mapping io on v.uuid = io.job_version_uuid\n"
+ " GROUP BY j.uuid\n"
+ " ),\n"
+ " lineage(job_uuid, inputs, outputs) AS (\n"
+ " SELECT job_uuid, inputs, outputs, 0 AS depth\n"
+ " FROM job_io\n"
+ " WHERE job_uuid IN (<jobIds>)\n"
+ " UNION\n"
+ " SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1\n"
+ " FROM job_io io,\n"
+ " lineage l\n"
+ " WHERE io.job_uuid != l.job_uuid AND\n"
+ " array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)\n"
+ " AND depth < :depth"
+ " )\n"
+ "SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context\n"
+ "FROM lineage l2\n"
+ "INNER JOIN jobs_view s ON s.uuid=l2.job_uuid\n"
+ "INNER JOIN jobs_view j ON j.uuid=COALESCE(s.symlink_target_uuid, s.uuid)\n"
+ "LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid")
"""
WITH RECURSIVE
job_io AS (
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='INPUT') AS inputs,
ARRAY_AGG(DISTINCT io.dataset_uuid) FILTER (WHERE io_type='OUTPUT') AS outputs
FROM job_versions_io_mapping io
INNER JOIN job_versions v ON io.job_version_uuid=v.uuid
INNER JOIN jobs_view j on j.current_version_uuid = v.uuid
LEFT JOIN jobs_view s ON s.uuid=j.symlink_target_uuid
WHERE s.current_version_uuid IS NULL
GROUP BY COALESCE(j.symlink_target_uuid, j.uuid)
),
lineage(job_uuid, inputs, outputs) AS (
SELECT COALESCE(j.symlink_target_uuid, j.uuid) AS job_uuid,
COALESCE(inputs, Array[]::uuid[]) AS inputs,
COALESCE(outputs, Array[]::uuid[]) AS outputs,
0 AS depth
FROM jobs_view j
LEFT JOIN job_io io ON io.job_uuid=j.uuid OR j.symlink_target_uuid=io.job_uuid
WHERE j.uuid IN (<jobIds>)
UNION
SELECT io.job_uuid, io.inputs, io.outputs, l.depth + 1
FROM job_io io,
lineage l
WHERE io.job_uuid != l.job_uuid AND
array_cat(io.inputs, io.outputs) && array_cat(l.inputs, l.outputs)
AND depth < :depth)
SELECT DISTINCT ON (j.uuid) j.*, inputs AS input_uuids, outputs AS output_uuids, jc.context
FROM lineage l2
INNER JOIN jobs_view j ON j.uuid=l2.job_uuid
LEFT JOIN job_contexts jc on jc.uuid = j.current_job_context_uuid;
""")
Set<JobData> getLineage(@BindList Set<UUID> jobIds, int depth);

@SqlQuery(
Expand Down
32 changes: 32 additions & 0 deletions api/src/test/java/marquez/db/LineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,38 @@ public void testGetLineageForSymlinkedJob() throws SQLException {
.map(JobData::getUuid)
.collect(Collectors.toSet());
assertThat(lineageForOriginalJob).isEqualTo(jobIds);

UpdateLineageRow updatedTargetJob =
LineageTestUtils.createLineageRow(
openLineageDao,
symlinkTargetJobName,
"COMPLETE",
jobFacet,
Arrays.asList(),
Arrays.asList(
new Dataset(
NAMESPACE,
"a_new_dataset",
newDatasetFacet(new SchemaField("firstname", "string", "the first name")))));
assertThat(updatedTargetJob.getJob().getUuid()).isEqualTo(targetJob.getUuid());

// get lineage for original job - the old datasets/jobs should no longer be present
assertThat(
lineageDao
.getLineage(new HashSet<>(Arrays.asList(writeJob.getJob().getUuid())), 2)
.stream()
.map(JobData::getUuid)
.collect(Collectors.toSet()))
.hasSize(1)
.containsExactlyInAnyOrder(targetJob.getUuid());

// fetching lineage for target job should yield the same results
assertThat(
lineageDao.getLineage(new HashSet<>(Arrays.asList(targetJob.getUuid())), 2).stream()
.map(JobData::getUuid)
.collect(Collectors.toSet()))
.hasSize(1)
.containsExactlyInAnyOrder(targetJob.getUuid());
}

@Test
Expand Down