Skip to content

Commit

Permalink
feat(spark-lineage): support for persist API (#4980)
Browse files Browse the repository at this point in the history
  • Loading branch information
MugdhaHardikar-GSLab authored May 23, 2022
1 parent a9ff203 commit 96ef7e5
Show file tree
Hide file tree
Showing 16 changed files with 304 additions and 155 deletions.
4 changes: 2 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ MANIFEST
**/spark-lineage/metastore_db/
**/spark-lineage/**/derby.log
**/spark-lineage/**/hive/
**/spark-lineage/**/out.csv/
**/spark-lineage/**/out*.csv/
.vscode

#spark smoke test
Expand All @@ -67,4 +67,4 @@ tmp*
temp*

# frontend assets
datahub-frontend/public/**
datahub-frontend/public/**
7 changes: 7 additions & 0 deletions metadata-integration/java/spark-lineage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ This initial release has been tested with the following environments:
Note that testing for other environments such as Databricks is planned in near future.

### Spark commands supported

Below is a list of Spark commands that are parsed currently:
- InsertIntoHadoopFsRelationCommand
- SaveIntoDataSourceCommand (jdbc)
Expand All @@ -101,6 +102,12 @@ Below is a list of Spark commands that are parsed currently:

Effectively, these support data sources/sinks corresponding to Hive, HDFS and JDBC.

DataFrame.persist command is supported for below LeafExecNodes:
- FileSourceScanExec
- HiveTableScanExec
- RowDataSourceScanExec
- InMemoryTableScanExec

### Spark commands not yet supported
- View related commands
- Cache commands and implications on lineage
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,24 +49,22 @@
import scala.runtime.AbstractFunction1;
import scala.runtime.AbstractPartialFunction;



@Slf4j
public class DatahubSparkListener extends SparkListener {

private static final int THREAD_CNT = 16;
public static final String CONSUMER_TYPE_KEY = "spark.datahub.lineage.consumerTypes";
public static final String DATAHUB_EMITTER = "mcpEmitter";
public static final String DATABRICKS_CLUSTER_KEY = "databricks.cluster";
public static final String PIPELINE_KEY = "metadata.pipeline";
public static final String DATABRICKS_CLUSTER_KEY = "databricks.cluster";
public static final String PIPELINE_KEY = "metadata.pipeline";
public static final String PIPELINE_PLATFORM_INSTANCE_KEY = PIPELINE_KEY + ".platformInstance";

private final Map<String, AppStartEvent> appDetails = new ConcurrentHashMap<>();
private final Map<String, Map<Long, SQLQueryExecStartEvent>> appSqlDetails = new ConcurrentHashMap<>();
private final Map<String, ExecutorService> appPoolDetails = new ConcurrentHashMap<>();
private final Map<String, McpEmitter> appEmitters = new ConcurrentHashMap<>();
private final Map<String, Config> appConfig = new ConcurrentHashMap<>();

public DatahubSparkListener() {
log.info("DatahubSparkListener initialised.");
}
Expand All @@ -85,30 +83,30 @@ public SqlStartTask(SparkListenerSQLExecutionStart sqlStart, LogicalPlan plan, S

@Override
public void run() {
appSqlDetails.get(ctx.applicationId())
.put(sqlStart.executionId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), null));
appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(),
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), null));
log.debug("PLAN for execution id: " + getPipelineName(ctx) + ":" + sqlStart.executionId() + "\n");
log.debug(plan.toString());

Optional<? extends SparkDataset> outputDS = DatasetExtractor.asDataset(plan, ctx, true);
if (!outputDS.isPresent()) {
Optional<? extends Collection<SparkDataset>> outputDS = DatasetExtractor.asDataset(plan, ctx, true);
if (!outputDS.isPresent() || outputDS.get().isEmpty()) {
log.debug("Skipping execution as no output dataset present for execution id: " + ctx.applicationId() + ":"
+ sqlStart.executionId());
return;
}

DatasetLineage lineage = new DatasetLineage(sqlStart.description(), plan.toString(), outputDS.get());
// Here assumption is that there will be only single target for single sql query
DatasetLineage lineage = new DatasetLineage(sqlStart.description(), plan.toString(),
outputDS.get().iterator().next());
Collection<QueryPlan<?>> allInners = new ArrayList<>();

plan.collect(new AbstractPartialFunction<LogicalPlan, Void>() {

@Override
public Void apply(LogicalPlan plan) {
log.debug("CHILD " + plan.getClass() + "\n" + plan + "\n-------------\n");
Optional<? extends SparkDataset> inputDS = DatasetExtractor.asDataset(plan, ctx, false);
inputDS.ifPresent(x -> lineage.addSource(x));
Optional<? extends Collection<SparkDataset>> inputDS = DatasetExtractor.asDataset(plan, ctx, false);
inputDS.ifPresent(x -> x.forEach(y -> lineage.addSource(y)));
allInners.addAll(JavaConversions.asJavaCollection(plan.innerChildren()));
return null;
}
Expand All @@ -130,10 +128,10 @@ public boolean isDefinedAt(LogicalPlan x) {
@Override
public Void apply(LogicalPlan plan) {
log.debug("INNER CHILD " + plan.getClass() + "\n" + plan + "\n-------------\n");
Optional<? extends SparkDataset> inputDS = DatasetExtractor.asDataset(plan, ctx, false);
Optional<? extends Collection<SparkDataset>> inputDS = DatasetExtractor.asDataset(plan, ctx, false);
inputDS.ifPresent(
x -> log.debug("source added for " + ctx.appName() + "/" + sqlStart.executionId() + ": " + x));
inputDS.ifPresent(x -> lineage.addSource(x));
inputDS.ifPresent(x -> x.forEach(y -> lineage.addSource(y)));
return null;
}

Expand All @@ -144,9 +142,8 @@ public boolean isDefinedAt(LogicalPlan x) {
});
}

SQLQueryExecStartEvent evt =
new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx), ctx.applicationId(),
sqlStart.time(), sqlStart.executionId(), lineage);
SQLQueryExecStartEvent evt = new SQLQueryExecStartEvent(ctx.conf().get("spark.master"), getPipelineName(ctx),
ctx.applicationId(), sqlStart.time(), sqlStart.executionId(), lineage);

appSqlDetails.get(ctx.applicationId()).put(sqlStart.executionId(), evt);

Expand All @@ -160,7 +157,7 @@ public boolean isDefinedAt(LogicalPlan x) {
log.debug("Parsed execution id {}:{}", ctx.appName(), sqlStart.executionId());
}
}

@Override
public void onApplicationStart(SparkListenerApplicationStart applicationStart) {
try {
Expand Down Expand Up @@ -212,13 +209,13 @@ public Void apply(SparkContext sc) {
}
}
consumers().forEach(x -> {
x.accept(evt);
try {
x.close();
} catch (IOException e) {
log.warn("Failed to close lineage consumer", e);
}
});
x.accept(evt);
try {
x.close();
} catch (IOException e) {
log.warn("Failed to close lineage consumer", e);
}
});
}
return null;
}
Expand Down Expand Up @@ -263,13 +260,11 @@ public void processExecutionEnd(SparkListenerSQLExecutionEnd sqlEnd) {
public Void apply(SparkContext sc) {
SQLQueryExecStartEvent start = appSqlDetails.get(sc.applicationId()).remove(sqlEnd.executionId());
if (start == null) {
log.error(
"Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId() + ":"
+ sqlEnd.executionId());
log.error("Execution end event received, but start event missing for appId/sql exec Id " + sc.applicationId()
+ ":" + sqlEnd.executionId());
} else if (start.getDatasetLineage() != null) {
SQLQueryExecEndEvent evt =
new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(), sc.applicationId(), sqlEnd.time(),
sqlEnd.executionId(), start);
SQLQueryExecEndEvent evt = new SQLQueryExecEndEvent(LineageUtils.getMaster(sc), sc.appName(),
sc.applicationId(), sqlEnd.time(), sqlEnd.executionId(), start);
McpEmitter emitter = appEmitters.get(sc.applicationId());
if (emitter != null) {
emitter.accept(evt);
Expand All @@ -279,7 +274,7 @@ public Void apply(SparkContext sc) {
}
});
}

private synchronized ExecutorService getOrCreateApplicationSetup(SparkContext ctx) {

ExecutorService pool = null;
Expand All @@ -288,10 +283,11 @@ private synchronized ExecutorService getOrCreateApplicationSetup(SparkContext ct
if (datahubConfig == null) {
Config datahubConf = LineageUtils.parseSparkConfig();
appConfig.put(appId, datahubConf);
Config pipelineConfig = datahubConf.hasPath(PIPELINE_KEY) ? datahubConf.getConfig(PIPELINE_KEY) : com.typesafe.config.ConfigFactory.empty();
Config pipelineConfig = datahubConf.hasPath(PIPELINE_KEY) ? datahubConf.getConfig(PIPELINE_KEY)
: com.typesafe.config.ConfigFactory.empty();
AppStartEvent evt = new AppStartEvent(LineageUtils.getMaster(ctx), getPipelineName(ctx), appId, ctx.startTime(),
ctx.sparkUser(), pipelineConfig);

appEmitters.computeIfAbsent(appId, s -> new McpEmitter(datahubConf)).accept(evt);
consumers().forEach(c -> c.accept(evt));
appDetails.put(appId, evt);
Expand All @@ -315,14 +311,14 @@ private String getPipelineName(SparkContext cx) {
name = datahubConfig.getString(DATABRICKS_CLUSTER_KEY) + "_" + cx.applicationId();
}
name = cx.appName();
//TODO: appending of platform instance needs to be done at central location like adding constructor to dataflowurl
// TODO: appending of platform instance needs to be done at central location
// like adding constructor to dataflowurl
if (datahubConfig.hasPath(PIPELINE_PLATFORM_INSTANCE_KEY)) {
name = datahubConfig.getString(PIPELINE_PLATFORM_INSTANCE_KEY) + "." + name;
}
return name;
}


private void processExecution(SparkListenerSQLExecutionStart sqlStart) {
QueryExecution queryExec = SQLExecution.getQueryExecution(sqlStart.executionId());
if (queryExec == null) {
Expand All @@ -336,15 +332,16 @@ private void processExecution(SparkListenerSQLExecutionStart sqlStart) {
ExecutorService pool = getOrCreateApplicationSetup(ctx);
pool.execute(new SqlStartTask(sqlStart, plan, ctx));
}
private List<LineageConsumer> consumers() {
SparkConf conf = SparkEnv.get().conf();
if (conf.contains(CONSUMER_TYPE_KEY)) {
String consumerTypes = conf.get(CONSUMER_TYPE_KEY);
return StreamSupport.stream(Splitter.on(",").trimResults().split(consumerTypes).spliterator(), false)
.map(x -> LineageUtils.getConsumer(x)).filter(Objects::nonNull).collect(Collectors.toList());
} else {
return Collections.emptyList();
}

private List<LineageConsumer> consumers() {
SparkConf conf = SparkEnv.get().conf();
if (conf.contains(CONSUMER_TYPE_KEY)) {
String consumerTypes = conf.get(CONSUMER_TYPE_KEY);
return StreamSupport.stream(Splitter.on(",").trimResults().split(consumerTypes).spliterator(), false)
.map(x -> LineageUtils.getConsumer(x)).filter(Objects::nonNull).collect(Collectors.toList());
} else {
return Collections.emptyList();
}

}
}
Loading

0 comments on commit 96ef7e5

Please sign in to comment.