Skip to content

Commit

Permalink
feat(ingest): lookml - adding support for only emitting reachable vie…
Browse files Browse the repository at this point in the history
…ws from explores (#5333)
  • Loading branch information
shirshanka authored Jul 5, 2022
1 parent afc9842 commit e93e469
Show file tree
Hide file tree
Showing 9 changed files with 917 additions and 33 deletions.
4 changes: 2 additions & 2 deletions metadata-ingestion/src/datahub/ingestion/source/looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,8 @@ class LookerDashboardSourceConfig(LookerAPIConfig, LookerCommonConfig):
@validator("external_base_url", pre=True, always=True)
def external_url_defaults_to_api_config_base_url(
cls, v: Optional[str], *, values: Dict[str, Any], **kwargs: Dict[str, Any]
) -> str:
return v or values["base_url"]
) -> Optional[str]:
return v or values.get("base_url")

@validator("platform_instance")
def platform_instance_not_supported(cls, v: str) -> str:
Expand Down
17 changes: 10 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/looker_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,20 +494,23 @@ def _get_fields_from_sql_equality(sql_fragment: str) -> List[str]:
return field_match.findall(sql_fragment)

@classmethod
def __from_dict(cls, model_name: str, dict: Dict) -> "LookerExplore":
def from_dict(cls, model_name: str, dict: Dict) -> "LookerExplore":
view_names = set()
joins = None
# always add the explore's name or the name from the from clause as the view on which this explore is built
view_names.add(dict.get("from", dict.get("name")))

if dict.get("joins", {}) != {}:
# additionally for join-based explores, pull in the linked views
assert "joins" in dict
view_names = set()
for join in dict["joins"]:
join_from = join.get("from")
view_names.add(join_from or join["name"])
sql_on = join.get("sql_on", None)
if sql_on is not None:
fields = cls._get_fields_from_sql_equality(sql_on)
joins = fields
for f in fields:
view_names.add(LookerUtil._extract_view_from_field(f))
else:
# non-join explore, get view_name from `from` field if possible, default to explore name
view_names = set(dict.get("from", dict.get("name")))

return LookerExplore(
model_name=model_name,
name=dict["name"],
Expand Down
92 changes: 74 additions & 18 deletions metadata-ingestion/src/datahub/ingestion/source/lookml.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
)
from datahub.ingestion.source.looker_common import (
LookerCommonConfig,
LookerExplore,
LookerUtil,
LookerViewId,
ViewField,
Expand Down Expand Up @@ -177,6 +178,10 @@ class LookMLSourceConfig(LookerCommonConfig):
512000, # 512KB should be plenty
description="When extracting the view definition from a lookml file, the maximum number of characters to extract.",
)
emit_reachable_views_only: bool = Field(
False,
description="When enabled, only views that are reachable from explores defined in the model files are emitted",
)

@validator("platform_instance")
def platform_instance_not_supported(cls, v: str) -> str:
Expand Down Expand Up @@ -341,7 +346,9 @@ def resolve_includes(
or included_file.endswith(".dashboard.lookml")
or included_file.endswith(".dashboard.lkml")
):
logger.debug(f"include '{inc}' is a dashboard, skipping it")
logger.debug(
f"include '{included_file}' is a dashboard, skipping it"
)
continue

logger.debug(
Expand Down Expand Up @@ -1081,9 +1088,10 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
str(self.source_config.base_folder), self.reporter
)

# some views can be mentioned by multiple 'include' statements, so this set is used to prevent
# creating duplicate MCE messages
processed_view_files: Set[str] = set()
# some views can be mentioned by multiple 'include' statements and can be included via different connections.
# So this set is used to prevent creating duplicate events
processed_view_map: Dict[str, Set[str]] = {}
view_connection_map: Dict[str, Tuple[str, str]] = {}

# The ** means "this directory and all subdirectories", and hence should
# include all the files we want.
Expand Down Expand Up @@ -1119,20 +1127,43 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
self.reporter.report_models_dropped(model_name)
continue

explore_reachable_views = set()
for explore_dict in model.explores:
explore: LookerExplore = LookerExplore.from_dict(
model_name, explore_dict
)
if explore.upstream_views:
for view_name in explore.upstream_views:
explore_reachable_views.add(view_name)

processed_view_files = processed_view_map.get(model.connection)
if processed_view_files is None:
processed_view_map[model.connection] = set()
processed_view_files = processed_view_map[model.connection]

project_name = self.get_project_name(model_name)
logger.debug(f"Model: {model_name}; Includes: {model.resolved_includes}")

for include in model.resolved_includes:
logger.debug(f"Considering {include} for model {model_name}")
if include in processed_view_files:
logger.debug(f"view '{include}' already processed, skipping it")
continue

logger.debug(f"Attempting to load view file: {include}")
looker_viewfile = viewfile_loader.load_viewfile(
include, connectionDefinition, self.reporter
)
if looker_viewfile is not None:
for raw_view in looker_viewfile.views:
if (
self.source_config.emit_reachable_views_only
and raw_view["name"] not in explore_reachable_views
):
logger.debug(
f"view {raw_view['name']} is not reachable from an explore, skipping.."
)
continue

self.reporter.report_views_scanned()
try:
maybe_looker_view = LookerView.from_looker_dict(
Expand All @@ -1157,24 +1188,49 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
if self.source_config.view_pattern.allowed(
maybe_looker_view.id.view_name
):
mce = self._build_dataset_mce(maybe_looker_view)
workunit = MetadataWorkUnit(
id=f"lookml-view-{maybe_looker_view.id}",
mce=mce,
view_connection_mapping = view_connection_map.get(
maybe_looker_view.id.view_name
)
self.reporter.report_workunit(workunit)
processed_view_files.add(include)
yield workunit

for mcp in self._build_dataset_mcps(maybe_looker_view):
# We want to treat mcp aspects as optional, so allowing failures in this aspect to be treated as warnings rather than failures
if not view_connection_mapping:
view_connection_map[
maybe_looker_view.id.view_name
] = (model_name, model.connection)
# first time we are discovering this view
mce = self._build_dataset_mce(maybe_looker_view)
workunit = MetadataWorkUnit(
id=f"lookml-view-{mcp.aspectName}-{maybe_looker_view.id}",
mcp=mcp,
treat_errors_as_warnings=True,
id=f"lookml-view-{maybe_looker_view.id}",
mce=mce,
)
processed_view_files.add(include)
self.reporter.report_workunit(workunit)
yield workunit
for mcp in self._build_dataset_mcps(
maybe_looker_view
):
# We want to treat mcp aspects as optional, so allowing failures in this aspect to be treated as warnings rather than failures
workunit = MetadataWorkUnit(
id=f"lookml-view-{mcp.aspectName}-{maybe_looker_view.id}",
mcp=mcp,
treat_errors_as_warnings=True,
)
self.reporter.report_workunit(workunit)
yield workunit
else:
(
prev_model_name,
prev_model_connection,
) = view_connection_mapping
if prev_model_connection != model.connection:
# this view has previously been discovered and emitted using a different connection
logger.warning(
f"view {maybe_looker_view.id.view_name} from model {model_name}, connection {model.connection} was previously processed via model {prev_model_name}, connection {prev_model_connection} and will likely lead to incorrect lineage to the underlying tables"
)
if (
not self.source_config.emit_reachable_views_only
):
logger.warning(
"Consider enabling the `emit_reachable_views_only` flag to handle this case."
)
else:
self.reporter.report_views_dropped(
str(maybe_looker_view.id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,4 +30,4 @@ explore: data_model {
value: "day"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
connection: "my_other_connection"
include: "**/*.view.lkml"

explore: aliased_explore2 {
from: my_view2
}

explore: duplicate_explore {
from: my_view
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
view: my_view2 {
derived_table: {
sql:
SELECT
is_latest,
country,
city,
timestamp,
measurement
FROM
my_table ;;
}

dimension: country {
type: string
description: "The country"
sql: ${TABLE}.country ;;
}

dimension: city {
type: string
description: "City"
sql: ${TABLE}.city ;;
}

dimension: is_latest {
type: yesno
description: "Is latest data"
sql: ${TABLE}.is_latest ;;
}

dimension_group: timestamp {
group_label: "Timestamp"
type: time
description: "Timestamp of measurement"
sql: ${TABLE}.timestamp ;;
timeframes: [hour, date, week, day_of_week]
}

measure: average_measurement {
group_label: "Measurement"
type: average
description: "My measurement"
sql: ${TABLE}.measurement ;;
}

}
Loading

0 comments on commit e93e469

Please sign in to comment.