Skip to content

Commit

Permalink
Presto testo
Browse files Browse the repository at this point in the history
  • Loading branch information
Jacob Beck committed Jan 16, 2019
1 parent 16d7524 commit 85389af
Show file tree
Hide file tree
Showing 17 changed files with 433 additions and 55 deletions.
4 changes: 4 additions & 0 deletions core/dbt/adapters/base/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ def close(cls, connection):
if connection.state in {'closed', 'init'}:
return connection

if connection.transaction_open and connection.handle:
connection.handle.rollback()
connection.transaction_open = False

# On windows, sometimes connection handles don't have a close() attr.
if hasattr(connection.handle, 'close'):
connection.handle.close()
Expand Down
4 changes: 1 addition & 3 deletions plugins/presto/dbt/adapters/presto/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,9 +118,7 @@ def open(cls, connection):
catalog=credentials.database,
schema=credentials.schema,
auth=auth,
# we just need it to be anything but 'autocommit', this is what
# snowflake is. TODO: should this be toggle-able?
isolation_level=IsolationLevel.REPEATABLE_READ,
isolation_level=IsolationLevel.SERIALIZABLE,
)
connection.state = 'open'
connection.handle = handle
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
-e ./plugins/redshift
-e ./plugins/snowflake
-e ./plugins/bigquery
-e ./plugins/presto
30 changes: 27 additions & 3 deletions test/integration/029_docs_generate_tests/test_docs_generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,10 @@ def get_role(self):
profile = self.get_profile(self.adapter_type)
target_name = profile['test']['target']
return profile['test']['outputs'][target_name]['user']
elif self.adapter_type == 'bigquery':
return None
else: # snowflake
elif self.adapter_type == 'snowflake':
return self.run_sql('select current_role()', fetch='one')[0]
else: # bigquery, presto, other dbs that have no 'role'
return None

def expected_postgres_references_catalog(self):
model_database = self.default_database
Expand Down Expand Up @@ -474,6 +474,19 @@ def expected_bigquery_catalog(self):
model_database=self.alternative_database
)

def expected_presto_catalog(self):
return self._expected_catalog(
id_type='integer',
text_type='varchar',
time_type='timestamp',
view_type='VIEW',
table_type='BASE TABLE',
model_stats=self._no_stats(),
seed_stats=self._no_stats(),
model_database=self.default_database
)


@staticmethod
def _clustered_bigquery_columns(update_type):
return {
Expand Down Expand Up @@ -2304,3 +2317,14 @@ def test__redshift__incremental_view(self):
)
self.verify_catalog(self.expected_redshift_incremental_catalog())
self.verify_manifest(self.expected_redshift_incremental_view_manifest())

@use_profile('presto')
def test__presto__run_and_generate(self):
self.run_and_generate(alternate_db=self.default_database)
self.verify_catalog(self.expected_presto_catalog())
self.verify_manifest(self.expected_seeded_manifest(
model_database=self.default_database
))
self.verify_run_results(self.expected_run_results(
model_database=self.default_database
))
10 changes: 10 additions & 0 deletions test/integration/030_statement_test/test_statements.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ def test_snowflake_statements(self):

self.assertManyTablesEqual(["STATEMENT_ACTUAL", "STATEMENT_EXPECTED"])

@use_profile("presto")
def test_presto_statements(self):
self.use_default_project({"data-paths": [self.dir("seed")]})

results = self.run_dbt(["seed"])
self.assertEqual(len(results), 2)
results = self.run_dbt()
self.assertEqual(len(results), 1)

self.assertTablesEqual("statement_actual","statement_expected")

class TestStatementsBigquery(DBTIntegrationTest):

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,19 @@ def test__bigquery__switch_materialization(self):
results = self.run_dbt(['run', '--vars', 'materialized: view', "--full-refresh"])
self.assertEquals(results[0].node['config']['materialized'], 'view')
self.assertEqual(len(results), 1)

@use_profile('presto')
def test__presto__switch_materialization(self):
# presto can't do incremental materializations so there's less to this

results = self.run_dbt(['run', '--vars', 'materialized: view'])
self.assertEquals(results[0].node['config']['materialized'], 'view')
self.assertEqual(len(results), 1)

results = self.run_dbt(['run', '--vars', 'materialized: table'])
self.assertEquals(results[0].node['config']['materialized'], 'table')
self.assertEqual(len(results), 1)

results = self.run_dbt(['run', '--vars', 'materialized: view'])
self.assertEquals(results[0].node['config']['materialized'], 'view')
self.assertEqual(len(results), 1)
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,50 @@ def test__snowflake__changed_table_schema_for_downstream_view_changed_to_table(s
for result in results:
node_name = result.node.name
self.assertEqual(result.node.config['materialized'], expected_types[node_name])

@use_profile('presto')
def test__presto__changed_table_schema_for_downstream_view(self):
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)

results = self.run_dbt(["run"])
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["people", "base_table", "dependent_model"])

# Change the schema of base_table, assert that dependent_model doesn't fail
results = self.run_dbt(["run", "--vars", "{add_table_field: true, dependent_type: view}"])
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["base_table", "dependent_model"])

@use_profile('presto')
def test__presto__changed_table_schema_for_downstream_view_changed_to_table(self):
results = self.run_dbt(["seed"])
self.assertEqual(len(results), 1)

results = self.run_dbt(["run"])
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["people", "base_table", "dependent_model"])

expected_types = {
'base_table': 'table',
'dependent_model': 'view'
}

# ensure that the model actually was materialized as a table
for result in results:
node_name = result.node.name
self.assertEqual(result.node.config['materialized'], expected_types[node_name])

results = self.run_dbt(["run", "--vars", "{add_table_field: true, dependent_type: table}"])
self.assertEqual(len(results), 2)
self.assertManyTablesEqual(["base_table", "dependent_model"])

expected_types = {
'base_table': 'table',
'dependent_model': 'table'
}

# ensure that the model actually was materialized as a table
for result in results:
node_name = result.node.name
self.assertEqual(result.node.config['materialized'], expected_types[node_name])
5 changes: 5 additions & 0 deletions test/integration/041_presto_test/data/data_seed.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
id,dupe
1,a
2,a
3,a
4,a
18 changes: 18 additions & 0 deletions test/integration/041_presto_test/macros/test_creation.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@

{% macro test_was_materialized(model, name, type) %}

{#-- don't run this query in the parsing step #}
{%- if model -%}
{%- set table = adapter.get_relation(database=model.database, schema=model.schema,
identifier=model.name) -%}
{%- else -%}
{%- set table = {} -%}
{%- endif -%}

{% if table and table.type == type %}
select 0 as success
{% else %}
select 1 as error
{% endif %}

{% endmacro %}
4 changes: 4 additions & 0 deletions test/integration/041_presto_test/models/ephemeral_model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

{{ config(materialized = "ephemeral") }}

select * from {{ ref('view_model') }}
21 changes: 21 additions & 0 deletions test/integration/041_presto_test/models/schema.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

view_model:
constraints:
not_null:
- id
- updated_at

unique:
- id
- dupe # fails

was_materialized:
- {name: view_model, type: view}

table_model:
constraints:
not_null:
- id

was_materialized:
- {name: table_model, type: table}
4 changes: 4 additions & 0 deletions test/integration/041_presto_test/models/table_model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@

{{ config(materialized = "table") }}

select * from {{ ref('ephemeral_model') }}
13 changes: 13 additions & 0 deletions test/integration/041_presto_test/models/view_model.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
{{
config(
materialized = "view"
)
}}


select
id,
current_date as updated_at,
dupe

from {{ ref('data_seed') }}
73 changes: 73 additions & 0 deletions test/integration/041_presto_test/test_simple_presto_view.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from test.integration.base import DBTIntegrationTest, FakeArgs, use_profile
import random
import time


class TestBasePrestoRun(DBTIntegrationTest):

@property
def schema(self):
return "presto_test_41"

@property
def models(self):
return "test/integration/041_presto_test/models"

@property
def project_config(self):
return {
'data-paths': ['test/integration/041_presto_test/data'],
'macro-paths': ['test/integration/041_presto_test/macros'],
}

@property
def profile_config(self):
return self.presto_profile()

def assert_nondupes_pass(self):
# The 'dupe' model should fail, but all others should pass
test_results = self.run_dbt(['test'], expect_pass=False)

for result in test_results:
if 'dupe' in result.node.get('name'):
self.assertFalse(result.errored)
self.assertFalse(result.skipped)
self.assertTrue(result.status > 0)

# assert that actual tests pass
else:
self.assertFalse(result.errored)
self.assertFalse(result.skipped)
# status = # of failing rows
self.assertEqual(result.status, 0)


class TestSimplePrestoRun(TestBasePrestoRun):
def setUp(self):
super(TestSimplePrestoRun, self).setUp()
for conn in self.adapter.connections.in_use.values():
conn.transaction_open

@use_profile('presto')
def test__presto_simple_run(self):
# make sure seed works twice. Full-refresh is a no-op
self.run_dbt(['seed'])
self.run_dbt(['seed', '--full-refresh'])
results = self.run_dbt()
self.assertEqual(len(results), 2)
self.assert_nondupes_pass()


class TestUnderscorePrestoRun(TestBasePrestoRun):
prefix = "_test{}{:04}".format(int(time.time()), random.randint(0, 9999))

@use_profile('presto')
def test_presto_run_twice(self):
self.run_dbt(['seed'])
results = self.run_dbt()
self.assertEqual(len(results), 2)
self.assert_nondupes_pass()
results = self.run_dbt()
self.assertEqual(len(results), 2)
self.assert_nondupes_pass()

Loading

0 comments on commit 85389af

Please sign in to comment.