66
77import com .google .common .collect .ImmutableSet ;
88import com .linkedin .businessattribute .BusinessAttributes ;
9- import com .linkedin .common .AuditStamp ;
109import com .linkedin .common .urn .Urn ;
11- import com .linkedin .entity .EnvelopedAspect ;
10+ import com .linkedin .common .urn .UrnUtils ;
11+ import com .linkedin .entity .Aspect ;
1212import com .linkedin .events .metadata .ChangeType ;
1313import com .linkedin .metadata .Constants ;
14+ import com .linkedin .metadata .aspect .AspectRetriever ;
15+ import com .linkedin .metadata .aspect .GraphRetriever ;
16+ import com .linkedin .metadata .aspect .models .graph .Edge ;
17+ import com .linkedin .metadata .aspect .models .graph .RelatedEntitiesScrollResult ;
1418import com .linkedin .metadata .aspect .models .graph .RelatedEntity ;
15- import com .linkedin .metadata .entity .EntityService ;
16- import com .linkedin .metadata .graph .GraphService ;
17- import com .linkedin .metadata .graph .RelatedEntitiesResult ;
18- import com .linkedin .metadata .models .AspectSpec ;
19- import com .linkedin .metadata .models .registry .EntityRegistry ;
2019import com .linkedin .metadata .query .filter .RelationshipDirection ;
2120import com .linkedin .metadata .utils .GenericRecordUtils ;
21+ import com .linkedin .metadata .utils .PegasusUtils ;
2222import com .linkedin .mxe .PlatformEvent ;
2323import com .linkedin .platform .event .v1 .EntityChangeEvent ;
2424import io .datahubproject .metadata .context .OperationContext ;
2525import java .util .Arrays ;
26+ import java .util .Map ;
2627import java .util .Set ;
28+ import java .util .function .Consumer ;
29+ import java .util .stream .Collectors ;
2730import lombok .extern .slf4j .Slf4j ;
2831import org .springframework .beans .factory .annotation .Value ;
2932import org .springframework .lang .NonNull ;
33+ import org .springframework .lang .Nullable ;
3034import org .springframework .stereotype .Component ;
3135
3236@ Slf4j
3337@ Component
3438public class BusinessAttributeUpdateHookService {
3539 private static final String BUSINESS_ATTRIBUTE_OF = "BusinessAttributeOf" ;
3640
37- private final GraphService graphService ;
38- private final EntityService <?> entityService ;
39- private final EntityRegistry entityRegistry ;
40-
41+ private final UpdateIndicesService updateIndicesService ;
4142 private final int relatedEntitiesCount ;
43+ private final int getRelatedEntitiesBatchSize ;
4244
4345 public static final String TAG = "TAG" ;
4446 public static final String GLOSSARY_TERM = "GLOSSARY_TERM" ;
4547 public static final String DOCUMENTATION = "DOCUMENTATION" ;
4648
4749 public BusinessAttributeUpdateHookService (
48- GraphService graphService ,
49- EntityService <?> entityService ,
50- EntityRegistry entityRegistry ,
51- @ NonNull @ Value ("${businessAttribute.fetchRelatedEntitiesCount}" ) int relatedEntitiesCount ) {
52- this .graphService = graphService ;
53- this .entityService = entityService ;
54- this .entityRegistry = entityRegistry ;
50+ @ NonNull UpdateIndicesService updateIndicesService ,
51+ @ NonNull @ Value ("${businessAttribute.fetchRelatedEntitiesCount}" ) int relatedEntitiesCount ,
52+ @ NonNull @ Value ("${businessAttribute.fetchRelatedEntitiesBatchSize}" ) int relatedBatchSize ) {
53+ this .updateIndicesService = updateIndicesService ;
5554 this .relatedEntitiesCount = relatedEntitiesCount ;
55+ this .getRelatedEntitiesBatchSize = relatedBatchSize ;
5656 }
5757
5858 public void handleChangeEvent (
@@ -76,58 +76,76 @@ public void handleChangeEvent(
7676 Urn urn = entityChangeEvent .getEntityUrn ();
7777 log .info ("Business Attribute update hook invoked for urn :" + urn );
7878
79- RelatedEntitiesResult entityAssociatedWithBusinessAttribute =
80- graphService .findRelatedEntities (
79+ fetchRelatedEntities (opContext , urn , batch -> processBatch (opContext , batch ), null , 0 );
80+ }
81+
82+ private void fetchRelatedEntities (
83+ @ NonNull final OperationContext opContext ,
84+ @ NonNull final Urn urn ,
85+ @ NonNull final Consumer <RelatedEntitiesScrollResult > resultConsumer ,
86+ @ Nullable String scrollId ,
87+ int consumedEntityCount ) {
88+ GraphRetriever graph = opContext .getRetrieverContext ().get ().getGraphRetriever ();
89+
90+ RelatedEntitiesScrollResult result =
91+ graph .scrollRelatedEntities (
8192 null ,
8293 newFilter ("urn" , urn .toString ()),
8394 null ,
8495 EMPTY_FILTER ,
8596 Arrays .asList (BUSINESS_ATTRIBUTE_OF ),
8697 newRelationshipFilter (EMPTY_FILTER , RelationshipDirection .INCOMING ),
87- 0 ,
88- relatedEntitiesCount );
89-
90- for (RelatedEntity relatedEntity : entityAssociatedWithBusinessAttribute .getEntities ()) {
91- String entityUrnStr = relatedEntity .getUrn ();
92- try {
93- Urn entityUrn = new Urn (entityUrnStr );
94- final AspectSpec aspectSpec =
95- entityRegistry
96- .getEntitySpec (Constants .SCHEMA_FIELD_ENTITY_NAME )
97- .getAspectSpec (Constants .BUSINESS_ATTRIBUTE_ASPECT );
98-
99- EnvelopedAspect envelopedAspect =
100- entityService .getLatestEnvelopedAspect (
101- opContext ,
102- Constants .SCHEMA_FIELD_ENTITY_NAME ,
103- entityUrn ,
104- Constants .BUSINESS_ATTRIBUTE_ASPECT );
105- BusinessAttributes businessAttributes =
106- new BusinessAttributes (envelopedAspect .getValue ().data ());
107-
108- final AuditStamp auditStamp =
109- new AuditStamp ()
110- .setActor (Urn .createFromString (Constants .SYSTEM_ACTOR ))
111- .setTime (System .currentTimeMillis ());
112-
113- entityService
114- .alwaysProduceMCLAsync (
115- opContext ,
116- entityUrn ,
117- Constants .SCHEMA_FIELD_ENTITY_NAME ,
118- Constants .BUSINESS_ATTRIBUTE_ASPECT ,
119- aspectSpec ,
120- null ,
121- businessAttributes ,
122- null ,
123- null ,
124- auditStamp ,
125- ChangeType .UPSERT )
126- .getFirst ();
127-
128- } catch (Exception e ) {
129- throw new RuntimeException (e );
130- }
98+ Edge .EDGE_SORT_CRITERION ,
99+ scrollId ,
100+ getRelatedEntitiesBatchSize ,
101+ null ,
102+ null );
103+ resultConsumer .accept (result );
104+
105+ if (result .getScrollId () != null && consumedEntityCount < relatedEntitiesCount ) {
106+ fetchRelatedEntities (
107+ opContext ,
108+ urn ,
109+ resultConsumer ,
110+ result .getScrollId (),
111+ consumedEntityCount + result .getEntities ().size ());
131112 }
132113 }
114+
115+ private void processBatch (
116+ @ NonNull OperationContext opContext , @ NonNull RelatedEntitiesScrollResult batch ) {
117+ AspectRetriever aspectRetriever = opContext .getRetrieverContext ().get ().getAspectRetriever ();
118+
119+ Set <Urn > entityUrns =
120+ batch .getEntities ().stream ()
121+ .map (RelatedEntity ::getUrn )
122+ .map (UrnUtils ::getUrn )
123+ .collect (Collectors .toSet ());
124+
125+ Map <Urn , Map <String , Aspect >> entityAspectMap =
126+ aspectRetriever .getLatestAspectObjects (
127+ entityUrns , Set .of (Constants .BUSINESS_ATTRIBUTE_ASPECT ));
128+
129+ entityAspectMap .entrySet ().stream ()
130+ .filter (entry -> entry .getValue ().containsKey (Constants .BUSINESS_ATTRIBUTE_ASPECT ))
131+ .forEach (
132+ entry -> {
133+ final Urn entityUrn = entry .getKey ();
134+ final Aspect aspect = entry .getValue ().get (Constants .BUSINESS_ATTRIBUTE_ASPECT );
135+
136+ updateIndicesService .handleChangeEvent (
137+ opContext ,
138+ PegasusUtils .constructMCL (
139+ null ,
140+ Constants .SCHEMA_FIELD_ENTITY_NAME ,
141+ entityUrn ,
142+ ChangeType .UPSERT ,
143+ Constants .BUSINESS_ATTRIBUTE_ASPECT ,
144+ opContext .getAuditStamp (),
145+ new BusinessAttributes (aspect .data ()),
146+ null ,
147+ null ,
148+ null ));
149+ });
150+ }
133151}
0 commit comments