Skip to content

Commit dddc744

Browse files
Merge branch 'master' into feat-ingestion-tableau-report-metrics
2 parents e585d14 + 65f44ef commit dddc744

File tree

9 files changed

+212
-13
lines changed

9 files changed

+212
-13
lines changed

.github/workflows/pr-labeler.yml

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,24 +29,24 @@ jobs:
2929
"swaroopjagadish",
3030
"treff7es",
3131
"yoonhyejin",
32-
"eboneil",
3332
"gabe-lyons",
3433
"hsheth2",
3534
"jjoyce0510",
3635
"maggiehays",
3736
"pedro93",
3837
"RyanHolstien",
3938
"sakethvarma397",
40-
"Kunal-kankriya",
4139
"purnimagarg1",
42-
"dushayntAW",
4340
"sagar-salvi-apptware",
4441
"kushagra-apptware",
4542
"Salman-Apptware",
4643
"mayurinehate",
4744
"noggi",
4845
"skrydal",
49-
"kevinkarchacryl"
46+
"kevinkarchacryl",
47+
"sgomezvillamor",
48+
"acrylJonny",
49+
"chakru-r"
5050
]'),
5151
github.actor
5252
)

datahub-web-react/src/app/ingest/source/executions/reporting/StructuredReportItem.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ const StyledCollapse = styled(Collapse)<{ color: string }>`
1616
.ant-collapse-header {
1717
display: flex;
1818
align-items: center;
19+
overflow: auto;
1920
}
2021
2122
.ant-collapse-item {

entity-registry/src/main/java/com/linkedin/metadata/models/StructuredPropertyUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ public static String toElasticsearchFieldName(
178178
/**
179179
* Return an elasticsearch type from structured property type
180180
*
181-
* @param fieldName filter or facet field name
181+
* @param fieldName filter or facet field name - must match actual FQN of structured prop
182182
* @param aspectRetriever aspect retriever
183183
* @return elasticsearch type
184184
*/

metadata-ingestion/setup.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,15 @@
142142
# datahub does not depend on traitlets directly but great expectations does.
143143
# https://github.com/ipython/traitlets/issues/741
144144
"traitlets!=5.2.2",
145+
# GE depends on IPython - we have no direct dependency on it.
146+
# IPython 8.22.0 added a dependency on traitlets 5.13.x, but only declared a
147+
# version requirement of traitlets>5.
148+
# See https://github.com/ipython/ipython/issues/14352.
149+
# This issue was fixed by https://github.com/ipython/ipython/pull/14353,
150+
# which first appeared in IPython 8.22.1.
151+
# As such, we just need to avoid that version in order to get the
152+
# dependencies that we need. IPython probably should've yanked 8.22.0.
153+
"IPython!=8.22.0",
145154
"greenlet",
146155
*cachetools_lib,
147156
}

metadata-ingestion/src/datahub/ingestion/source/pulsar.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,17 @@ class PulsarSchema:
7878
def __init__(self, schema):
7979
self.schema_version = schema.get("version")
8080

81-
avro_schema = json.loads(schema.get("data"))
81+
schema_data = schema.get("data")
82+
if not schema_data:
83+
logger.warning("Schema data is empty or None. Using default empty schema.")
84+
schema_data = "{}"
85+
86+
try:
87+
avro_schema = json.loads(schema_data)
88+
except json.JSONDecodeError as e:
89+
logger.error(f"Invalid JSON schema: {schema_data}. Error: {str(e)}")
90+
avro_schema = {}
91+
8292
self.schema_name = avro_schema.get("namespace") + "." + avro_schema.get("name")
8393
self.schema_description = avro_schema.get("doc")
8494
self.schema_type = schema.get("type")

metadata-io/src/main/java/com/linkedin/metadata/search/elasticsearch/query/request/AggregationQueryBuilder.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -379,7 +379,7 @@ private void addCriteriaFiltersToAggregationMetadata(
379379
}
380380
}
381381

382-
private void addCriterionFiltersToAggregationMetadata(
382+
public void addCriterionFiltersToAggregationMetadata(
383383
@Nonnull final Criterion criterion,
384384
@Nonnull final List<AggregationMetadata> aggregationMetadata,
385385
@Nullable AspectRetriever aspectRetriever) {
@@ -422,17 +422,32 @@ private void addCriterionFiltersToAggregationMetadata(
422422
value ->
423423
addMissingAggregationValueToAggregationMetadata(value, originalAggMetadata));
424424
}
425+
} else if (aggregationMetadataMap.containsKey(criterion.getField())) {
426+
/*
427+
* If we already have aggregations for the facet field (original field name), simply inject any missing values counts into the set.
428+
* If there are no results for a particular facet value, it will NOT be in the original aggregation set returned by
429+
* Elasticsearch.
430+
*/
431+
AggregationMetadata originalAggMetadata = aggregationMetadataMap.get(criterion.getField());
432+
criterion
433+
.getValues()
434+
.forEach(
435+
value -> addMissingAggregationValueToAggregationMetadata(value, originalAggMetadata));
425436
} else {
426437
/*
427438
* If we do not have ANY aggregation for the facet field, then inject a new aggregation metadata object for the
428439
* facet field.
429440
* If there are no results for a particular facet, it will NOT be in the original aggregation set returned by
430441
* Elasticsearch.
431442
*/
443+
// Simply replace suffix from original field when there are no aggregations for it. Prevents
444+
// bug where ES mappings for field are different from how we map the field back to UI
445+
// (ie. Structured Properties with dots in them)
446+
String facetField = ESUtils.replaceSuffix(criterion.getField());
432447
aggregationMetadata.add(
433448
buildAggregationMetadata(
434-
finalFacetField,
435-
getFacetToDisplayNames().getOrDefault(finalFacetField, finalFacetField),
449+
facetField,
450+
getFacetToDisplayNames().getOrDefault(facetField, facetField),
436451
new LongMap(
437452
criterion.getValues().stream().collect(Collectors.toMap(i -> i, i -> 0L))),
438453
new FilterValueArray(

metadata-io/src/main/java/com/linkedin/metadata/search/utils/ESUtils.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -448,9 +448,20 @@ public static String toParentField(
448448
urnDefinition.getFirst(), urnDefinition.getSecond()))
449449
.orElse(filterField);
450450

451+
return replaceSuffix(fieldName);
452+
}
453+
454+
/**
455+
* Strip subfields from filter field
456+
*
457+
* @param fieldName name of the field
458+
* @return normalized field name without subfields
459+
*/
460+
@Nonnull
461+
public static String replaceSuffix(@Nonnull final String fieldName) {
451462
for (String subfield : SUBFIELDS) {
452463
String SUFFIX = "." + subfield;
453-
if (filterField.endsWith(SUFFIX)) {
464+
if (fieldName.endsWith(SUFFIX)) {
454465
return fieldName.replace(SUFFIX, "");
455466
}
456467
}
@@ -710,7 +721,8 @@ private static QueryBuilder buildEqualsConditionFromCriterionWithValues(
710721
final Map<String, Set<SearchableAnnotation.FieldType>> searchableFieldTypes,
711722
@Nonnull AspectRetriever aspectRetriever,
712723
boolean enableCaseInsensitiveSearch) {
713-
Set<String> fieldTypes = getFieldTypes(searchableFieldTypes, fieldName, aspectRetriever);
724+
Set<String> fieldTypes =
725+
getFieldTypes(searchableFieldTypes, fieldName, criterion, aspectRetriever);
714726
if (fieldTypes.size() > 1) {
715727
log.warn(
716728
"Multiple field types for field name {}, determining best fit for set: {}",
@@ -753,12 +765,16 @@ private static QueryBuilder buildEqualsConditionFromCriterionWithValues(
753765
private static Set<String> getFieldTypes(
754766
Map<String, Set<SearchableAnnotation.FieldType>> searchableFields,
755767
String fieldName,
768+
@Nonnull final Criterion criterion,
756769
@Nullable AspectRetriever aspectRetriever) {
757770

758771
final Set<String> finalFieldTypes;
759772
if (fieldName.startsWith(STRUCTURED_PROPERTY_MAPPING_FIELD_PREFIX)) {
773+
// use criterion field here for structured props since fieldName has dots replaced with
774+
// underscores
760775
finalFieldTypes =
761-
StructuredPropertyUtils.toElasticsearchFieldType(fieldName, aspectRetriever);
776+
StructuredPropertyUtils.toElasticsearchFieldType(
777+
replaceSuffix(criterion.getField()), aspectRetriever);
762778
} else {
763779
Set<SearchableAnnotation.FieldType> fieldTypes =
764780
searchableFields.getOrDefault(fieldName.split("\\.")[0], Collections.emptySet());
@@ -782,7 +798,8 @@ private static RangeQueryBuilder buildRangeQueryFromCriterion(
782798
Condition condition,
783799
boolean isTimeseries,
784800
AspectRetriever aspectRetriever) {
785-
Set<String> fieldTypes = getFieldTypes(searchableFieldTypes, fieldName, aspectRetriever);
801+
Set<String> fieldTypes =
802+
getFieldTypes(searchableFieldTypes, fieldName, criterion, aspectRetriever);
786803

787804
// Determine criterion value, range query only accepts single value so take first value in
788805
// values if multiple

metadata-io/src/test/java/com/linkedin/metadata/search/query/request/AggregationQueryBuilderTest.java

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import static com.linkedin.metadata.Constants.DATA_TYPE_URN_PREFIX;
44
import static com.linkedin.metadata.Constants.STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME;
55
import static com.linkedin.metadata.utils.SearchUtil.*;
6+
import static org.mockito.ArgumentMatchers.any;
67
import static org.mockito.ArgumentMatchers.anySet;
78
import static org.mockito.ArgumentMatchers.eq;
89
import static org.mockito.Mockito.mock;
@@ -12,23 +13,36 @@
1213
import com.google.common.collect.ImmutableMap;
1314
import com.google.common.collect.ImmutableSet;
1415
import com.linkedin.common.urn.Urn;
16+
import com.linkedin.common.urn.UrnUtils;
17+
import com.linkedin.data.DataMap;
18+
import com.linkedin.data.template.LongMap;
1519
import com.linkedin.data.template.SetMode;
20+
import com.linkedin.data.template.StringArray;
1621
import com.linkedin.entity.Aspect;
1722
import com.linkedin.metadata.aspect.AspectRetriever;
1823
import com.linkedin.metadata.config.search.SearchConfiguration;
1924
import com.linkedin.metadata.models.EntitySpec;
2025
import com.linkedin.metadata.models.annotation.SearchableAnnotation;
26+
import com.linkedin.metadata.query.filter.Condition;
27+
import com.linkedin.metadata.query.filter.Criterion;
28+
import com.linkedin.metadata.search.AggregationMetadata;
29+
import com.linkedin.metadata.search.FilterValue;
30+
import com.linkedin.metadata.search.FilterValueArray;
2131
import com.linkedin.metadata.search.elasticsearch.query.request.AggregationQueryBuilder;
2232
import com.linkedin.r2.RemoteInvocationException;
2333
import com.linkedin.structured.StructuredPropertyDefinition;
2434
import io.datahubproject.test.metadata.context.TestOperationContexts;
2535
import java.net.URISyntaxException;
36+
import java.util.ArrayList;
2637
import java.util.Collections;
38+
import java.util.HashMap;
39+
import java.util.HashSet;
2740
import java.util.List;
2841
import java.util.Map;
2942
import java.util.Optional;
3043
import java.util.Set;
3144
import java.util.stream.Collectors;
45+
import org.mockito.Mockito;
3246
import org.opensearch.search.aggregations.AggregationBuilder;
3347
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
3448
import org.testng.Assert;
@@ -598,4 +612,94 @@ public void testMissingAggregation() {
598612
.equals(
599613
MISSING_SPECIAL_TYPE + AGGREGATION_SPECIAL_TYPE_DELIMITER + "test")));
600614
}
615+
616+
@Test
617+
public void testAddFiltersToMetadataWithStructuredPropsNoResults() {
618+
final Urn propertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:test_me.one");
619+
620+
SearchConfiguration config = new SearchConfiguration();
621+
config.setMaxTermBucketSize(25);
622+
623+
AggregationQueryBuilder builder =
624+
new AggregationQueryBuilder(
625+
config, ImmutableMap.of(mock(EntitySpec.class), ImmutableList.of()));
626+
627+
Criterion criterion =
628+
new Criterion()
629+
.setField("structuredProperties.test_me.one")
630+
.setValues(new StringArray("test123"))
631+
.setCondition(Condition.EQUAL);
632+
633+
AspectRetriever mockAspectRetriever = getMockAspectRetriever(propertyUrn);
634+
635+
final List<AggregationMetadata> aggregationMetadataList = new ArrayList<>();
636+
builder.addCriterionFiltersToAggregationMetadata(
637+
criterion, aggregationMetadataList, mockAspectRetriever);
638+
639+
// ensure we add the correct structured prop aggregation here
640+
Assert.assertEquals(aggregationMetadataList.size(), 1);
641+
// Assert.assertEquals(aggregationMetadataList.get(0).getEntity(), propertyUrn);
642+
Assert.assertEquals(
643+
aggregationMetadataList.get(0).getName(), "structuredProperties.test_me.one");
644+
Assert.assertEquals(aggregationMetadataList.get(0).getAggregations().size(), 1);
645+
Assert.assertEquals(aggregationMetadataList.get(0).getAggregations().get("test123"), 0);
646+
}
647+
648+
@Test
649+
public void testAddFiltersToMetadataWithStructuredPropsWithAggregations() {
650+
final Urn propertyUrn = UrnUtils.getUrn("urn:li:structuredProperty:test_me.one");
651+
652+
final AggregationMetadata aggregationMetadata = new AggregationMetadata();
653+
aggregationMetadata.setName("structuredProperties.test_me.one");
654+
FilterValue filterValue =
655+
new FilterValue().setValue("test123").setFiltered(false).setFacetCount(1);
656+
aggregationMetadata.setFilterValues(new FilterValueArray(filterValue));
657+
LongMap aggregations = new LongMap();
658+
aggregations.put("test123", 1L);
659+
aggregationMetadata.setAggregations(aggregations);
660+
661+
SearchConfiguration config = new SearchConfiguration();
662+
config.setMaxTermBucketSize(25);
663+
664+
AggregationQueryBuilder builder =
665+
new AggregationQueryBuilder(
666+
config, ImmutableMap.of(mock(EntitySpec.class), ImmutableList.of()));
667+
668+
Criterion criterion =
669+
new Criterion()
670+
.setField("structuredProperties.test_me.one")
671+
.setValues(new StringArray("test123"))
672+
.setCondition(Condition.EQUAL);
673+
674+
AspectRetriever mockAspectRetriever = getMockAspectRetriever(propertyUrn);
675+
676+
final List<AggregationMetadata> aggregationMetadataList = new ArrayList<>();
677+
aggregationMetadataList.add(aggregationMetadata);
678+
builder.addCriterionFiltersToAggregationMetadata(
679+
criterion, aggregationMetadataList, mockAspectRetriever);
680+
681+
Assert.assertEquals(aggregationMetadataList.size(), 1);
682+
Assert.assertEquals(
683+
aggregationMetadataList.get(0).getName(), "structuredProperties.test_me.one");
684+
Assert.assertEquals(aggregationMetadataList.get(0).getAggregations().size(), 1);
685+
Assert.assertEquals(aggregationMetadataList.get(0).getAggregations().get("test123"), 1);
686+
}
687+
688+
private AspectRetriever getMockAspectRetriever(Urn propertyUrn) {
689+
AspectRetriever mockAspectRetriever = Mockito.mock(AspectRetriever.class);
690+
Map<Urn, Map<String, Aspect>> mockResult = new HashMap<>();
691+
Map<String, Aspect> aspectMap = new HashMap<>();
692+
DataMap definition = new DataMap();
693+
definition.put("qualifiedName", "test_me.one");
694+
definition.put("valueType", "urn:li:dataType:datahub.string");
695+
Aspect definitionAspect = new Aspect(definition);
696+
aspectMap.put(STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME, definitionAspect);
697+
mockResult.put(propertyUrn, aspectMap);
698+
Set<Urn> urns = new HashSet<>();
699+
urns.add(propertyUrn);
700+
Mockito.when(mockAspectRetriever.getLatestAspectObjects(eq(urns), any()))
701+
.thenReturn(mockResult);
702+
703+
return mockAspectRetriever;
704+
}
601705
}

metadata-io/src/test/java/com/linkedin/metadata/search/utils/ESUtilsTest.java

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ public static void setup() throws RemoteInvocationException, URISyntaxException
4545
Urn abFghTenUrn = Urn.createFromString("urn:li:structuredProperty:ab.fgh.ten");
4646
Urn underscoresAndDotsUrn =
4747
Urn.createFromString("urn:li:structuredProperty:under.scores.and.dots_make_a_mess");
48+
Urn dateWithDotsUrn = Urn.createFromString("urn:li:structuredProperty:date_here.with_dot");
4849

4950
// legacy
5051
aspectRetriever = mock(AspectRetriever.class);
@@ -64,6 +65,18 @@ public static void setup() throws RemoteInvocationException, URISyntaxException
6465
STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME,
6566
new Aspect(structPropAbFghTenDefinition.data()))));
6667

68+
StructuredPropertyDefinition dateWithDotsDefinition = new StructuredPropertyDefinition();
69+
dateWithDotsDefinition.setVersion(null, SetMode.REMOVE_IF_NULL);
70+
dateWithDotsDefinition.setValueType(Urn.createFromString(DATA_TYPE_URN_PREFIX + "date"));
71+
dateWithDotsDefinition.setQualifiedName("date_here.with_dot");
72+
when(aspectRetriever.getLatestAspectObjects(eq(Set.of(dateWithDotsUrn)), anySet()))
73+
.thenReturn(
74+
Map.of(
75+
dateWithDotsUrn,
76+
Map.of(
77+
STRUCTURED_PROPERTY_DEFINITION_ASPECT_NAME,
78+
new Aspect(dateWithDotsDefinition.data()))));
79+
6780
StructuredPropertyDefinition structPropUnderscoresAndDotsDefinition =
6881
new StructuredPropertyDefinition();
6982
structPropUnderscoresAndDotsDefinition.setVersion(null, SetMode.REMOVE_IF_NULL);
@@ -895,6 +908,36 @@ public void testGetQueryBuilderFromNamespacedStructPropEqualsValueV1() {
895908
Assert.assertEquals(result.toString(), expected);
896909
}
897910

911+
@Test
912+
public void testGetQueryBuilderFromDatesWithDots() {
913+
914+
final Criterion singleValueCriterion =
915+
buildCriterion(
916+
"structuredProperties.date_here.with_dot", Condition.GREATER_THAN, "1731974400000");
917+
918+
OperationContext opContext = mock(OperationContext.class);
919+
when(opContext.getAspectRetriever()).thenReturn(aspectRetriever);
920+
QueryBuilder result =
921+
ESUtils.getQueryBuilderFromCriterion(
922+
singleValueCriterion, false, new HashMap<>(), opContext, QueryFilterRewriteChain.EMPTY);
923+
// structuredProperties.date_here_with_dot should not have .keyword at the end since this field
924+
// type is type long for dates
925+
String expected =
926+
"{\n"
927+
+ " \"range\" : {\n"
928+
+ " \"structuredProperties.date_here_with_dot\" : {\n"
929+
+ " \"from\" : 1731974400000,\n"
930+
+ " \"to\" : null,\n"
931+
+ " \"include_lower\" : false,\n"
932+
+ " \"include_upper\" : true,\n"
933+
+ " \"boost\" : 1.0,\n"
934+
+ " \"_name\" : \"structuredProperties.date_here.with_dot\"\n"
935+
+ " }\n"
936+
+ " }\n"
937+
+ "}";
938+
Assert.assertEquals(result.toString(), expected);
939+
}
940+
898941
@Test
899942
public void testGetQueryBuilderFromStructPropExists() {
900943
final Criterion singleValueCriterion = buildExistsCriterion("structuredProperties.ab.fgh.ten");

0 commit comments

Comments
 (0)