Skip to content

Error importing DAGs with params from airflow to Datahub #4546

@edulauer

Description

@edulauer

Describe the bug
I've seen that some of my running DAGs from Airflow are not getting imported in Datahub. It looks like this error is caused by when the DAGs is configured with a template of the Param Class (https://airflow.apache.org/docs/apache-airflow/2.2.1/concepts/params.html#:~:text=Params%20are%20Airflow's%20concept%20of,depends%20on%20the%20flag%20core.) I don't know exactly why this error occours, but it could be relate to type of params, which is a dictionary type.

**Screenshots**
Here is an example of a DAG definition of ours:
dag = DAG(
    'dag_test',
    default_args=default_args,
    schedule_interval="0 4 15 * *",
    catchup=False,
    description=__doc__,
    params={"reference_date": "YYYY-MM-DD"},
    tags=['pgd', 'datamart', 'datalake']
)

And here is the stack trace (task log in airflow), after turned graceful_exceptions to false:


Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1332, in _run_raw_task
    self._execute_task_with_callbacks(context)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 1461, in _execute_task_with_callbacks
    self.task.post_execute(context=context, result=result)
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/lineage/__init__.py", line 124, in wrapper
    _backend.send_lineage(operator=self, inlets=self.inlets, outlets=self.outlets, context=context)
  File "/home/airflow/.local/lib/python3.9/site-packages/datahub_provider/lineage/datahub.py", line 117, in send_lineage
    send_lineage_to_datahub(
  File "/home/airflow/.local/lib/python3.9/site-packages/datahub_provider/_lineage_core.py", line 144, in send_lineage_to_datahub
    key: repr(value) for (key, value) in SerializedDAG.serialize_dag(dag).items()
  File "/home/airflow/.local/lib/python3.9/site-packages/airflow/serialization/serialized_objects.py", line 821, in serialize_dag
    raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'dag_test': 'str' object has no attribute '__module__'

Expected behavior
Dags and tasks sucessfully imported in Datahub as a new pipeline

Version

  • Airflow: v2.2.1
  • Datahub: 0.8.31
  • acryl-datahub[airflow]:0.8.31

Metadata

Metadata

Assignees

Labels

bugBug reportingestionPR or Issue related to the ingestion of metadata

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions