Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cli): Adds ability to upload recipes to DataHub's UI #8317

Merged
merged 24 commits into from
Aug 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
c8c2d49
feat(cli): Adds ability to upload recipes to DataHub
pedro93 Jun 27, 2023
f0f67bc
WIP change to graphQL
pedro93 Jun 28, 2023
a4ff7e6
Apply review comments
pedro93 Jun 29, 2023
6958c90
Remove unused IngestionSourceUrn class
pedro93 Jun 29, 2023
96ce993
Optimize imports
pedro93 Jun 29, 2023
d9d38e1
Apply review comments
pedro93 Jun 30, 2023
aad5c27
Merge branch 'master' into ps-add-recipe-deploy
pedro93 Jul 11, 2023
564ef13
Merge branch 'master' into ps-add-recipe-deploy
pedro93 Jul 12, 2023
e1b3535
Merge branch 'master' into ps-add-recipe-deploy
pedro93 Jul 14, 2023
c145720
Fix test
pedro93 Jul 14, 2023
ca49921
Merge branch 'master' into ps-add-recipe-deploy
pedro93 Jul 19, 2023
2b6deba
fix lint
pedro93 Jul 20, 2023
1eedabf
Merge branch 'master' into ps-add-recipe-deploy
pedro93 Jul 20, 2023
5c5a77b
fix another lint
pedro93 Jul 20, 2023
42b3462
more lint
pedro93 Jul 20, 2023
6b56b22
Merge branch 'master' into ps-add-recipe-deploy
pedro93 Jul 21, 2023
010129b
more lint fixes
pedro93 Jul 21, 2023
e7cc374
Merge branch 'master' into ps-add-recipe-deploy
pedro93 Jul 22, 2023
c41684a
Merge branch 'master' into ps-add-recipe-deploy
pedro93 Jul 24, 2023
b79af22
Add missing mypy stub
pedro93 Jul 24, 2023
fe12d62
Merge branch 'master' into ps-add-recipe-deploy
pedro93 Jul 25, 2023
f538ade
Merge branch 'master' into ps-add-recipe-deploy
pedro93 Jul 27, 2023
b2fb924
Merge branch 'master' into ps-add-recipe-deploy
iprentic Jul 31, 2023
13a314d
Merge branch 'master' into ps-add-recipe-deploy
iprentic Aug 1, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.ListIngestionSourcesInput;
import com.linkedin.datahub.graphql.generated.ListIngestionSourcesResult;
import com.linkedin.datahub.graphql.resolvers.ingest.IngestionAuthUtils;
Expand All @@ -20,6 +21,7 @@
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -51,14 +53,16 @@ public CompletableFuture<ListIngestionSourcesResult> get(final DataFetchingEnvir
final Integer start = input.getStart() == null ? DEFAULT_START : input.getStart();
final Integer count = input.getCount() == null ? DEFAULT_COUNT : input.getCount();
final String query = input.getQuery() == null ? DEFAULT_QUERY : input.getQuery();
final List<FacetFilterInput> filters = input.getFilters() == null ? Collections.emptyList() : input.getFilters();

return CompletableFuture.supplyAsync(() -> {
try {
// First, get all ingestion sources Urns.
final SearchResult gmsResult = _entityClient.search(
Constants.INGESTION_SOURCE_ENTITY_NAME,
query,
Collections.emptyMap(),
buildFilter(filters, Collections.emptyList()),
null,
start,
count,
context.getAuthentication(),
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/ingestion.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -428,6 +428,11 @@ input ListIngestionSourcesInput {
An optional search query
"""
query: String

"""
Optional Facet filters to apply to the result set
"""
filters: [FacetFilterInput!]
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.r2.RemoteInvocationException;
import graphql.schema.DataFetchingEnvironment;
import java.util.Collections;
import java.util.HashSet;
import org.mockito.Mockito;
import org.testng.annotations.Test;
Expand All @@ -30,7 +29,7 @@

public class ListIngestionSourceResolverTest {

private static final ListIngestionSourcesInput TEST_INPUT = new ListIngestionSourcesInput(0, 20, null);
private static final ListIngestionSourcesInput TEST_INPUT = new ListIngestionSourcesInput(0, 20, null, null);

@Test
public void testGetSuccess() throws Exception {
Expand All @@ -44,7 +43,8 @@ public void testGetSuccess() throws Exception {
Mockito.when(mockClient.search(
Mockito.eq(Constants.INGESTION_SOURCE_ENTITY_NAME),
Mockito.eq(""),
Mockito.eq(Collections.emptyMap()),
Mockito.any(),
Mockito.any(),
Mockito.eq(0),
Mockito.eq(20),
Mockito.any(Authentication.class),
Expand Down
26 changes: 21 additions & 5 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,29 @@ Source specific crawlers are provided by plugins and might sometimes need additi
Usage: datahub [datahub-options] ingest [command-options]

Command Options:
-c / --config Config file in .toml or .yaml format
-n / --dry-run Perform a dry run of the ingestion, essentially skipping writing to sink
--preview Perform limited ingestion from the source to the sink to get a quick preview
--preview-workunits The number of workunits to produce for preview
--strict-warnings If enabled, ingestion runs with warnings will yield a non-zero error code
-c / --config Config file in .toml or .yaml format
-n / --dry-run Perform a dry run of the ingestion, essentially skipping writing to sink
--preview Perform limited ingestion from the source to the sink to get a quick preview
--preview-workunits The number of workunits to produce for preview
--strict-warnings If enabled, ingestion runs with warnings will yield a non-zero error code
--test-source-connection When set, ingestion will only test the source connection details from the recipe
pedro93 marked this conversation as resolved.
Show resolved Hide resolved
```

#### ingest deploy

The `ingest deploy` command instructs the cli to upload an ingestion recipe to DataHub to be run by DataHub's [UI Ingestion](./ui-ingestion.md).
This command can also be used to schedule the ingestion while uploading or even to update existing sources.

To schedule a recipe called "test", to run at 5am everyday, London time with the recipe configured in a local `recipe.yaml` file:
````shell
datahub ingest deploy --name "test" --schedule "5 * * * *" --time-zone "Europe/London" -c recipe.yaml
````

To update an existing recipe please use the `--urn` parameter to specify the id of the recipe to update.

**Note:** Updating a recipe will result in a replacement of the existing options with what was specified in the cli command.
I.e: Not specifying a schedule in the cli update command will remove the schedule from the recipe to be updated.

### init

The init command is used to tell `datahub` about where your DataHub instance is located. The CLI will point to localhost DataHub by default.
Expand Down
42 changes: 42 additions & 0 deletions docs/ui-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ your first **Ingestion Source**.

### Creating an Ingestion Source

<Tabs>
<TabItem value="ui" label="UI" default>

Before ingesting any metadata, you need to create a new Ingestion Source. Start by clicking **+ Create new source**.

![](./imgs/create-new-ingestion-source-button.png)
Expand Down Expand Up @@ -151,6 +154,45 @@ _Pinning the CLI version to version `0.8.23.2`_

Once you're happy with your changes, simply click 'Done' to save.

</TabItem>
<TabItem value="cli" label="CLI" default>

You can upload and even update recipes using the cli as mentioned in the [cli documentation for uploading ingestion recipes](./cli.md#ingest-deploy).
An example execution would look something like:

```bash
datahub ingest deploy --name "My Test Ingestion Source" --schedule "5 * * * *" --time-zone "UTC" -c recipe.yaml
```

This would create a new recipe with the name `My Test Ingestion Source`. Note that to update an existing recipe, it's `urn` id must be passed as a parameter.
DataHub supports having multiple recipes with the same name so to distinguish them we use the urn for unique identification.

</TabItem>
<TabItem value="graphql" label="GraphQL" default>

Create ingestion sources using [DataHub's GraphQL API](./api/graphql/overview.md) using the **createIngestionSource** mutation endpoint.
```graphql
mutation {
createIngestionSource(input: {
name: "My Test Ingestion Source",
type: "mysql",
description: "My ingestion source description",
schedule: {interval: "*/5 * * * *", timezone: "UTC"},
config: {
recipe: "{\"source\":{\"type\":\"mysql\",\"config\":{\"include_tables\":true,\"database\":null,\"password\":\"${MYSQL_PASSWORD}\",\"profiling\":{\"enabled\":false},\"host_port\":null,\"include_views\":true,\"username\":\"${MYSQL_USERNAME}\"}},\"pipeline_name\":\"urn:li:dataHubIngestionSource:f38bd060-4ea8-459c-8f24-a773286a2927\"}",
version: "0.8.18",
executorId: "mytestexecutor",
}
})
}
```

To update sources, please use the `updateIngestionSource` endpoint. It is almost identical to the create endpoint, only requiring the urn of the source to be updated in addition to the same input as the create endpoint.

**Note**: Recipe must be double quotes escaped

</TabItem>
</Tabs>

### Running an Ingestion Source

Expand Down
13 changes: 13 additions & 0 deletions metadata-ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,19 @@ reporting:
report_recipe: false
```

#### Deploying and scheduling ingestion to the UI

The `deploy` subcommand of the `ingest` command tree allows users to upload their recipes and schedule them in the server.

```shell
datahub ingest deploy -n <user friendly name for ingestion> -c recipe.yaml
```

By default, no schedule is done unless explicitly configured with the `--schedule` parameter. Timezones are inferred from the system time, can be overriden with `--time-zone` flag.
```shell
datahub ingest deploy -n test --schedule "0 * * * *" --time-zone "Europe/London" -c recipe.yaml
```

## Transformations

If you'd like to modify data before it reaches the ingestion sinks – for instance, adding additional owners or tags – you can use a transformer to write your own module and integrate it with DataHub. Transformers require extending the recipe with a new section to describe the transformers that you want to run.
Expand Down
1 change: 1 addition & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ def get_long_description():
"types-termcolor>=1.0.0",
"types-Deprecated",
"types-protobuf>=4.21.0.1",
"types-tzlocal",
}


Expand Down
153 changes: 153 additions & 0 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@
import logging
import os
import sys
import textwrap
from datetime import datetime
from typing import Optional

import click
import click_spinner
import tzlocal
from click_default_group import DefaultGroup
from tabulate import tabulate

Expand All @@ -21,6 +23,7 @@
post_rollback_endpoint,
)
from datahub.configuration.config_loader import load_config_file
from datahub.ingestion.graph.client import get_default_graph
from datahub.ingestion.run.connection import ConnectionManager
from datahub.ingestion.run.pipeline import Pipeline
from datahub.telemetry import telemetry
Expand Down Expand Up @@ -198,6 +201,156 @@ async def run_ingestion_and_check_upgrade() -> int:
# don't raise SystemExit if there's no error


@ingest.command()
@upgrade.check_upgrade
@telemetry.with_telemetry()
@click.option(
"-n",
"--name",
type=str,
help="Recipe Name",
required=True,
)
@click.option(
"-c",
"--config",
type=click.Path(dir_okay=False),
help="Config file in .toml or .yaml format.",
required=True,
)
@click.option(
"--urn",
type=str,
help="Urn of recipe to update",
required=False,
)
@click.option(
"--executor-id",
type=str,
default="default",
help="Executor id to route execution requests to. Do not use this unless you have configured a custom executor.",
required=False,
)
@click.option(
"--cli-version",
type=str,
help="Provide a custom CLI version to use for ingestion. By default will use server default.",
required=False,
default=None,
)
@click.option(
"--schedule",
type=str,
help="Cron definition for schedule. If none is provided, ingestion recipe will not be scheduled",
required=False,
default=None,
)
@click.option(
"--time-zone",
type=str,
help=f"Timezone for the schedule. By default uses the timezone of the current system: {tzlocal.get_localzone_name()}.",
required=False,
default=tzlocal.get_localzone_name(),
)
def deploy(
name: str,
config: str,
urn: str,
executor_id: str,
cli_version: str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the non-required ones with no defaults, let's fix the type annotations to be Optional[str]

schedule: str,
time_zone: str,
) -> None:
"""
Deploy an ingestion recipe to your DataHub instance.

The urn of the ingestion source will be based on the name parameter in the format:
urn:li:dataHubIngestionSource:<name>
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is this name -> urn logic? is that in the backend?

"""

datahub_graph = get_default_graph()

pipeline_config = load_config_file(
config,
allow_stdin=True,
resolve_env_vars=False,
)

graphql_query: str

variables: dict = {
"urn": urn,
"name": name,
"type": pipeline_config["source"]["type"],
"schedule": {"interval": schedule, "timezone": time_zone},
"recipe": json.dumps(pipeline_config),
"executorId": executor_id,
"version": cli_version,
}

if urn:
if not datahub_graph.exists(urn):
logger.error(f"Could not find recipe for provided urn: {urn}")
exit()
logger.info("Found recipe URN, will update recipe.")

graphql_query = textwrap.dedent(
"""
mutation updateIngestionSource(
$urn: String!,
$name: String!,
$type: String!,
$schedule: UpdateIngestionSourceScheduleInput,
$recipe: String!,
$executorId: String!
$version: String) {

updateIngestionSource(urn: $urn, input: {
name: $name,
type: $type,
schedule: $schedule,
config: {
recipe: $recipe,
executorId: $executorId,
version: $version,
}
})
}
"""
)
else:
logger.info("No URN specified recipe urn, will create a new recipe.")
graphql_query = textwrap.dedent(
"""
mutation createIngestionSource(
$name: String!,
$type: String!,
$schedule: UpdateIngestionSourceScheduleInput,
$recipe: String!,
$executorId: String!,
$version: String) {

createIngestionSource(input: {
type: $type,
schedule: $schedule,
config: {
recipe: $recipe,
executorId: $executorId,
version: $version,
}
})
}
"""
)

response = datahub_graph.execute_graphql(graphql_query, variables=variables)

click.echo(
f"✅ Successfully wrote data ingestion source metadata for recipe {name}:"
)
click.echo(response)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does this show the urn of the newly created ingestion recipe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not the urn, the user visible name

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it's not too much work, can we should the urn too?



def _test_source_connection(report_to: Optional[str], pipeline_config: dict) -> None:
connection_report = None
try:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ def load_config_file(
squirrel_original_config: bool = False,
squirrel_field: str = "__orig_config",
allow_stdin: bool = False,
resolve_env_vars: bool = True,
) -> dict:
config_mech: ConfigurationMechanism
if allow_stdin and config_file == "-":
Expand Down Expand Up @@ -104,7 +105,10 @@ def load_config_file(

config_fp = io.StringIO(raw_config_file)
raw_config = config_mech.load_config(config_fp)
config = resolve_env_variables(raw_config)
if resolve_env_vars:
config = resolve_env_variables(raw_config)
else:
config = raw_config
if squirrel_original_config:
config[squirrel_field] = raw_config
return config
Loading