Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingest): improve ingest deploy command #10944

Merged
merged 10 commits into from
Jul 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 52 additions & 26 deletions docs/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ Command Options:
--test-source-connection When set, ingestion will only test the source connection details from the recipe
--no-progress If enabled, mute intermediate progress ingestion reports
```

#### ingest --dry-run

The `--dry-run` option of the `ingest` command performs all of the ingestion steps, except writing to the sink. This is useful to validate that the
Expand Down Expand Up @@ -133,23 +134,8 @@ By default `--preview` creates 10 workunits. But if you wish to try producing mo
datahub ingest -c ./examples/recipes/example_to_datahub_rest.dhub.yaml -n --preview --preview-workunits=20
```

#### ingest deploy

The `ingest deploy` command instructs the cli to upload an ingestion recipe to DataHub to be run by DataHub's [UI Ingestion](./ui-ingestion.md).
This command can also be used to schedule the ingestion while uploading or even to update existing sources. It will upload to the remote instance the
CLI is connected to, not the sink of the recipe. Use `datahub init` to set the remote if not already set.

To schedule a recipe called "test", to run at 5am everyday, London time with the recipe configured in a local `recipe.yaml` file:
````shell
datahub ingest deploy --name "test" --schedule "5 * * * *" --time-zone "Europe/London" -c recipe.yaml
````

To update an existing recipe please use the `--urn` parameter to specify the id of the recipe to update.

**Note:** Updating a recipe will result in a replacement of the existing options with what was specified in the cli command.
I.e: Not specifying a schedule in the cli update command will remove the schedule from the recipe to be updated.

#### ingest --no-default-report

By default, the cli sends an ingestion report to DataHub, which allows you to see the result of all cli-based ingestion in the UI. This can be turned off with the `--no-default-report` flag.

```shell
Expand Down Expand Up @@ -180,6 +166,52 @@ failure_log:
filename: ./path/to/failure.json
```

### ingest deploy

The `ingest deploy` command instructs the cli to upload an ingestion recipe to DataHub to be run by DataHub's [UI Ingestion](./ui-ingestion.md).
This command can also be used to schedule the ingestion while uploading or even to update existing sources. It will upload to the remote instance the
CLI is connected to, not the sink of the recipe. Use `datahub init` to set the remote if not already set.

Comment on lines +169 to +174
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix grammatical error.

Add a comma after "uploading" for clarity.

- This command can also be used to schedule the ingestion while uploading or even to update existing sources.
+ This command can also be used to schedule the ingestion while uploading, or even to update existing sources.
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
### ingest deploy
The `ingest deploy` command instructs the cli to upload an ingestion recipe to DataHub to be run by DataHub's [UI Ingestion](./ui-ingestion.md).
This command can also be used to schedule the ingestion while uploading or even to update existing sources. It will upload to the remote instance the
CLI is connected to, not the sink of the recipe. Use `datahub init` to set the remote if not already set.
### ingest deploy
The `ingest deploy` command instructs the cli to upload an ingestion recipe to DataHub to be run by DataHub's [UI Ingestion](./ui-ingestion.md).
This command can also be used to schedule the ingestion while uploading, or even to update existing sources. It will upload to the remote instance the
CLI is connected to, not the sink of the recipe. Use `datahub init` to set the remote if not already set.
Tools
LanguageTool

[uncategorized] ~172-~172: Possible missing comma found.
Context: ...be used to schedule the ingestion while uploading or even to update existing sources. It ...

(AI_HYDRA_LEO_MISSING_COMMA)

This command will automatically create a new recipe if it doesn't exist, or update it if it does.
Note that this is a complete update, and will remove any options that were previously set.
I.e: Not specifying a schedule in the cli update command will remove the schedule from the recipe to be updated.

**Basic example**

To schedule a recipe called "Snowflake Integration", to run at 5am every day, London time with the recipe configured in a local `recipe.yaml` file:

```shell
datahub ingest deploy --name "Snowflake Integration" --schedule "5 * * * *" --time-zone "Europe/London" -c recipe.yaml
```

By default, the ingestion recipe's identifier is generated by hashing the name.
You can override the urn generation by passing the `--urn` flag to the CLI.

**Using `deployment` to avoid CLI args**

As an alternative to configuring settings from the CLI, all of these settings can also be set in the `deployment` field of the recipe.

```yml
# deployment_recipe.yml
deployment:
name: "Snowflake Integration"
schedule: "5 * * * *"
time_zone: "Europe/London"

source: ...
```

```shell
datahub ingest deploy -c deployment_recipe.yml
```

This is particularly useful when you want all recipes to be stored in version control.

```shell
# Deploy every yml recipe in a directory
ls recipe_directory/*.yml | xargs -n 1 -I {} datahub ingest deploy -c {}
```

### init

The init command is used to tell `datahub` about where your DataHub instance is located. The CLI will point to localhost DataHub by default.
Expand Down Expand Up @@ -242,8 +274,6 @@ The [metadata deletion guide](./how/delete-metadata.md) covers the various optio

### exists

**🤝 Version compatibility** : `acryl-datahub>=0.10.2.4`

The exists command can be used to check if an entity exists in DataHub.

```shell
Expand All @@ -253,7 +283,6 @@ true
false
```


### get

The `get` command allows you to easily retrieve metadata from DataHub, by using the REST API. This works for both versioned aspects and timeseries aspects. For timeseries aspects, it fetches the latest value.
Expand Down Expand Up @@ -314,6 +343,7 @@ Update succeeded with status 200
```

#### put platform

**🤝 Version Compatibility:** `acryl-datahub>0.8.44.4`

The **put platform** command instructs `datahub` to create or update metadata about a data platform. This is very useful if you are using a custom data platform, to set up its logo and display name for a native UI experience.
Expand Down Expand Up @@ -346,6 +376,7 @@ datahub timeline --urn "urn:li:dataset:(urn:li:dataPlatform:mysql,User.UserAccou
The `dataset` command allows you to interact with the dataset entity.

The `get` operation can be used to read in a dataset into a yaml file.

```shell
datahub dataset get --urn "$URN" --to-file "$FILE_NAME"
```
Expand All @@ -358,7 +389,6 @@ datahub dataset upsert -f dataset.yaml

An example of `dataset.yaml` would look like as in [dataset.yaml](https://github.com/datahub-project/datahub/blob/master/metadata-ingestion/examples/cli_usage/dataset/dataset.yaml).


### user (User Entity)

The `user` command allows you to interact with the User entity.
Expand Down Expand Up @@ -411,7 +441,6 @@ members:
display_name: "Joe's Hub"
```


### dataproduct (Data Product Entity)

**🤝 Version Compatibility:** `acryl-datahub>=0.10.2.4`
Expand Down Expand Up @@ -566,14 +595,12 @@ Use this to delete a Data Product from DataHub. Default to `--soft` which preser
# > datahub dataproduct delete --urn "urn:li:dataProduct:pet_of_the_week" --hard
```


## Miscellaneous Admin Commands

### lite (experimental)

The lite group of commands allow you to run an embedded, lightweight DataHub instance for command line exploration of your metadata. This is intended more for developer tool oriented usage rather than as a production server instance for DataHub. See [DataHub Lite](./datahub_lite.md) for more information about how you can ingest metadata into DataHub Lite and explore your metadata easily.


### telemetry

To help us understand how people are using DataHub, we collect anonymous usage statistics on actions such as command invocations via Mixpanel.
Expand Down Expand Up @@ -640,7 +667,6 @@ External Entities Affected: None
Old Entities Migrated = {'urn:li:dataset:(urn:li:dataPlatform:hive,logging_events,PROD)', 'urn:li:dataset:(urn:li:dataPlatform:hive,SampleHiveDataset,PROD)', 'urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_deleted,PROD)', 'urn:li:dataset:(urn:li:dataPlatform:hive,fct_users_created,PROD)'}
```


## Alternate Installation Options

### Using docker
Expand Down Expand Up @@ -673,7 +699,7 @@ We use a plugin architecture so that you can install only the dependencies you a
Please see our [Integrations page](https://datahubproject.io/integrations) if you want to filter on the features offered by each source.

| Plugin Name | Install Command | Provides |
|------------------------------------------------------------------------------------------------| ---------------------------------------------------------- | --------------------------------------- |
| ---------------------------------------------------------------------------------------------- | ---------------------------------------------------------- | --------------------------------------- |
| [metadata-file](./generated/ingestion/sources/metadata-file.md) | _included by default_ | File source and sink |
| [athena](./generated/ingestion/sources/athena.md) | `pip install 'acryl-datahub[athena]'` | AWS Athena source |
| [bigquery](./generated/ingestion/sources/bigquery.md) | `pip install 'acryl-datahub[bigquery]'` | BigQuery source |
Expand Down Expand Up @@ -715,7 +741,7 @@ Please see our [Integrations page](https://datahubproject.io/integrations) if yo
### Sinks

| Plugin Name | Install Command | Provides |
|-------------------------------------------------------------------| -------------------------------------------- | -------------------------- |
| ----------------------------------------------------------------- | -------------------------------------------- | -------------------------- |
| [metadata-file](../metadata-ingestion/sink_docs/metadata-file.md) | _included by default_ | File source and sink |
| [console](../metadata-ingestion/sink_docs/console.md) | _included by default_ | Console sink |
| [datahub-rest](../metadata-ingestion/sink_docs/datahub.md) | `pip install 'acryl-datahub[datahub-rest]'` | DataHub sink over REST API |
Expand Down
150 changes: 89 additions & 61 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@
import datahub as datahub_package
from datahub.cli import cli_utils
from datahub.cli.config_utils import CONDENSED_DATAHUB_CONFIG_PATH
from datahub.configuration.common import ConfigModel
from datahub.configuration.config_loader import load_config_file
from datahub.emitter.mce_builder import datahub_guid
from datahub.ingestion.graph.client import get_default_graph
from datahub.ingestion.run.connection import ConnectionManager
from datahub.ingestion.run.pipeline import Pipeline
Expand Down Expand Up @@ -204,6 +206,23 @@ async def run_ingestion_and_check_upgrade() -> int:
# don't raise SystemExit if there's no error


def _make_ingestion_urn(name: str) -> str:
guid = datahub_guid(
{
"name": name,
}
)
return f"urn:li:dataHubIngestionSource:deploy-{guid}"


class DeployOptions(ConfigModel):
name: str
schedule: Optional[str] = None
time_zone: str = "UTC"
cli_version: Optional[str] = None
executor_id: str = "default"


@ingest.command()
@upgrade.check_upgrade
@telemetry.with_telemetry()
Expand All @@ -212,7 +231,6 @@ async def run_ingestion_and_check_upgrade() -> int:
"--name",
type=str,
help="Recipe Name",
required=True,
)
@click.option(
"-c",
Expand All @@ -224,7 +242,7 @@ async def run_ingestion_and_check_upgrade() -> int:
@click.option(
"--urn",
type=str,
help="Urn of recipe to update. Creates recipe if provided urn does not exist",
help="Urn of recipe to update. If not specified here or in the recipe's pipeline_name, this will create a new ingestion source.",
required=False,
)
@click.option(
Expand Down Expand Up @@ -256,7 +274,7 @@ async def run_ingestion_and_check_upgrade() -> int:
default="UTC",
)
def deploy(
name: str,
name: Optional[str],
config: str,
urn: Optional[str],
executor_id: str,
Expand All @@ -280,74 +298,84 @@ def deploy(
resolve_env_vars=False,
)

deploy_options_raw = pipeline_config.pop("deployment", None)
if deploy_options_raw is not None:
deploy_options = DeployOptions.parse_obj(deploy_options_raw)

if name:
logger.info(f"Overriding deployment name {deploy_options.name} with {name}")
deploy_options.name = name
else:
if not name:
raise click.UsageError(
"Either --name must be set or deployment_name specified in the config"
)
deploy_options = DeployOptions(name=name)

# Use remaining CLI args to override deploy_options
if schedule:
deploy_options.schedule = schedule
if time_zone:
deploy_options.time_zone = time_zone
if cli_version:
deploy_options.cli_version = cli_version
if executor_id:
deploy_options.executor_id = executor_id

logger.info(f"Using {repr(deploy_options)}")

if not urn:
# When urn/name is not specified, we will generate a unique urn based on the deployment name.
urn = _make_ingestion_urn(deploy_options.name)
logger.info(f"Using recipe urn: {urn}")

# Invariant - at this point, both urn and deploy_options are set.

variables: dict = {
"urn": urn,
"name": name,
"name": deploy_options.name,
"type": pipeline_config["source"]["type"],
"recipe": json.dumps(pipeline_config),
"executorId": executor_id,
"version": cli_version,
"executorId": deploy_options.executor_id,
"version": deploy_options.cli_version,
}

if schedule is not None:
variables["schedule"] = {"interval": schedule, "timezone": time_zone}

if urn:

graphql_query: str = textwrap.dedent(
"""
mutation updateIngestionSource(
$urn: String!,
$name: String!,
$type: String!,
$schedule: UpdateIngestionSourceScheduleInput,
$recipe: String!,
$executorId: String!
$version: String) {

updateIngestionSource(urn: $urn, input: {
name: $name,
type: $type,
schedule: $schedule,
config: {
recipe: $recipe,
executorId: $executorId,
version: $version,
}
})
}
"""
)
else:
logger.info("No URN specified recipe urn, will create a new recipe.")
graphql_query = textwrap.dedent(
"""
mutation createIngestionSource(
$name: String!,
$type: String!,
$schedule: UpdateIngestionSourceScheduleInput,
$recipe: String!,
$executorId: String!,
$version: String) {

createIngestionSource(input: {
name: $name,
type: $type,
schedule: $schedule,
config: {
recipe: $recipe,
executorId: $executorId,
version: $version,
}
})
}
"""
)
if deploy_options.schedule is not None:
variables["schedule"] = {
"interval": deploy_options.schedule,
"timezone": deploy_options.time_zone,
}

# The updateIngestionSource endpoint can actually do upserts as well.
graphql_query: str = textwrap.dedent(
"""
mutation updateIngestionSource(
$urn: String!,
$name: String!,
$type: String!,
$schedule: UpdateIngestionSourceScheduleInput,
$recipe: String!,
$executorId: String!
$version: String) {

updateIngestionSource(urn: $urn, input: {
name: $name,
type: $type,
schedule: $schedule,
config: {
recipe: $recipe,
executorId: $executorId,
version: $version,
}
})
}
"""
)

response = datahub_graph.execute_graphql(graphql_query, variables=variables)

click.echo(
f"✅ Successfully wrote data ingestion source metadata for recipe {name}:"
f"✅ Successfully wrote data ingestion source metadata for recipe {deploy_options.name}:"
)
click.echo(response)

Expand Down
Loading