Skip to content

Commit

Permalink
feat(rest-emitter): adding async flag to rest emitter (#10902)
Browse files Browse the repository at this point in the history
Co-authored-by: Gabe Lyons <[email protected]>
  • Loading branch information
gabe-lyons and gabe-lyons authored Jul 12, 2024
1 parent 542f6c1 commit 423af83
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 6 deletions.
26 changes: 21 additions & 5 deletions metadata-ingestion/src/datahub/emitter/rest_emitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,14 +206,15 @@ def emit(
UsageAggregation,
],
callback: Optional[Callable[[Exception, str], None]] = None,
async_flag: Optional[bool] = None,
) -> None:
try:
if isinstance(item, UsageAggregation):
self.emit_usage(item)
elif isinstance(
item, (MetadataChangeProposal, MetadataChangeProposalWrapper)
):
self.emit_mcp(item)
self.emit_mcp(item, async_flag=async_flag)
else:
self.emit_mce(item)
except Exception as e:
Expand Down Expand Up @@ -245,24 +246,39 @@ def emit_mce(self, mce: MetadataChangeEvent) -> None:
self._emit_generic(url, payload)

def emit_mcp(
self, mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper]
self,
mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
async_flag: Optional[bool] = None,
) -> None:
url = f"{self._gms_server}/aspects?action=ingestProposal"
ensure_has_system_metadata(mcp)

mcp_obj = pre_json_transform(mcp.to_obj())
payload = json.dumps({"proposal": mcp_obj})
payload_dict = {"proposal": mcp_obj}

if async_flag is not None:
payload_dict["async"] = "true" if async_flag else "false"

payload = json.dumps(payload_dict)

self._emit_generic(url, payload)

def emit_mcps(
self, mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]]
self,
mcps: List[Union[MetadataChangeProposal, MetadataChangeProposalWrapper]],
async_flag: Optional[bool] = None,
) -> None:
url = f"{self._gms_server}/aspects?action=ingestProposalBatch"
for mcp in mcps:
ensure_has_system_metadata(mcp)

mcp_objs = [pre_json_transform(mcp.to_obj()) for mcp in mcps]
payload = json.dumps({"proposals": mcp_objs})
payload_dict: dict = {"proposals": mcp_objs}

if async_flag is not None:
payload_dict["async"] = "true" if async_flag else "false"

payload = json.dumps(payload_dict)

self._emit_generic(url, payload)

Expand Down
5 changes: 4 additions & 1 deletion metadata-ingestion/tests/test_helpers/graph_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,14 +111,17 @@ def emit(
UsageAggregationClass,
],
callback: Union[Callable[[Exception, str], None], None] = None,
async_flag: Optional[bool] = None,
) -> None:
self.emitted.append(item) # type: ignore

def emit_mce(self, mce: MetadataChangeEvent) -> None:
self.emitted.append(mce)

def emit_mcp(
self, mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper]
self,
mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
async_flag: Optional[bool] = None,
) -> None:
self.emitted.append(mcp)

Expand Down

0 comments on commit 423af83

Please sign in to comment.