Skip to content

Commit

Permalink
fix(ingest/transformers): Use set to store tags in AddDatasetTags (#1…
Browse files Browse the repository at this point in the history
  • Loading branch information
asikowitz authored Apr 18, 2024
1 parent 77f1a0c commit a041a2e
Showing 1 changed file with 10 additions and 19 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import logging
from typing import Callable, List, Optional, Union, cast
from typing import Callable, Dict, List, Optional, Union, cast

import datahub.emitter.mce_builder as builder
from datahub.configuration.common import (
KeyValuePattern,
TransformerSemanticsConfigModel,
Expand All @@ -15,7 +14,6 @@
GlobalTagsClass,
MetadataChangeProposalClass,
TagAssociationClass,
TagKeyClass,
)
from datahub.utilities.urns.tag_urn import TagUrn

Expand All @@ -33,13 +31,13 @@ class AddDatasetTags(DatasetTagsTransformer):

ctx: PipelineContext
config: AddDatasetTagsConfig
processed_tags: List[TagAssociationClass]
processed_tags: Dict[str, TagAssociationClass]

def __init__(self, config: AddDatasetTagsConfig, ctx: PipelineContext):
super().__init__()
self.ctx = ctx
self.config = config
self.processed_tags = []
self.processed_tags = {}

@classmethod
def create(cls, config_dict: dict, ctx: PipelineContext) -> "AddDatasetTags":
Expand All @@ -58,9 +56,9 @@ def transform_aspect(
tags_to_add = self.config.get_tags_to_add(entity_urn)
if tags_to_add is not None:
out_global_tags_aspect.tags.extend(tags_to_add)
self.processed_tags.extend(
tags_to_add
) # Keep track of tags added so that we can create them in handle_end_of_stream
# Keep track of tags added so that we can create them in handle_end_of_stream
for tag in tags_to_add:
self.processed_tags.setdefault(tag.tag, tag)

return self.get_result_semantics(
self.config, self.ctx.graph, entity_urn, out_global_tags_aspect
Expand All @@ -76,19 +74,12 @@ def handle_end_of_stream(

logger.debug("Generating tags")

for tag_association in self.processed_tags:
ids: List[str] = TagUrn.create_from_string(
tag_association.tag
).get_entity_id()

assert len(ids) == 1, "Invalid Tag Urn"

tag_name: str = ids[0]

for tag_association in self.processed_tags.values():
tag_urn = TagUrn.create_from_string(tag_association.tag)
mcps.append(
MetadataChangeProposalWrapper(
entityUrn=builder.make_tag_urn(tag=tag_name),
aspect=TagKeyClass(name=tag_name),
entityUrn=tag_urn.urn(),
aspect=tag_urn.to_key_aspect(),
)
)

Expand Down

0 comments on commit a041a2e

Please sign in to comment.