-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
fix(ingestion/prefect-plugin): Prefect plugin (#10643)
Co-authored-by: shubhamjagtap639 <[email protected]> Co-authored-by: Tamas Nemeth <[email protected]>
- Loading branch information
1 parent
1a051b1
commit 6204cba
Showing
29 changed files
with
2,567 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
name: Prefect Plugin | ||
on: | ||
push: | ||
branches: | ||
- master | ||
paths: | ||
- ".github/workflows/prefect-plugin.yml" | ||
- "metadata-ingestion-modules/prefect-plugin/**" | ||
- "metadata-ingestion/**" | ||
- "metadata-models/**" | ||
pull_request: | ||
branches: | ||
- "**" | ||
paths: | ||
- ".github/workflows/prefect-plugin.yml" | ||
- "metadata-ingestion-modules/prefect-plugin/**" | ||
- "metadata-ingestion/**" | ||
- "metadata-models/**" | ||
release: | ||
types: [published] | ||
|
||
concurrency: | ||
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} | ||
cancel-in-progress: true | ||
|
||
jobs: | ||
prefect-plugin: | ||
runs-on: ubuntu-latest | ||
env: | ||
SPARK_VERSION: 3.0.3 | ||
DATAHUB_TELEMETRY_ENABLED: false | ||
strategy: | ||
matrix: | ||
python-version: ["3.8", "3.9", "3.10"] | ||
include: | ||
- python-version: "3.8" | ||
- python-version: "3.9" | ||
- python-version: "3.10" | ||
fail-fast: false | ||
steps: | ||
- name: Set up JDK 17 | ||
uses: actions/setup-java@v3 | ||
with: | ||
distribution: "zulu" | ||
java-version: 17 | ||
- uses: gradle/gradle-build-action@v2 | ||
- uses: actions/checkout@v3 | ||
- uses: actions/setup-python@v4 | ||
with: | ||
python-version: ${{ matrix.python-version }} | ||
cache: "pip" | ||
- name: Install dependencies | ||
run: ./metadata-ingestion/scripts/install_deps.sh | ||
- name: Install prefect package | ||
run: ./gradlew :metadata-ingestion-modules:prefect-plugin:lint :metadata-ingestion-modules:prefect-plugin:testQuick | ||
- name: pip freeze show list installed | ||
if: always() | ||
run: source metadata-ingestion-modules/prefect-plugin/venv/bin/activate && pip freeze | ||
- uses: actions/upload-artifact@v3 | ||
if: ${{ always() && matrix.python-version == '3.10'}} | ||
with: | ||
name: Test Results (Prefect Plugin ${{ matrix.python-version}}) | ||
path: | | ||
**/build/reports/tests/test/** | ||
**/build/test-results/test/** | ||
**/junit.*.xml | ||
!**/binary/** | ||
- name: Upload coverage to Codecov | ||
if: always() | ||
uses: codecov/codecov-action@v3 | ||
with: | ||
token: ${{ secrets.CODECOV_TOKEN }} | ||
directory: . | ||
fail_ci_if_error: false | ||
flags: prefect,prefect-${{ matrix.extra_pip_extras }} | ||
name: pytest-prefect-${{ matrix.python-version }} | ||
verbose: true | ||
|
||
event-file: | ||
runs-on: ubuntu-latest | ||
steps: | ||
- name: Upload | ||
uses: actions/upload-artifact@v3 | ||
with: | ||
name: Event File | ||
path: ${{ github.event_path }} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,137 @@ | ||
# Prefect Integration with DataHub | ||
|
||
## Overview | ||
|
||
DataHub supports integration with Prefect, allowing you to ingest: | ||
|
||
- Prefect flow and task metadata | ||
- Flow run and Task run information | ||
- Lineage information (when available) | ||
|
||
This integration enables you to track and monitor your Prefect workflows within DataHub, providing a comprehensive view of your data pipeline activities. | ||
|
||
## Prefect DataHub Block | ||
|
||
### What is a Prefect DataHub Block? | ||
|
||
Blocks in Prefect are primitives that enable the storage of configuration and provide an interface for interacting with external systems. The `prefect-datahub` block uses the [DataHub REST](../../metadata-ingestion/sink_docs/datahub.md#datahub-rest) emitter to send metadata events while running Prefect flows. | ||
|
||
### Prerequisites | ||
|
||
1. Use either Prefect Cloud (recommended) or a self-hosted Prefect server. | ||
2. For Prefect Cloud setup, refer to the [Cloud Quickstart](https://docs.prefect.io/latest/getting-started/quickstart/) guide. | ||
3. For self-hosted Prefect server setup, refer to the [Host Prefect Server](https://docs.prefect.io/latest/guides/host/) guide. | ||
4. Ensure the Prefect API URL is set correctly. Verify using: | ||
|
||
```shell | ||
prefect profile inspect | ||
``` | ||
|
||
5. API URL format: | ||
- Prefect Cloud: `https://api.prefect.cloud/api/accounts/<account_id>/workspaces/<workspace_id>` | ||
- Self-hosted: `http://<host>:<port>/api` | ||
|
||
## Setup Instructions | ||
|
||
### 1. Installation | ||
|
||
Install `prefect-datahub` using pip: | ||
|
||
```shell | ||
pip install 'prefect-datahub' | ||
``` | ||
|
||
Note: Requires Python 3.7+ | ||
|
||
### 2. Saving Configurations to a Block | ||
|
||
Save your configuration to the [Prefect block document store](https://docs.prefect.io/latest/concepts/blocks/#saving-blocks): | ||
|
||
```python | ||
from prefect_datahub.datahub_emitter import DatahubEmitter | ||
|
||
DatahubEmitter( | ||
datahub_rest_url="http://localhost:8080", | ||
env="PROD", | ||
platform_instance="local_prefect" | ||
).save("MY-DATAHUB-BLOCK") | ||
``` | ||
|
||
Configuration options: | ||
|
||
| Config | Type | Default | Description | | ||
|--------|------|---------|-------------| | ||
| datahub_rest_url | `str` | `http://localhost:8080` | DataHub GMS REST URL | | ||
| env | `str` | `PROD` | Environment for assets (see [FabricType](https://datahubproject.io/docs/graphql/enums/#fabrictype)) | | ||
| platform_instance | `str` | `None` | Platform instance for assets (see [Platform Instances](https://datahubproject.io/docs/platform-instances/)) | | ||
|
||
### 3. Using the Block in Prefect Workflows | ||
|
||
Load and use the saved block in your Prefect workflows: | ||
|
||
```python | ||
from prefect import flow, task | ||
from prefect_datahub.dataset import Dataset | ||
from prefect_datahub.datahub_emitter import DatahubEmitter | ||
|
||
datahub_emitter = DatahubEmitter.load("MY-DATAHUB-BLOCK") | ||
|
||
@task(name="Transform", description="Transform the data") | ||
def transform(data): | ||
data = data.split(" ") | ||
datahub_emitter.add_task( | ||
inputs=[Dataset("snowflake", "mydb.schema.tableA")], | ||
outputs=[Dataset("snowflake", "mydb.schema.tableC")], | ||
) | ||
return data | ||
|
||
@flow(name="ETL flow", description="Extract transform load flow") | ||
def etl(): | ||
data = transform("This is data") | ||
datahub_emitter.emit_flow() | ||
``` | ||
|
||
**Note**: To emit tasks, you must call `emit_flow()`. Otherwise, no metadata will be emitted. | ||
|
||
## Concept Mapping | ||
|
||
| Prefect Concept | DataHub Concept | | ||
|-----------------|-----------------| | ||
| [Flow](https://docs.prefect.io/latest/concepts/flows/) | [DataFlow](https://datahubproject.io/docs/generated/metamodel/entities/dataflow/) | | ||
| [Flow Run](https://docs.prefect.io/latest/concepts/flows/#flow-runs) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) | | ||
| [Task](https://docs.prefect.io/latest/concepts/tasks/) | [DataJob](https://datahubproject.io/docs/generated/metamodel/entities/datajob/) | | ||
| [Task Run](https://docs.prefect.io/latest/concepts/tasks/#tasks) | [DataProcessInstance](https://datahubproject.io/docs/generated/metamodel/entities/dataprocessinstance) | | ||
| [Task Tag](https://docs.prefect.io/latest/concepts/tasks/#tags) | [Tag](https://datahubproject.io/docs/generated/metamodel/entities/tag/) | | ||
|
||
## Validation and Troubleshooting | ||
|
||
### Validating the Setup | ||
|
||
1. Check the Prefect UI's Blocks menu for the DataHub emitter. | ||
2. Run a Prefect workflow and look for DataHub-related log messages: | ||
|
||
```text | ||
Emitting flow to datahub... | ||
Emitting tasks to datahub... | ||
``` | ||
|
||
### Debugging Common Issues | ||
|
||
#### Incorrect Prefect API URL | ||
|
||
If the Prefect API URL is incorrect, set it manually: | ||
|
||
```shell | ||
prefect config set PREFECT_API_URL='http://127.0.0.1:4200/api' | ||
``` | ||
|
||
#### DataHub Connection Error | ||
|
||
If you encounter a `ConnectionError: HTTPConnectionPool(host='localhost', port=8080)`, ensure that your DataHub GMS service is running. | ||
|
||
## Additional Resources | ||
|
||
- [Prefect Documentation](https://docs.prefect.io/) | ||
- [DataHub Documentation](https://datahubproject.io/docs/) | ||
|
||
For more information or support, please refer to the official Prefect and DataHub documentation or reach out to their respective communities. |
Oops, something went wrong.