diff --git a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql index a3a2b488ae..a544c0352e 100644 --- a/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql +++ b/api/src/main/resources/marquez/db/migration/R__1_Jobs_view_and_rewrite_function.sql @@ -21,13 +21,14 @@ SELECT f.uuid, FROM jobs_fqn f, jobs j WHERE j.uuid = f.uuid -AND j.is_hidden IS FALSE; + AND j.is_hidden IS FALSE; CREATE OR REPLACE FUNCTION rewrite_jobs_fqn_table() RETURNS TRIGGER AS $$ DECLARE job_uuid uuid; + job_updated_at timestamp with time zone; new_symlink_target_uuid uuid; old_symlink_target_uuid uuid; inserted_job jobs_view%rowtype; @@ -53,7 +54,7 @@ BEGIN COALESCE(NEW.parent_job_uuid::char(36), ''), false ON CONFLICT (name, namespace_uuid, parent_job_uuid_string) - DO UPDATE SET updated_at = EXCLUDED.updated_at, + DO UPDATE SET updated_at = now(), type = EXCLUDED.type, description = EXCLUDED.description, current_job_context_uuid = EXCLUDED.current_job_context_uuid, @@ -64,15 +65,48 @@ BEGIN EXCLUDED.symlink_target_uuid), is_hidden = false -- the SELECT statement below will get the OLD symlink_target_uuid in case of update and the NEW - -- version in case of insert - RETURNING uuid, symlink_target_uuid, (SELECT symlink_target_uuid FROM jobs j2 WHERE j2.uuid=jobs.uuid) - INTO job_uuid, new_symlink_target_uuid, old_symlink_target_uuid; + -- version in case of insert + RETURNING uuid, + updated_at, + symlink_target_uuid, + (SELECT symlink_target_uuid FROM jobs j2 WHERE j2.uuid=jobs.uuid) + INTO job_uuid, job_updated_at, new_symlink_target_uuid, old_symlink_target_uuid; - -- update the jobs_fqn table only when inserting a new record (NEW.uuid will equal the job_uuid - -- when inserting a new record) or when the symlink_target_uuid is being updated. - IF NEW.uuid = job_uuid OR - (new_symlink_target_uuid IS DISTINCT FROM old_symlink_target_uuid) THEN - RAISE LOG 'Updating jobs_fqn due to % to job % (%)', TG_OP, NEW.name, job_uuid; + + -- update the jobs_fqn table when inserting a new record + -- (NEW.uuid will equal the job_uuid when inserting a new record) + -- AND if the symlink target is null + -- Avoid constructing the symlinks and aliases, as that is expensive + IF TG_OP='INSERT' + AND NEW.uuid = job_uuid + AND NEW.symlink_target_uuid IS NULL + AND NEW.updated_at=job_updated_at THEN + RAISE DEBUG 'Inserting into jobs_fqn for new job % (%)', NEW.name, job_uuid; + WITH fqn AS (SELECT j.uuid, + CASE + WHEN j.parent_job_uuid IS NULL THEN j.name + ELSE jf.job_fqn || '.' || j.name + END AS name, + j.namespace_uuid, + j.namespace_name, + jf.job_fqn AS parent_job_name, + j.parent_job_uuid + FROM jobs j + LEFT JOIN jobs_fqn jf ON jf.uuid=j.parent_job_uuid + WHERE j.uuid=job_uuid) + INSERT + INTO jobs_fqn + SELECT j.uuid, + jf.namespace_uuid, + jf.namespace_name, + jf.parent_job_name, + ARRAY[jf.name]::text[], + jf.name AS job_fqn + FROM jobs j + INNER JOIN fqn jf ON jf.uuid = j.uuid; + -- or when the symlink_target_uuid is being updated. + ELSIF (new_symlink_target_uuid IS NOT NULL AND new_symlink_target_uuid IS DISTINCT FROM old_symlink_target_uuid) THEN + RAISE DEBUG 'Updating jobs_fqn due to % to job % (%)', TG_OP, NEW.name, job_uuid; WITH RECURSIVE jobs_symlink AS (SELECT j.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid FROM jobs j diff --git a/api/src/test/java/marquez/db/LineageDaoTest.java b/api/src/test/java/marquez/db/LineageDaoTest.java index 07b2b9ee7c..f0a451bbda 100644 --- a/api/src/test/java/marquez/db/LineageDaoTest.java +++ b/api/src/test/java/marquez/db/LineageDaoTest.java @@ -15,6 +15,7 @@ import com.google.common.base.Functions; import java.sql.SQLException; +import java.time.Instant; import java.util.Arrays; import java.util.Collections; import java.util.HashSet; @@ -220,7 +221,7 @@ public void testGetLineageForSymlinkedJob() throws SQLException { .upsertJob( UUID.randomUUID(), JobType.valueOf(writeJob.getJob().getType()), - writeJob.getJob().getCreatedAt(), + Instant.now(), namespaceRow.getUuid(), writeJob.getJob().getNamespaceName(), symlinkTargetJobName, @@ -233,7 +234,7 @@ public void testGetLineageForSymlinkedJob() throws SQLException { .upsertJob( writeJob.getJob().getUuid(), JobType.valueOf(writeJob.getJob().getType()), - writeJob.getJob().getCreatedAt(), + Instant.now(), namespaceRow.getUuid(), writeJob.getJob().getNamespaceName(), writeJob.getJob().getName(),