Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(api/timeline): fix corner cases missed, add tests #11288

Merged
merged 3 commits into from
Sep 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,7 @@
import com.linkedin.schema.SchemaMetadata;
import jakarta.json.JsonPatch;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.*;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
Expand Down Expand Up @@ -208,160 +203,168 @@ private static List<ChangeEvent> getFieldPropertyChangeEvents(
return propChangeEvents;
}

// TODO: This could use some cleanup, lots of repeated logic and tenuous conditionals
private static Map<String, SchemaField> getSchemaFieldMap(SchemaFieldArray fieldArray) {
Map<String, SchemaField> fieldMap = new HashMap<>();
fieldArray.forEach(schemaField -> fieldMap.put(schemaField.getFieldPath(), schemaField));
return fieldMap;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a check that Map.keys().size is that same as the array size?
In other words confirm that there are no duplicate fieldPaths?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would that be necessary in a change event generator? Seems more like a validation to be done during ingestion time.

}

private static void processFieldPathDataTypeChange(
String baseFieldPath,
Urn datasetUrn,
ChangeCategory changeCategory,
AuditStamp auditStamp,
Map<String, SchemaField> baseFieldMap,
Map<String, SchemaField> targetFieldMap,
Set<String> processedBaseFields,
Set<String> processedTargetFields,
List<ChangeEvent> changeEvents) {
SchemaField curBaseField = baseFieldMap.get(baseFieldPath);
if (!targetFieldMap.containsKey(baseFieldPath)) {
return;
}
processedBaseFields.add(baseFieldPath);
processedTargetFields.add(baseFieldPath);
SchemaField curTargetField = targetFieldMap.get(baseFieldPath);
if (!curBaseField.getNativeDataType().equals(curTargetField.getNativeDataType())) {
// Non-backward compatible change + Major version bump
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
changeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.modifier(getSchemaFieldUrn(datasetUrn, curBaseField).toString())
.entityUrn(datasetUrn.toString())
.operation(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MAJOR)
.description(
String.format(
"%s native datatype of the field '%s' changed from '%s' to '%s'.",
BACKWARDS_INCOMPATIBLE_DESC,
getFieldPathV1(curTargetField),
curBaseField.getNativeDataType(),
curTargetField.getNativeDataType()))
.fieldPath(curBaseField.getFieldPath())
.fieldUrn(getSchemaFieldUrn(datasetUrn, curBaseField))
.nullable(curBaseField.isNullable())
.auditStamp(auditStamp)
.build());
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
}

private static void processFieldPathRename(
String baseFieldPath,
Urn datasetUrn,
ChangeCategory changeCategory,
AuditStamp auditStamp,
Map<String, SchemaField> baseFieldMap,
Map<String, SchemaField> targetFieldMap,
Set<String> processedBaseFields,
Set<String> processedTargetFields,
List<ChangeEvent> changeEvents,
Set<SchemaField> renamedFields) {

List<SchemaField> nonProcessedTargetSchemaFields = new ArrayList<>();
targetFieldMap.forEach(
(s, schemaField) -> {
if (!processedTargetFields.contains(s)) {
nonProcessedTargetSchemaFields.add(schemaField);
}
});

SchemaField curBaseField = baseFieldMap.get(baseFieldPath);
SchemaField renamedField =
findRenamedField(curBaseField, nonProcessedTargetSchemaFields, renamedFields);
processedBaseFields.add(baseFieldPath);
if (renamedField == null) {
processRemoval(changeCategory, changeEvents, datasetUrn, curBaseField, auditStamp);
} else {
if (!ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
return;
}
processedTargetFields.add(renamedField.getFieldPath());
changeEvents.add(generateRenameEvent(datasetUrn, curBaseField, renamedField, auditStamp));
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, renamedField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
}
}

private static Set<String> getNonProcessedFields(
Map<String, SchemaField> fieldMap, Set<String> processedFields) {
Set<String> nonProcessedFields = new HashSet<>(fieldMap.keySet());
nonProcessedFields.removeAll(processedFields);
return nonProcessedFields;
}

private static List<ChangeEvent> computeDiffs(
SchemaMetadata baseSchema,
SchemaMetadata targetSchema,
Urn datasetUrn,
ChangeCategory changeCategory,
AuditStamp auditStamp) {
// Sort the fields by their field path.
if (baseSchema != null) {
sortFieldsByPath(baseSchema);
}
if (targetSchema != null) {
sortFieldsByPath(targetSchema);
}

// Performs ordinal based diff, primarily based on fixed field ordinals and their types.
SchemaFieldArray baseFields =
(baseSchema != null ? baseSchema.getFields() : new SchemaFieldArray());
SchemaFieldArray targetFields =
targetSchema != null ? targetSchema.getFields() : new SchemaFieldArray();
int baseFieldIdx = 0;
int targetFieldIdx = 0;

Map<String, SchemaField> baseFieldMap = getSchemaFieldMap(baseFields);
Map<String, SchemaField> targetFieldMap = getSchemaFieldMap(targetFields);

Set<String> processedBaseFields = new HashSet<>();
Set<String> processedTargetFields = new HashSet<>();

List<ChangeEvent> changeEvents = new ArrayList<>();
Set<SchemaField> renamedFields = new HashSet<>();
while (baseFieldIdx < baseFields.size() && targetFieldIdx < targetFields.size()) {
SchemaField curBaseField = baseFields.get(baseFieldIdx);
SchemaField curTargetField = targetFields.get(targetFieldIdx);
// TODO: Re-evaluate ordinal processing?
int comparison = curBaseField.getFieldPath().compareTo(curTargetField.getFieldPath());
if (renamedFields.contains(curBaseField)) {
baseFieldIdx++;
} else if (renamedFields.contains(curTargetField)) {
targetFieldIdx++;
} else if (comparison == 0) {
// This is the same field. Check for change events from property changes.
if (!curBaseField.getNativeDataType().equals(curTargetField.getNativeDataType())) {
// Non-backward compatible change + Major version bump
if (ChangeCategory.TECHNICAL_SCHEMA.equals(changeCategory)) {
changeEvents.add(
DatasetSchemaFieldChangeEvent.schemaFieldChangeEventBuilder()
.category(ChangeCategory.TECHNICAL_SCHEMA)
.modifier(getSchemaFieldUrn(datasetUrn, curBaseField).toString())
.entityUrn(datasetUrn.toString())
.operation(ChangeOperation.MODIFY)
.semVerChange(SemanticChangeType.MAJOR)
.description(
String.format(
"%s native datatype of the field '%s' changed from '%s' to '%s'.",
BACKWARDS_INCOMPATIBLE_DESC,
getFieldPathV1(curTargetField),
curBaseField.getNativeDataType(),
curTargetField.getNativeDataType()))
.fieldPath(curBaseField.getFieldPath())
.fieldUrn(getSchemaFieldUrn(datasetUrn, curBaseField))
.nullable(curBaseField.isNullable())
.auditStamp(auditStamp)
.build());
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
++targetFieldIdx;
}
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
++targetFieldIdx;
} else if (comparison < 0) {
// Base Field was removed or was renamed. Non-backward compatible change + Major version
// bump
// Check for rename, if rename coincides with other modifications we assume drop/add.
// Assumes that two different fields on the same schema would not have the same description,
// terms,
// or tags and share the same type
SchemaField renamedField =
findRenamedField(
curBaseField,
targetFields.subList(targetFieldIdx, targetFields.size()),
renamedFields);
if (renamedField == null) {
processRemoval(changeCategory, changeEvents, datasetUrn, curBaseField, auditStamp);
++baseFieldIdx;
} else {
changeEvents.add(generateRenameEvent(datasetUrn, curBaseField, renamedField, auditStamp));
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
++baseFieldIdx;
renamedFields.add(renamedField);
}
} else {
// The targetField got added or a renaming occurred. Forward & backwards compatible change +
// minor version bump.
SchemaField renamedField =
findRenamedField(
curTargetField, baseFields.subList(baseFieldIdx, baseFields.size()), renamedFields);
if (renamedField == null) {
processAdd(changeCategory, changeEvents, datasetUrn, curTargetField, auditStamp);
++targetFieldIdx;
} else {
changeEvents.add(
generateRenameEvent(datasetUrn, renamedField, curTargetField, auditStamp));
List<ChangeEvent> propChangeEvents =
getFieldPropertyChangeEvents(
curBaseField, curTargetField, datasetUrn, changeCategory, auditStamp);
changeEvents.addAll(propChangeEvents);
++targetFieldIdx;
renamedFields.add(renamedField);
}
}
}
while (baseFieldIdx < baseFields.size()) {
// Handle removed fields. Non-backward compatible change + major version bump
SchemaField baseField = baseFields.get(baseFieldIdx);
if (!renamedFields.contains(baseField)) {
processRemoval(changeCategory, changeEvents, datasetUrn, baseField, auditStamp);
}
++baseFieldIdx;

for (String baseFieldPath : baseFieldMap.keySet()) {
processFieldPathDataTypeChange(
baseFieldPath,
datasetUrn,
changeCategory,
auditStamp,
baseFieldMap,
targetFieldMap,
processedBaseFields,
processedTargetFields,
changeEvents);
}
while (targetFieldIdx < targetFields.size()) {
// Newly added fields. Forwards & backwards compatible change + minor version bump.
SchemaField targetField = targetFields.get(targetFieldIdx);
if (!renamedFields.contains(targetField)) {
processAdd(changeCategory, changeEvents, datasetUrn, targetField, auditStamp);
}
targetFieldIdx++;
Set<String> nonProcessedBaseFields = getNonProcessedFields(baseFieldMap, processedBaseFields);
for (String baseFieldPath : nonProcessedBaseFields) {
processFieldPathRename(
baseFieldPath,
datasetUrn,
changeCategory,
auditStamp,
baseFieldMap,
targetFieldMap,
processedBaseFields,
processedTargetFields,
changeEvents,
renamedFields);
}

// Handle primary key constraint change events.
List<ChangeEvent> primaryKeyChangeEvents =
getPrimaryKeyChangeEvents(baseSchema, targetSchema, datasetUrn, auditStamp);
changeEvents.addAll(primaryKeyChangeEvents);
Set<String> nonProcessedTargetFields =
getNonProcessedFields(targetFieldMap, processedTargetFields);

// Handle foreign key constraint change events.
List<ChangeEvent> foreignKeyChangeEvents = getForeignKeyChangeEvents();
changeEvents.addAll(foreignKeyChangeEvents);
nonProcessedTargetFields.forEach(
fieldPath -> {
SchemaField curTargetField = targetFieldMap.get(fieldPath);
processAdd(changeCategory, changeEvents, datasetUrn, curTargetField, auditStamp);
});

return changeEvents;
}

private static void sortFieldsByPath(SchemaMetadata schemaMetadata) {
if (schemaMetadata == null) {
throw new IllegalArgumentException("SchemaMetadata should not be null");
}
List<SchemaField> schemaFields = new ArrayList<>(schemaMetadata.getFields());
schemaFields.sort(Comparator.comparing(SchemaField::getFieldPath));
schemaMetadata.setFields(new SchemaFieldArray(schemaFields));
}

private static SchemaField findRenamedField(
SchemaField curField, List<SchemaField> targetFields, Set<SchemaField> renamedFields) {
return targetFields.stream()
Expand Down Expand Up @@ -391,7 +394,14 @@ private static boolean parentFieldsMatch(SchemaField curField, SchemaField schem
}

private static boolean descriptionsMatch(SchemaField curField, SchemaField schemaField) {
return StringUtils.isNotBlank(curField.getDescription())
if (StringUtils.isBlank(curField.getDescription())
&& StringUtils.isBlank(schemaField.getDescription())) {
return true;
}
return !(StringUtils.isBlank(curField.getDescription())
&& StringUtils.isNotBlank(schemaField.getDescription()))
&& !(StringUtils.isNotBlank(curField.getDescription())
&& StringUtils.isBlank(schemaField.getDescription()))
&& curField.getDescription().equals(schemaField.getDescription());
}

Expand Down
Loading
Loading