Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/s3): Fixing container creation when there is no folder in path #10993

Merged
merged 4 commits into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,23 +160,24 @@ def create_container_hierarchy(
)
return

for folder in parent_folder_path.split("/"):
abs_path = folder
if parent_key:
prefix: str = ""
if isinstance(parent_key, BucketKey):
prefix = parent_key.bucket_name
elif isinstance(parent_key, FolderKey):
prefix = parent_key.folder_abs_path
abs_path = prefix + "/" + folder
folder_key = self.gen_folder_key(abs_path)
yield from self.create_emit_containers(
container_key=folder_key,
name=folder,
sub_types=[DatasetContainerSubTypes.FOLDER],
parent_container_key=parent_key,
)
parent_key = folder_key
if parent_folder_path:
for folder in parent_folder_path.split("/"):
abs_path = folder
if parent_key:
prefix: str = ""
if isinstance(parent_key, BucketKey):
prefix = parent_key.bucket_name
elif isinstance(parent_key, FolderKey):
prefix = parent_key.folder_abs_path
abs_path = prefix + "/" + folder
folder_key = self.gen_folder_key(abs_path)
yield from self.create_emit_containers(
container_key=folder_key,
name=folder,
sub_types=[DatasetContainerSubTypes.FOLDER],
parent_container_key=parent_key,
)
parent_key = folder_key

assert parent_key is not None
yield from add_dataset_to_container(parent_key, dataset_urn)
78 changes: 78 additions & 0 deletions metadata-ingestion/tests/unit/s3/test_s3_source.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
from typing import List

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.data_lake_common.data_lake_utils import ContainerWUCreator
from datahub.ingestion.source.data_lake_common.path_spec import PathSpec
from datahub.ingestion.source.s3.source import partitioned_folder_comparator

Expand Down Expand Up @@ -91,3 +96,76 @@ def test_path_spec_dir_allowed():

path = "s3://my-bucket/my-folder/year=2022/month=10/day=10/"
assert path_spec.dir_allowed(path) is False, f"{path} should be denied"


def test_container_generation_without_folders():
cwu = ContainerWUCreator("s3", None, "PROD")
mcps = cwu.create_container_hierarchy(
"s3://my-bucket/my-file.json.gz", "urn:li:dataset:123"
)

def container_properties_filter(x: MetadataWorkUnit) -> bool:
assert isinstance(x.metadata, MetadataChangeProposalWrapper)
return x.metadata.aspectName == "containerProperties"

container_properties: List = list(filter(container_properties_filter, mcps))
assert len(container_properties) == 1
assert container_properties[0].metadata.aspect.customProperties == {
"bucket_name": "my-bucket",
"env": "PROD",
"platform": "s3",
}


def test_container_generation_with_folder():
cwu = ContainerWUCreator("s3", None, "PROD")
mcps = cwu.create_container_hierarchy(
"s3://my-bucket/my-dir/my-file.json.gz", "urn:li:dataset:123"
)

def container_properties_filter(x: MetadataWorkUnit) -> bool:
assert isinstance(x.metadata, MetadataChangeProposalWrapper)
return x.metadata.aspectName == "containerProperties"

container_properties: List = list(filter(container_properties_filter, mcps))
assert len(container_properties) == 2
assert container_properties[0].metadata.aspect.customProperties == {
"bucket_name": "my-bucket",
"env": "PROD",
"platform": "s3",
}
assert container_properties[1].metadata.aspect.customProperties == {
"env": "PROD",
"folder_abs_path": "my-bucket/my-dir",
"platform": "s3",
}


def test_container_generation_with_multiple_folders():
cwu = ContainerWUCreator("s3", None, "PROD")
mcps = cwu.create_container_hierarchy(
"s3://my-bucket/my-dir/my-dir2/my-file.json.gz", "urn:li:dataset:123"
)

def container_properties_filter(x: MetadataWorkUnit) -> bool:
assert isinstance(x.metadata, MetadataChangeProposalWrapper)
return x.metadata.aspectName == "containerProperties"

container_properties: List = list(filter(container_properties_filter, mcps))

assert len(container_properties) == 3
assert container_properties[0].metadata.aspect.customProperties == {
"bucket_name": "my-bucket",
"env": "PROD",
"platform": "s3",
}
assert container_properties[1].metadata.aspect.customProperties == {
"env": "PROD",
"folder_abs_path": "my-bucket/my-dir",
"platform": "s3",
}
assert container_properties[2].metadata.aspect.customProperties == {
"env": "PROD",
"folder_abs_path": "my-bucket/my-dir/my-dir2",
"platform": "s3",
}
Loading