Skip to content

Commit

Permalink
refactor(metadata-io): introduce a storage-independent in-memory enti…
Browse files Browse the repository at this point in the history
…ty aspect model (datahub-project#4957)
  • Loading branch information
Justin Marozas authored and maggiehays committed Aug 1, 2022
1 parent c82d13c commit 16625c4
Show file tree
Hide file tree
Showing 54 changed files with 2,242 additions and 2,789 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public NoCodeUpgrade(
entityRegistry,
systemAuthentication,
entityClient);
_cleanupSteps = buildCleanupSteps(server);
_cleanupSteps = buildCleanupSteps();
}

@Override
Expand All @@ -56,7 +56,7 @@ public List<UpgradeCleanupStep> cleanupSteps() {
return _cleanupSteps;
}

private List<UpgradeCleanupStep> buildCleanupSteps(final EbeanServer server) {
private List<UpgradeCleanupStep> buildCleanupSteps() {
return Collections.emptyList();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.linkedin.datahub.upgrade.UpgradeContext;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.metadata.entity.AspectStorageValidationUtil;
import com.linkedin.metadata.entity.ebean.AspectStorageValidationUtil;
import io.ebean.EbeanServer;
import java.util.function.Function;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.UpgradeStepResult;
import com.linkedin.datahub.upgrade.impl.DefaultUpgradeStepResult;
import com.linkedin.metadata.entity.AspectStorageValidationUtil;
import com.linkedin.metadata.entity.ebean.AspectStorageValidationUtil;
import io.ebean.EbeanServer;
import java.util.function.Function;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
return (context) -> {

context.report().addLine("Sending MAE from local DB...");
final int rowCount = _server.find(EbeanAspectV2.class).where().eq(EbeanAspectV2.VERSION_COLUMN, 0).findCount();
final int rowCount = _server.find(EbeanAspectV2.class).where().eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION).findCount();
context.report().addLine(String.format("Found %s latest aspects in aspects table", rowCount));

int totalRowsMigrated = 0;
Expand Down Expand Up @@ -141,7 +141,7 @@ private PagedList<EbeanAspectV2> getPagedAspects(final int start, final int page
return _server.find(EbeanAspectV2.class)
.select(EbeanAspectV2.ALL_COLUMNS)
.where()
.eq(EbeanAspectV2.VERSION_COLUMN, 0)
.eq(EbeanAspectV2.VERSION_COLUMN, ASPECT_LATEST_VERSION)
.orderBy()
.asc(EbeanAspectV2.URN_COLUMN)
.orderBy()
Expand Down
105 changes: 104 additions & 1 deletion metadata-io/src/main/java/com/linkedin/metadata/entity/AspectDao.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,108 @@
package com.linkedin.metadata.entity;

import com.linkedin.common.urn.Urn;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.sql.Timestamp;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;

/**
* An interface specifying create, update, and read operations against metadata entity aspects.
* This interface is meant to abstract away the storage concerns of these pieces of metadata, permitting any underlying
* storage system to be used.
*
* Requirements for any implementation:
* 1. Being able to map its internal storage representation to {@link EntityAspect};
* 2. Honor the internal versioning semantics. The latest version of any aspect is set to 0 for efficient retrieval.
* In most cases only the latest state of an aspect will be fetched. See {@link EntityService} for more details.
*
* TODO: This interface exposes {@link #runInTransactionWithRetry(Supplier, int)} because {@link EntityService} concerns
* itself with batching multiple commands into a single transaction. It exposes storage concerns somewhat and it'd be
* worth looking into ways to move this responsibility inside {@link AspectDao} implementations.
*/
public interface AspectDao {
EntityAspect getAspect(String urn, String aspectName, long version);

@Nullable
EntityAspect getAspect(@Nonnull final String urn, @Nonnull final String aspectName, final long version);

@Nullable
EntityAspect getAspect(@Nonnull final EntityAspectIdentifier key);

@Nonnull
Map<EntityAspectIdentifier, EntityAspect> batchGet(@Nonnull final Set<EntityAspectIdentifier> keys);

@Nonnull
List<EntityAspect> getAspectsInRange(@Nonnull Urn urn, Set<String> aspectNames, long startTimeMillis, long endTimeMillis);

@Nullable
EntityAspect getLatestAspect(@Nonnull final String urn, @Nonnull final String aspectName);

void saveAspect(
@Nonnull final String urn,
@Nonnull final String aspectName,
@Nonnull final String aspectMetadata,
@Nonnull final String actor,
@Nullable final String impersonator,
@Nonnull final Timestamp timestamp,
@Nonnull final String systemMetadata,
final long version,
final boolean insert);

void saveAspect(@Nonnull final EntityAspect aspect, final boolean insert);

long saveLatestAspect(
@Nonnull final String urn,
@Nonnull final String aspectName,
@Nullable final String oldAspectMetadata,
@Nullable final String oldActor,
@Nullable final String oldImpersonator,
@Nullable final Timestamp oldTime,
@Nullable final String oldSystemMetadata,
@Nonnull final String newAspectMetadata,
@Nonnull final String newActor,
@Nullable final String newImpersonator,
@Nonnull final Timestamp newTime,
@Nullable final String newSystemMetadata,
final Long nextVersion);

void deleteAspect(@Nonnull final EntityAspect aspect);

@Nonnull
ListResult<String> listUrns(
@Nonnull final String entityName,
@Nonnull final String aspectName,
final int start,
final int pageSize);

int deleteUrn(@Nonnull final String urn);

@Nonnull
ListResult<String> listLatestAspectMetadata(
@Nonnull final String entityName,
@Nonnull final String aspectName,
final int start,
final int pageSize);

@Nonnull
ListResult<String> listAspectMetadata(
@Nonnull final String entityName,
@Nonnull final String aspectName,
final long version,
final int start,
final int pageSize);

long getNextVersion(@Nonnull final String urn, @Nonnull final String aspectName);

Map<String, Long> getNextVersions(@Nonnull final String urn, @Nonnull final Set<String> aspectNames);

long getMaxVersion(@Nonnull final String urn, @Nonnull final String aspectName);

void setWritable(boolean canWrite);

@Nonnull
<T> T runInTransactionWithRetry(@Nonnull final Supplier<T> block, final int maxTransactionRetry);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package com.linkedin.metadata.entity;

import javax.annotation.Nonnull;

/**
* This interface is a split-off from {@link AspectDao} to segregate the methods that are only called by data migration
* tasks. This separation is not technically necessary, but it felt dangerous to leave entire-table queries mixed
* with the rest.
*/
public interface AspectMigrationsDao {

/**
* Return a paged list of _all_ URNs in the database.
* @param start Start offset of a page.
* @param pageSize Number of records in a page.
* @return An iterable of {@code String} URNs.
*/
@Nonnull
Iterable<String> listAllUrns(final int start, final int pageSize);

/**
* Return the count of entities (unique URNs) in the database.
* @return Count of entities.
*/
long countEntities();

/**
* Check if any record of given {@param aspectName} exists in the database.
* @param aspectName Name of an entity aspect to search for.
* @return {@code true} if at least one record of given {@param aspectName} is found. {@code false} otherwise.
*/
boolean checkIfAspectExists(@Nonnull final String aspectName);
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,46 @@
package com.linkedin.metadata.entity;

public interface EntityAspect {
String getSystemMetadata();
import lombok.AllArgsConstructor;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;

import javax.annotation.Nonnull;
import java.sql.Timestamp;

/**
* This is an internal representation of an entity aspect record {@link EntityService} and {@link AspectDao}
* implementations are using. While {@link AspectDao} implementations have their own aspect record implementations,
* they cary implementation details that should not leak outside. Therefore, this is the type to use in public
* {@link AspectDao} methods.
*/
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode
public class EntityAspect {

@Nonnull
private String urn;

@Nonnull
private String aspect;

private long version;

private String metadata;

private String systemMetadata;

private Timestamp createdOn;

private String createdBy;

private String createdFor;

public EntityAspectIdentifier toAspectIdentifier() {
return new EntityAspectIdentifier(getUrn(), getAspect(), getVersion());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.linkedin.metadata.entity;

import lombok.Value;

import javax.annotation.Nonnull;

/**
* This class holds values required to construct a unique key to identify an entity aspect record in a database.
* Its existence started mainly for compatibility with {@link com.linkedin.metadata.entity.ebean.EbeanAspectV2.PrimaryKey}
*/
@Value
public class EntityAspectIdentifier {
@Nonnull String urn;
@Nonnull String aspect;
long version;
}
Loading

0 comments on commit 16625c4

Please sign in to comment.