-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
sdk(platform-resource): add entity type for ease of use (#11541)
- Loading branch information
1 parent
3426d40
commit f3a348a
Showing
11 changed files
with
786 additions
and
0 deletions.
There are no files selected for viewing
Empty file.
36 changes: 36 additions & 0 deletions
36
metadata-ingestion/src/datahub/api/entities/common/data_platform_instance.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from typing import Optional | ||
|
||
from pydantic import BaseModel | ||
|
||
import datahub.metadata.schema_classes as models | ||
from datahub.emitter.mce_builder import ( | ||
make_data_platform_urn, | ||
make_dataplatform_instance_urn, | ||
) | ||
|
||
|
||
class DataPlatformInstance(BaseModel): | ||
platform: str | ||
platform_instance: Optional[str] = None | ||
|
||
@classmethod | ||
def from_data_platform_instance( | ||
cls, | ||
platform_instance: models.DataPlatformInstanceClass, | ||
) -> "DataPlatformInstance": | ||
return cls( | ||
platform=platform_instance.platform, | ||
platform_instance=platform_instance.instance, | ||
) | ||
|
||
def to_data_platform_instance(self) -> models.DataPlatformInstanceClass: | ||
return models.DataPlatformInstanceClass( | ||
platform=make_data_platform_urn(self.platform), | ||
instance=( | ||
make_dataplatform_instance_urn( | ||
platform=self.platform, instance=self.platform_instance | ||
) | ||
if self.platform_instance | ||
else None | ||
), | ||
) |
141 changes: 141 additions & 0 deletions
141
metadata-ingestion/src/datahub/api/entities/common/serialized_value.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,141 @@ | ||
import json | ||
import logging | ||
from typing import Dict, Optional, Type, Union | ||
|
||
from avrogen.dict_wrapper import DictWrapper | ||
from pydantic import BaseModel | ||
|
||
import datahub.metadata.schema_classes as models | ||
from datahub.metadata.schema_classes import __SCHEMA_TYPES as SCHEMA_TYPES | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
_REMAPPED_SCHEMA_TYPES = { | ||
k.replace("pegasus2avro.", ""): v for k, v in SCHEMA_TYPES.items() | ||
} | ||
|
||
|
||
class SerializedResourceValue(BaseModel): | ||
class Config: | ||
arbitrary_types_allowed = True | ||
|
||
content_type: str | ||
blob: bytes | ||
schema_type: Optional[str] = None | ||
schema_ref: Optional[str] = None | ||
|
||
def as_raw_json(self) -> Optional[Dict]: | ||
""" | ||
Parse the blob into a Python object based on the schema type and schema | ||
ref. | ||
If the schema type is JSON, the blob is parsed into a Python dict. | ||
If a schema ref is provided, the blob is parsed into a Python object | ||
assuming the schema ref is a Python class that can be instantiated using | ||
the parsed dict. | ||
If the schema type is PEGASUS, the blob is parsed into a DictWrapper | ||
object using the schema ref. | ||
""" | ||
if ( | ||
not self.schema_type | ||
or self.schema_type == models.SerializedValueSchemaTypeClass.JSON | ||
): | ||
# default to JSON parsing | ||
json_string = self.blob.decode("utf-8") | ||
object_dict = json.loads(json_string) | ||
# TODO: Add support for schema ref | ||
return object_dict | ||
elif self.schema_type == models.SerializedValueSchemaTypeClass.PEGASUS: | ||
json_string = self.blob.decode("utf-8") | ||
object_dict = json.loads(json_string) | ||
return object_dict | ||
else: | ||
logger.warning( | ||
f"Unsupported schema type {self.schema_type} for parsing value" | ||
) | ||
raise ValueError( | ||
f"Unsupported schema type {self.schema_type} for parsing value" | ||
) | ||
|
||
def as_pegasus_object(self) -> DictWrapper: | ||
""" | ||
Parse the blob into a Pegasus-defined Python object based on the schema type and schema | ||
ref. | ||
If the schema type is JSON, the blob is parsed into a Python dict. | ||
If a schema ref is provided, the blob is parsed into a Python object | ||
assuming the schema ref is a Python class that can be instantiated using | ||
the parsed dict. | ||
If the schema type is PEGASUS, the blob is parsed into a DictWrapper | ||
object using the schema ref. | ||
""" | ||
assert ( | ||
self.schema_type | ||
and self.schema_type == models.SerializedValueSchemaTypeClass.PEGASUS | ||
) | ||
assert self.schema_ref | ||
object_dict = self.as_raw_json() | ||
model_type = _REMAPPED_SCHEMA_TYPES.get(self.schema_ref) | ||
if model_type: | ||
assert issubclass(model_type, DictWrapper) | ||
return model_type.from_obj(object_dict or {}) | ||
else: | ||
raise ValueError( | ||
f"Could not find schema ref {self.schema_ref} for parsing value" | ||
) | ||
|
||
def as_pydantic_object( | ||
self, model_type: Type[BaseModel], validate_schema_ref: bool = False | ||
) -> BaseModel: | ||
""" | ||
Parse the blob into a Pydantic-defined Python object based on the schema type and schema | ||
ref. | ||
If the schema type is JSON, the blob is parsed into a Python dict. | ||
If a schema ref is provided, the blob is parsed into a Python object | ||
assuming the schema ref is a Python class that can be instantiated using | ||
the parsed dict. | ||
If the schema type is PEGASUS, the blob is parsed into a DictWrapper | ||
object using the schema ref. | ||
""" | ||
assert ( | ||
self.schema_type | ||
and self.schema_type == models.SerializedValueSchemaTypeClass.JSON | ||
) | ||
if validate_schema_ref: | ||
assert self.schema_ref | ||
assert self.schema_ref == model_type.__name__ | ||
object_dict = self.as_raw_json() | ||
return model_type.parse_obj(object_dict) | ||
|
||
@classmethod | ||
def from_resource_value( | ||
cls, resource_value: models.SerializedValueClass | ||
) -> "SerializedResourceValue": | ||
return cls( | ||
content_type=resource_value.contentType, | ||
blob=resource_value.blob, | ||
schema_type=resource_value.schemaType, | ||
schema_ref=resource_value.schemaRef, | ||
) | ||
|
||
@classmethod | ||
def create( | ||
cls, object: Union[DictWrapper, BaseModel, Dict] | ||
) -> "SerializedResourceValue": | ||
if isinstance(object, DictWrapper): | ||
return SerializedResourceValue( | ||
content_type=models.SerializedValueContentTypeClass.JSON, | ||
blob=json.dumps(object.to_obj()).encode("utf-8"), | ||
schema_type=models.SerializedValueSchemaTypeClass.PEGASUS, | ||
schema_ref=object.RECORD_SCHEMA.fullname.replace("pegasus2avro.", ""), | ||
) | ||
elif isinstance(object, BaseModel): | ||
return SerializedResourceValue( | ||
content_type=models.SerializedValueContentTypeClass.JSON, | ||
blob=json.dumps(object.dict()).encode("utf-8"), | ||
schema_type=models.SerializedValueSchemaTypeClass.JSON, | ||
schema_ref=object.__class__.__name__, | ||
) | ||
else: | ||
return SerializedResourceValue( | ||
content_type=models.SerializedValueContentTypeClass.JSON, | ||
blob=json.dumps(object).encode("utf-8"), | ||
) |
Empty file.
Oops, something went wrong.