From ab4f42d2251275aae57c173f15a0c5e471765570 Mon Sep 17 00:00:00 2001 From: sophiely Date: Thu, 16 Nov 2023 11:17:24 +0100 Subject: [PATCH 1/2] refacto run dao sql query Signed-off-by: sophiely --- api/src/main/java/marquez/db/RunDao.java | 118 ++++++++++++++--------- 1 file changed, 72 insertions(+), 46 deletions(-) diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index 32c39cf4a2..15cdb52349 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -140,54 +140,80 @@ LEFT OUTER JOIN ( @SqlQuery( """ - SELECT r.*, ra.args, f.facets, - j.namespace_name, j.name, jv.version AS job_version, - ri.input_versions, ro.output_versions, df.dataset_facets - FROM runs_view AS r - INNER JOIN jobs_view j ON r.job_uuid=j.uuid - LEFT JOIN LATERAL - ( - SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets - FROM run_facets_view rf - WHERE rf.run_uuid=r.uuid - GROUP BY rf.run_uuid - ) AS f ON r.uuid=f.run_uuid - LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid - LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid - LEFT OUTER JOIN ( - SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name, - 'name', dv.dataset_name, - 'version', dv.version, - 'dataset_version_uuid', uuid - )) AS input_versions - FROM runs_input_mapping im - INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid - GROUP BY im.run_uuid - ) ri ON ri.run_uuid=r.uuid - LEFT OUTER JOIN ( - SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name, - 'name', dataset_name, - 'version', version, - 'dataset_version_uuid', uuid - )) AS output_versions - FROM dataset_versions + WITH filtered_jobs AS ( + SELECT + jv.uuid, + jv.namespace_name, + jv.name + FROM jobs_view jv + WHERE jv.namespace_name=:namespace AND (jv.name=:jobName OR :jobName = ANY(jv.aliases)) + ), + run_facets_agg AS ( + SELECT + run_uuid, + JSON_AGG(facet ORDER BY lineage_event_time ASC) AS facets + FROM run_facets_view + WHERE + run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs)) GROUP BY run_uuid - ) ro ON ro.run_uuid=r.uuid - LEFT OUTER JOIN ( + ), + input_versions_agg AS ( + SELECT + im.run_uuid, + JSON_AGG(json_build_object('namespace', dv.namespace_name, + 'name', dv.dataset_name, + 'version', dv.version, + 'dataset_version_uuid', dv.uuid + )) AS input_versions + FROM runs_input_mapping im + INNER JOIN dataset_versions dv ON im.dataset_version_uuid = dv.uuid + WHERE + im.run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs)) + GROUP BY im.run_uuid + ), + output_versions_agg AS ( + SELECT + dv.run_uuid, + JSON_AGG(json_build_object('namespace', namespace_name, + 'name', dataset_name, + 'version', version, + 'dataset_version_uuid', uuid + )) AS output_versions + FROM dataset_versions dv + WHERE dv.run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs)) + GROUP BY dv.run_uuid + ), + dataset_facets_agg AS ( + SELECT + run_uuid, + JSON_AGG(json_build_object( + 'dataset_version_uuid', dataset_version_uuid, + 'name', name, + 'type', type, + 'facet', facet + ) ORDER BY created_at ASC) as dataset_facets + FROM dataset_facets_view + WHERE run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs)) + AND (type ILIKE 'output' OR type ILIKE 'input') + GROUP BY run_uuid + ) SELECT - run_uuid, - JSON_AGG(json_build_object( - 'dataset_version_uuid', dataset_version_uuid, - 'name', name, - 'type', type, - 'facet', facet - ) ORDER BY created_at ASC) as dataset_facets - FROM dataset_facets_view - WHERE (type ILIKE 'output' OR type ILIKE 'input') - GROUP BY run_uuid - ) AS df ON r.uuid = df.run_uuid - WHERE j.namespace_name=:namespace AND (j.name=:jobName OR :jobName = ANY(j.aliases)) - ORDER BY STARTED_AT DESC NULLS LAST + r.*, + ra.args, + f.facets, + jv.version AS job_version, + ri.input_versions, + ro.output_versions, + df.dataset_facets + FROM runs_view r + INNER JOIN filtered_jobs fj ON r.job_uuid = fj.uuid + LEFT JOIN run_facets_agg f ON r.uuid = f.run_uuid + LEFT JOIN run_args ra ON ra.uuid = r.run_args_uuid + LEFT JOIN job_versions jv ON jv.uuid = r.job_version_uuid + LEFT JOIN input_versions_agg ri ON r.uuid = ri.run_uuid + LEFT JOIN output_versions_agg ro ON r.uuid = ro.run_uuid + LEFT JOIN dataset_facets_agg df ON r.uuid = df.run_uuid + ORDER BY r.started_at DESC NULLS LAST LIMIT :limit OFFSET :offset """) List findAll(String namespace, String jobName, int limit, int offset); From dc74376d1bc045606af2d78effc771d9226ccc7d Mon Sep 17 00:00:00 2001 From: sophiely Date: Fri, 17 Nov 2023 16:56:22 +0100 Subject: [PATCH 2/2] add comments on filters Signed-off-by: sophiely --- api/src/main/java/marquez/db/RunDao.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/api/src/main/java/marquez/db/RunDao.java b/api/src/main/java/marquez/db/RunDao.java index 15cdb52349..1ba6e2a79d 100644 --- a/api/src/main/java/marquez/db/RunDao.java +++ b/api/src/main/java/marquez/db/RunDao.java @@ -153,6 +153,7 @@ run_facets_agg AS ( run_uuid, JSON_AGG(facet ORDER BY lineage_event_time ASC) AS facets FROM run_facets_view + -- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters WHERE run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs)) GROUP BY run_uuid @@ -167,6 +168,7 @@ input_versions_agg AS ( )) AS input_versions FROM runs_input_mapping im INNER JOIN dataset_versions dv ON im.dataset_version_uuid = dv.uuid + -- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters WHERE im.run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs)) GROUP BY im.run_uuid @@ -180,6 +182,7 @@ output_versions_agg AS ( 'dataset_version_uuid', uuid )) AS output_versions FROM dataset_versions dv + -- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters WHERE dv.run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs)) GROUP BY dv.run_uuid ), @@ -193,6 +196,7 @@ dataset_facets_agg AS ( 'facet', facet ) ORDER BY created_at ASC) as dataset_facets FROM dataset_facets_view + -- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters WHERE run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs)) AND (type ILIKE 'output' OR type ILIKE 'input') GROUP BY run_uuid