Skip to content

Commit

Permalink
Update insert job function to avoid joining on symlinks for jobs that…
Browse files Browse the repository at this point in the history
… have no symlinks

Signed-off-by: Michael Collado <[email protected]>
  • Loading branch information
collado-mike committed Sep 27, 2022
1 parent 5431dab commit 1fdd728
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,14 @@ SELECT f.uuid,
FROM jobs_fqn f,
jobs j
WHERE j.uuid = f.uuid
AND j.is_hidden IS FALSE;
AND j.is_hidden IS FALSE;


CREATE OR REPLACE FUNCTION rewrite_jobs_fqn_table() RETURNS TRIGGER AS
$$
DECLARE
job_uuid uuid;
job_updated_at timestamp with time zone;
new_symlink_target_uuid uuid;
old_symlink_target_uuid uuid;
inserted_job jobs_view%rowtype;
Expand All @@ -53,7 +54,7 @@ BEGIN
COALESCE(NEW.parent_job_uuid::char(36), ''),
false
ON CONFLICT (name, namespace_uuid, parent_job_uuid_string)
DO UPDATE SET updated_at = EXCLUDED.updated_at,
DO UPDATE SET updated_at = now(),
type = EXCLUDED.type,
description = EXCLUDED.description,
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
Expand All @@ -64,15 +65,48 @@ BEGIN
EXCLUDED.symlink_target_uuid),
is_hidden = false
-- the SELECT statement below will get the OLD symlink_target_uuid in case of update and the NEW
-- version in case of insert
RETURNING uuid, symlink_target_uuid, (SELECT symlink_target_uuid FROM jobs j2 WHERE j2.uuid=jobs.uuid)
INTO job_uuid, new_symlink_target_uuid, old_symlink_target_uuid;
-- version in case of insert
RETURNING uuid,
updated_at,
symlink_target_uuid,
(SELECT symlink_target_uuid FROM jobs j2 WHERE j2.uuid=jobs.uuid)
INTO job_uuid, job_updated_at, new_symlink_target_uuid, old_symlink_target_uuid;

-- update the jobs_fqn table only when inserting a new record (NEW.uuid will equal the job_uuid
-- when inserting a new record) or when the symlink_target_uuid is being updated.
IF NEW.uuid = job_uuid OR
(new_symlink_target_uuid IS DISTINCT FROM old_symlink_target_uuid) THEN
RAISE LOG 'Updating jobs_fqn due to % to job % (%)', TG_OP, NEW.name, job_uuid;

-- update the jobs_fqn table when inserting a new record
-- (NEW.uuid will equal the job_uuid when inserting a new record)
-- AND if the symlink target is null
-- Avoid constructing the symlinks and aliases, as that is expensive
IF TG_OP='INSERT'
AND NEW.uuid = job_uuid
AND NEW.symlink_target_uuid IS NULL
AND NEW.updated_at=job_updated_at THEN
RAISE DEBUG 'Inserting into jobs_fqn for new job % (%)', NEW.name, job_uuid;
WITH fqn AS (SELECT j.uuid,
CASE
WHEN j.parent_job_uuid IS NULL THEN j.name
ELSE jf.job_fqn || '.' || j.name
END AS name,
j.namespace_uuid,
j.namespace_name,
jf.job_fqn AS parent_job_name,
j.parent_job_uuid
FROM jobs j
LEFT JOIN jobs_fqn jf ON jf.uuid=j.parent_job_uuid
WHERE j.uuid=job_uuid)
INSERT
INTO jobs_fqn
SELECT j.uuid,
jf.namespace_uuid,
jf.namespace_name,
jf.parent_job_name,
ARRAY[jf.name]::text[],
jf.name AS job_fqn
FROM jobs j
INNER JOIN fqn jf ON jf.uuid = j.uuid;
-- or when the symlink_target_uuid is being updated.
ELSIF (new_symlink_target_uuid IS NOT NULL AND new_symlink_target_uuid IS DISTINCT FROM old_symlink_target_uuid) THEN
RAISE DEBUG 'Updating jobs_fqn due to % to job % (%)', TG_OP, NEW.name, job_uuid;
WITH RECURSIVE
jobs_symlink AS (SELECT j.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs j
Expand Down
5 changes: 3 additions & 2 deletions api/src/test/java/marquez/db/LineageDaoTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.google.common.base.Functions;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
Expand Down Expand Up @@ -220,7 +221,7 @@ public void testGetLineageForSymlinkedJob() throws SQLException {
.upsertJob(
UUID.randomUUID(),
JobType.valueOf(writeJob.getJob().getType()),
writeJob.getJob().getCreatedAt(),
Instant.now(),
namespaceRow.getUuid(),
writeJob.getJob().getNamespaceName(),
symlinkTargetJobName,
Expand All @@ -233,7 +234,7 @@ public void testGetLineageForSymlinkedJob() throws SQLException {
.upsertJob(
writeJob.getJob().getUuid(),
JobType.valueOf(writeJob.getJob().getType()),
writeJob.getJob().getCreatedAt(),
Instant.now(),
namespaceRow.getUuid(),
writeJob.getJob().getNamespaceName(),
writeJob.getJob().getName(),
Expand Down

0 comments on commit 1fdd728

Please sign in to comment.