Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingestion/lookml): resolve CLL issue caused by column name casing. #11876

Open
wants to merge 24 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
7cdff5a
configurable: convert upstream column to lowercase
sid-acryl Nov 18, 2024
69b828b
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Nov 23, 2024
3d39e4a
wip
sid-acryl Nov 23, 2024
b33a0a9
existing test-case are working
sid-acryl Nov 24, 2024
ff3077e
test case for column resolution from gms
sid-acryl Nov 24, 2024
c0c1513
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Nov 24, 2024
e877cd0
address review comments
sid-acryl Dec 1, 2024
79b4f74
Merge branch 'cus3139-looker-ingestion' of https://github.com/sid-acr…
sid-acryl Dec 1, 2024
1639ec7
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 1, 2024
bfa92e0
remove graph=graph
sid-acryl Dec 2, 2024
66d825d
fix test-case
sid-acryl Dec 2, 2024
7e55a51
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 2, 2024
9caa198
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 3, 2024
95b5bb6
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 3, 2024
8f43c7a
address review comments
sid-acryl Dec 6, 2024
f8d723d
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 6, 2024
a6382f1
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 9, 2024
bdfbbe4
address review comments
sid-acryl Dec 9, 2024
afca825
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 10, 2024
bc0f119
address review comments
sid-acryl Dec 10, 2024
d6f04bf
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 11, 2024
22ffa40
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 11, 2024
72169ee
address review comments
sid-acryl Dec 11, 2024
493b513
Merge branch 'master' into cus3139-looker-ingestion
sid-acryl Dec 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,7 @@ def column_name_in_sql_attribute(self) -> List[str]:
for upstream_field_match in re.finditer(r"\${TABLE}\.[\"]*([\.\w]+)", sql):
matched_field = upstream_field_match.group(1)
# Remove quotes from field names
matched_field = matched_field.replace('"', "").replace("`", "").lower()
column_names.append(matched_field)
column_names.append(matched_field.replace('"', "").replace("`", "").lower())

return column_names

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,13 @@
LookMLSourceReport,
)
from datahub.ingestion.source.looker.urn_functions import get_qualified_table_name
from datahub.sql_parsing.schema_resolver import match_columns_to_schema
from datahub.sql_parsing.sqlglot_lineage import (
ColumnLineageInfo,
ColumnRef,
SqlParsingResult,
Urn,
create_and_cache_schema_resolver,
create_lineage_sql_parsed_result,
)

Expand Down Expand Up @@ -200,7 +202,7 @@ def _generate_fully_qualified_name(
class AbstractViewUpstream(ABC):
"""
Implementation of this interface extracts the view upstream as per the way the view is bound to datasets.
For detail explanation please refer lookml_concept_context.LookerViewContext documentation.
For detail explanation, please refer lookml_concept_context.LookerViewContext documentation.
"""

view_context: LookerViewContext
Expand Down Expand Up @@ -236,6 +238,47 @@ def get_upstream_dataset_urn(self) -> List[Urn]:
def create_fields(self) -> List[ViewField]:
return [] # it is for the special case

def create_upstream_column_refs(
self, upstream_urn: str, downstream_looker_columns: List[str]
) -> List[ColumnRef]:
"""
- **`upstream_urn`**: The URN of the upstream dataset.

- **`expected_columns`**: These are the columns identified by the Looker connector as belonging to the `upstream_urn` dataset. However, there is potential for human error in specifying the columns of the upstream dataset. For example, a user might declare a column in lowercase, while on the actual platform, it may exist in uppercase, or vice versa.

- This function ensures consistency in column-level lineage by consulting GMS before creating the final `ColumnRef` instance, avoiding discrepancies.
"""
schema_resolver = create_and_cache_schema_resolver(
platform=self.view_context.view_connection.platform,
platform_instance=self.view_context.view_connection.platform_instance,
env=self.view_context.view_connection.platform_env or self.config.env,
graph=self.ctx.graph,
)

urn, schema_info = schema_resolver.resolve_urn(urn=upstream_urn)

if schema_info:
actual_columns = match_columns_to_schema(
schema_info, downstream_looker_columns
)
else:
logger.info(
f"schema_info not found for dataset {urn} in GMS. Using expected_columns to form ColumnRef"
)
actual_columns = [column.lower() for column in downstream_looker_columns]

upstream_column_refs: List[ColumnRef] = []

for column in actual_columns:
upstream_column_refs.append(
ColumnRef(
column=column,
table=upstream_urn,
)
)

return upstream_column_refs


class SqlBasedDerivedViewUpstream(AbstractViewUpstream, ABC):
"""
Expand Down Expand Up @@ -372,15 +415,12 @@ def get_upstream_column_ref(
# in-case of "select * from look_ml_view.SQL_TABLE_NAME" or extra field are defined in the looker view which is
# referring to upstream table
if self._get_upstream_dataset_urn() and not upstreams_column_refs:
upstreams_column_refs = [
ColumnRef(
table=self._get_upstream_dataset_urn()[
0
], # 0th index has table of from clause
column=column,
)
for column in field_context.column_name_in_sql_attribute()
]
upstreams_column_refs = self.create_upstream_column_refs(
upstream_urn=self._get_upstream_dataset_urn()[
0
], # 0th index has table of from clause,
downstream_looker_columns=field_context.column_name_in_sql_attribute(),
)

# fix any derived view reference present in urn
upstreams_column_refs = resolve_derived_view_urn_of_col_ref(
Expand Down Expand Up @@ -487,18 +527,18 @@ def get_upstream_column_ref(
return upstream_column_refs

explore_urn: str = self._get_upstream_dataset_urn()[0]
expected_columns: List[str] = []

for column in field_context.column_name_in_sql_attribute():
if column in self._get_explore_column_mapping():
explore_column: Dict = self._get_explore_column_mapping()[column]
upstream_column_refs.append(
ColumnRef(
column=explore_column.get("field", explore_column[NAME]),
table=explore_urn,
)
expected_columns.append(
explore_column.get("field", explore_column[NAME])
)

return upstream_column_refs
return self.create_upstream_column_refs(
upstream_urn=explore_urn, downstream_looker_columns=expected_columns
)

def get_upstream_dataset_urn(self) -> List[Urn]:
return self._get_upstream_dataset_urn()
Expand Down Expand Up @@ -548,14 +588,10 @@ def __get_upstream_dataset_urn(self) -> Urn:
def get_upstream_column_ref(
self, field_context: LookerFieldContext
) -> List[ColumnRef]:
upstream_column_ref: List[ColumnRef] = []

for column_name in field_context.column_name_in_sql_attribute():
upstream_column_ref.append(
ColumnRef(table=self._get_upstream_dataset_urn(), column=column_name)
)

return upstream_column_ref
return self.create_upstream_column_refs(
upstream_urn=self._get_upstream_dataset_urn(),
downstream_looker_columns=field_context.column_name_in_sql_attribute(),
)

def get_upstream_dataset_urn(self) -> List[Urn]:
return [self._get_upstream_dataset_urn()]
Expand Down Expand Up @@ -609,15 +645,14 @@ def get_upstream_column_ref(
self, field_context: LookerFieldContext
) -> List[ColumnRef]:
upstream_column_ref: List[ColumnRef] = []

if not self._get_upstream_dataset_urn():
return upstream_column_ref

for column_name in field_context.column_name_in_sql_attribute():
upstream_column_ref.append(
ColumnRef(table=self._get_upstream_dataset_urn()[0], column=column_name)
)

return upstream_column_ref
return self.create_upstream_column_refs(
upstream_urn=self._get_upstream_dataset_urn()[0],
downstream_looker_columns=field_context.column_name_in_sql_attribute(),
)

def get_upstream_dataset_urn(self) -> List[Urn]:
return self._get_upstream_dataset_urn()
Expand Down
23 changes: 23 additions & 0 deletions metadata-ingestion/src/datahub/sql_parsing/schema_resolver.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,13 @@ def get_urn_for_table(
)
return urn

def resolve_urn(self, urn: str) -> Tuple[str, Optional[SchemaInfo]]:
schema_info = self._resolve_schema_info(urn)
if schema_info:
return urn, schema_info
sid-acryl marked this conversation as resolved.
Show resolved Hide resolved

return urn, None

def resolve_table(self, table: _TableName) -> Tuple[str, Optional[SchemaInfo]]:
urn = self.get_urn_for_table(table)

Expand Down Expand Up @@ -293,3 +300,19 @@ def _convert_schema_field_list_to_info(

def _convert_schema_aspect_to_info(schema_metadata: SchemaMetadataClass) -> SchemaInfo:
return _convert_schema_field_list_to_info(schema_metadata.fields)


def match_columns_to_schema(
schema_info: SchemaInfo, input_columns: List[str]
) -> List[str]:
column_from_gms: List[str] = list(schema_info.keys()) # list() to silent lint

gms_column_map: Dict[str, str] = {
column.lower(): column for column in column_from_gms
}

output_columns: List[str] = [
gms_column_map.get(column.lower(), column) for column in input_columns
]

return output_columns
61 changes: 48 additions & 13 deletions metadata-ingestion/src/datahub/sql_parsing/sqlglot_lineage.py
Original file line number Diff line number Diff line change
Expand Up @@ -1181,6 +1181,45 @@ def sqlglot_lineage(
)


@functools.lru_cache(maxsize=128)
def create_and_cache_schema_resolver(
platform: str,
env: str,
graph: Optional[DataHubGraph] = None,
platform_instance: Optional[str] = None,
schema_aware: bool = True,
) -> SchemaResolver:
return create_schema_resolver(
platform=platform,
env=env,
graph=graph,
platform_instance=platform_instance,
schema_aware=schema_aware,
)


def create_schema_resolver(
platform: str,
env: str,
graph: Optional[DataHubGraph] = None,
platform_instance: Optional[str] = None,
schema_aware: bool = True,
) -> SchemaResolver:
if graph and schema_aware:
return graph._make_schema_resolver(
platform=platform,
platform_instance=platform_instance,
env=env,
)

return SchemaResolver(
platform=platform,
platform_instance=platform_instance,
env=env,
graph=None,
)


def create_lineage_sql_parsed_result(
query: str,
default_db: Optional[str],
Expand All @@ -1191,21 +1230,17 @@ def create_lineage_sql_parsed_result(
graph: Optional[DataHubGraph] = None,
schema_aware: bool = True,
) -> SqlParsingResult:
schema_resolver = create_schema_resolver(
platform=platform,
platform_instance=platform_instance,
env=env,
schema_aware=schema_aware,
graph=graph,
)

needs_close: bool = True
if graph and schema_aware:
needs_close = False
schema_resolver = graph._make_schema_resolver(
platform=platform,
platform_instance=platform_instance,
env=env,
)
else:
needs_close = True
schema_resolver = SchemaResolver(
platform=platform,
platform_instance=platform_instance,
env=env,
graph=None,
)

try:
return sqlglot_lineage(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
connection: "my_connection"

include: "top_10_employee_income_source.view.lkml"

explore: top_10_employee_income_source {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
view: top_10_employee_income_source {
sql_table_name: "db.public.employee"
;;
dimension: id {
type: number
sql: ${TABLE}.id ;;
}

dimension: name {
type: string
sql: ${TABLE}.name ;;
}

dimension: source {
type: string
sql: ${TABLE}.source ;;
}
}
Loading