Skip to content

Commit

Permalink
fix(ingestion/airflow-plugin): bumping up the openlineage-airflow ver…
Browse files Browse the repository at this point in the history
…sion (#10457)
  • Loading branch information
dushayntAW authored May 9, 2024
1 parent 7adf726 commit a86ec7b
Show file tree
Hide file tree
Showing 4 changed files with 9 additions and 1 deletion.
3 changes: 3 additions & 0 deletions .github/workflows/airflow-plugin.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ jobs:
- python-version: "3.10"
extra_pip_requirements: 'apache-airflow==2.8.1 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.8.1/constraints-3.10.txt'
extra_pip_extras: plugin-v2
- python-version: "3.10"
extra_pip_requirements: 'apache-airflow==2.9.0 -c https://raw.githubusercontent.com/apache/airflow/constraints-2.9.0/constraints-3.10.txt'
extra_pip_extras: plugin-v2
fail-fast: false
steps:
- name: Set up JDK 17
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion-modules/airflow-plugin/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def get_long_description():
# We remain restrictive on the versions allowed here to prevent
# us from being broken by backwards-incompatible changes in the
# underlying package.
"openlineage-airflow>=1.2.0,<=1.7.0",
"openlineage-airflow>=1.2.0,<=1.12.0",
},
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,7 @@ def run_datajob(
config: Optional[DatahubLineageConfig] = None,
) -> DataProcessInstance:
if datajob is None:
assert ti.task is not None
datajob = AirflowGenerator.generate_datajob(
cluster, ti.task, dag, config=config
)
Expand Down Expand Up @@ -509,6 +510,7 @@ def complete_datajob(
:return: DataProcessInstance
"""
if datajob is None:
assert ti.task is not None
datajob = AirflowGenerator.generate_datajob(
cluster, ti.task, dag, config=config
)
Expand All @@ -530,6 +532,7 @@ def complete_datajob(
f"Result should be either success or failure and it was {ti.state}"
)

assert datajob is not None
dpi = DataProcessInstance.from_datajob(
datajob=datajob,
id=f"{dag.dag_id}_{ti.task_id}_{dag_run.run_id}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ def on_task_instance_running(
# The type ignore is to placate mypy on Airflow 2.1.x.
dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined]
task = task_instance.task
assert task is not None
dag: "DAG" = task.dag # type: ignore[assignment]

self._task_holder.set_task(task_instance)
Expand Down Expand Up @@ -447,6 +448,7 @@ def on_task_instance_finish(
) -> None:
dagrun: "DagRun" = task_instance.dag_run # type: ignore[attr-defined]
task = self._task_holder.get_task(task_instance) or task_instance.task
assert task is not None
dag: "DAG" = task.dag # type: ignore[assignment]

datajob = AirflowGenerator.generate_datajob(
Expand Down

0 comments on commit a86ec7b

Please sign in to comment.