Skip to content

Commit

Permalink
feat(spark-lineage): coalesce spark jobs (#5077)
Browse files Browse the repository at this point in the history
  • Loading branch information
MugdhaHardikar-GSLab authored Jun 3, 2022
1 parent b2d957d commit ccf8222
Show file tree
Hide file tree
Showing 12 changed files with 349 additions and 10 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ MANIFEST
**/spark-lineage/**/derby.log
**/spark-lineage/**/hive/
**/spark-lineage/**/out*.csv/
**/spark-lineage/coalesce-test/

#VS Code
.vscode
Expand Down
2 changes: 2 additions & 0 deletions metadata-integration/java/spark-lineage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ spark = SparkSession.builder()
| spark.datahub.metadata.pipeline.platformInstance| | | Pipeline level platform instance |
| spark.datahub.metadata.dataset.platformInstance| | | dataset level platform instance |
| spark.datahub.metadata.dataset.env | | PROD | [Supported values](https://datahubproject.io/docs/graphql/enums#fabrictype). In all other cases, will fallback to PROD |
| spark.datahub.coalesce_jobs | | false | Only one datajob(taask) will be emitted containing all input and output datasets for the spark application |
| spark.datahub.parent.datajob_urn | | | Specified dataset will be set as upstream dataset for datajob created. Effective only when spark.datahub.coalesce_jobs is set to true |


## What to Expect: The Metadata Model
Expand Down
1 change: 1 addition & 0 deletions metadata-integration/java/spark-lineage/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ jacocoTestReport {
}

test {
forkEvery = 1
useJUnit()
finalizedBy jacocoTestReport
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package datahub.spark;

import datahub.spark.consumer.impl.CoalesceJobsEmitter;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
Expand Down Expand Up @@ -56,6 +57,8 @@ public class DatahubSparkListener extends SparkListener {
public static final String PIPELINE_KEY = "metadata.pipeline";
public static final String PIPELINE_PLATFORM_INSTANCE_KEY = PIPELINE_KEY + ".platformInstance";

public static final String COALESCE_KEY = "coalesce_jobs";

private final Map<String, AppStartEvent> appDetails = new ConcurrentHashMap<>();
private final Map<String, Map<Long, SQLQueryExecStartEvent>> appSqlDetails = new ConcurrentHashMap<>();
private final Map<String, McpEmitter> appEmitters = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -281,7 +284,11 @@ private synchronized void checkOrCreateApplicationSetup(SparkContext ctx) {
AppStartEvent evt = new AppStartEvent(LineageUtils.getMaster(ctx), getPipelineName(ctx), appId, ctx.startTime(),
ctx.sparkUser(), pipelineConfig);

appEmitters.computeIfAbsent(appId, s -> new McpEmitter(datahubConf)).accept(evt);
appEmitters.computeIfAbsent(appId,
s -> datahubConf.hasPath(COALESCE_KEY) && datahubConf.getBoolean(COALESCE_KEY)
? new CoalesceJobsEmitter(datahubConf)
: new McpEmitter(datahubConf))
.accept(evt);
consumers().forEach(c -> c.accept(evt));
appDetails.put(appId, evt);
appSqlDetails.put(appId, new ConcurrentHashMap<>());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package datahub.spark.consumer.impl;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;

import com.linkedin.data.template.StringMap;

import com.linkedin.common.DataJobUrnArray;
import com.linkedin.common.DatasetUrnArray;
import com.linkedin.common.urn.DataJobUrn;
import com.linkedin.common.urn.DatasetUrn;
import com.linkedin.datajob.DataJobInfo;
import com.linkedin.datajob.DataJobInputOutput;
import com.linkedin.datajob.JobStatus;
import com.typesafe.config.Config;

import datahub.event.MetadataChangeProposalWrapper;
import datahub.spark.model.AppEndEvent;
import datahub.spark.model.AppStartEvent;
import datahub.spark.model.LineageEvent;
import datahub.spark.model.SQLQueryExecStartEvent;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class CoalesceJobsEmitter extends McpEmitter {

private static final String PARENT_JOB_KEY = "parent.datajob_urn";
private final String parentJobUrn;
private AppStartEvent appStartEvent = null;
private final ArrayList<SQLQueryExecStartEvent> sqlQueryExecStartEvents = new ArrayList<>();

public CoalesceJobsEmitter(Config datahubConf) {
super(datahubConf);
parentJobUrn = datahubConf.hasPath(PARENT_JOB_KEY) ? datahubConf.getString(PARENT_JOB_KEY) : null;
log.info("CoalesceJobsEmitter initialised with " + PARENT_JOB_KEY + ":" + parentJobUrn);
}

@Override
public void accept(LineageEvent evt) {
if (evt instanceof AppStartEvent) {
this.appStartEvent = (AppStartEvent) evt;
log.debug("AppstartEvent received for processing: " + appStartEvent.getAppId());
emit(this.appStartEvent.asMetadataEvents());
} else if (evt instanceof SQLQueryExecStartEvent) {
SQLQueryExecStartEvent sqlQueryExecStartEvent = (SQLQueryExecStartEvent) evt;
sqlQueryExecStartEvents.add(sqlQueryExecStartEvent);
log.debug("SQLQueryExecStartEvent received for processing. for app: " + sqlQueryExecStartEvent.getAppId() + ":"
+ sqlQueryExecStartEvent.getAppName() + "sqlID: " + sqlQueryExecStartEvent.getSqlQueryExecId());
} else if (evt instanceof AppEndEvent) {
AppEndEvent appEndEvent = (AppEndEvent) evt;
if (appStartEvent == null) {
log.error("Application End event received for processing but start event is not received for processing for "
+ appEndEvent.getAppId() + "-" + appEndEvent.getAppName());
return;
}
log.debug("AppEndEvent received for processing. for app start :" + appEndEvent.getAppId());
emit(appEndEvent.asMetadataEvents());
emit(squashSQLQueryExecStartEvents(appEndEvent));
}
}

private List<MetadataChangeProposalWrapper> squashSQLQueryExecStartEvents(AppEndEvent appEndEvent) {

DataJobUrn jobUrn = new DataJobUrn(appStartEvent.getFlowUrn(), appStartEvent.getAppName());

Set<DatasetUrn> inSet = new TreeSet<DatasetUrn>(new DataSetUrnComparator());
sqlQueryExecStartEvents.forEach(x -> inSet.addAll(x.getInputDatasets()));
Set<DatasetUrn> outSet = new TreeSet<DatasetUrn>(new DataSetUrnComparator());
sqlQueryExecStartEvents.forEach(x -> outSet.addAll(x.getOuputDatasets()));
DataJobUrnArray upStreamjobs = new DataJobUrnArray();
try {
if (parentJobUrn != null) {
upStreamjobs = new DataJobUrnArray(DataJobUrn.createFromString(parentJobUrn));
}

} catch (URISyntaxException e) {
log.warn(PARENT_JOB_KEY + " is not a valid URN. Skipping setting up upstream job.");
} catch (ClassCastException e) {
log.warn(PARENT_JOB_KEY + " is not a valid Datajob URN. Skipping setting up upstream job.");
}

DataJobInputOutput jobio = new DataJobInputOutput().setInputDatasets(new DatasetUrnArray(inSet))
.setOutputDatasets(new DatasetUrnArray(outSet)).setInputDatajobs(upStreamjobs);

MetadataChangeProposalWrapper<?> mcpJobIO = MetadataChangeProposalWrapper
.create(b -> b.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(jobio));

StringMap customProps = new StringMap();
customProps.put("startedAt", appStartEvent.timeStr());
customProps.put("appId", appStartEvent.getAppId());
customProps.put("appName", appStartEvent.getAppName());
customProps.put("completedAt", appEndEvent.timeStr());

DataJobInfo jobInfo = new DataJobInfo().setName(appStartEvent.getAppName())
.setType(DataJobInfo.Type.create("sparkJob"));
jobInfo.setCustomProperties(customProps);
jobInfo.setStatus(JobStatus.COMPLETED);
MetadataChangeProposalWrapper<?> mcpJobInfo = MetadataChangeProposalWrapper
.create(b -> b.entityType("dataJob").entityUrn(jobUrn).upsert().aspect(jobInfo));

return Arrays.asList(mcpJobIO, mcpJobInfo);

}

@Override
public void close() throws IOException {
super.close();
}
}

class DataSetUrnComparator implements Comparator<DatasetUrn> {

@Override
public int compare(DatasetUrn urn1, DatasetUrn urn2) {
return urn1.toString().compareTo(urn2.toString());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ private Optional<Emitter> getEmitter() {
return emitter;
}

private void emit(List<MetadataChangeProposalWrapper> mcpws) {
protected void emit(List<MetadataChangeProposalWrapper> mcpws) {
Optional<Emitter> emitter = getEmitter();
if (emitter.isPresent()) {
mcpws.stream().map(mcpw -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,21 @@ public AppStartEvent(String master, String appName, String appId, long time, Str
this.pipelineConfig = pipelineConfig;
}

public DataFlowUrn getFlowUrn() {
return LineageUtils.flowUrn(getMaster(), getAppName());
}

@Override
public List<MetadataChangeProposalWrapper> asMetadataEvents() {
DataFlowUrn flowUrn = LineageUtils.flowUrn(getMaster(), getAppName());
ArrayList<MetadataChangeProposalWrapper> mcps = new ArrayList<MetadataChangeProposalWrapper>();

if (this.pipelineConfig.hasPath(PLATFORM_INSTANCE_KEY)) {
try {
DataPlatformInstance dpi = new DataPlatformInstance().setPlatform(new DataPlatformUrn(PLATFORM_SPARK)).setInstance(
LineageUtils.dataPlatformInstanceUrn(PLATFORM_SPARK, this.pipelineConfig.getString(PLATFORM_INSTANCE_KEY)));
DataPlatformInstance dpi = new DataPlatformInstance().setPlatform(new DataPlatformUrn(PLATFORM_SPARK))
.setInstance(LineageUtils.dataPlatformInstanceUrn(PLATFORM_SPARK,
this.pipelineConfig.getString(PLATFORM_INSTANCE_KEY)));
mcps.add(MetadataChangeProposalWrapper
.create(b -> b.entityType("dataFlow").entityUrn(flowUrn).upsert().aspect(dpi)));
.create(b -> b.entityType("dataFlow").entityUrn(getFlowUrn()).upsert().aspect(dpi)));
} catch (URISyntaxException e) {
// log error, but don't impact thread
StringWriter s = new StringWriter();
Expand All @@ -57,7 +61,7 @@ public List<MetadataChangeProposalWrapper> asMetadataEvents() {
}
DataFlowInfo flowInfo = new DataFlowInfo().setName(getAppName()).setCustomProperties(customProps());
mcps.add(MetadataChangeProposalWrapper
.create(b -> b.entityType("dataFlow").entityUrn(flowUrn).upsert().aspect(flowInfo)));
.create(b -> b.entityType("dataFlow").entityUrn(getFlowUrn()).upsert().aspect(flowInfo)));
return mcps;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public abstract class LineageEvent {

public abstract List<MetadataChangeProposalWrapper> asMetadataEvents();

protected String timeStr() {
public String timeStr() {
return new Date(getTime()).toInstant().toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,13 @@ StringMap customProps() {
return customProps;
}

private DataJobInputOutput jobIO() {
public DatasetUrnArray getOuputDatasets() {
DatasetUrnArray out = new DatasetUrnArray();
out.add(datasetLineage.getSink().urn());
return out;
}

public DatasetUrnArray getInputDatasets() {
DatasetUrnArray in = new DatasetUrnArray();

Set<SparkDataset> sources = new TreeSet<>(new Comparator<SparkDataset>() {
Expand All @@ -99,7 +102,12 @@ public int compare(SparkDataset x, SparkDataset y) {
in.add(source.urn());
}

DataJobInputOutput io = new DataJobInputOutput().setInputDatasets(in).setOutputDatasets(out);
return in;
}

private DataJobInputOutput jobIO() {
DataJobInputOutput io = new DataJobInputOutput().setInputDatasets(getInputDatasets())
.setOutputDatasets(getOuputDatasets());
return io;
}
}
Loading

0 comments on commit ccf8222

Please sign in to comment.