Skip to content

Commit

Permalink
action attempt 1
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 committed Apr 13, 2023
1 parent a8120eb commit 22611b3
Show file tree
Hide file tree
Showing 3 changed files with 322 additions and 0 deletions.
66 changes: 66 additions & 0 deletions action.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
name: Acryl dbt Impact Analysis
description: Comments on PRs with the impact of dbt changes.
inputs:
datahub_host:
description: "DataHub GMS host."
required: true
datahub_token:
description: "DataHub GMS token."
required: false
datahub_frontend_url:
description: "DataHub frontend URL."
required: true

dbt_project_folder:
description: "dbt project folder. Defaults to ."
default: '.'

runs:
using: "composite"
steps:
# - uses: actions/checkout@v3
# with:
# # We need git history for the git merge-base.
# fetch-depth: 0
# TODO: For large repos, use this instead:
# - uses: rmacklin/fetch-through-merge-base@v0

# TODO: ensure that this is running on a PR

- name: Check dbt installation and install acryl-datahub
run: |
dbt --version
pip install https://docs-website-hy1muoklg-acryldata.vercel.app/artifacts/wheels/acryl_datahub-0.0.0.dev1-py3-none-any.whl
pip cache remove 'acryl*'
# Generate the previous manifest.
- name: Generate previous manifest
run: |
git checkout ${{ github.base_ref }}
dbt ls
cp -r target target-previous
git checkout -
# Run impact analysis script.
- name: Run impact analysis
id: impact-analysis
run: |
DBT_ARTIFACT_STATE_PATH=target-previous python ${{ github.action_path }}/impact_analysis.py
cat impact_analysis.md
# Output a multiline string to an output parameter.
# Technique from https://docs.github.com/en/actions/using-workflows/workflow-commands-for-github-actions#multiline-strings
EOF=$(dd if=/dev/urandom bs=15 count=1 status=none | base64)
echo "IMPACT_ANALYSIS_MD<<$EOF" >> $GITHUB_OUTPUT
cat impact_analysis.md >> $GITHUB_OUTPUT
echo "$EOF" >> $GITHUB_OUTPUT
env:
DATAHUB_GMS_HOST: ${{ inputs.datahub_host }}
DATAHUB_GMS_TOKEN: ${{ inputs.datahub_token }}
DATAHUB_FRONTEND_URL: ${{ inputs.datahub_frontend_url }}

# Post a comment on the PR.
- uses: marocchino/sticky-pull-request-comment@v2
with:
header: acryl-impact-analysis
message: ${{ steps.impact-analysis.outputs.IMPACT_ANALYSIS_MD }}
18 changes: 18 additions & 0 deletions fake-dbt-profiles.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
long_tail_companions:
target: prod
outputs:
prod:
type: snowflake
account: foo
user: foo
password: foo
role: foo
database: foo
schema: foo
# account: "{{ env_var('DBT_SNOWFLAKE_ACCOUNT') }}"
# user: "{{ env_var('DBT_SNOWFLAKE_USER') }}"
# password: "{{ env_var('DBT_SNOWFLAKE_PASSWORD') }}"
# role: "{{ env_var('DBT_SNOWFLAKE_ROLE') }}"
# database: "{{ env_var('DBT_SNOWFLAKE_DATABASE') }}"
# warehouse: "{{ env_var('DBT_SNOWFLAKE_WAREHOUSE') }}"
# schema: "{{ env_var('DBT_SCHEMA') }}"
238 changes: 238 additions & 0 deletions impact_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,238 @@
import json
import os
import pathlib
import subprocess
from typing import Dict, List, Optional, TypedDict

from datahub.ingestion.graph.client import DatahubClientConfig, DataHubGraph
from datahub.metadata.schema_classes import DatasetPropertiesClass
from datahub.utilities.urns.urn import Urn, guess_entity_type

DATAHUB_SERVER = os.environ["DATAHUB_GMS_HOST"]
DATAHUB_TOKEN: Optional[str] = os.getenv("DATAHUB_GMS_TOKEN")
DATAHUB_FRONTEND_URL = os.environ["DATAHUB_FRONTEND_URL"]

DBT_ID_PROP = "dbt_unique_id"

graph = DataHubGraph(DatahubClientConfig(server=DATAHUB_SERVER, token=DATAHUB_TOKEN))


class DbtNodeInfo(TypedDict):
unique_id: str
original_file_path: str


def determine_changed_dbt_models() -> List[DbtNodeInfo]:
if "DBT_ARTIFACT_STATE_PATH" not in os.environ:
raise ValueError("DBT_ARTIFACT_STATE_PATH environment variable must be set")

# Running dbt ls also regenerates the manifest file, so it
# will always produce output that is up-to-date with the latest changes.
res = subprocess.run(
[
"dbt",
"ls",
# fmt: off
# Use the manifest file from the previous run.
"-s", "state:modified",
# Limit to desired node types.
"--resource-type", "model",
"--resource-type", "snapshot",
# Output formatting.
"--output", "json",
"--output-keys", "unique_id,original_file_path",
# fmt: on
],
check=True,
capture_output=True,
text=True,
)

dbt_nodes: List[DbtNodeInfo] = []
for line in res.stdout.splitlines():
dbt_info: DbtNodeInfo = json.loads(line)
dbt_nodes.append(dbt_info)

return dbt_nodes


def find_datahub_urns(dbt_node_ids: List[str]) -> List[str]:
filter_conditions = [
{
"field": "customProperties",
"value": f"{DBT_ID_PROP}={dbt_node_id}",
"condition": "EQUAL",
}
for dbt_node_id in dbt_node_ids
]

search_body = {
"input": "*",
"entity": "dataset",
"start": 0,
"count": 10000,
"filter": {"or": [{"and": [filter_cond]} for filter_cond in filter_conditions]},
}
results: Dict = graph._post_generic(graph._get_search_endpoint(), search_body)

urns = [res["entity"] for res in results["value"]["entities"]]

return urns


def get_datahub_info(urn: str):
return graph.get_aspects_for_entity(
urn,
aspects=["datasetProperties"],
aspect_types=[DatasetPropertiesClass],
)


IMPACT_ANALYSIS_QUERY = """\
query GetLineage($urn: String!) {
searchAcrossLineage(
input: {
urn: $urn,
direction: DOWNSTREAM
}
) {
searchResults {
entity {
urn
type
... on Dataset {
properties {
name
}
platform {
name
properties {
displayName
}
}
subTypes {
typeNames
}
siblings {
isPrimary
}
}
... on Chart {
properties {
name
}
platform {
name
properties {
displayName
}
}
}
... on Dashboard {
properties {
name
}
platform {
name
properties {
displayName
}
}
}
}
degree
}
}
}
"""


def get_impact_analysis(urn: str):
result = graph.execute_graphql(IMPACT_ANALYSIS_QUERY, variables={"urn": urn})

downstreams = result["searchAcrossLineage"]["searchResults"]

# Filter out the non-primary siblings.
downstreams = [
downstream
for downstream in downstreams
if (downstream["entity"].get("siblings") or {}).get("isPrimary") is not False
]

# Sort by number of hops from the root node.
# downstreams.sort(key=lambda x: x["degree"])

return [downstream["entity"] for downstream in downstreams]


def datahub_url_from_urn(urn: str) -> str:
entity_type = guess_entity_type(urn)
# TODO: This won't work for dataJobs / dataFlows.
return f"{DATAHUB_FRONTEND_URL}/{entity_type}/{Urn.url_encode(urn)}"


def format_entity(downstream: Dict) -> str:
platform = downstream["platform"]["name"]
if downstream["platform"].get("properties", {}).get("displayName"):
platform = downstream["platform"]["properties"]["displayName"]

name = downstream["properties"]["name"]
url = datahub_url_from_urn(downstream["urn"])

type: str = downstream["type"].capitalize()
if downstream.get("subTypes"):
type = downstream["subTypes"]["typeNames"][0]

return f"{platform} {type} [{name}]({url})"


def main():
# Step 1 - determine which dbt nodes are impacted by the changes in a given PR.
changed_dbt_nodes = determine_changed_dbt_models()
dbt_id_to_dbt_node = {node["unique_id"]: node for node in changed_dbt_nodes}
# print(changed_dbt_nodes)

# Step 2 - map dbt nodes to datahub urns.
# In an ideal world, the datahub urns for dbt would just be the dbt node ids.
urns = find_datahub_urns([node["unique_id"] for node in changed_dbt_nodes])
datahub_nodes = {urn: get_datahub_info(urn) for urn in urns}
urn_to_dbt_id = {
urn: node["datasetProperties"].customProperties[DBT_ID_PROP]
for urn, node in datahub_nodes.items()
}
# print(urn_to_dbt_id)

# Step 3 - generate downstream impact analysis for each datahub urn.
downstreams_report = {urn: get_impact_analysis(urn) for urn in urns}

# Step 4 - format the output message as markdown.
all_impacted_urns = {
downstream["urn"]
for downstreams in downstreams_report.values()
for downstream in downstreams
}

output = "# Acryl Impact Analysis\n\n"
output += f"- **{len(changed_dbt_nodes)}** dbt models changed\n"
output += (
f"- **{len(all_impacted_urns)}** downstream entities potentially impacted\n"
)

for urn, downstreams in downstreams_report.items():
dbt_node = dbt_id_to_dbt_node[urn_to_dbt_id[urn]]

output += (
f"\n## [{dbt_node['original_file_path']}]({datahub_url_from_urn(urn)})\n\n"
)
output += f"May impact **{len(downstreams)}** downstreams:\n"
for downstream in downstreams:
output += f"- {format_entity(downstream)}\n"
# TODO truncate if there's too many?

output += f"\n\n_If a dbt model is reported as changed even though it's file contents have not changed, it's likely because a dbt macro or other metadata has changed._\n\n"

pathlib.Path("impact_analysis.md").write_text(output)


if __name__ == "__main__":
main()

0 comments on commit 22611b3

Please sign in to comment.