-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Closed
Labels
bugBug reportBug reportingestionPR or Issue related to the ingestion of metadataPR or Issue related to the ingestion of metadata
Description
Describe the bug
I've seen that some of my running DAGs from Airflow are not getting imported in Datahub. It looks like this error is caused by when the DAGs is configured with a template of the Param Class (https://airflow.apache.org/docs/apache-airflow/2.2.1/concepts/params.html#:~:text=Params%20are%20Airflow's%20concept%20of,depends%20on%20the%20flag%20core.) I don't know exactly why this error occours, but it could be relate to type of params, which is a dictionary type.
**Screenshots**
Here is an example of a DAG definition of ours:
dag = DAG(
'dag_test',
default_args=default_args,
schedule_interval="0 4 15 * *",
catchup=False,
description=__doc__,
params={"reference_date": "YYYY-MM-DD"},
tags=['pgd', 'datamart', 'datalake']
)
And here is the stack trace (task log in airflow), after turned graceful_exceptions to false:
Traceback (most recent call last):
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
self._execute_task_with_callbacks(context)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1461, in _execute_task_with_callbacks
self.task.post_execute(context=context, result=result)
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/lineage/__init__.py", line 124, in wrapper
_backend.send_lineage(operator=self, inlets=self.inlets, outlets=self.outlets, context=context)
File "/home/airflow/.local/lib/python3.9/site-packages/datahub_provider/lineage/datahub.py", line 117, in send_lineage
send_lineage_to_datahub(
File "/home/airflow/.local/lib/python3.9/site-packages/datahub_provider/_lineage_core.py", line 144, in send_lineage_to_datahub
key: repr(value) for (key, value) in SerializedDAG.serialize_dag(dag).items()
File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 821, in serialize_dag
raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'dag_test': 'str' object has no attribute '__module__'
Expected behavior
Dags and tasks sucessfully imported in Datahub as a new pipeline
Version
- Airflow: v2.2.1
- Datahub: 0.8.31
- acryl-datahub[airflow]:0.8.31
Metadata
Metadata
Assignees
Labels
bugBug reportBug reportingestionPR or Issue related to the ingestion of metadataPR or Issue related to the ingestion of metadata