Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[PERF] Refacto run dao sql query #2685

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 76 additions & 46 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,54 +140,84 @@ LEFT OUTER JOIN (

@SqlQuery(
"""
SELECT r.*, ra.args, f.facets,
j.namespace_name, j.name, jv.version AS job_version,
ri.input_versions, ro.output_versions, df.dataset_facets
FROM runs_view AS r
INNER JOIN jobs_view j ON r.job_uuid=j.uuid
LEFT JOIN LATERAL
(
SELECT rf.run_uuid, JSON_AGG(rf.facet ORDER BY rf.lineage_event_time ASC) AS facets
FROM run_facets_view rf
WHERE rf.run_uuid=r.uuid
GROUP BY rf.run_uuid
) AS f ON r.uuid=f.run_uuid
LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid
LEFT OUTER JOIN job_versions jv ON jv.uuid=r.job_version_uuid
LEFT OUTER JOIN (
SELECT im.run_uuid, JSON_AGG(json_build_object('namespace', dv.namespace_name,
'name', dv.dataset_name,
'version', dv.version,
'dataset_version_uuid', uuid
)) AS input_versions
FROM runs_input_mapping im
INNER JOIN dataset_versions dv on im.dataset_version_uuid = dv.uuid
GROUP BY im.run_uuid
) ri ON ri.run_uuid=r.uuid
LEFT OUTER JOIN (
SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
'name', dataset_name,
'version', version,
'dataset_version_uuid', uuid
)) AS output_versions
FROM dataset_versions
WITH filtered_jobs AS (
SELECT
jv.uuid,
jv.namespace_name,
jv.name
FROM jobs_view jv
WHERE jv.namespace_name=:namespace AND (jv.name=:jobName OR :jobName = ANY(jv.aliases))
),
run_facets_agg AS (
SELECT
run_uuid,
JSON_AGG(facet ORDER BY lineage_event_time ASC) AS facets
FROM run_facets_view
-- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters
WHERE
run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs))
GROUP BY run_uuid
) ro ON ro.run_uuid=r.uuid
LEFT OUTER JOIN (
),
input_versions_agg AS (
SELECT
im.run_uuid,
JSON_AGG(json_build_object('namespace', dv.namespace_name,
'name', dv.dataset_name,
'version', dv.version,
'dataset_version_uuid', dv.uuid
)) AS input_versions
FROM runs_input_mapping im
INNER JOIN dataset_versions dv ON im.dataset_version_uuid = dv.uuid
-- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters
WHERE
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this where .. in (SELECT ...) necessary to gain performance boost?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my opinion yes because with this condition we only JSON_AGG (which is the bottleneck here) on the uuid that matters and not on the whole table.

For a given namespace and job name and a db.t4g.medium (vCPU: 2, RAM: 4 GB) machine:

The old query takes: 1 minute 14 seconds
The new query takes: 1.919 seconds
The new query without filter: More than 10 minutes (probably more, i stop it at ten minutes)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Could you then add some comment describing that it's done for this purpose.

It may be tempting for anyone in future to refactor this code and get rid of this. In such case, our tests will not notice the performance degradation...

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done ! ;)

im.run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs))
GROUP BY im.run_uuid
),
output_versions_agg AS (
SELECT
dv.run_uuid,
JSON_AGG(json_build_object('namespace', namespace_name,
'name', dataset_name,
'version', version,
'dataset_version_uuid', uuid
)) AS output_versions
FROM dataset_versions dv
-- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters
WHERE dv.run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs))
GROUP BY dv.run_uuid
),
dataset_facets_agg AS (
SELECT
run_uuid,
JSON_AGG(json_build_object(
'dataset_version_uuid', dataset_version_uuid,
'name', name,
'type', type,
'facet', facet
) ORDER BY created_at ASC) as dataset_facets
FROM dataset_facets_view
-- This filter here is used for performance purpose: we only aggregate the json of run_uuid that matters
WHERE run_uuid IN (SELECT uuid FROM runs_view WHERE job_uuid IN (SELECT uuid FROM filtered_jobs))
AND (type ILIKE 'output' OR type ILIKE 'input')
GROUP BY run_uuid
)
SELECT
run_uuid,
JSON_AGG(json_build_object(
'dataset_version_uuid', dataset_version_uuid,
'name', name,
'type', type,
'facet', facet
) ORDER BY created_at ASC) as dataset_facets
FROM dataset_facets_view
WHERE (type ILIKE 'output' OR type ILIKE 'input')
GROUP BY run_uuid
) AS df ON r.uuid = df.run_uuid
WHERE j.namespace_name=:namespace AND (j.name=:jobName OR :jobName = ANY(j.aliases))
ORDER BY STARTED_AT DESC NULLS LAST
r.*,
ra.args,
f.facets,
jv.version AS job_version,
ri.input_versions,
ro.output_versions,
df.dataset_facets
FROM runs_view r
INNER JOIN filtered_jobs fj ON r.job_uuid = fj.uuid
LEFT JOIN run_facets_agg f ON r.uuid = f.run_uuid
LEFT JOIN run_args ra ON ra.uuid = r.run_args_uuid
LEFT JOIN job_versions jv ON jv.uuid = r.job_version_uuid
LEFT JOIN input_versions_agg ri ON r.uuid = ri.run_uuid
LEFT JOIN output_versions_agg ro ON r.uuid = ro.run_uuid
LEFT JOIN dataset_facets_agg df ON r.uuid = df.run_uuid
ORDER BY r.started_at DESC NULLS LAST
LIMIT :limit OFFSET :offset
""")
List<Run> findAll(String namespace, String jobName, int limit, int offset);
Expand Down
Loading