Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ingestion) Ingest Tags from s3 bucket on an AWS Glue job and S3 Data Lake Ingest Job #4689

Merged
merged 37 commits into from
Apr 29, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
e1af525
begin work on tags
Jiafi Apr 18, 2022
e576aa0
working ingest from s3 and updated unit tests
Jiafi Apr 19, 2022
585d4ea
working s3 data lake ingest bucket tags
Jiafi Apr 19, 2022
9956022
cleanup
Jiafi Apr 19, 2022
cc00805
updated source docs for glue and s3_data_lake
Jiafi Apr 19, 2022
06e7750
Merge branch 'master' into jordan/tags
Jiafi Apr 19, 2022
ae629df
isort and fix test failure caused by config validator
Jiafi Apr 19, 2022
6761c77
add flake8 ignore for complexity and fix unit tests
Jiafi Apr 20, 2022
a0dd95f
Merge branch 'master' into jordan/tags
Jiafi Apr 20, 2022
fe98c4d
add object tagging functionality
Jiafi Apr 20, 2022
90e07d4
update unit tests to test for object tags
Jiafi Apr 20, 2022
3e13e6b
test cleanup
Jiafi Apr 20, 2022
b3a716d
Merge branch 'master' into jordan/tags
Jiafi Apr 21, 2022
6c31b37
fix bug in s3 ingest. Needed to use make_tag_urn
Jiafi Apr 21, 2022
bddf131
Merge branch 'jordan/tags' of github.com:Jiafi/datahub into jordan/tags
Jiafi Apr 21, 2022
7fa7bee
forgot make_tag_urn on the object
Jiafi Apr 21, 2022
f6dadb9
fix bug with pushing two GlobalTags at the same time. update tests f…
Jiafi Apr 21, 2022
b833ccb
Merge branch 'master' into jordan/tags
Jiafi Apr 21, 2022
3d060b5
fix typo
Jiafi Apr 25, 2022
eab89e3
Merge branch 'jordan/tags' of github.com:Jiafi/datahub into jordan/tags
Jiafi Apr 25, 2022
ae9e432
Merge branch 'master' into jordan/tags
Jiafi Apr 25, 2022
13b6f50
Merge branch 'master' into jordan/tags
Jiafi Apr 25, 2022
3262cf5
Merge branch 'master' into jordan/tags
Jiafi Apr 25, 2022
5ad5835
add some errors for poor configuration. Do not push tags as an aspec…
Jiafi Apr 26, 2022
f76e226
Merge branch 'jordan/tags' of github.com:Jiafi/datahub into jordan/tags
Jiafi Apr 26, 2022
c788f62
Merge branch 'master' into jordan/tags
Jiafi Apr 26, 2022
79376f2
lintFIx
Jiafi Apr 26, 2022
784e64d
instead of raising an exception, log that it could not grab the curre…
Jiafi Apr 26, 2022
6090ca1
Merge branch 'master' into jordan/tags
Jiafi Apr 26, 2022
541cdd6
remove similar exception from aws glue
Jiafi Apr 26, 2022
ee19426
Merge branch 'master' into jordan/tags
Jiafi Apr 27, 2022
de063bf
make key optional and only apply bucket tags when key is not None
Jiafi Apr 27, 2022
683d5e4
lintFix
Jiafi Apr 27, 2022
6096d75
change 2 logs when cant connect to datahub api to warns from debug
Jiafi Apr 27, 2022
85d2ebf
add error handling and warns for when no tags exist on a bucket and o…
Jiafi Apr 28, 2022
7668a6f
Merge branch 'master' into jordan/tags
Jiafi Apr 28, 2022
544ea97
Merge branch 'master' into jordan/tags
Jiafi Apr 28, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 42 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dataclasses import field as dataclass_field
from typing import Any, Dict, Iterable, Iterator, List, Optional, Set, Tuple, Union
from urllib.parse import urlparse
from datahub.ingestion.source.aws import s3_util

import yaml
from pydantic import validator
Expand All @@ -17,6 +18,7 @@
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
make_tag_urn,
)
from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.emitter.mcp_builder import (
Expand Down Expand Up @@ -48,10 +50,12 @@
DataPlatformInstanceClass,
DatasetLineageTypeClass,
DatasetPropertiesClass,
GlobalTagsClass,
MetadataChangeEventClass,
OwnerClass,
OwnershipClass,
OwnershipTypeClass,
TagAssociationClass,
UpstreamClass,
UpstreamLineageClass,
)
Expand All @@ -75,6 +79,8 @@ class GlueSourceConfig(AwsSourceConfig, PlatformSourceConfigBase):
domain: Dict[str, AllowDenyPattern] = dict()
catalog_id: Optional[str] = None

use_s3_bucket_tags: Optional[bool] = False

@property
def glue_client(self):
return self.get_glue_client()
Expand Down Expand Up @@ -787,6 +793,33 @@ def get_dataset_properties() -> DatasetPropertiesClass:
tags=[],
)

def get_s3_bucket_tags() -> GlobalTagsClass:
print(dataset_urn)
print("in get s3 bucket tags")
bucket_name = s3_util.get_bucket_name(
table["StorageDescriptor"]["Location"]
)
s3_tags = self.s3_client.get_bucket_tagging(Bucket=bucket_name)
tags_to_add = [
make_tag_urn(f"""{tag["Key"]}:{tag["Value"]}""")
for tag in s3_tags["TagSet"]
]
new_tags = GlobalTagsClass(
tags=[TagAssociationClass(tag_to_add) for tag_to_add in tags_to_add]
)
if self.ctx.graph is not None:
current_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_aspect_v2(
entity_urn=dataset_urn,
aspect="globalTags",
aspect_type=GlobalTagsClass,
)
if current_tags:
for tag_to_add in current_tags.tags:
if tag_to_add not in [x.tag for x in new_tags.tags]:
# any current tag not in new tags
new_tags.tags.append(tag_to_add)
return new_tags

def get_schema_metadata(glue_source: GlueSource) -> SchemaMetadata:
schema = table["StorageDescriptor"]["Columns"]
fields: List[SchemaField] = []
Expand Down Expand Up @@ -829,13 +862,14 @@ def get_data_platform_instance() -> DataPlatformInstanceClass:
else None,
)

dataset_urn = make_dataset_urn_with_platform_instance(
platform=self.platform,
name=table_name,
env=self.env,
platform_instance=self.source_config.platform_instance,
)
dataset_snapshot = DatasetSnapshot(
urn=make_dataset_urn_with_platform_instance(
platform=self.platform,
name=table_name,
env=self.env,
platform_instance=self.source_config.platform_instance,
),
urn=dataset_urn,
aspects=[],
)

Expand All @@ -849,6 +883,8 @@ def get_data_platform_instance() -> DataPlatformInstanceClass:
dataset_snapshot.aspects.append(get_dataset_properties())
dataset_snapshot.aspects.append(get_schema_metadata(self))
dataset_snapshot.aspects.append(get_data_platform_instance())
if self.source_config.use_s3_bucket_tags:
dataset_snapshot.aspects.append(get_s3_bucket_tags())

metadata_record = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
return metadata_record
Expand Down
26 changes: 25 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/s3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
from datahub.metadata.schema_classes import (
ChangeTypeClass,
DatasetPropertiesClass,
GlobalTagsClass,
MapTypeClass,
OtherSchemaClass,
)
Expand Down Expand Up @@ -316,7 +317,7 @@ def read_file_spark(self, file: str, ext: str) -> Optional[DataFrame]:
df = self.spark.read.json(file)
elif ext.endswith(".avro"):
try:
df = self.spark.read.format("avro").load(file)
df = self.sparkeread.format("avro").load(file)
except AnalysisException:
self.report.report_warning(
file,
Expand Down Expand Up @@ -569,6 +570,19 @@ def ingest_table(self, table_data: TableData) -> Iterable[MetadataWorkUnit]:
if self.source_config.profiling.enabled:
yield from self.get_table_profile(table_data, dataset_urn)

if (
self.source_config.use_s3_bucket_tags
or self.source_config.use_s3_object_tags
) and self.ctx.graph is not None:
current_tags: Optional[GlobalTagsClass] = self.ctx.graph.get_aspect_v2(
entity_urn=dataset_urn,
aspect="globalTags",
aspect_type=GlobalTagsClass,
)
tags_to_add = [make_tag_urn(tag) for tag in current_tag]

pass

def gen_bucket_key(self, name):
return S3BucketKey(
platform="s3",
Expand All @@ -578,6 +592,16 @@ def gen_bucket_key(self, name):
bucket_name=name,
)

def create_new_tags(self, tags: set[str], dataset_urn: str):
pass

def get_bucket_tags(self, name: str) -> set[str]:
if self.source_config.aws_config is None:
raise ValueError("aws_config not set. Cannot browse s3")
s3 = self.source_config.aws_config.get_s3_resource()
bucket = s3.Bucket(name)
return {f"""{tag["Key"]}:{tag["Value"]}""" for tag in bucket.Tagging().tag_set}

def gen_folder_key(self, abs_path):
return FolderKey(
platform=self.source_config.platform,
Expand Down
12 changes: 12 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/s3/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,11 @@ class DataLakeSourceConfig(ConfigModel):

aws_config: Optional[AwsSourceConfig] = None

# Whether or not to create in datahub from the s3 bucket
use_s3_bucket_tags: Optional[bool] = None
# Whether or not to create in datahub from the s3 object
use_s3_object_tags: Optional[bool] = None

profile_patterns: AllowDenyPattern = AllowDenyPattern.allow_all()
profiling: DataLakeProfilerConfig = DataLakeProfilerConfig()

Expand All @@ -158,6 +163,13 @@ def validate_platform(cls, values: Dict) -> Dict:
if values["path_spec"].is_s3():
values["platform"] = "s3"
else:
if (
values["us3_s3_object_tags"] is not None
or values["us3_s3_bucket_tags"] is not None
):
raise ValueError(
"cannot grab s3 tags for platform=file. Remove the flag or use s3."
)
values["platform"] = "file"
logger.debug(f'Setting config "platform": {values.get("platform")}')
return values
Expand Down
29 changes: 27 additions & 2 deletions metadata-ingestion/tests/unit/glue/glue_mces_golden.json
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,13 @@
"platform": "urn:li:dataPlatform:glue",
"instance": null
}
},
{
"com.linkedin.pegasus2avro.common.GlobalTags": {
"tags": [
{"tag": "urn:li:tag:foo:bar"}
]
}
}
]
}
Expand Down Expand Up @@ -496,7 +503,16 @@
"platform": "urn:li:dataPlatform:glue",
"instance": null
}
}
},
{
"com.linkedin.pegasus2avro.common.GlobalTags": {
"tags": [
{
"tag": "urn:li:tag:foo:bar"
}
]
}
}
]
}
},
Expand Down Expand Up @@ -695,6 +711,15 @@
"platform": "urn:li:dataPlatform:glue",
"instance": null
}
},
{
"com.linkedin.pegasus2avro.common.GlobalTags": {
"tags": [
{
"tag": "urn:li:tag:foo:bar"
}
]
}
}
]
}
Expand Down Expand Up @@ -1254,4 +1279,4 @@
"proposedDelta": null,
"systemMetadata": null
}
]
]
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,15 @@
"platform": "urn:li:dataPlatform:glue",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)"
}
},
{
"com.linkedin.pegasus2avro.common.GlobalTags": {
"tags": [
{
"tag": "urn:li:tag:foo:bar"
}
]
}
}
]
}
Expand Down Expand Up @@ -496,6 +505,15 @@
"platform": "urn:li:dataPlatform:glue",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)"
}
},
{
"com.linkedin.pegasus2avro.common.GlobalTags": {
"tags": [
{
"tag": "urn:li:tag:foo:bar"
}
]
}
}
]
}
Expand Down Expand Up @@ -695,6 +713,15 @@
"platform": "urn:li:dataPlatform:glue",
"instance": "urn:li:dataPlatformInstance:(urn:li:dataPlatform:glue,some_instance_name)"
}
},
{
"com.linkedin.pegasus2avro.common.GlobalTags": {
"tags": [
{
"tag": "urn:li:tag:foo:bar"
}
]
}
}
]
}
Expand Down Expand Up @@ -1254,4 +1281,4 @@
"proposedDelta": null,
"systemMetadata": null
}
]
]
11 changes: 11 additions & 0 deletions metadata-ingestion/tests/unit/test_glue_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from tests.test_helpers import mce_helpers
from tests.test_helpers.type_helpers import PytestConfig
from tests.unit.test_glue_source_stubs import (
get_bucket_tagging,
get_databases_response,
get_dataflow_graph_response_1,
get_dataflow_graph_response_2,
Expand All @@ -42,6 +43,7 @@ def glue_source(platform_instance: Optional[str] = None) -> GlueSource:
aws_region="us-west-2",
extract_transforms=True,
platform_instance=platform_instance,
use_s3_bucket_tags=True,
),
)

Expand Down Expand Up @@ -117,6 +119,15 @@ def test_glue_ingest(

with Stubber(glue_source_instance.s3_client) as s3_stubber:

for _ in range(
len(get_tables_response_1["TableList"])
+ len(get_tables_response_2["TableList"])
):
s3_stubber.add_response(
"get_bucket_tagging",
get_bucket_tagging(),
)

s3_stubber.add_response(
"get_object",
get_object_response_1(),
Expand Down
4 changes: 4 additions & 0 deletions metadata-ingestion/tests/unit/test_glue_source_stubs.py
Original file line number Diff line number Diff line change
Expand Up @@ -765,3 +765,7 @@ def get_object_response_1() -> Dict[str, Any]:

def get_object_response_2() -> Dict[str, Any]:
return mock_get_object_response(get_object_body_2)


def get_bucket_tagging() -> Dict[str, Any]:
return {"TagSet": [{"Key": "foo", "Value": "bar"}]}