Skip to content

Commit

Permalink
feat(ingest): add support for Looker view built from SQL-based derive…
Browse files Browse the repository at this point in the history
…d tables (#2478)
  • Loading branch information
remisalmon authored May 3, 2021
1 parent 062597a commit 7948226
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
import time
import typing
import os
import re

from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer

from dataclasses import dataclass, replace

from sql_metadata import get_query_tables

# Configuration
AVSC_PATH = "../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc"
KAFKA_TOPIC = 'MetadataChangeEvent_v4'
Expand Down Expand Up @@ -47,7 +50,7 @@ def _load_viewfile(self, path: str) -> typing.Optional["LookerViewFile"]:
parsed = lkml.load(file)
looker_viewfile = LookerViewFile.from_looker_dict(path, parsed)
self.viewfile_cache[path] = looker_viewfile
return looker_viewfile
return looker_viewfile
except Exception as e:
print(e)
print(f"Error processing view file {path}. Skipping it")
Expand All @@ -63,7 +66,7 @@ def load_viewfile(self, path: str, connection: str):

@dataclass
class LookerModel:
connection: str
connection: str
includes: typing.List[str]
resolved_includes: typing.List[str]

Expand Down Expand Up @@ -107,7 +110,7 @@ class LookerView:
absolute_file_path: str
connection: str
view_name: str
sql_table_names: typing.List[str]
sql_table_names: typing.List[str]

def get_relative_file_path(self):
if LOOKER_DIRECTORY in self.absolute_file_path:
Expand All @@ -120,13 +123,21 @@ def from_looker_dict(looker_view, connection: str, looker_viewfile: LookerViewFi
view_name = looker_view["name"]
sql_table_name = looker_view.get("sql_table_name", None)
# Some sql_table_name fields contain quotes like: optimizely."group", just remove the quotes
sql_table_name = ''.join(c for c in sql_table_name if c != '"') if sql_table_name is not None else None
sql_table_name = sql_table_name.replace('"', '') if sql_table_name is not None else None
derived_table = looker_view.get("derived_table", None)

# We do not support parsing SQL to extract dependencies right now, load the view but ignore sql_table_names
if derived_table is not None:
print(f"Skipping sql_table_names for derived table with view_name: {view_name}. Need to parse SQL in the future")
return LookerView(absolute_file_path=looker_viewfile.absolute_file_path, connection=connection, view_name=view_name, sql_table_names=[])
# Parse SQL from derived tables to extract dependencies
if derived_table is not None and 'sql' in derived_table:
# Get the list of tables in the query
sql_tables: typing.List[str] = get_query_tables(derived_table['sql'])

# Remove temporary tables from WITH statements
sql_table_names = [t for t in sql_tables if not re.search(f'WITH(.*,)?\s+{t}(\s*\([\w\s,]+\))?\s+AS\s+\(', derived_table['sql'], re.IGNORECASE|re.DOTALL)]

# Remove quotes from tables
sql_table_names = [t.replace('"', '') for t in sql_table_names]

return LookerView(absolute_file_path=looker_viewfile.absolute_file_path, connection=connection, view_name=view_name, sql_table_names=sql_table_names)

# There is a single dependency in the view, on the sql_table_name
if sql_table_name is not None:
Expand All @@ -140,7 +151,7 @@ def from_looker_dict(looker_view, connection: str, looker_viewfile: LookerViewFi
print(f"Skipping malformed with view_name: {view_name}. View should have a sql_table_name if it is not a derived table")
return None

extends_to_looker_view =[]
extends_to_looker_view = []

# The base view could live in the same file
for raw_view in looker_viewfile.views:
Expand All @@ -158,7 +169,7 @@ def from_looker_dict(looker_view, connection: str, looker_viewfile: LookerViewFi
for view in looker_viewfile.views:
maybe_looker_view = LookerView.from_looker_dict(view, connection, looker_viewfile, looker_viewfile_loader)
if maybe_looker_view is None:
continue
continue

if maybe_looker_view is not None and maybe_looker_view.view_name in extends:
extends_to_looker_view.append(maybe_looker_view)
Expand All @@ -175,7 +186,7 @@ def from_looker_dict(looker_view, connection: str, looker_viewfile: LookerViewFi

def get_platform_and_table(view_name: str, connection: str, sql_table_name: str):
"""
This will depend on what database connections you use in Looker
This will depend on what database connections you use in Looker
For SpotHero, we had two database connections in Looker: "redshift_test" (a redshift database) and "presto" (a presto database)
Presto supports querying across multiple catalogs, so we infer which underlying database presto is using based on the presto catalog name
For SpotHero, we have 3 catalogs in presto: "redshift", "hive", and "hive_emr"
Expand Down Expand Up @@ -330,4 +341,3 @@ def main():

if __name__ == "__main__":
main()

Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
lkml==0.2.2
lkml==1.1.0
avro-python3==1.8.2
confluent-kafka[avro]==1.4.0
confluent-kafka[avro]==1.4.0
sql-metadata==1.12.0

0 comments on commit 7948226

Please sign in to comment.