Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix recursive views perf #2043

Merged
merged 7 commits into from
Jul 27, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

## [Unreleased](https://github.com/MarquezProject/marquez/compare/0.23.0...HEAD)

### Changed
* Updated `jobs_view` to stop computing FQN on reads and to compute on _writes_ instead [@collado-mike](https://github.com/collado-mike)

## [0.23.0](https://github.com/MarquezProject/marquez/compare/0.22.0...0.23.0) - 2022-06-16

### Added
Expand Down
157 changes: 30 additions & 127 deletions api/src/main/java/marquez/db/JobDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
@RegisterRowMapper(JobRowMapper.class)
@RegisterRowMapper(JobMapper.class)
public interface JobDao extends BaseDao {

@SqlQuery(
"SELECT EXISTS (SELECT 1 FROM jobs_view AS j "
+ "WHERE j.namespace_name= :namespaceName AND "
Expand All @@ -52,18 +53,8 @@ public interface JobDao extends BaseDao {

@SqlQuery(
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
FROM jobs_view j
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
UNION
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, jc.context, f.facets
FROM jobs_view j
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
LEFT OUTER JOIN job_versions AS jv ON jv.uuid = j.current_version_uuid
LEFT OUTER JOIN job_contexts jc ON jc.uuid = j.current_job_context_uuid
LEFT OUTER JOIN (
Expand All @@ -78,6 +69,8 @@ SELECT run_uuid, JSON_AGG(e.facets) AS facets
) e
GROUP BY e.run_uuid
) f ON f.run_uuid=jv.latest_run_uuid
WHERE j.namespace_name=:namespaceName AND (j.name=:jobName OR :jobName = ANY(j.aliases))
AND j.symlink_target_uuid IS NULL
""")
Optional<Job> findJobByName(String namespaceName, String jobName);

Expand All @@ -93,38 +86,22 @@ default Optional<Job> findWithRun(String namespaceName, String jobName) {

@SqlQuery(
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
FROM jobs_view j
WHERE j.uuid=:jobUuid
UNION
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, n.name AS namespace_name
FROM jobs_view AS j
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
""")
SELECT j.*, n.name AS namespace_name
FROM jobs_view AS j
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
WHERE j.uuid=:jobUuid
""")
Optional<JobRow> findJobByUuidAsRow(UUID jobUuid);

@SqlQuery(
"""
WITH RECURSIVE job_ids AS (
SELECT uuid, uuid AS link_target_uuid, symlink_target_uuid
FROM jobs_view j
WHERE j.namespace_name=:namespaceName AND j.name=:jobName
UNION
SELECT jn.uuid, j.uuid AS link_target_uuid, j.symlink_target_uuid
FROM jobs_view j
INNER JOIN job_ids jn ON j.uuid=jn.symlink_target_uuid
)
SELECT j.*, n.name AS namespace_name
FROM jobs_view AS j
INNER JOIN job_ids jn ON jn.link_target_uuid=j.uuid AND jn.symlink_target_uuid IS NULL
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
""")
SELECT j.*, n.name AS namespace_name
FROM jobs_view AS j
INNER JOIN namespaces AS n ON j.namespace_uuid = n.uuid
WHERE j.namespace_name=:namespaceName AND
(j.name=:jobName OR :jobName = ANY(j.aliases))
AND j.symlink_target_uuid IS NULL
""")
Optional<JobRow> findJobByNameAsRow(String namespaceName, String jobName);

@SqlQuery(
Expand All @@ -150,11 +127,11 @@ WITH RECURSIVE job_ids AS (
+ "LIMIT :limit OFFSET :offset")
List<Job> findAll(String namespaceName, int limit, int offset);

@SqlQuery("SELECT count(*) FROM jobs AS j WHERE symlink_target_uuid IS NULL")
@SqlQuery("SELECT count(*) FROM jobs_view AS j WHERE symlink_target_uuid IS NULL")
int count();

@SqlQuery(
"SELECT count(*) FROM jobs AS j WHERE j.namespace_name = :namespaceName\n"
"SELECT count(*) FROM jobs_view AS j WHERE j.namespace_name = :namespaceName\n"
+ "AND symlink_target_uuid IS NULL")
int countFor(String namespaceName);

Expand Down Expand Up @@ -213,7 +190,6 @@ default JobRow upsertJobMeta(
createdAt,
Utils.toJson(jobMeta.getContext()),
Utils.checksumFor(jobMeta.getContext()));

return upsertJob(
UUID.randomUUID(),
jobMeta.getType(),
Expand Down Expand Up @@ -248,7 +224,7 @@ default PGobject toJson(Set<DatasetId> dataset, ObjectMapper mapper) {

@SqlQuery(
"""
INSERT INTO jobs AS j (
INSERT INTO jobs_view AS j (
uuid,
type,
created_at,
Expand All @@ -260,7 +236,8 @@ INSERT INTO jobs AS j (
current_job_context_uuid,
current_location,
current_inputs,
symlink_target_uuid
symlink_target_uuid,
parent_job_id_string
) VALUES (
:uuid,
:type,
Expand All @@ -273,20 +250,11 @@ INSERT INTO jobs AS j (
:jobContextUuid,
:location,
:inputs,
:symlinkTargetId
) ON CONFLICT (name, namespace_uuid) WHERE parent_job_uuid IS NULL DO
UPDATE SET
updated_at = EXCLUDED.updated_at,
type = EXCLUDED.type,
description = EXCLUDED.description,
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
current_location = EXCLUDED.current_location,
current_inputs = EXCLUDED.current_inputs,
-- update the symlink target if not null. otherwise, keep the old value
symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid)
RETURNING uuid
:symlinkTargetId,
''
) RETURNING *
""")
UUID upsertJobNoParent(
JobRow upsertJob(
UUID uuid,
JobType type,
Instant now,
Expand All @@ -299,39 +267,12 @@ UUID upsertJobNoParent(
UUID symlinkTargetId,
PGobject inputs);

default JobRow upsertJob(
UUID uuid,
JobType type,
Instant now,
UUID namespaceUuid,
String namespaceName,
String name,
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs) {
UUID jobUuid =
upsertJobNoParent(
uuid,
type,
now,
namespaceUuid,
namespaceName,
name,
description,
jobContextUuid,
location,
symlinkTargetId,
inputs);
return findJobByUuidAsRow(jobUuid).get();
}

@SqlQuery(
"""
INSERT INTO jobs AS j (
INSERT INTO jobs_view AS j (
uuid,
parent_job_uuid,
parent_job_id_string,
type,
created_at,
updated_at,
Expand All @@ -346,6 +287,7 @@ INSERT INTO jobs AS j (
) VALUES (
:uuid,
:parentJobUuid,
COALESCE(:parentJobUuid::text, ''),
:type,
:now,
:now,
Expand All @@ -357,19 +299,10 @@ INSERT INTO jobs AS j (
:location,
:inputs,
:symlinkTargetId
) ON CONFLICT (name, namespace_uuid, parent_job_uuid) DO
UPDATE SET
updated_at = EXCLUDED.updated_at,
type = EXCLUDED.type,
description = EXCLUDED.description,
current_job_context_uuid = EXCLUDED.current_job_context_uuid,
current_location = EXCLUDED.current_location,
current_inputs = EXCLUDED.current_inputs,
-- update the symlink target if not null. otherwise, keep the old value
symlink_target_uuid = COALESCE(EXCLUDED.symlink_target_uuid, j.symlink_target_uuid)
RETURNING uuid
)
RETURNING *
""")
UUID upsertJobWithParent(
JobRow upsertJob(
UUID uuid,
UUID parentJobUuid,
JobType type,
Expand All @@ -382,34 +315,4 @@ UUID upsertJobWithParent(
String location,
UUID symlinkTargetId,
PGobject inputs);

default JobRow upsertJob(
UUID uuid,
UUID parentJobUuid,
JobType type,
Instant now,
UUID namespaceUuid,
String namespaceName,
String name,
String description,
UUID jobContextUuid,
String location,
UUID symlinkTargetId,
PGobject inputs) {
UUID jobUuid =
upsertJobWithParent(
uuid,
parentJobUuid,
type,
now,
namespaceUuid,
namespaceName,
name,
description,
jobContextUuid,
location,
symlinkTargetId,
inputs);
return findJobByUuidAsRow(jobUuid).get();
}
}
7 changes: 2 additions & 5 deletions api/src/main/java/marquez/db/JobVersionDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,22 +286,19 @@ default List<UUID> findOutputDatasetsFor(UUID jobVersionUuid) {
* code location, and context. A version for a given job is created <i>only</i> when a {@link Run}
* transitions into a {@code COMPLETED}, {@code ABORTED}, or {@code FAILED} state.
*
* @param namespaceName The namespace for the job version.
* @param jobName The name of the job.
* @param jobRow The job.
* @param runUuid The unique ID of the run associated with the job version.
* @param runState The current run state.
* @param transitionedAt The timestamp of the run state transition.
* @return A {@link BagOfJobVersionInfo} object.
*/
default BagOfJobVersionInfo upsertJobVersionOnRunTransition(
@NonNull String namespaceName,
@NonNull String jobName,
@NonNull JobRow jobRow,
@NonNull UUID runUuid,
@NonNull RunState runState,
@NonNull Instant transitionedAt) {
// Get the job.
final JobDao jobDao = createJobDao();
final JobRow jobRow = jobDao.findJobByNameAsRow(namespaceName, jobName).get();

// Get the job context.
final UUID jobContextUuid = jobRow.getJobContextUuid().get();
Expand Down
61 changes: 30 additions & 31 deletions api/src/main/java/marquez/db/OpenLineageDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
import marquez.db.models.DatasetFieldRow;
import marquez.db.models.DatasetRow;
import marquez.db.models.DatasetVersionRow;
import marquez.db.models.ExtendedRunRow;
import marquez.db.models.JobContextRow;
import marquez.db.models.JobRow;
import marquez.db.models.NamespaceRow;
Expand Down Expand Up @@ -161,10 +160,9 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
PGobject inputs = new PGobject();
inputs.setType("json");
inputs.setValue("[]");
Optional<ExtendedRunRow> parentRunRow = runDao.findRunByUuidAsRow(uuid);
JobRow parentJobRow =
parentRunRow
.flatMap(run -> jobDao.findJobByUuidAsRow(run.getJobUuid()))
runDao
.findJobRowByRunUuid(uuid)
.orElseGet(
() -> {
JobRow newParentJobRow =
Expand All @@ -181,34 +179,36 @@ default UpdateLineageRow updateBaseMarquezModel(LineageEvent event, ObjectMapper
null,
inputs);
log.info("Created new parent job record {}", newParentJobRow);

RunArgsRow argsRow =
runArgsDao.upsertRunArgs(
UUID.randomUUID(),
now,
"{}",
Utils.checksumFor(ImmutableMap.of()));
RunRow newRow =
runDao.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
newParentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType())
.map(this::getRunState)
.orElse(null),
now,
namespace.getName(),
newParentJobRow.getName(),
newParentJobRow.getLocation(),
newParentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);
return newParentJobRow;
});
log.debug("Found parent job record {}", parentJobRow);
if (parentRunRow.isEmpty()) {
RunArgsRow argsRow =
runArgsDao.upsertRunArgs(
UUID.randomUUID(), now, "{}", Utils.checksumFor(ImmutableMap.of()));
ExtendedRunRow newRow =
runDao.upsert(
uuid,
null,
facet.getRun().getRunId(),
now,
parentJobRow.getUuid(),
null,
argsRow.getUuid(),
nominalStartTime,
nominalEndTime,
Optional.ofNullable(event.getEventType())
.map(this::getRunState)
.orElse(null),
now,
namespace.getName(),
parentJobRow.getName(),
parentJobRow.getLocation(),
parentJobRow.getJobContextUuid().orElse(null));
log.info("Created new parent run record {}", newRow);
}
return parentJobRow;
} catch (Exception e) {
throw new RuntimeException("Unable to insert parent run", e);
Expand Down Expand Up @@ -390,8 +390,7 @@ default void updateMarquezOnComplete(
BagOfJobVersionInfo bagOfJobVersionInfo =
createJobVersionDao()
.upsertJobVersionOnRunTransition(
updateLineageRow.getRun().getNamespaceName(),
updateLineageRow.getRun().getJobName(),
updateLineageRow.getJob(),
updateLineageRow.getRun().getUuid(),
runState,
event.getEventTime().toInstant());
Expand Down
Loading