Skip to content

Commit

Permalink
sdk(platform-resource): add entity type for ease of use (#11541)
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka authored Oct 8, 2024
1 parent 3426d40 commit f3a348a
Show file tree
Hide file tree
Showing 11 changed files with 786 additions and 0 deletions.
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
from typing import Optional

from pydantic import BaseModel

import datahub.metadata.schema_classes as models
from datahub.emitter.mce_builder import (
make_data_platform_urn,
make_dataplatform_instance_urn,
)


class DataPlatformInstance(BaseModel):
platform: str
platform_instance: Optional[str] = None

@classmethod
def from_data_platform_instance(
cls,
platform_instance: models.DataPlatformInstanceClass,
) -> "DataPlatformInstance":
return cls(
platform=platform_instance.platform,
platform_instance=platform_instance.instance,
)

def to_data_platform_instance(self) -> models.DataPlatformInstanceClass:
return models.DataPlatformInstanceClass(
platform=make_data_platform_urn(self.platform),
instance=(
make_dataplatform_instance_urn(
platform=self.platform, instance=self.platform_instance
)
if self.platform_instance
else None
),
)
141 changes: 141 additions & 0 deletions metadata-ingestion/src/datahub/api/entities/common/serialized_value.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import json
import logging
from typing import Dict, Optional, Type, Union

from avrogen.dict_wrapper import DictWrapper
from pydantic import BaseModel

import datahub.metadata.schema_classes as models
from datahub.metadata.schema_classes import __SCHEMA_TYPES as SCHEMA_TYPES

logger = logging.getLogger(__name__)

_REMAPPED_SCHEMA_TYPES = {
k.replace("pegasus2avro.", ""): v for k, v in SCHEMA_TYPES.items()
}


class SerializedResourceValue(BaseModel):
class Config:
arbitrary_types_allowed = True

content_type: str
blob: bytes
schema_type: Optional[str] = None
schema_ref: Optional[str] = None

def as_raw_json(self) -> Optional[Dict]:
"""
Parse the blob into a Python object based on the schema type and schema
ref.
If the schema type is JSON, the blob is parsed into a Python dict.
If a schema ref is provided, the blob is parsed into a Python object
assuming the schema ref is a Python class that can be instantiated using
the parsed dict.
If the schema type is PEGASUS, the blob is parsed into a DictWrapper
object using the schema ref.
"""
if (
not self.schema_type
or self.schema_type == models.SerializedValueSchemaTypeClass.JSON
):
# default to JSON parsing
json_string = self.blob.decode("utf-8")
object_dict = json.loads(json_string)
# TODO: Add support for schema ref
return object_dict
elif self.schema_type == models.SerializedValueSchemaTypeClass.PEGASUS:
json_string = self.blob.decode("utf-8")
object_dict = json.loads(json_string)
return object_dict
else:
logger.warning(
f"Unsupported schema type {self.schema_type} for parsing value"
)
raise ValueError(
f"Unsupported schema type {self.schema_type} for parsing value"
)

def as_pegasus_object(self) -> DictWrapper:
"""
Parse the blob into a Pegasus-defined Python object based on the schema type and schema
ref.
If the schema type is JSON, the blob is parsed into a Python dict.
If a schema ref is provided, the blob is parsed into a Python object
assuming the schema ref is a Python class that can be instantiated using
the parsed dict.
If the schema type is PEGASUS, the blob is parsed into a DictWrapper
object using the schema ref.
"""
assert (
self.schema_type
and self.schema_type == models.SerializedValueSchemaTypeClass.PEGASUS
)
assert self.schema_ref
object_dict = self.as_raw_json()
model_type = _REMAPPED_SCHEMA_TYPES.get(self.schema_ref)
if model_type:
assert issubclass(model_type, DictWrapper)
return model_type.from_obj(object_dict or {})
else:
raise ValueError(
f"Could not find schema ref {self.schema_ref} for parsing value"
)

def as_pydantic_object(
self, model_type: Type[BaseModel], validate_schema_ref: bool = False
) -> BaseModel:
"""
Parse the blob into a Pydantic-defined Python object based on the schema type and schema
ref.
If the schema type is JSON, the blob is parsed into a Python dict.
If a schema ref is provided, the blob is parsed into a Python object
assuming the schema ref is a Python class that can be instantiated using
the parsed dict.
If the schema type is PEGASUS, the blob is parsed into a DictWrapper
object using the schema ref.
"""
assert (
self.schema_type
and self.schema_type == models.SerializedValueSchemaTypeClass.JSON
)
if validate_schema_ref:
assert self.schema_ref
assert self.schema_ref == model_type.__name__
object_dict = self.as_raw_json()
return model_type.parse_obj(object_dict)

@classmethod
def from_resource_value(
cls, resource_value: models.SerializedValueClass
) -> "SerializedResourceValue":
return cls(
content_type=resource_value.contentType,
blob=resource_value.blob,
schema_type=resource_value.schemaType,
schema_ref=resource_value.schemaRef,
)

@classmethod
def create(
cls, object: Union[DictWrapper, BaseModel, Dict]
) -> "SerializedResourceValue":
if isinstance(object, DictWrapper):
return SerializedResourceValue(
content_type=models.SerializedValueContentTypeClass.JSON,
blob=json.dumps(object.to_obj()).encode("utf-8"),
schema_type=models.SerializedValueSchemaTypeClass.PEGASUS,
schema_ref=object.RECORD_SCHEMA.fullname.replace("pegasus2avro.", ""),
)
elif isinstance(object, BaseModel):
return SerializedResourceValue(
content_type=models.SerializedValueContentTypeClass.JSON,
blob=json.dumps(object.dict()).encode("utf-8"),
schema_type=models.SerializedValueSchemaTypeClass.JSON,
schema_ref=object.__class__.__name__,
)
else:
return SerializedResourceValue(
content_type=models.SerializedValueContentTypeClass.JSON,
blob=json.dumps(object).encode("utf-8"),
)
Empty file.
Loading

0 comments on commit f3a348a

Please sign in to comment.