Skip to content

Commit ae48bd1

Browse files
fix(mae): fix mae standalone platform consumer (#10352)
1 parent b9a34fe commit ae48bd1

File tree

5 files changed

+235
-171
lines changed

5 files changed

+235
-171
lines changed

metadata-io/src/main/java/com/linkedin/metadata/service/BusinessAttributeUpdateHookService.java

Lines changed: 82 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -6,53 +6,53 @@
66

77
import com.google.common.collect.ImmutableSet;
88
import com.linkedin.businessattribute.BusinessAttributes;
9-
import com.linkedin.common.AuditStamp;
109
import com.linkedin.common.urn.Urn;
11-
import com.linkedin.entity.EnvelopedAspect;
10+
import com.linkedin.common.urn.UrnUtils;
11+
import com.linkedin.entity.Aspect;
1212
import com.linkedin.events.metadata.ChangeType;
1313
import com.linkedin.metadata.Constants;
14+
import com.linkedin.metadata.aspect.AspectRetriever;
15+
import com.linkedin.metadata.aspect.GraphRetriever;
16+
import com.linkedin.metadata.aspect.models.graph.Edge;
17+
import com.linkedin.metadata.aspect.models.graph.RelatedEntitiesScrollResult;
1418
import com.linkedin.metadata.aspect.models.graph.RelatedEntity;
15-
import com.linkedin.metadata.entity.EntityService;
16-
import com.linkedin.metadata.graph.GraphService;
17-
import com.linkedin.metadata.graph.RelatedEntitiesResult;
18-
import com.linkedin.metadata.models.AspectSpec;
19-
import com.linkedin.metadata.models.registry.EntityRegistry;
2019
import com.linkedin.metadata.query.filter.RelationshipDirection;
2120
import com.linkedin.metadata.utils.GenericRecordUtils;
21+
import com.linkedin.metadata.utils.PegasusUtils;
2222
import com.linkedin.mxe.PlatformEvent;
2323
import com.linkedin.platform.event.v1.EntityChangeEvent;
2424
import io.datahubproject.metadata.context.OperationContext;
2525
import java.util.Arrays;
26+
import java.util.Map;
2627
import java.util.Set;
28+
import java.util.function.Consumer;
29+
import java.util.stream.Collectors;
2730
import lombok.extern.slf4j.Slf4j;
2831
import org.springframework.beans.factory.annotation.Value;
2932
import org.springframework.lang.NonNull;
33+
import org.springframework.lang.Nullable;
3034
import org.springframework.stereotype.Component;
3135

3236
@Slf4j
3337
@Component
3438
public class BusinessAttributeUpdateHookService {
3539
private static final String BUSINESS_ATTRIBUTE_OF = "BusinessAttributeOf";
3640

37-
private final GraphService graphService;
38-
private final EntityService<?> entityService;
39-
private final EntityRegistry entityRegistry;
40-
41+
private final UpdateIndicesService updateIndicesService;
4142
private final int relatedEntitiesCount;
43+
private final int getRelatedEntitiesBatchSize;
4244

4345
public static final String TAG = "TAG";
4446
public static final String GLOSSARY_TERM = "GLOSSARY_TERM";
4547
public static final String DOCUMENTATION = "DOCUMENTATION";
4648

4749
public BusinessAttributeUpdateHookService(
48-
GraphService graphService,
49-
EntityService<?> entityService,
50-
EntityRegistry entityRegistry,
51-
@NonNull @Value("${businessAttribute.fetchRelatedEntitiesCount}") int relatedEntitiesCount) {
52-
this.graphService = graphService;
53-
this.entityService = entityService;
54-
this.entityRegistry = entityRegistry;
50+
@NonNull UpdateIndicesService updateIndicesService,
51+
@NonNull @Value("${businessAttribute.fetchRelatedEntitiesCount}") int relatedEntitiesCount,
52+
@NonNull @Value("${businessAttribute.fetchRelatedEntitiesBatchSize}") int relatedBatchSize) {
53+
this.updateIndicesService = updateIndicesService;
5554
this.relatedEntitiesCount = relatedEntitiesCount;
55+
this.getRelatedEntitiesBatchSize = relatedBatchSize;
5656
}
5757

5858
public void handleChangeEvent(
@@ -76,58 +76,76 @@ public void handleChangeEvent(
7676
Urn urn = entityChangeEvent.getEntityUrn();
7777
log.info("Business Attribute update hook invoked for urn :" + urn);
7878

79-
RelatedEntitiesResult entityAssociatedWithBusinessAttribute =
80-
graphService.findRelatedEntities(
79+
fetchRelatedEntities(opContext, urn, batch -> processBatch(opContext, batch), null, 0);
80+
}
81+
82+
private void fetchRelatedEntities(
83+
@NonNull final OperationContext opContext,
84+
@NonNull final Urn urn,
85+
@NonNull final Consumer<RelatedEntitiesScrollResult> resultConsumer,
86+
@Nullable String scrollId,
87+
int consumedEntityCount) {
88+
GraphRetriever graph = opContext.getRetrieverContext().get().getGraphRetriever();
89+
90+
RelatedEntitiesScrollResult result =
91+
graph.scrollRelatedEntities(
8192
null,
8293
newFilter("urn", urn.toString()),
8394
null,
8495
EMPTY_FILTER,
8596
Arrays.asList(BUSINESS_ATTRIBUTE_OF),
8697
newRelationshipFilter(EMPTY_FILTER, RelationshipDirection.INCOMING),
87-
0,
88-
relatedEntitiesCount);
89-
90-
for (RelatedEntity relatedEntity : entityAssociatedWithBusinessAttribute.getEntities()) {
91-
String entityUrnStr = relatedEntity.getUrn();
92-
try {
93-
Urn entityUrn = new Urn(entityUrnStr);
94-
final AspectSpec aspectSpec =
95-
entityRegistry
96-
.getEntitySpec(Constants.SCHEMA_FIELD_ENTITY_NAME)
97-
.getAspectSpec(Constants.BUSINESS_ATTRIBUTE_ASPECT);
98-
99-
EnvelopedAspect envelopedAspect =
100-
entityService.getLatestEnvelopedAspect(
101-
opContext,
102-
Constants.SCHEMA_FIELD_ENTITY_NAME,
103-
entityUrn,
104-
Constants.BUSINESS_ATTRIBUTE_ASPECT);
105-
BusinessAttributes businessAttributes =
106-
new BusinessAttributes(envelopedAspect.getValue().data());
107-
108-
final AuditStamp auditStamp =
109-
new AuditStamp()
110-
.setActor(Urn.createFromString(Constants.SYSTEM_ACTOR))
111-
.setTime(System.currentTimeMillis());
112-
113-
entityService
114-
.alwaysProduceMCLAsync(
115-
opContext,
116-
entityUrn,
117-
Constants.SCHEMA_FIELD_ENTITY_NAME,
118-
Constants.BUSINESS_ATTRIBUTE_ASPECT,
119-
aspectSpec,
120-
null,
121-
businessAttributes,
122-
null,
123-
null,
124-
auditStamp,
125-
ChangeType.UPSERT)
126-
.getFirst();
127-
128-
} catch (Exception e) {
129-
throw new RuntimeException(e);
130-
}
98+
Edge.EDGE_SORT_CRITERION,
99+
scrollId,
100+
getRelatedEntitiesBatchSize,
101+
null,
102+
null);
103+
resultConsumer.accept(result);
104+
105+
if (result.getScrollId() != null && consumedEntityCount < relatedEntitiesCount) {
106+
fetchRelatedEntities(
107+
opContext,
108+
urn,
109+
resultConsumer,
110+
result.getScrollId(),
111+
consumedEntityCount + result.getEntities().size());
131112
}
132113
}
114+
115+
private void processBatch(
116+
@NonNull OperationContext opContext, @NonNull RelatedEntitiesScrollResult batch) {
117+
AspectRetriever aspectRetriever = opContext.getRetrieverContext().get().getAspectRetriever();
118+
119+
Set<Urn> entityUrns =
120+
batch.getEntities().stream()
121+
.map(RelatedEntity::getUrn)
122+
.map(UrnUtils::getUrn)
123+
.collect(Collectors.toSet());
124+
125+
Map<Urn, Map<String, Aspect>> entityAspectMap =
126+
aspectRetriever.getLatestAspectObjects(
127+
entityUrns, Set.of(Constants.BUSINESS_ATTRIBUTE_ASPECT));
128+
129+
entityAspectMap.entrySet().stream()
130+
.filter(entry -> entry.getValue().containsKey(Constants.BUSINESS_ATTRIBUTE_ASPECT))
131+
.forEach(
132+
entry -> {
133+
final Urn entityUrn = entry.getKey();
134+
final Aspect aspect = entry.getValue().get(Constants.BUSINESS_ATTRIBUTE_ASPECT);
135+
136+
updateIndicesService.handleChangeEvent(
137+
opContext,
138+
PegasusUtils.constructMCL(
139+
null,
140+
Constants.SCHEMA_FIELD_ENTITY_NAME,
141+
entityUrn,
142+
ChangeType.UPSERT,
143+
Constants.BUSINESS_ATTRIBUTE_ASPECT,
144+
opContext.getAuditStamp(),
145+
new BusinessAttributes(aspect.data()),
146+
null,
147+
null,
148+
null));
149+
});
150+
}
133151
}

metadata-jobs/pe-consumer/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ dependencies {
2525
testRuntimeOnly externalDependency.logbackClassic
2626
testImplementation externalDependency.springBootTest
2727
testImplementation externalDependency.testng
28+
testImplementation project(':metadata-operation-context')
2829
}
2930

3031
task avroSchemaSources(type: Copy) {

metadata-jobs/pe-consumer/src/main/java/com/datahub/event/hook/BusinessAttributeUpdateHook.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,38 @@
11
package com.datahub.event.hook;
22

33
import com.linkedin.gms.factory.common.GraphServiceFactory;
4-
import com.linkedin.gms.factory.entity.EntityServiceFactory;
5-
import com.linkedin.gms.factory.entityregistry.EntityRegistryFactory;
64
import com.linkedin.metadata.service.BusinessAttributeUpdateHookService;
75
import com.linkedin.mxe.PlatformEvent;
86
import io.datahubproject.metadata.context.OperationContext;
97
import javax.annotation.Nonnull;
108
import lombok.extern.slf4j.Slf4j;
9+
import org.springframework.beans.factory.annotation.Value;
1110
import org.springframework.context.annotation.Import;
1211
import org.springframework.stereotype.Component;
1312

1413
@Slf4j
1514
@Component
16-
@Import({EntityServiceFactory.class, EntityRegistryFactory.class, GraphServiceFactory.class})
15+
@Import(GraphServiceFactory.class)
1716
public class BusinessAttributeUpdateHook implements PlatformEventHook {
1817

1918
protected final BusinessAttributeUpdateHookService businessAttributeUpdateHookService;
19+
protected final boolean enabled;
2020

2121
public BusinessAttributeUpdateHook(
22-
BusinessAttributeUpdateHookService businessAttributeUpdateHookService) {
22+
BusinessAttributeUpdateHookService businessAttributeUpdateHookService,
23+
@Value("${featureFlags.businessAttributeEntityEnabled}") boolean enabled) {
2324
this.businessAttributeUpdateHookService = businessAttributeUpdateHookService;
25+
this.enabled = enabled;
26+
}
27+
28+
@Override
29+
public boolean isEnabled() {
30+
return enabled;
31+
}
32+
33+
@Override
34+
public void init() {
35+
log.info("Initialized PlatformEventHook: BusinessAttributeUpdateHook");
2436
}
2537

2638
/**

0 commit comments

Comments
 (0)