Skip to content

Commit

Permalink
feat(cli): delete - add --only-soft-deleted option, perf improvements (
Browse files Browse the repository at this point in the history
  • Loading branch information
anshbansal authored Jul 27, 2022
1 parent d407904 commit 9b5afcc
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 40 deletions.
1 change: 1 addition & 0 deletions docs/how/delete-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ For now, this behaviour must be opted into by a prompt that will appear for you

You can optionally add `-n` or `--dry-run` to execute a dry run before issuing the final delete command.
You can optionally add `-f` or `--force` to skip confirmations
You can optionally add `--only-soft-deleted` flag to remove soft-deleted items only.

:::note

Expand Down
13 changes: 11 additions & 2 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,10 +366,11 @@ def post_delete_endpoint_with_session_and_url(

def get_urns_by_filter(
platform: Optional[str],
env: Optional[str],
env: Optional[str] = None,
entity_type: str = "dataset",
search_query: str = "*",
include_removed: bool = False,
only_soft_deleted: Optional[bool] = None,
) -> Iterable[str]:
session, gms_host = get_session_and_host()
endpoint: str = "/entities?action=search"
Expand Down Expand Up @@ -401,7 +402,15 @@ def get_urns_by_filter(
}
)

if include_removed:
if only_soft_deleted:
filter_criteria.append(
{
"field": "removed",
"value": "true",
"condition": "EQUAL",
}
)
elif include_removed:
filter_criteria.append(
{
"field": "removed",
Expand Down
108 changes: 70 additions & 38 deletions metadata-ingestion/src/datahub/cli/delete_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def delete_for_registry(
@click.option("--query", required=False, type=str)
@click.option("--registry-id", required=False, type=str)
@click.option("-n", "--dry-run", required=False, is_flag=True)
@click.option("--include-removed", required=False, is_flag=True)
@click.option("--only-soft-deleted", required=False, is_flag=True, default=False)
@upgrade.check_upgrade
@telemetry.with_telemetry
def delete(
Expand All @@ -107,7 +107,7 @@ def delete(
query: str,
registry_id: str,
dry_run: bool,
include_removed: bool,
only_soft_deleted: bool,
) -> None:
"""Delete metadata from datahub using a single urn or a combination of filters"""

Expand All @@ -118,7 +118,12 @@ def delete(
"You must provide either an urn or a platform or an env or a query for me to delete anything"
)

if not soft:
include_removed: bool
if soft:
# For soft-delete include-removed does not make any sense
include_removed = False
else:
# For hard-delete we always include the soft-deleted items
include_removed = True

# default query is set to "*" if not provided
Expand Down Expand Up @@ -183,11 +188,6 @@ def delete(
registry_id=registry_id, soft=soft, dry_run=dry_run
)
else:
# log warn include_removed + hard is the only way to work
if include_removed and soft:
logger.warning(
"A filtered delete including soft deleted entities is redundant, because it is a soft delete by default. Please use --include-removed in conjunction with --hard"
)
# Filter based delete
deletion_result = delete_with_filters(
env=env,
Expand All @@ -198,6 +198,7 @@ def delete(
search_query=query,
force=force,
include_removed=include_removed,
only_soft_deleted=only_soft_deleted,
)

if not dry_run:
Expand Down Expand Up @@ -229,6 +230,7 @@ def delete_with_filters(
entity_type: str = "dataset",
env: Optional[str] = None,
platform: Optional[str] = None,
only_soft_deleted: Optional[bool] = False,
) -> DeletionResult:

session, gms_host = cli_utils.get_session_and_host()
Expand All @@ -237,20 +239,43 @@ def delete_with_filters(
logger.info(f"datahub configured with {gms_host}")
emitter = rest_emitter.DatahubRestEmitter(gms_server=gms_host, token=token)
batch_deletion_result = DeletionResult()
urns = list(
cli_utils.get_urns_by_filter(
env=env,
platform=platform,
search_query=search_query,
entity_type=entity_type,
include_removed=include_removed,

urns: List[str] = []
if not only_soft_deleted:
urns = list(
cli_utils.get_urns_by_filter(
env=env,
platform=platform,
search_query=search_query,
entity_type=entity_type,
include_removed=False,
)
)
)

soft_deleted_urns: List[str] = []
if include_removed or only_soft_deleted:
soft_deleted_urns = list(
cli_utils.get_urns_by_filter(
env=env,
platform=platform,
search_query=search_query,
entity_type=entity_type,
only_soft_deleted=True,
)
)

final_message = ""
if len(urns) > 0:
final_message = f"{len(urns)} "
if len(urns) > 0 and len(soft_deleted_urns) > 0:
final_message += "and "
if len(soft_deleted_urns) > 0:
final_message = f"{len(soft_deleted_urns)} (soft-deleted) "

logger.info(
f"Filter matched {len(urns)} {entity_type} entities of {platform}. Sample: {choices(urns, k=min(5, len(urns)))}"
f"Filter matched {final_message} {entity_type} entities of {platform}. Sample: {choices(urns, k=min(5, len(urns)))}"
)
if len(urns) == 0:
if len(urns) == 0 and len(soft_deleted_urns) == 0:
click.echo(
f"No urns to delete. Maybe you want to change entity_type={entity_type} or platform={platform} to be something different?"
)
Expand All @@ -263,30 +288,36 @@ def delete_with_filters(
abort=True,
)

for urn in progressbar.progressbar(urns, redirect_stdout=True):
one_result = _delete_one_urn(
urn,
soft=soft,
entity_type=entity_type,
dry_run=dry_run,
cached_session_host=(session, gms_host),
cached_emitter=emitter,
)
batch_deletion_result.merge(one_result)
if len(urns) > 0:
for urn in progressbar.progressbar(urns, redirect_stdout=True):
one_result = _delete_one_urn(
urn,
soft=soft,
entity_type=entity_type,
dry_run=dry_run,
cached_session_host=(session, gms_host),
cached_emitter=emitter,
)
batch_deletion_result.merge(one_result)

if len(soft_deleted_urns) > 0 and not soft:
click.echo("Starting to delete soft-deleted URNs")
for urn in progressbar.progressbar(soft_deleted_urns, redirect_stdout=True):
one_result = _delete_one_urn(
urn,
soft=soft,
entity_type=entity_type,
dry_run=dry_run,
cached_session_host=(session, gms_host),
cached_emitter=emitter,
is_soft_deleted=True,
)
batch_deletion_result.merge(one_result)
batch_deletion_result.end()

return batch_deletion_result


def is_soft_deleted(urn: str) -> bool:
try:
return cli_utils.get_entity(urn=urn, aspect=["status"])["aspects"]["status"][
"value"
]["removed"]
except KeyError:
return False


def _delete_one_urn(
urn: str,
soft: bool = False,
Expand All @@ -296,10 +327,11 @@ def _delete_one_urn(
cached_emitter: Optional[rest_emitter.DatahubRestEmitter] = None,
run_id: str = "delete-run-id",
deletion_timestamp: int = _get_current_time(),
is_soft_deleted: Optional[bool] = None,
) -> DeletionResult:

soft_delete_msg: str = ""
if dry_run and is_soft_deleted(urn):
if dry_run and is_soft_deleted:
soft_delete_msg = "(soft-deleted)"

deletion_result = DeletionResult()
Expand Down

0 comments on commit 9b5afcc

Please sign in to comment.