Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix recursive views perf #2043

Merged
merged 7 commits into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.23.0...HEAD)

### Changed
* Updated `jobs_view` to stop computing FQN on reads and to compute on _writes_ instead [@collado-mike](https://github.com/collado-mike)

## [0.23.0](https://github.com/MarquezProject/marquez/compare/0.22.0...0.23.0) - 2022-06-16

### Added
Expand Down
157 changes: 30 additions & 127 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
@RegisterRowMapper(JobRowMapper.class)
@RegisterRowMapper(JobMapper.class)
public interface JobDao extends BaseDao {

@SqlQuery(
"SELECT EXISTS (SELECT 1 FROM jobs_view AS j "
+ "WHERE j.namespace_name= :namespaceName AND "
Expand All @@ -52,18 +53,8 @@ public interface JobDao extends BaseDao {

@SqlQuery(
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
FROM jobs_view j
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
UNION
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, jc.context, f.facets
FROM jobs_view j
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
LEFT OUTER JOIN (
Expand All @@ -78,6 +69,8 @@ SELECT run_uuid, JSON_AGG(e.facets) AS facets
) e
GROUP BY e.run_uuid
) f ON f.run_uuid=jv.latest_run_uuid
WHERE j.namespace_name=:namespaceName AND (j.name=:jobName OR :jobName = ANY(j.aliases))
AND j.symlink_target_uuid IS NULL
""")
Optional<Job> findJobByName(String namespaceName, String jobName);

Expand All @@ -93,38 +86,22 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {

@SqlQuery(
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
FROM jobs_view j
WHERE j.uuid=:jobUuid
UNION
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, n.name AS namespace_name
FROM jobs_view AS j
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
""")
SELECT j.*, n.name AS namespace_name
FROM jobs_view AS j
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
WHERE j.uuid=:jobUuid
""")
Optional<JobRow> findJobByUuidAsRow(UUID jobUuid);

@SqlQuery(
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
FROM jobs_view j
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
UNION
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, n.name AS namespace_name
FROM jobs_view AS j
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
""")
SELECT j.*, n.name AS namespace_name
FROM jobs_view AS j
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
WHERE j.namespace_name=:namespaceName AND
(j.name=:jobName OR :jobName = ANY(j.aliases))
AND j.symlink_target_uuid IS NULL
""")
Optional<JobRow> findJobByNameAsRow(String namespaceName, String jobName);

@SqlQuery(
Expand All @@ -150,11 +127,11 @@ WITH RECURSIVE job_ids AS (
+ "LIMIT :limit OFFSET :offset")
List<Job> findAll(String namespaceName, int limit, int offset);

@SqlQuery("SELECT count(*) FROM jobs AS j WHERE symlink_target_uuid IS NULL")
@SqlQuery("SELECT count(*) FROM jobs_view AS j WHERE symlink_target_uuid IS NULL")
int count();

@SqlQuery(
"SELECT count(*) FROM jobs AS j WHERE j.namespace_name = :namespaceName\n"
"SELECT count(*) FROM jobs_view AS j WHERE j.namespace_name = :namespaceName\n"
+ "AND symlink_target_uuid IS NULL")
int countFor(String namespaceName);

Expand Down Expand Up @@ -213,7 +190,6 @@ default JobRow upsertJobMeta(
createdAt,
Utils.toJson(jobMeta.getContext()),
Utils.checksumFor(jobMeta.getContext()));

return upsertJob(
UUID.randomUUID(),
jobMeta.getType(),
Expand Down Expand Up @@ -248,7 +224,7 @@ default PGobject toJson(Set<DatasetId> dataset, ObjectMapper mapper) {

@SqlQuery(
"""
INSERT INTO jobs AS j (
INSERT INTO jobs_view AS j (
uuid,
type,
created_at,
Expand All @@ -260,7 +236,8 @@ INSERT INTO jobs AS j (
current_job_context_uuid,
current_location,
current_inputs,
symlink_target_uuid
symlink_target_uuid,
parent_job_uuid_string
) VALUES (
:uuid,
:type,
Expand All @@ -273,20 +250,11 @@ INSERT INTO jobs AS j (
:jobContextUuid,
:location,
:inputs,
:symlinkTargetId
) ON CONFLICT (name, namespace_uuid) WHERE parent_job_uuid IS NULL DO
UPDATE SET
updated_at = EXCLUDED.updated_at,
type = EXCLUDED.type,
description = EXCLUDED.description,
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
current_location = EXCLUDED.current_location,
current_inputs = EXCLUDED.current_inputs,
-- update the symlink target if not null. otherwise, keep the old value
symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid)
RETURNING uuid
:symlinkTargetId,
''
) RETURNING *
""")
UUID upsertJobNoParent(
JobRow upsertJob(
UUID uuid,
JobType type,
Instant now,
Expand All @@ -299,39 +267,12 @@ UUID upsertJobNoParent(
UUID symlinkTargetId,
PGobject inputs);

default JobRow upsertJob(
UUID uuid,
JobType type,
Instant now,
UUID namespaceUuid,
String namespaceName,
String name,
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs) {
UUID jobUuid =
upsertJobNoParent(
uuid,
type,
now,
namespaceUuid,
namespaceName,
name,
description,
jobContextUuid,
location,
symlinkTargetId,
inputs);
return findJobByUuidAsRow(jobUuid).get();
}

@SqlQuery(
"""
INSERT INTO jobs AS j (
INSERT INTO jobs_view AS j (
uuid,
parent_job_uuid,
parent_job_uuid_string,
type,
created_at,
updated_at,
Expand All @@ -346,6 +287,7 @@ INSERT INTO jobs AS j (
) VALUES (
:uuid,
:parentJobUuid,
COALESCE(:parentJobUuid::text, ''),
:type,
:now,
:now,
Expand All @@ -357,19 +299,10 @@ INSERT INTO jobs AS j (
:location,
:inputs,
:symlinkTargetId
) ON CONFLICT (name, namespace_uuid, parent_job_uuid) DO
UPDATE SET
updated_at = EXCLUDED.updated_at,
type = EXCLUDED.type,
description = EXCLUDED.description,
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
current_location = EXCLUDED.current_location,
current_inputs = EXCLUDED.current_inputs,
-- update the symlink target if not null. otherwise, keep the old value
symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid)
RETURNING uuid
)
RETURNING *
""")
UUID upsertJobWithParent(
JobRow upsertJob(
UUID uuid,
UUID parentJobUuid,
JobType type,
Expand All @@ -382,34 +315,4 @@ UUID upsertJobWithParent(
String location,
UUID symlinkTargetId,
PGobject inputs);

default JobRow upsertJob(
UUID uuid,
UUID parentJobUuid,
JobType type,
Instant now,
UUID namespaceUuid,
String namespaceName,
String name,
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs) {
UUID jobUuid =
upsertJobWithParent(
uuid,
parentJobUuid,
type,
now,
namespaceUuid,
namespaceName,
name,
description,
jobContextUuid,
location,
symlinkTargetId,
inputs);
return findJobByUuidAsRow(jobUuid).get();
}
}
19 changes: 5 additions & 14 deletions api/src/main/java/marquez/db/RunDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -122,26 +122,16 @@ public interface RunDao extends BaseDao {

@SqlQuery(
"""
WITH RECURSIVE job_names AS (
SELECT uuid, namespace_name, name, symlink_target_uuid
FROM jobs_view j
WHERE j.namespace_name=:namespace AND j.name=:jobName
UNION
SELECT j.uuid, j.namespace_name, j.name, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_names jn ON j.uuid=jn.symlink_target_uuid OR j.symlink_target_uuid=jn.uuid
)
SELECT r.*, ra.args, ctx.context, f.facets,
jv.namespace_name, jv.job_name, jv.version AS job_version,
j.namespace_name, j.name, jv.version AS job_version,
ri.input_versions, ro.output_versions
FROM runs_view AS r
INNER JOIN job_names j ON r.namespace_name=j.namespace_name AND r.job_name=j.name
LEFT OUTER JOIN
INNER JOIN jobs_view j ON r.job_uuid=j.uuid
LEFT JOIN LATERAL
(
SELECT le.run_uuid, JSON_AGG(event->'run'->'facets') AS facets
FROM lineage_events le
INNER JOIN runs_view r2 ON r2.uuid=le.run_uuid
WHERE r2.job_name=:jobName AND r2.namespace_name=:namespace
WHERE le.run_uuid=r.uuid
GROUP BY le.run_uuid
) AS f ON r.uuid=f.run_uuid
LEFT OUTER JOIN run_args AS ra ON ra.uuid = r.run_args_uuid
Expand All @@ -162,6 +152,7 @@ SELECT run_uuid, JSON_AGG(json_build_object('namespace', namespace_name,
FROM dataset_versions
GROUP BY run_uuid
) ro ON ro.run_uuid=r.uuid
WHERE j.namespace_name=:namespace AND (j.name=:jobName OR :jobName = ANY(j.aliases))
ORDER BY STARTED_AT DESC NULLS LAST
LIMIT :limit OFFSET :offset
""")
Expand Down
Loading