Skip to content

Commit

Permalink
feat(cli) Changes rollback behaviour to apply soft deletes by default (
Browse files Browse the repository at this point in the history
…#4358)

* Changes rollback behaviour to apply soft deletes by default

Summary:
Addresses feature request: Flag in delete command to only delete aspects touched by an ingestion run; add flag to nuke everything by modifying the default behaviour of a rollback operation which will not by default delete an entity if a keyAspect is being rolled-back.

Instead the key aspect is kept and a StatusAspect is upserted with removed=true, effectively making a soft delete.
Another PR will follow to perform garbage collection on these soft deleted entities.

To keep old behaviour, a new parameter to the cli ingest rollback endpoint: --hard-delete was added.

* Adds restli specs

* Fixes deleteAspect endpoint & adds support for nested transactions

* Enable regression test & fix docker-compose for local development

* Add generated quickstart

* Fix quickstart generation script

* Adds missing var env to docker-compose-without-neo4j

* Sets status removed=true when ingesting resources

* Adds soft deletes for ElasticSearch + soft delete flags across ingestion sub-commands

* Makes elastic search consistent

* Update tests with new behaviour

* apply review comments

* apply review comment

* Forces Elastic search to add documents with status removed false when ingesting

* Reset gradle properties to default

* Fix tests
  • Loading branch information
pedro93 authored Mar 15, 2022
1 parent ab0b516 commit e8f6c4c
Show file tree
Hide file tree
Showing 28 changed files with 536 additions and 102 deletions.
3 changes: 3 additions & 0 deletions docker/docker-compose-without-neo4j.yml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ services:
hostname: elasticsearch
ports:
- "9200:9200"
environment:
- discovery.type=single-node
- xpack.security.enabled=false
volumes:
- esdata:/usr/share/elasticsearch/data
healthcheck:
Expand Down
3 changes: 3 additions & 0 deletions docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ services:
hostname: elasticsearch
ports:
- "9200:9200"
environment:
- discovery.type=single-node
- xpack.security.enabled=false
volumes:
- esdata:/usr/share/elasticsearch/data
healthcheck:
Expand Down
4 changes: 1 addition & 3 deletions docker/elasticsearch/env/docker.env
Original file line number Diff line number Diff line change
@@ -1,3 +1 @@
discovery.type=single-node
xpack.security.enabled=false
ES_JAVA_OPTS=-Xms256m -Xmx256m -Dlog4j2.formatMsgNoLookups=true
ES_JAVA_OPTS="-Xms256m -Xmx256m -Dlog4j2.formatMsgNoLookups=true"
23 changes: 14 additions & 9 deletions docker/quickstart/generate_docker_quickstart.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from collections.abc import Mapping
from dotenv import dotenv_values
from yaml import Loader
from collections import OrderedDict


# Generates a merged docker-compose file with env variables inlined.
# Usage: python3 docker_compose_cli_gen.py ../docker-compose.yml ../docker-compose.override.yml ../docker-compose-gen.yml
Expand Down Expand Up @@ -46,23 +48,26 @@ def modify_docker_config(base_path, docker_yaml_config):
# 3. Resolve the .env values
env_vars = dotenv_values(env_file_path)

# 4. Add an "environment" block to YAML
service["environment"] = list(
f"{key}={value}" for key, value in env_vars.items()
)
# 4. Create an "environment" block if it does not exist
if "environment" not in service:
service["environment"] = list()

# 5. Append to an "environment" block to YAML
for key, value in env_vars.items():
service["environment"].append(f"{key}={value}")

# 5. Delete the "env_file" value
# 6. Delete the "env_file" value
del service["env_file"]

# 6. Delete build instructions
# 7. Delete build instructions
if "build" in service:
del service["build"]

# 7. Set memory limits
# 8. Set memory limits
if name in mem_limits:
service["mem_limit"] = mem_limits[name]

# 8. Correct relative paths for volume mounts
# 9. Correct relative paths for volume mounts
if "volumes" in service:
volumes = service["volumes"]
for i in range(len(volumes)):
Expand All @@ -72,7 +77,7 @@ def modify_docker_config(base_path, docker_yaml_config):
elif volumes[i].startswith("./"):
volumes[i] = "." + volumes[i]

# 8. Set docker compose version to 2.
# 9. Set docker compose version to 2.
# We need at least this version, since we use features like start_period for
# healthchecks and shell-like variable interpolation.
docker_yaml_config["version"] = "2.3"
Expand Down
9 changes: 9 additions & 0 deletions docs/how/delete-metadata.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,3 +99,12 @@ datahub ingest rollback --run-id <run-id>
```

to rollback all aspects added with this run and all entities created by this run.

:::note

Since datahub v0.8.29, the `rollback` endpoint will now perform a *soft delete* of the entities ingested by a given run `<run-id>`.
This was done to preserve potential changes that were made directly via DataHub's UI and not part of the ingestion run itself. Such that this information can be retrieved later on if a re-ingestion for the same deleted entity is done.

If you wish to keep old behaviour (hard delete), please use the `--hard-delete` flag (short-hand: `-d`).

:::
3 changes: 3 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ org.gradle.configureondemand=true
org.gradle.parallel=true
org.gradle.caching=false

# Increase gradle JVM memory to 2GB to allow tests to run locally
#org.gradle.jvmargs=-Xmx2000m

# Increase retries to 5 (from default of 3) and increase interval from 125ms to 1s.
org.gradle.internal.repository.max.retries=5
org.gradle.internal.repository.max.tentatives=5
Expand Down
16 changes: 12 additions & 4 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,14 @@ def parse_restli_response(response):
@ingest.command()
@click.argument("page_offset", type=int, default=0)
@click.argument("page_size", type=int, default=100)
@click.option(
"--include-soft-deletes",
is_flag=True,
default=False,
help="If enabled, will list ingestion runs which have been soft deleted",
)
@telemetry.with_telemetry
def list_runs(page_offset: int, page_size: int) -> None:
def list_runs(page_offset: int, page_size: int, include_soft_deletes: bool) -> None:
"""List recent ingestion runs to datahub"""

session, gms_host = get_session_and_host()
Expand All @@ -133,6 +139,7 @@ def list_runs(page_offset: int, page_size: int) -> None:
payload_obj = {
"pageOffset": page_offset,
"pageSize": page_size,
"includeSoft": include_soft_deletes,
}

payload = json.dumps(payload_obj)
Expand Down Expand Up @@ -163,7 +170,7 @@ def list_runs(page_offset: int, page_size: int) -> None:
def show(run_id: str) -> None:
"""Describe a provided ingestion run to datahub"""

payload_obj = {"runId": run_id, "dryRun": True}
payload_obj = {"runId": run_id, "dryRun": True, "hardDelete": True}
structured_rows, entities_affected, aspects_affected = post_rollback_endpoint(
payload_obj, "/runs?action=rollback"
)
Expand All @@ -190,8 +197,9 @@ def show(run_id: str) -> None:
@click.option("--run-id", required=True, type=str)
@click.option("-f", "--force", required=False, is_flag=True)
@click.option("--dry-run", "-n", required=False, is_flag=True, default=False)
@click.option("--hard-delete", "-d", required=False, is_flag=True, default=False)
@telemetry.with_telemetry
def rollback(run_id: str, force: bool, dry_run: bool) -> None:
def rollback(run_id: str, force: bool, dry_run: bool, hard_delete: bool) -> None:
"""Rollback a provided ingestion run to datahub"""

cli_utils.test_connectivity_complain_exit("ingest")
Expand All @@ -202,7 +210,7 @@ def rollback(run_id: str, force: bool, dry_run: bool) -> None:
abort=True,
)

payload_obj = {"runId": run_id, "dryRun": dry_run}
payload_obj = {"runId": run_id, "dryRun": dry_run, "hardDelete": hard_delete}
structured_rows, entities_affected, aspects_affected = post_rollback_endpoint(
payload_obj, "/runs?action=rollback"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.google.common.collect.Streams;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.BrowsePaths;
import com.linkedin.common.Status;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.schema.RecordDataSchema;
import com.linkedin.data.schema.TyperefDataSchema;
Expand Down Expand Up @@ -104,6 +105,7 @@ public abstract class EntityService {
public static final String DEFAULT_RUN_ID = "no-run-id-provided";
public static final String BROWSE_PATHS = "browsePaths";
public static final String DATA_PLATFORM_INSTANCE = "dataPlatformInstance";
public static final String STATUS = "status";

protected EntityService(@Nonnull final EntityEventProducer producer, @Nonnull final EntityRegistry entityRegistry) {
_producer = producer;
Expand Down Expand Up @@ -658,6 +660,11 @@ public List<Pair<String, RecordTemplate>> generateDefaultAspectsIfMissing(@Nonnu
aspectsToGet.add(DATA_PLATFORM_INSTANCE);
}

boolean shouldHaveStatusSet = isAspectProvided(entityType, STATUS, includedAspects);
if (shouldHaveStatusSet) {
aspectsToGet.add(STATUS);
}

List<Pair<String, RecordTemplate>> aspects = new ArrayList<>();
final String keyAspectName = getKeyAspectName(urn);
aspectsToGet.add(keyAspectName);
Expand Down Expand Up @@ -686,6 +693,12 @@ public List<Pair<String, RecordTemplate>> generateDefaultAspectsIfMissing(@Nonnu
.ifPresent(aspect -> aspects.add(Pair.of(DATA_PLATFORM_INSTANCE, aspect)));
}

if (shouldHaveStatusSet && latestAspects.get(STATUS) != null) {
Status status = new Status();
status.setRemoved(false);
aspects.add(Pair.of(STATUS, status));
}

return aspects;
}

Expand Down Expand Up @@ -821,12 +834,12 @@ public Set<String> getEntityAspectNames(final String entityName) {

public abstract void setWritable(boolean canWrite);

public RollbackRunResult rollbackRun(List<AspectRowSummary> aspectRows, String runId) {
return rollbackWithConditions(aspectRows, Collections.singletonMap("runId", runId));
public RollbackRunResult rollbackRun(List<AspectRowSummary> aspectRows, String runId, boolean hardDelete) {
return rollbackWithConditions(aspectRows, Collections.singletonMap("runId", runId), hardDelete);
}

public abstract RollbackRunResult rollbackWithConditions(List<AspectRowSummary> aspectRows,
Map<String, String> conditions);
Map<String, String> conditions, boolean hardDelete);

public abstract RollbackRunResult deleteUrn(Urn urn);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.ebean.RawSql;
import io.ebean.RawSqlBuilder;
import io.ebean.Transaction;
import io.ebean.TxScope;
import io.ebean.annotation.TxIsolation;
import java.net.URISyntaxException;
import java.sql.Timestamp;
Expand Down Expand Up @@ -443,7 +444,7 @@ public <T> T runInTransactionWithRetry(@Nonnull final Supplier<T> block, final i

T result = null;
do {
try (Transaction transaction = _server.beginTransaction(TxIsolation.REPEATABLE_READ)) {
try (Transaction transaction = _server.beginTransaction(TxScope.requiresNew().setIsolation(TxIsolation.REPEATABLE_READ))) {
transaction.setBatchMode(true);
result = block.get();
transaction.commit();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import com.linkedin.common.AuditStamp;
import com.linkedin.common.Status;
import com.linkedin.common.UrnArray;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.schema.RecordDataSchema;
Expand All @@ -14,6 +15,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.EnvelopedAspect;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.aspect.Aspect;
import com.linkedin.metadata.aspect.VersionedAspect;
import com.linkedin.metadata.entity.EntityService;
Expand All @@ -26,11 +28,17 @@
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.query.ListUrnsResult;
import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.utils.AuditStampUtils;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.PegasusUtils;
import com.linkedin.metadata.utils.SystemMetadataUtils;
import com.linkedin.mxe.MetadataAuditOperation;
import com.linkedin.mxe.SystemMetadata;
import com.linkedin.util.Pair;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.net.URISyntaxException;
import java.sql.Timestamp;
import java.util.ArrayList;
Expand All @@ -45,13 +53,11 @@
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

import static com.linkedin.metadata.Constants.*;
import static com.linkedin.metadata.Constants.ASPECT_LATEST_VERSION;
import static com.linkedin.metadata.Constants.SYSTEM_ACTOR;
import static com.linkedin.metadata.entity.ebean.EbeanUtils.*;
import static com.linkedin.metadata.utils.PegasusUtils.*;
import static com.linkedin.metadata.utils.PegasusUtils.urnToEntityName;


/**
Expand All @@ -72,8 +78,9 @@ public EbeanEntityService(@Nonnull final EbeanAspectDao entityDao, @Nonnull fina
_entityDao = entityDao;
}

@Nonnull Map<String, EbeanAspectV2> getLatestEbeanAspectForUrn(@Nonnull final Urn urn,
@Nonnull final Set<String> aspectNames) {
@Nonnull
Map<String, EbeanAspectV2> getLatestEbeanAspectForUrn(@Nonnull final Urn urn,
@Nonnull final Set<String> aspectNames) {
Set<Urn> urns = new HashSet<>();
urns.add(urn);

Expand Down Expand Up @@ -469,11 +476,14 @@ private boolean filterMatch(SystemMetadata systemMetadata, Map<String, String> c
}

@Nullable
public RollbackResult deleteAspect(String urn, String aspectName, Map<String, String> conditions) {
public RollbackResult deleteAspect(String urn, String aspectName, Map<String, String> conditions, boolean hardDelete) {
// Validate pre-conditions before running queries
Urn entityUrn;
EntitySpec entitySpec;
try {
String entityName = PegasusUtils.urnToEntityName(Urn.createFromString(urn));
EntitySpec entitySpec = getEntityRegistry().getEntitySpec(entityName);
entityUrn = Urn.createFromString(urn);
String entityName = PegasusUtils.urnToEntityName(entityUrn);
entitySpec = getEntityRegistry().getEntitySpec(entityName);
Preconditions.checkState(entitySpec != null, String.format("Could not find entity definition for %s", entityName));
Preconditions.checkState(entitySpec.hasAspect(aspectName), String.format("Could not find aspect %s in definition for %s", aspectName, entityName));
} catch (URISyntaxException uriSyntaxException) {
Expand Down Expand Up @@ -543,8 +553,18 @@ public RollbackResult deleteAspect(String urn, String aspectName, Map<String, St
_entityDao.deleteAspect(survivingAspect);
} else {
if (isKeyAspect) {
// If this is the key aspect, delete the entity entirely.
additionalRowsDeleted = _entityDao.deleteUrn(urn);
if (hardDelete) {
// If this is the key aspect, delete the entity entirely.
additionalRowsDeleted = _entityDao.deleteUrn(urn);
} else if (entitySpec.hasAspect(Constants.STATUS_ASPECT_NAME)) {
// soft delete by setting status.removed=true (if applicable)
final Status statusAspect = new Status();
statusAspect.setRemoved(true);
final SystemMetadata systemMetadata = SystemMetadataUtils.createDefaultSystemMetadata();
final AuditStamp auditStamp = AuditStampUtils.createDefaultAuditStamp();

this.ingestAspect(entityUrn, Constants.STATUS_ASPECT_NAME, statusAspect, auditStamp, systemMetadata);
}
} else {
// Else, only delete the specific aspect.
_entityDao.deleteAspect(latest);
Expand All @@ -562,6 +582,10 @@ public RollbackResult deleteAspect(String urn, String aspectName, Map<String, St
survivingAspect.getKey().getAspect(), previousMetadata, getEntityRegistry());

final Urn urnObj = Urn.createFromString(urn);
// We are not deleting key aspect if hardDelete has not been set so do not return a rollback result
if (isKeyAspect && !hardDelete) {
return null;
}
return new RollbackResult(urnObj, urnObj.getEntityType(), latest.getAspect(), latestValue,
previousValue == null ? latestValue : previousValue, latestSystemMetadata,
previousValue == null ? null : parseSystemMetadata(survivingAspect.getSystemMetadata()),
Expand All @@ -575,19 +599,19 @@ public RollbackResult deleteAspect(String urn, String aspectName, Map<String, St
}

@Override
public RollbackRunResult rollbackRun(List<AspectRowSummary> aspectRows, String runId) {
return rollbackWithConditions(aspectRows, Collections.singletonMap("runId", runId));
public RollbackRunResult rollbackRun(List<AspectRowSummary> aspectRows, String runId, boolean hardDelete) {
return rollbackWithConditions(aspectRows, Collections.singletonMap("runId", runId), hardDelete);
}

@Override
public RollbackRunResult rollbackWithConditions(List<AspectRowSummary> aspectRows, Map<String, String> conditions) {
public RollbackRunResult rollbackWithConditions(List<AspectRowSummary> aspectRows, Map<String, String> conditions, boolean hardDelete) {
List<AspectRowSummary> removedAspects = new ArrayList<>();
AtomicInteger rowsDeletedFromEntityDeletion = new AtomicInteger(0);

aspectRows.forEach(aspectToRemove -> {

RollbackResult result = deleteAspect(aspectToRemove.getUrn(), aspectToRemove.getAspectName(),
conditions);
conditions, hardDelete);
if (result != null) {
Optional<AspectSpec> aspectSpec = getAspectSpec(result.entityName, result.aspectName);
if (!aspectSpec.isPresent()) {
Expand Down Expand Up @@ -621,7 +645,8 @@ public RollbackRunResult deleteUrn(Urn urn) {
}

SystemMetadata latestKeySystemMetadata = parseSystemMetadata(latestKey.getSystemMetadata());
RollbackResult result = deleteAspect(urn.toString(), keyAspectName, Collections.singletonMap("runId", latestKeySystemMetadata.getRunId()));
RollbackResult result = deleteAspect(urn.toString(), keyAspectName, Collections.singletonMap("runId",
latestKeySystemMetadata.getRunId()), true);

if (result != null) {
AspectRowSummary summary = new AspectRowSummary();
Expand Down
Loading

0 comments on commit e8f6c4c

Please sign in to comment.