Skip to content

Commit

Permalink
feat(ingest/abs): Adding azure blob storage ingestion source (#10813)
Browse files Browse the repository at this point in the history
  • Loading branch information
joelmataKPN authored Jul 17, 2024
1 parent 298c299 commit 13b6feb
Show file tree
Hide file tree
Showing 17 changed files with 2,138 additions and 11 deletions.
40 changes: 40 additions & 0 deletions metadata-ingestion/docs/sources/abs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
This connector ingests Azure Blob Storage (abbreviated to abs) datasets into DataHub. It allows mapping an individual
file or a folder of files to a dataset in DataHub.
To specify the group of files that form a dataset, use `path_specs` configuration in ingestion recipe. Refer
section [Path Specs](https://datahubproject.io/docs/generated/ingestion/sources/s3/#path-specs) for more details.

### Concept Mapping

This ingestion source maps the following Source System Concepts to DataHub Concepts:

| Source Concept | DataHub Concept | Notes |
|----------------------------------------|--------------------------------------------------------------------------------------------|------------------|
| `"abs"` | [Data Platform](https://datahubproject.io/docs/generated/metamodel/entities/dataplatform/) | |
| abs blob / Folder containing abs blobs | [Dataset](https://datahubproject.io/docs/generated/metamodel/entities/dataset/) | |
| abs container | [Container](https://datahubproject.io/docs/generated/metamodel/entities/container/) | Subtype `Folder` |

This connector supports both local files and those stored on Azure Blob Storage (which must be identified using the
prefix `http(s)://<account>.blob.core.windows.net/` or `azure://`).

### Supported file types

Supported file types are as follows:

- CSV (*.csv)
- TSV (*.tsv)
- JSONL (*.jsonl)
- JSON (*.json)
- Parquet (*.parquet)
- Apache Avro (*.avro)

Schemas for Parquet and Avro files are extracted as provided.

Schemas for schemaless formats (CSV, TSV, JSONL, JSON) are inferred. For CSV, TSV and JSONL files, we consider the first
100 rows by default, which can be controlled via the `max_rows` recipe parameter (see [below](#config-details))
JSON file schemas are inferred on the basis of the entire file (given the difficulty in extracting only the first few
objects of the file), which may impact performance.
We are working on using iterator-based JSON parsers to avoid reading in the entire JSON object.

### Profiling

Profiling is not available in the current release.
204 changes: 204 additions & 0 deletions metadata-ingestion/docs/sources/abs/abs.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@

### Path Specs

Path Specs (`path_specs`) is a list of Path Spec (`path_spec`) objects, where each individual `path_spec` represents one or more datasets. The include path (`path_spec.include`) represents a formatted path to the dataset. This path must end with `*.*` or `*.[ext]` to represent the leaf level. If `*.[ext]` is provided, then only files with the specified extension type will be scanned. "`.[ext]`" can be any of the [supported file types](#supported-file-types). Refer to [example 1](#example-1---individual-file-as-dataset) below for more details.

All folder levels need to be specified in the include path. You can use `/*/` to represent a folder level and avoid specifying the exact folder name. To map a folder as a dataset, use the `{table}` placeholder to represent the folder level for which the dataset is to be created. For a partitioned dataset, you can use the placeholder `{partition_key[i]}` to represent the name of the `i`th partition and `{partition[i]}` to represent the value of the `i`th partition. During ingestion, `i` will be used to match the partition_key to the partition. Refer to [examples 2 and 3](#example-2---folder-of-files-as-dataset-without-partitions) below for more details.

Exclude paths (`path_spec.exclude`) can be used to ignore paths that are not relevant to the current `path_spec`. This path cannot have named variables (`{}`). The exclude path can have `**` to represent multiple folder levels. Refer to [example 4](#example-4---folder-of-files-as-dataset-with-partitions-and-exclude-filter) below for more details.

Refer to [example 5](#example-5---advanced---either-individual-file-or-folder-of-files-as-dataset) if your container has a more complex dataset representation.

**Additional points to note**
- Folder names should not contain {, }, *, / in their names.
- Named variable {folder} is reserved for internal working. please do not use in named variables.


### Path Specs - Examples
#### Example 1 - Individual file as Dataset

Container structure:

```
test-container
├── employees.csv
├── departments.json
└── food_items.csv
```

Path specs config to ingest `employees.csv` and `food_items.csv` as datasets:
```
path_specs:
- include: https://storageaccountname.blob.core.windows.net/test-container/*.csv
```
This will automatically ignore `departments.json` file. To include it, use `*.*` instead of `*.csv`.

#### Example 2 - Folder of files as Dataset (without Partitions)

Container structure:
```
test-container
└── offers
├── 1.avro
└── 2.avro
```

Path specs config to ingest folder `offers` as dataset:
```
path_specs:
- include: https://storageaccountname.blob.core.windows.net/test-container/{table}/*.avro
```

`{table}` represents folder for which dataset will be created.

#### Example 3 - Folder of files as Dataset (with Partitions)

Container structure:
```
test-container
├── orders
│ └── year=2022
│ └── month=2
│ ├── 1.parquet
│ └── 2.parquet
└── returns
└── year=2021
└── month=2
└── 1.parquet
```

Path specs config to ingest folders `orders` and `returns` as datasets:
```
path_specs:
- include: https://storageaccountname.blob.core.windows.net/test-container/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
```

One can also use `include: https://storageaccountname.blob.core.windows.net/test-container/{table}/*/*/*.parquet` here however above format is preferred as it allows declaring partitions explicitly.

#### Example 4 - Folder of files as Dataset (with Partitions), and Exclude Filter

Container structure:
```
test-container
├── orders
│ └── year=2022
│ └── month=2
│ ├── 1.parquet
│ └── 2.parquet
└── tmp_orders
└── year=2021
└── month=2
└── 1.parquet
```

Path specs config to ingest folder `orders` as dataset but not folder `tmp_orders`:
```
path_specs:
- include: https://storageaccountname.blob.core.windows.net/test-container/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
exclude:
- **/tmp_orders/**
```


#### Example 5 - Advanced - Either Individual file OR Folder of files as Dataset

Container structure:
```
test-container
├── customers
│ ├── part1.json
│ ├── part2.json
│ ├── part3.json
│ └── part4.json
├── employees.csv
├── food_items.csv
├── tmp_10101000.csv
└── orders
└── year=2022
└── month=2
├── 1.parquet
├── 2.parquet
└── 3.parquet
```

Path specs config:
```
path_specs:
- include: https://storageaccountname.blob.core.windows.net/test-container/*.csv
exclude:
- **/tmp_10101000.csv
- include: https://storageaccountname.blob.core.windows.net/test-container/{table}/*.json
- include: https://storageaccountname.blob.core.windows.net/test-container/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.parquet
```

Above config has 3 path_specs and will ingest following datasets
- `employees.csv` - Single File as Dataset
- `food_items.csv` - Single File as Dataset
- `customers` - Folder as Dataset
- `orders` - Folder as Dataset
and will ignore file `tmp_10101000.csv`

**Valid path_specs.include**

```python
https://storageaccountname.blob.core.windows.net/my-container/foo/tests/bar.avro # single file table
https://storageaccountname.blob.core.windows.net/my-container/foo/tests/*.* # mulitple file level tables
https://storageaccountname.blob.core.windows.net/my-container/foo/tests/{table}/*.avro #table without partition
https://storageaccountname.blob.core.windows.net/my-container/foo/tests/{table}/*/*.avro #table where partitions are not specified
https://storageaccountname.blob.core.windows.net/my-container/foo/tests/{table}/*.* # table where no partitions as well as data type specified
https://storageaccountname.blob.core.windows.net/my-container/{dept}/tests/{table}/*.avro # specifying keywords to be used in display name
https://storageaccountname.blob.core.windows.net/my-container/{dept}/tests/{table}/{partition_key[0]}={partition[0]}/{partition_key[1]}={partition[1]}/*.avro # specify partition key and value format
https://storageaccountname.blob.core.windows.net/my-container/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.avro # specify partition value only format
https://storageaccountname.blob.core.windows.net/my-container/{dept}/tests/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # for all extensions
https://storageaccountname.blob.core.windows.net/my-container/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 2 levels down in container
https://storageaccountname.blob.core.windows.net/my-container/*/*/{table}/{partition[0]}/{partition[1]}/{partition[2]}/*.* # table is present at 3 levels down in container
```

**Valid path_specs.exclude**
- \**/tests/**
- https://storageaccountname.blob.core.windows.net/my-container/hr/**
- **/tests/*.csv
- https://storageaccountname.blob.core.windows.net/my-container/foo/*/my_table/**



If you would like to write a more complicated function for resolving file names, then a {transformer} would be a good fit.

:::caution

Specify as long fixed prefix ( with out /*/ ) as possible in `path_specs.include`. This will reduce the scanning time and cost, specifically on AWS S3

:::

:::caution

Running profiling against many tables or over many rows can run up significant costs.
While we've done our best to limit the expensiveness of the queries the profiler runs, you
should be prudent about the set of tables profiling is enabled on or the frequency
of the profiling runs.

:::

:::caution

If you are ingesting datasets from AWS S3, we recommend running the ingestion on a server in the same region to avoid high egress costs.

:::

### Compatibility

Profiles are computed with PyDeequ, which relies on PySpark. Therefore, for computing profiles, we currently require Spark 3.0.3 with Hadoop 3.2 to be installed and the `SPARK_HOME` and `SPARK_VERSION` environment variables to be set. The Spark+Hadoop binary can be downloaded [here](https://www.apache.org/dyn/closer.lua/spark/spark-3.0.3/spark-3.0.3-bin-hadoop3.2.tgz).

For an example guide on setting up PyDeequ on AWS, see [this guide](https://aws.amazon.com/blogs/big-data/testing-data-quality-at-scale-with-pydeequ/).

:::caution

From Spark 3.2.0+, Avro reader fails on column names that don't start with a letter and contains other character than letters, number, and underscore. [https://github.com/apache/spark/blob/72c62b6596d21e975c5597f8fff84b1a9d070a02/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroFileFormat.scala#L158]
Avro files that contain such columns won't be profiled.
:::
13 changes: 13 additions & 0 deletions metadata-ingestion/docs/sources/abs/abs_recipe.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
source:
type: abs
config:
path_specs:
- include: "https://storageaccountname.blob.core.windows.net/covid19-lake/covid_knowledge_graph/csv/nodes/*.*"

azure_config:
account_name: "*****"
sas_token: "*****"
container_name: "covid_knowledge_graph"
env: "PROD"

# sink configs
2 changes: 1 addition & 1 deletion metadata-ingestion/docs/sources/s3/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
This connector ingests AWS S3 datasets into DataHub. It allows mapping an individual file or a folder of files to a dataset in DataHub.
To specify the group of files that form a dataset, use `path_specs` configuration in ingestion recipe. Refer section [Path Specs](https://datahubproject.io/docs/generated/ingestion/sources/s3/#path-specs) for more details.
Refer to the section [Path Specs](https://datahubproject.io/docs/generated/ingestion/sources/s3/#path-specs) for more details.

:::tip
This connector can also be used to ingest local files.
Expand Down
10 changes: 10 additions & 0 deletions metadata-ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,13 +258,21 @@
*path_spec_common,
}

abs_base = {
"azure-core==1.29.4",
"azure-identity>=1.14.0",
"azure-storage-blob>=12.19.0",
"azure-storage-file-datalake>=12.14.0",
}

data_lake_profiling = {
"pydeequ~=1.1.0",
"pyspark~=3.3.0",
}

delta_lake = {
*s3_base,
*abs_base,
# Version 0.18.0 broken on ARM Macs: https://github.com/delta-io/delta-rs/issues/2577
"deltalake>=0.6.3, != 0.6.4, < 0.18.0; platform_system == 'Darwin' and platform_machine == 'arm64'",
"deltalake>=0.6.3, != 0.6.4; platform_system != 'Darwin' or platform_machine != 'arm64'",
Expand Down Expand Up @@ -407,6 +415,7 @@
| {"cachetools"},
"s3": {*s3_base, *data_lake_profiling},
"gcs": {*s3_base, *data_lake_profiling},
"abs": {*abs_base},
"sagemaker": aws_common,
"salesforce": {"simple-salesforce"},
"snowflake": snowflake_common | usage_common | sqlglot_lib,
Expand Down Expand Up @@ -686,6 +695,7 @@
"demo-data = datahub.ingestion.source.demo_data.DemoDataSource",
"unity-catalog = datahub.ingestion.source.unity.source:UnityCatalogSource",
"gcs = datahub.ingestion.source.gcs.gcs_source:GCSSource",
"abs = datahub.ingestion.source.abs.source:ABSSource",
"sql-queries = datahub.ingestion.source.sql_queries:SqlQueriesSource",
"fivetran = datahub.ingestion.source.fivetran.fivetran:FivetranSource",
"qlik-sense = datahub.ingestion.source.qlik_sense.qlik_sense:QlikSenseSource",
Expand Down
Empty file.
Loading

0 comments on commit 13b6feb

Please sign in to comment.