Skip to content

Instantly share code, notes, and snippets.

@ananis25
Created August 23, 2024 11:13
Show Gist options
  • Save ananis25/cb52e4fe6e2e88c9c155d0baee827728 to your computer and use it in GitHub Desktop.
Save ananis25/cb52e4fe6e2e88c9c155d0baee827728 to your computer and use it in GitHub Desktop.
sqlmesh-repro-duckdb-normalization
import os
import duckdb
from sqlmesh.core.config import (
Config,
GatewayConfig,
ModelDefaultsConfig,
DuckDBConnectionConfig,
)
from sqlmesh.core.context import Context
from sqlmesh.core.config.format import FormatConfig
from sqlmesh import configure_logging
configure_logging(force_debug=True)
tmp_path = "/tmp/repro-sqlmesh"
os.makedirs(tmp_path, exist_ok=True)
db_path = f"{tmp_path}/repro_db.db"
if os.path.exists(db_path):
os.remove(db_path)
# create some sample data
conn = duckdb.connect(database=db_path)
conn.execute(
"CREATE OR REPLACE TABLE CUSTOMERS (ID INTEGER, NAME TEXT, ADDRESSES JSON)"
)
insert_query = """
INSERT INTO CUSTOMERS (ID, NAME, ADDRESSES)
VALUES
(1, 'Alice', '[
{"STREET": "123 Main St", "CITY": "New York", "ZIP": "10001", "DEFAULT": true},
{"STREET": "456 Elm St", "CITY": "Boston", "ZIP": "02101", "DEFAULT": false}
]'),
(2, 'Bob', '[
{"STREET": "789 Oak Ave", "CITY": "Chicago", "ZIP": "60601", "DEFAULT": false},
{"STREET": "321 Pine Rd", "CITY": "San Francisco", "ZIP": "94102", "DEFAULT": true}
]'),
(3, 'Charlie', '[
{"STREET": "159 Maple Ln", "CITY": "Seattle", "ZIP": "98101", "DEFAULT": true}
]');
"""
conn.sql(insert_query)
conn.commit()
conn.close()
test_query = """
SELECT
c.ID AS ID,
c.NAME AS NAME,
MAX(CASE WHEN JSON_EXTRACT(a.VALUE, 'DEFAULT') = true THEN a.VALUE.ZIP ELSE NULL END) OVER (PARTITION BY c.ID) AS DEFAULT_ZIP_CODE
FROM
CUSTOMERS c,
UNNEST(FROM_JSON(c.ADDRESSES, '["JSON"]')) AS a(VALUE);
"""
model_definition = f"""
MODEL (
name FEATURES.FIRST,
kind FULL,
grain ID,
);
{test_query}
"""
os.makedirs(f"{tmp_path}/models", exist_ok=True)
model_path = f"{tmp_path}/models/first.sql"
with open(model_path, "w") as f:
f.write(model_definition)
config = Config(
gateways={
"main": GatewayConfig(connection=DuckDBConnectionConfig(database=db_path))
},
model_defaults=ModelDefaultsConfig(dialect="duckdb"),
format=FormatConfig(normalize=False),
)
context = Context(paths=tmp_path, config=config)
context.plan(auto_apply=True, no_prompts=True)
conn = duckdb.connect(database=db_path)
print(conn.sql("SELECT * FROM FEATURES.FIRST"))
print(conn.sql(test_query))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment