Skip to content

Commit 76b5783

Browse files
authored
feat(ingest/airflow): support disabling iolet materialization (#10305)
1 parent 529710a commit 76b5783

File tree

5 files changed

+24
-7
lines changed

5 files changed

+24
-7
lines changed

docs/lineage/airflow.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ enabled = True # default
7171
| capture_ownership_info | true | Extract DAG ownership. |
7272
| capture_tags_info | true | Extract DAG tags. |
7373
| capture_executions | true | Extract task runs and success/failure statuses. This will show up in DataHub "Runs" tab. |
74+
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
7475
| enable_extractors | true | Enable automatic lineage extraction. |
7576
| disable_openlineage_plugin | true | Disable the OpenLineage plugin to avoid duplicative processing. |
7677
| log_level | _no change_ | [debug] Set the log level for the plugin. |
@@ -135,8 +136,9 @@ conn_id = datahub_rest_default # or datahub_kafka_default
135136
| capture_ownership_info | true | If true, the owners field of the DAG will be capture as a DataHub corpuser. |
136137
| capture_tags_info | true | If true, the tags field of the DAG will be captured as DataHub tags. |
137138
| capture_executions | true | If true, we'll capture task runs in DataHub in addition to DAG definitions. |
138-
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid.
139-
|
139+
| materialize_iolets | true | Create or un-soft-delete all entities referenced in lineage. |
140+
| datajob_url_link | taskinstance | If taskinstance, the datajob url will be taskinstance link on airflow. It can also be grid. |
141+
| |
140142
| graceful_exceptions | true | If set to true, most runtime errors in the lineage backend will be suppressed and will not cause the overall task to fail. Note that configuration issues will still throw exceptions. |
141143

142144
#### Validate that the plugin is working

metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/_config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,10 @@ class DatahubLineageConfig(ConfigModel):
3434
# If true, the tags field of the DAG will be captured as DataHub tags.
3535
capture_tags_info: bool = True
3636

37+
# If true (default), we'll materialize and un-soft-delete any urns
38+
# referenced by inlets or outlets.
39+
materialize_iolets: bool = True
40+
3741
capture_executions: bool = False
3842

3943
enable_extractors: bool = True
@@ -67,6 +71,7 @@ def get_lineage_config() -> DatahubLineageConfig:
6771
"datahub", "capture_ownership_info", fallback=True
6872
)
6973
capture_executions = conf.get("datahub", "capture_executions", fallback=True)
74+
materialize_iolets = conf.get("datahub", "materialize_iolets", fallback=True)
7075
enable_extractors = conf.get("datahub", "enable_extractors", fallback=True)
7176
log_level = conf.get("datahub", "log_level", fallback=None)
7277
debug_emitter = conf.get("datahub", "debug_emitter", fallback=False)
@@ -84,6 +89,7 @@ def get_lineage_config() -> DatahubLineageConfig:
8489
capture_ownership_info=capture_ownership_info,
8590
capture_tags_info=capture_tags_info,
8691
capture_executions=capture_executions,
92+
materialize_iolets=materialize_iolets,
8793
enable_extractors=enable_extractors,
8894
log_level=log_level,
8995
debug_emitter=debug_emitter,

metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -389,7 +389,10 @@ def on_task_instance_running(
389389

390390
# TODO: Add handling for Airflow mapped tasks using task_instance.map_index
391391

392-
datajob.emit(self.emitter, callback=self._make_emit_callback())
392+
for mcp in datajob.generate_mcp(
393+
materialize_iolets=self.config.materialize_iolets
394+
):
395+
self.emitter.emit(mcp, self._make_emit_callback())
393396
logger.debug(f"Emitted DataHub Datajob start: {datajob}")
394397

395398
if self.config.capture_executions:
@@ -430,7 +433,10 @@ def on_task_instance_finish(
430433
# Add lineage info.
431434
self._extract_lineage(datajob, dagrun, task, task_instance, complete=True)
432435

433-
datajob.emit(self.emitter, callback=self._make_emit_callback())
436+
for mcp in datajob.generate_mcp(
437+
materialize_iolets=self.config.materialize_iolets
438+
):
439+
self.emitter.emit(mcp, self._make_emit_callback())
434440
logger.debug(f"Emitted DataHub Datajob finish w/ status {status}: {datajob}")
435441

436442
if self.config.capture_executions:

metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin_v22.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,8 @@ def datahub_task_status_callback(context, status):
133133
)
134134

135135
task.log.info(f"Emitting Datahub Datajob: {datajob}")
136-
datajob.emit(emitter, callback=_make_emit_callback(task.log))
136+
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
137+
emitter.emit(mcp, _make_emit_callback(task.log))
137138

138139
if config.capture_executions:
139140
dpi = AirflowGenerator.run_datajob(
@@ -200,7 +201,8 @@ def datahub_pre_execution(context):
200201
)
201202

202203
task.log.info(f"Emitting Datahub dataJob {datajob}")
203-
datajob.emit(emitter, callback=_make_emit_callback(task.log))
204+
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
205+
emitter.emit(mcp, _make_emit_callback(task.log))
204206

205207
if config.capture_executions:
206208
dpi = AirflowGenerator.run_datajob(

metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/lineage/_lineage_core.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ def send_lineage_to_datahub(
5959
entities_to_datajob_urn_list([let.urn for let in inlets])
6060
)
6161

62-
datajob.emit(emitter)
62+
for mcp in datajob.generate_mcp(materialize_iolets=config.materialize_iolets):
63+
emitter.emit(mcp)
6364
operator.log.info(f"Emitted from Lineage: {datajob}")
6465

6566
if config.capture_executions:

0 commit comments

Comments
 (0)