Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion): Add execution request cleanup job #11765

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,13 @@
keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}}
soft_deleted_entities_cleanup:
retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}}
execution_request_cleanup:
keep_history_min_count: {{execution_request_cleanup.keep_history_min_count}}{{^execution_request_cleanup.keep_history_min_count}}10{{/execution_request_cleanup.keep_history_min_count}}
keep_history_max_count: {{execution_request_cleanup.keep_history_max_count}}{{^execution_request_cleanup.keep_history_max_count}}1000{{/execution_request_cleanup.keep_history_max_count}}
keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}30{{/execution_request_cleanup.keep_history_max_days}}
batch_read_size: {{execution_request_cleanup.batch_read_size}}{{^execution_request_cleanup.batch_read_size}}100{{/execution_request_cleanup.batch_read_size}}
enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}false{{/execution_request_cleanup.enabled}}
extraArgs: {}
debugMode: false
executorId: default
headers: {}
headers: {}
11 changes: 11 additions & 0 deletions metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,14 @@ source:
soft_deleted_entities_cleanup:
# Delete soft deleted entities which were deleted 10 days ago
retention_days: 10
execution_request_cleanup:
# Minimum number of execution requests to keep, per ingestion source
keep_history_min_count: 10
# Maximum number of execution requests to keep, per ingestion source
keep_history_max_count: 1000
# Maximum number of days to keep execution requests for, per ingestion source
keep_history_max_days: 30
# Number of records per read operation
batch_read_size: 100
# Global switch for this cleanup task
enabled: true
25 changes: 24 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
DataProcessCleanupConfig,
DataProcessCleanupReport,
)
from datahub.ingestion.source.gc.execution_request_cleanup import (
DatahubExecutionRequestCleanup,
DatahubExecutionRequestCleanupConfig,
DatahubExecutionRequestCleanupReport,
)
from datahub.ingestion.source.gc.soft_deleted_entity_cleanup import (
SoftDeletedEntitiesCleanup,
SoftDeletedEntitiesCleanupConfig,
Expand Down Expand Up @@ -70,9 +75,18 @@ class DataHubGcSourceConfig(ConfigModel):
description="Configuration for soft deleted entities cleanup",
)

execution_request_cleanup: Optional[DatahubExecutionRequestCleanupConfig] = Field(
default=None,
description="Configuration for execution request cleanup",
)


@dataclass
class DataHubGcSourceReport(DataProcessCleanupReport, SoftDeletedEntitiesReport):
class DataHubGcSourceReport(
DataProcessCleanupReport,
SoftDeletedEntitiesReport,
DatahubExecutionRequestCleanupReport,
):
expired_tokens_revoked: int = 0


Expand All @@ -97,6 +111,7 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig):
self.graph = ctx.require_graph("The DataHubGc source")
self.dataprocess_cleanup: Optional[DataProcessCleanup] = None
self.soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanup] = None
self.execution_request_cleanup: Optional[DatahubExecutionRequestCleanup] = None

if self.config.dataprocess_cleanup:
self.dataprocess_cleanup = DataProcessCleanup(
Expand All @@ -109,6 +124,12 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig):
self.report,
self.config.dry_run,
)
if self.config.execution_request_cleanup:
self.execution_request_cleanup = DatahubExecutionRequestCleanup(
config=self.config.execution_request_cleanup,
graph=self.graph,
report=self.report,
)

@classmethod
def create(cls, config_dict, ctx):
Expand All @@ -130,6 +151,8 @@ def get_workunits_internal(
yield from self.dataprocess_cleanup.get_workunits_internal()
if self.soft_deleted_entities_cleanup:
self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities()
if self.execution_request_cleanup:
self.execution_request_cleanup.run()
yield from []

def truncate_indices(self) -> None:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
import logging
import time
from typing import Any, Dict, Iterator, Optional

from pydantic import BaseModel, Field

from datahub.configuration.common import ConfigModel
from datahub.ingestion.api.source import SourceReport
from datahub.ingestion.graph.client import DataHubGraph

logger = logging.getLogger(__name__)

DATAHUB_EXECUTION_REQUEST_ENTITY_NAME = "dataHubExecutionRequest"
DATAHUB_EXECUTION_REQUEST_KEY_ASPECT_NAME = "dataHubExecutionRequestKey"
DATAHUB_EXECUTION_REQUEST_INPUT_ASPECT_NAME = "dataHubExecutionRequestInput"
DATAHUB_EXECUTION_REQUEST_RESULT_ASPECT_NAME = "dataHubExecutionRequestResult"


class DatahubExecutionRequestCleanupConfig(ConfigModel):
keep_history_min_count: int = Field(
10,
description="Minimum number of execution requests to keep, per ingestion source",
)

keep_history_max_count: int = Field(
1000,
description="Maximum number of execution requests to keep, per ingestion source",
)

keep_history_max_days: int = Field(
30,
description="Maximum number of days to keep execution requests for, per ingestion source",
)

batch_read_size: int = Field(
100,
description="Number of records per read operation",
)

enabled: bool = Field(
True,
description="Global switch for this cleanup task",
)

def keep_history_max_milliseconds(self):
return self.keep_history_max_days * 24 * 3600 * 1000


class DatahubExecutionRequestCleanupReport(SourceReport):
execution_request_cleanup_records_read: int = 0
execution_request_cleanup_records_preserved: int = 0
execution_request_cleanup_records_deleted: int = 0
execution_request_cleanup_read_errors: int = 0
execution_request_cleanup_delete_errors: int = 0


class CleanupRecord(BaseModel):
urn: str
request_id: str
status: str
ingestion_source: str
requested_at: int


class DatahubExecutionRequestCleanup:
def __init__(
self,
graph: DataHubGraph,
report: DatahubExecutionRequestCleanupReport,
config: Optional[DatahubExecutionRequestCleanupConfig] = None,
) -> None:

self.graph = graph
self.report = report
self.instance_id = int(time.time())

if config is not None:
self.config = config
else:
self.config = DatahubExecutionRequestCleanupConfig()

def _to_cleanup_record(self, entry: Dict) -> CleanupRecord:
input_aspect = (
entry.get("aspects", {})
.get(DATAHUB_EXECUTION_REQUEST_INPUT_ASPECT_NAME, {})
.get("value", {})
)
result_aspect = (
entry.get("aspects", {})
.get(DATAHUB_EXECUTION_REQUEST_RESULT_ASPECT_NAME, {})
.get("value", {})
)
key_aspect = (
entry.get("aspects", {})
.get(DATAHUB_EXECUTION_REQUEST_KEY_ASPECT_NAME, {})
.get("value", {})
)
return CleanupRecord(
urn=entry.get("urn"),
request_id=key_aspect.get("id"),
requested_at=input_aspect.get("requestedAt", 0),
status=result_aspect.get("status", "PENDING"),
ingestion_source=input_aspect.get("source", {}).get("ingestionSource", ""),
)

def _scroll_execution_requests(
self, overrides: Dict[str, Any] = {}
) -> Iterator[CleanupRecord]:
headers: Dict[str, Any] = {
"Accept": "application/json",
"Content-Type": "application/json",
}
params = {
"aspectNames": [
DATAHUB_EXECUTION_REQUEST_KEY_ASPECT_NAME,
DATAHUB_EXECUTION_REQUEST_INPUT_ASPECT_NAME,
DATAHUB_EXECUTION_REQUEST_RESULT_ASPECT_NAME,
],
"count": str(self.config.batch_read_size),
"sort": "requestTimeMs",
"sortOrder": "DESCENDING",
"systemMetadata": "false",
"skipCache": "true",
}
params.update(overrides)

while True:
try:
url = f"{self.graph.config.server}/openapi/v2/entity/{DATAHUB_EXECUTION_REQUEST_ENTITY_NAME}"
response = self.graph._session.get(url, headers=headers, params=params)
response.raise_for_status()
document = response.json()

entries = document.get("results", [])
for entry in entries:
yield self._to_cleanup_record(entry)

if "scrollId" not in document:
break
params["scrollId"] = document["scrollId"]
except Exception as e:
logger.error(
f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}"
)
self.report.execution_request_cleanup_read_errors += 1

def _scroll_garbage_records(self):
state: Dict[str, Dict] = {}

now_ms = int(time.time()) * 1000
running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000

for entry in self._scroll_execution_requests():
self.report.execution_request_cleanup_records_read += 1
key = entry.ingestion_source

# Always delete corrupted records
if not key:
logger.warning(
f"ergc({self.instance_id}): will delete corrupted entry with missing source key: {entry}"
)
yield entry
continue

if key not in state:
state[key] = {}
state[key]["cutoffTimestamp"] = (
entry.requested_at - self.config.keep_history_max_milliseconds()
)

state[key]["count"] = state[key].get("count", 0) + 1

# Do not delete if number of requests is below minimum
if state[key]["count"] < self.config.keep_history_min_count:
self.report.execution_request_cleanup_records_preserved += 1
continue

# Do not delete if number of requests do not exceed allowed maximum,
# or the cutoff date.
if (state[key]["count"] < self.config.keep_history_max_count) and (
entry.requested_at > state[key]["cutoffTimestamp"]
):
self.report.execution_request_cleanup_records_preserved += 1
continue

# Do not delete if status is RUNNING or PENDING and created within last month. If the record is >month old and it did not
# transition to a final state within that timeframe, it likely has no value.
if entry.requested_at > running_guard_timeout and entry.status in [
"RUNNING",
"PENDING",
]:
self.report.execution_request_cleanup_records_preserved += 1
continue

# Otherwise delete current record
logger.info(
(
f"ergc({self.instance_id}): going to delete {entry.request_id} in source {key}; "
f"source count: {state[key]['count']}; "
f"source cutoff: {state[key]['cutoffTimestamp']}; "
f"record timestamp: {entry.requested_at}."
)
)
self.report.execution_request_cleanup_records_deleted += 1
yield entry

def _delete_entry(self, entry: CleanupRecord) -> None:
try:
logger.info(
f"ergc({self.instance_id}): going to delete ExecutionRequest {entry.request_id}"
)
self.graph.delete_entity(entry.urn, True)
except Exception as e:
self.report.execution_request_cleanup_delete_errors += 1
logger.error(
f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}"
)

def run(self) -> None:
if not self.config.enabled:
logger.info(
f"ergc({self.instance_id}): ExecutionRequest cleaner is disabled."
)
return

logger.info(
(
f"ergc({self.instance_id}): Starting cleanup of ExecutionRequest records; "
f"max days: {self.config.keep_history_max_days}, "
f"min records: {self.config.keep_history_min_count}, "
f"max records: {self.config.keep_history_max_count}."
)
)

for entry in self._scroll_garbage_records():
self._delete_entry(entry)

logger.info(
f"ergc({self.instance_id}): Finished cleanup of ExecutionRequest records."
)
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ bootstrap:

# Ingestion Recipes
- name: ingestion-datahub-gc
version: v1
version: v2
optional: true
mcps_location: "bootstrap_mcps/ingestion-datahub-gc.yaml"
values_env: "DATAHUB_GC_BOOTSTRAP_VALUES"
values_env: "DATAHUB_GC_BOOTSTRAP_VALUES"
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@
keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}}
soft_deleted_entities_cleanup:
retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}}
extraArgs: {}
execution_request_cleanup:
keep_history_min_count: {{execution_request_cleanup.keep_history_min_count}}{{^execution_request_cleanup.keep_history_min_count}}10{{/execution_request_cleanup.keep_history_min_count}}
keep_history_max_count: {{execution_request_cleanup.keep_history_max_count}}{{^execution_request_cleanup.keep_history_max_count}}1000{{/execution_request_cleanup.keep_history_max_count}}
keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}30{{/execution_request_cleanup.keep_history_max_days}}
batch_read_size: {{execution_request_cleanup.batch_read_size}}{{^execution_request_cleanup.batch_read_size}}100{{/execution_request_cleanup.batch_read_size}}
enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}false{{/execution_request_cleanup.enabled}}
extraArgs: {}
debugMode: false
executorId: default
source:
Expand Down
Loading