11package com .linkedin .datahub .upgrade .system ;
22
3- import static com .linkedin .metadata .Constants .DATA_HUB_UPGRADE_RESULT_ASPECT_NAME ;
4-
53import com .linkedin .common .urn .Urn ;
64import com .linkedin .datahub .upgrade .UpgradeContext ;
75import com .linkedin .datahub .upgrade .UpgradeStep ;
86import com .linkedin .datahub .upgrade .UpgradeStepResult ;
97import com .linkedin .datahub .upgrade .impl .DefaultUpgradeStepResult ;
108import com .linkedin .events .metadata .ChangeType ;
9+ import com .linkedin .metadata .aspect .SystemAspect ;
1110import com .linkedin .metadata .boot .BootstrapStep ;
1211import com .linkedin .metadata .entity .AspectDao ;
1312import com .linkedin .metadata .entity .EntityService ;
1615import com .linkedin .metadata .entity .ebean .PartitionedStream ;
1716import com .linkedin .metadata .entity .restoreindices .RestoreIndicesArgs ;
1817import com .linkedin .metadata .utils .AuditStampUtils ;
18+ import com .linkedin .upgrade .DataHubUpgradeResult ;
1919import com .linkedin .upgrade .DataHubUpgradeState ;
2020import com .linkedin .util .Pair ;
2121import io .datahubproject .metadata .context .OperationContext ;
2222import java .util .List ;
23+ import java .util .Map ;
24+ import java .util .Optional ;
2325import java .util .concurrent .ExecutionException ;
2426import java .util .concurrent .Future ;
2527import java .util .function .Function ;
3335 */
3436@ Slf4j
3537public abstract class AbstractMCLStep implements UpgradeStep {
38+ public static final String LAST_URN_KEY = "lastUrn" ;
39+
3640 private final OperationContext opContext ;
3741 private final EntityService <?> entityService ;
3842 private final AspectDao aspectDao ;
@@ -70,10 +74,30 @@ protected Urn getUpgradeIdUrn() {
7074 @ Override
7175 public Function <UpgradeContext , UpgradeStepResult > executable () {
7276 return (context ) -> {
77+ // Resume state
78+ Optional <DataHubUpgradeResult > prevResult =
79+ context .upgrade ().getUpgradeResult (opContext , getUpgradeIdUrn (), entityService );
80+ String resumeUrn =
81+ prevResult
82+ .filter (
83+ result ->
84+ DataHubUpgradeState .IN_PROGRESS .equals (result .getState ())
85+ && result .getResult () != null
86+ && result .getResult ().containsKey (LAST_URN_KEY ))
87+ .map (result -> result .getResult ().get (LAST_URN_KEY ))
88+ .orElse (null );
89+ if (resumeUrn != null ) {
90+ log .info ("{}: Resuming from URN: {}" , getUpgradeIdUrn (), resumeUrn );
91+ }
7392
7493 // re-using for configuring the sql scan
7594 RestoreIndicesArgs args =
76- new RestoreIndicesArgs ().aspectName (getAspectName ()).batchSize (batchSize ).limit (limit );
95+ new RestoreIndicesArgs ()
96+ .aspectName (getAspectName ())
97+ .batchSize (batchSize )
98+ .lastUrn (resumeUrn )
99+ .urnBasedPagination (resumeUrn != null )
100+ .limit (limit );
77101
78102 if (getUrnLike () != null ) {
79103 args = args .urnLike (getUrnLike ());
@@ -86,40 +110,62 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
86110 batch -> {
87111 log .info ("Processing batch({}) of size {}." , getAspectName (), batchSize );
88112
89- List <Pair <Future <?>, Boolean >> futures ;
90-
113+ List <Pair <Future <?>, SystemAspect >> futures ;
91114 futures =
92115 EntityUtils .toSystemAspectFromEbeanAspects (
93116 opContext .getRetrieverContext ().get (),
94117 batch .collect (Collectors .toList ()))
95118 .stream ()
96119 .map (
97- systemAspect ->
98- entityService .alwaysProduceMCLAsync (
99- opContext ,
100- systemAspect .getUrn (),
101- systemAspect .getUrn ().getEntityType (),
102- getAspectName (),
103- systemAspect .getAspectSpec (),
104- null ,
105- systemAspect .getRecordTemplate (),
106- null ,
107- systemAspect
108- .getSystemMetadata ()
109- .setRunId (id ())
110- .setLastObserved (System .currentTimeMillis ()),
111- AuditStampUtils .createDefaultAuditStamp (),
112- ChangeType .UPSERT ))
113- .collect (Collectors .toList ());
114-
115- futures .forEach (
116- f -> {
117- try {
118- f .getFirst ().get ();
119- } catch (InterruptedException | ExecutionException e ) {
120- throw new RuntimeException (e );
121- }
122- });
120+ systemAspect -> {
121+ Pair <Future <?>, Boolean > future =
122+ entityService .alwaysProduceMCLAsync (
123+ opContext ,
124+ systemAspect .getUrn (),
125+ systemAspect .getUrn ().getEntityType (),
126+ getAspectName (),
127+ systemAspect .getAspectSpec (),
128+ null ,
129+ systemAspect .getRecordTemplate (),
130+ null ,
131+ systemAspect
132+ .getSystemMetadata ()
133+ .setRunId (id ())
134+ .setLastObserved (System .currentTimeMillis ()),
135+ AuditStampUtils .createDefaultAuditStamp (),
136+ ChangeType .UPSERT );
137+ return Pair .<Future <?>, SystemAspect >of (
138+ future .getFirst (), systemAspect );
139+ })
140+ .toList ();
141+
142+ SystemAspect lastAspect =
143+ futures .stream ()
144+ .map (
145+ f -> {
146+ try {
147+ f .getFirst ().get ();
148+ return f .getSecond ();
149+ } catch (InterruptedException | ExecutionException e ) {
150+ throw new RuntimeException (e );
151+ }
152+ })
153+ .reduce ((a , b ) -> b )
154+ .orElse (null );
155+
156+ // record progress
157+ if (lastAspect != null ) {
158+ log .info (
159+ "{}: Saving state. Last urn:{}" , getUpgradeIdUrn (), lastAspect .getUrn ());
160+ context
161+ .upgrade ()
162+ .setUpgradeResult (
163+ opContext ,
164+ getUpgradeIdUrn (),
165+ entityService ,
166+ DataHubUpgradeState .IN_PROGRESS ,
167+ Map .of (LAST_URN_KEY , lastAspect .getUrn ().toString ()));
168+ }
123169
124170 if (batchDelayMs > 0 ) {
125171 log .info ("Sleeping for {} ms" , batchDelayMs );
@@ -142,12 +188,23 @@ public Function<UpgradeContext, UpgradeStepResult> executable() {
142188 @ Override
143189 /** Returns whether the upgrade should be skipped. */
144190 public boolean skip (UpgradeContext context ) {
145- boolean previouslyRun =
146- entityService .exists (
147- opContext , getUpgradeIdUrn (), DATA_HUB_UPGRADE_RESULT_ASPECT_NAME , true );
148- if (previouslyRun ) {
149- log .info ("{} was already run. Skipping." , id ());
191+ Optional <DataHubUpgradeResult > prevResult =
192+ context .upgrade ().getUpgradeResult (opContext , getUpgradeIdUrn (), entityService );
193+
194+ boolean previousRunFinal =
195+ prevResult
196+ .filter (
197+ result ->
198+ DataHubUpgradeState .SUCCEEDED .equals (result .getState ())
199+ || DataHubUpgradeState .ABORTED .equals (result .getState ()))
200+ .isPresent ();
201+
202+ if (previousRunFinal ) {
203+ log .info (
204+ "{} was already run. State: {} Skipping." ,
205+ id (),
206+ prevResult .map (DataHubUpgradeResult ::getState ));
150207 }
151- return previouslyRun ;
208+ return previousRunFinal ;
152209 }
153210}
0 commit comments