Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Supporting Job grouping and hierarchy in Marquez #1928

Closed
collado-mike opened this issue Mar 29, 2022 · 0 comments · Fixed by #1992
Closed

Supporting Job grouping and hierarchy in Marquez #1928

collado-mike opened this issue Mar 29, 2022 · 0 comments · Fixed by #1992
Assignees
Milestone

Comments

@collado-mike
Copy link
Collaborator

collado-mike commented Mar 29, 2022

Problem

In https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md#parent-job-run-a-nested-hierarchy-of-jobs we use parent jobs and job naming to describe an extended DAG of related jobs. However, Marquez never tracks parent/child job relationships.

As a concrete example, consider a data pipeline that calculates metrics for a website experimentation service:

Each hour:

Collect clickstream data from the website and copy it to cloud storage

Analyze clickstream data and determine samples for bucket A and bucket B

Read current and past sample population buckets and calculate per customer metrics for each running experiment

Aggregate hourly customer metrics for each running experiment

Each day:

Aggregate daily customer metrics for each running experiment

Aggregate cumulative experiment metrics for each running experiment

The workflow consists of two DAGs, one hourly and one daily. The lineage would look like the following

experiment_metrics_workflows

Customer metrics are generated hourly and experiment level metrics are aggregated from them so that major regressions in high volume metrics (clicks, page latency) can be caught right away. Hourly customer metrics also aggregated to generate daily customer level metrics. Once daily customer metrics are available, aggregate experiment-level metrics are calculated for all active days of the experiment. The code for aggregating hourly customer-level metrics and daily customer-level metrics is the same. Only the input (hourly vs daily customer metrics) and output (experiment-level metrics) differ.

To complicate things, assume the experiment metrics workflow executes once per marketplace region- e.g., once for U.S., once for U.K., once for JP, etc., each in its local timezone (a day in JP ends at midnight JST).

Grouping and Naming by Parent Jobs

In https://github.com/OpenLineage/OpenLineage/blob/main/spec/Naming.md#parent-job-run-a-nested-hierarchy-of-jobs , we state

Since what we care about is identifying the job as rooted in a recurring schedule, we want to [...] make sure that we treat the same application logic triggered at different schedules as different jobs. For example: if an Airflow DAG runs individual tasks per partition (for example market segments) using the same underlying job logic, they will be tracked as separate jobs.

By creating a schedule for each marketplace, we would effectively create a new job node per marketplace. I think this is less than ideal. While, from an operational perspective, I think it’s important to know if the last JP job failed or the last U.K. one failed, when visualizing the lineage graph, I don’t want to see the exact same graph duplicated n times. Instead, the ability to attach partitions or dimensions to jobs and the ability to reference specific dataset partitions would enhance the usability of the graph without adding duplication or clutter. Attaching partitions to jobs/runs is considered outside of the scope of this doc.

However, it’s important to distinguish between the CalculateHourlyExperimentMetrics job and the CalculateAggregateExperimentMetrics jobs even though they do the same thing. Ideally, the application name would be different for each job. One way of guaranteeing that is to prefix each job with the job name of the schedule that created it - in OpenLineage parlance, this would be the parent job name. However, it is possible to have a lengthy chain of parent jobs - e.g., if the workflow is an Airflow DAG that kicks off a spark job, the resulting job name would be something like

hourly_experiment_metrics_workflow.calculate_current_hourly_customer_experiment_metrics.customer_experiment_metrics_job.execute_insert_into_datasource 

If the Airflow task is in a TaskGroup, the task group name could conceivably be added, making the name longer. If a DAG is a subdag, it would inherit both its parent DAG name and the task that started the subdag. This naming scheme also requires the UI to rely on convention to determine how to group jobs and datasets together.

Job runs always report the run id of the parent that invoked it. In this way, we can derive the grouping of all jobs based on their parent job run. So the CalculateAggregateExperimentMetrics job will have a different parent from the CalculateHourlyExperimentMetrics job, since the workflows that trigger them are different. To derive the name of a job based on its parent requires that a job’s lineage_event is always received after its parent lineage_event.

Event Ordering

In distributed systems, it is rarely possible to guarantee events arrive in the expected order. For example, the default Airflow OpenLinage implementation does not trigger a complete event for any task in the DAG until all of the tasks have completed. In the event that one of those tasks was a spark job, the Spark job’s start and complete events will both fire before the complete event of the task that preceded it in the Airflow DAG.

However, this deficiency is actually a fundamental limitation of the way Marquez handles OpenLineage events. We rely on events arriving in order to determine the correct relationship between jobs and datasets - e.g., we have to know that Job A completed writing Dataset X before Job B consumed it in order to correctly construct lineage. Otherwise, we might imply that Job B consumed a version of Dataset X prior to Job A’s modification of that dataset. Until we have a robust strategy for handling out of order events, we should assume that parent job run events will start prior to their children and that producer runs will complete prior to the start of the consumer runs.

Proposal

Parent Job IDs

I propose a parent_job_id column be added to the jobs table and a parent_run_id column be added to the runs table, both _uuid_s. This enables the job grouping to be constructed at query time by traversing the job hierarchy. The namespace of a job should automatically be inherited from its parent job.

Groups can be nested, with one or many jobs belonging to a given group- the Airflow DAG will have a group which contains its tasks. A task may itself have a group that contains other jobs, e.g., if that task triggers a subdag or a Spark job that contains multiple actions.

The uniqueness constraint on the jobs table must be changed to include name, namespace_id, and parent_job_id. Thus two job instances may have the same name and namespace, but different parent jobs and, therefore, are two distinct jobs.

ALTER TABLE jobs ADD COLUMN parent_job_id uuid CONSTRAINT jobs_parent_fk_jobs REFERENCES jobs (uuid);
ALTER TABLE runs ADD COLUMN parent_run_id uuid CONSTRAINT runs_parent_fk_runs REFERENCES runs (uuid);
DROP INDEX jobs_name_index;
CREATE UNIQUE INDEX jobs_name_parent ON jobs (name, namespace_name, parent_job_id) INCLUDE (uuid, name, namespace_name, parent_job_id);

API Changes

Requests

The APIs in the marquez.api.JobResource resource all follow the pattern /namespaces/{namespace}/jobs/{job}, requiring the namespace and the job name. Currently, the namespace here is always the namespace of the job as recorded in the original OpenLineage call. Since the proposed implementation requires a job to inherit the namespace of its parent, there’s ambiguity in how we deal with a discrepancy between the given namespace of a job and that of its parent job.

It’s noteworthy that in all the data Datakin has collected from all beta customers, no jobs are invoked by more than one parent job. Additionally, only dbt jobs differ in their namespace from the parent job. This suggests that the dbt implementation itself is likely responsible for the change in the namespace between parent and child job. (this was fixed in OpenLineage)

Given this information, a reasonable migration path includes the following changes

  1. All job-related APIs should be changed to use the parent job’s namespace
  2. API calls that use the original namespace should 301 to the parent job’s namespace if the two differ
  3. If the same job is invoked from two or more parents with different namespaces, we can return a 300 with the list of valid URLs (again, this never happens in the Datakin customer data)

Job APIs should use the FQN (Fully Qualified Name) of the job, which is its parent job hierarchy, names separated by dots (.). Since many job names already use dots to distinguish between different tasks in the same process (e.g., different tasks in the same Airflow DAG or different actions in the same Spark job), the FQN in the URL is never parsed, but simply matched against jobs in the database by querying a temp table that determines the FQN of every job. The following is very cacheable (by the database) and can respond very quickly

with recursive job_fqn AS (
   SELECT uuid, name, parent_job_id
   FROM jobs
   UNION ALL
   SELECT j1.uuid, CASE WHEN j2.name IS NOT NULL THEN j2.name || '.' || j1.name ELSE j1.name END AS name, j2.parent_job_id
   FROM jobs j1
   LEFT JOIN jobs j2 ON j2.uuid=j1.parent_job_id
)
SELECT * FROM job_fqn

A new view makes the job name easily accessible by existing queries:

CREATE OR REPLACE VIEW jobs_view
AS
    with recursive job_fqn AS (
        SELECT uuid, name, namespace_name, parent_job_id
        FROM jobs
        UNION ALL
        SELECT j1.uuid,
               CASE WHEN j2.name IS NOT NULL THEN j2.name || '.' || j1.name ELSE j1.name END AS name,
               CASE WHEN j2.namespace_name IS NOT NULL THEN j2.namespace_name ELSE j1.namespace_name END AS namespace_name,
               j2.parent_job_id
        FROM jobs j1
        INNER JOIN job_fqn j2 ON j2.uuid=j1.parent_job_id
    )
    SELECT f.uuid,
           f.name,
           f.namespace_name,
           j.name AS simple_name,
           j.parent_job_id,
           j.type,
           j.created_at,
           j.updated_at,
           j.namespace_uuid,
           j.description,
           j.current_version_uuid,
           j.current_job_context_uuid,
           j.current_location,
           j.current_inputs
           FROM job_fqn f, jobs j WHERE j.uuid=f.uuid

Backwards compatibility will be achieved with the same set of rules used for namespaces:

  1. All job-related APIs should be changed to use the job’s FQN
  2. API calls that use the original name should 301 to the fully-qualified URL
  3. If the same job is invoked from two or more parent hierarchies, we can return a 300 with the list of valid URLs (again, this never happens in the Datakin customer data)

Responses

The response structure for Job entities should include the parent hierarchy, as well as the simple name and the FQN. The parent hierarchy can be represented as a simple List of Strings (order matters and all parents share the same namespace, so there’s no need to duplicate it). Parent hierarchy is a new field, thus requires no backwards compatibility consideration.

The Simple Name and FQN distinction should be made with one additional field. Since the name field is typically used to construct the URL of a job and the new URL should always be the FQN, the name field in the response should be changed to the FQN. Thus, the simple name can be added as a new simpleName field. This will require the least change in the Marquez UI and other programmatic clients of the API which construct job URLs from API responses.

Backfilling

In order for the API to work with the new FQN, we have to consider the option of backfilling existing data. While backfilling the jobs table itself is an easy undertaking (even large databases will have something on the order of 1000s of jobs to update), the existing structure of the Marquez database frequently repeats the job name in other tables in order to avoid the need for joins in many of the most common SELECT queries. The notable tables here are job_versions, runs, and lineage_events. job_versions is similar in scale to the jobs table, therefore can be considered negligible in difficulty to migrate. runs and lineage_events, however, can scale to very large volumes in some installations. For example, an organization that runs 100 hourly jobs per day and keeps data for 30 days will have 72,000 records in runs for a month. The lineage_events table will generally have at least twice that number (one record for START and one for COMPLETE). These numbers scale linearly with the number of jobs and the frequency of their execution.

Backfilling should be reversible, as a migration could fail and the developers may need to rollback for any number of reasons. Thus a strategy that may fail partway through and makes rollback impossible should be avoided.

There are three potential strategies we can consider for backfills.

  1. Update the job_name and name fields in all three of the jobs, job_versions, and runs tables to reflect the FQN. This strategy has the highest cost during the migration step, but there is virtually no development cost and no cost at runtime - all SQL queries can remain exactly as they are and their performance will remain exactly as they are. Rollback is possible since all tables will reference the same job_name value. However, there are multiple downsides. If the migration fails partway through the update, the database will be in an inconsistent state. Concurrent queries will return a mix of correct and incorrect records - some runs will have the new job name, some will have the old one. In the case of a rollback, only the runs whose job_name field matches the name field of the job table will be returned in queries. Executing the migration in a transaction addresses the potential for partial failures, but requires locking the tables for the full duration of the migration (up to two minutes for a large table with ~750,000 runs).
  2. Do not backfill the jobs or runs tables. Jobs and runs can continue to use the simple names in tables and those names will be translated at runtime when the API response is constructed. This has zero migration cost, but high development and runtime cost. Rollbacks are simple to implement with no chance of partial failure. Every API that returns a job name has to be updated to fetch the FQN (either by updating the SQL directly or by adding Java code). Failure to store the FQN could lead to ambiguous references, as the same job triggered by different parents can’t be disambuated if the run doesn’t store which FQN it should reference.
  3. A mix of the above strategies. We can update the runs table to reference the uuid of the jobs table. The existing SQL queries can be updated to reference a view that joins the runs table to the jobs table based on the uuid. The returned job_name field will be the constructed FQN of the job. As per the existing database standards guide, the job_name field in the runs table will continue to exist, but will not be read (new records will continue to write the simple name of the job). This provides an easy mechanism for rollback - the job_name column continues to exist and reflects the name column of the jobs table. The migration cost here is significant - adding the job_uuid column is similar in time to updating the job_name field in the runs table. However, because it is a new column, we can update the table values without locking the entire table for fear of partial failure. In addition to the migration cost, there is some dev cost, as we need to create the new view and update the queries to use it.

Of the three choices outlined, the 3rd choice offers the most stability and resilience in the case of partial failure and/or rollback. During deployment, a single pod can perform the migration while the old pods are still in service, meaning no down time for the application. Overall development time is pretty low, and it’s easily reversible.

CREATE VIEW runs_view
AS
SELECT r.uuid,
      r.created_at,
      r.updated_at,
      job_version_uuid,
      run_args_uuid,
      nominal_start_time,
      nominal_end_time,
      current_run_state,
      start_run_state_uuid,
      end_run_state_uuid,
      external_id,
      location,
      transitioned_at,
      started_at,
      ended_at,
      job_context_uuid,
      job_uuid,
      j.name AS job_name,
      j.namespace_name
FROM runs r
INNER JOIN jobs_view j ON j.uuid = r.job_uuid;

Note: Querying from the view has surprisingly similar performance compared with querying from the table directly, when comparing the following query:

SELECT * FROM runs_view WHERE namespace_name='barren-spectroscope-7582'
                         AND job_name='k8pod_simple_longrunning_dag.long_sleep'
ORDER BY created_at DESC LIMIT 20

While the EXPLAIN PLAN cost for the view is considerably higher, the practical impact is negligible. For both queries, the response was 250-275ms on a table with 750,000 records.

@collado-mike collado-mike self-assigned this Mar 30, 2022
@wslulciuc wslulciuc added this to the Roadmap milestone Mar 31, 2022
@wslulciuc wslulciuc modified the milestones: Roadmap, 0.23.0 May 19, 2022
jonathanschoeller added a commit to jonathanschoeller/marquez that referenced this issue Sep 9, 2022
It looks like something in
MarquezProject#1928 broke the example.
Specifically, the SQL query for findJobByNameAsRow can return multiple
rows, which causes an exception since only 0 or 1 row is expected.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Archived in project
Development

Successfully merging a pull request may close this issue.

2 participants