Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/snowflake): fix test connection #10927

Merged
merged 2 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -277,10 +277,12 @@ class SnowflakePrivilege:
capabilities: List[SourceCapability] = [c.capability for c in SnowflakeV2Source.get_capabilities() if c.capability not in (SourceCapability.PLATFORM_INSTANCE, SourceCapability.DOMAINS, SourceCapability.DELETION_DETECTION)] # type: ignore

cur = conn.query("select current_role()")
current_role = [row[0] for row in cur][0]
current_role = [row["CURRENT_ROLE()"] for row in cur][0]

cur = conn.query("select current_secondary_roles()")
secondary_roles_str = json.loads([row[0] for row in cur][0])["roles"]
secondary_roles_str = json.loads(
[row["CURRENT_SECONDARY_ROLES()"] for row in cur][0]
)["roles"]
secondary_roles = (
[] if secondary_roles_str == "" else secondary_roles_str.split(",")
)
Expand All @@ -299,7 +301,9 @@ class SnowflakePrivilege:
cur = conn.query(f'show grants to role "{role}"')
for row in cur:
privilege = SnowflakePrivilege(
privilege=row[1], object_type=row[2], object_name=row[3]
privilege=row["privilege"],
object_type=row["granted_on"],
object_name=row["name"],
)
privileges.append(privilege)

Expand Down Expand Up @@ -362,7 +366,7 @@ class SnowflakePrivilege:
roles.append(privilege.object_name)

cur = conn.query("select current_warehouse()")
current_warehouse = [row[0] for row in cur][0]
current_warehouse = [row["CURRENT_WAREHOUSE()"] for row in cur][0]

default_failure_messages = {
SourceCapability.SCHEMA_METADATA: "Either no tables exist or current role does not have permissions to access them",
Expand Down
128 changes: 62 additions & 66 deletions metadata-ingestion/tests/unit/test_snowflake_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -274,43 +274,43 @@ def test_test_connection_basic_success(mock_connect):
test_connection_helpers.assert_basic_connectivity_success(report)


def setup_mock_connect(mock_connect, query_results=None):
def default_query_results(query):
class MissingQueryMock(Exception):
pass


def setup_mock_connect(mock_connect, extra_query_results=None):
def query_results(query):
if extra_query_results is not None:
try:
return extra_query_results(query)
except MissingQueryMock:
pass

if query == "select current_role()":
return [("TEST_ROLE",)]
return [{"CURRENT_ROLE()": "TEST_ROLE"}]
elif query == "select current_secondary_roles()":
return [('{"roles":"","value":""}',)]
return [{"CURRENT_SECONDARY_ROLES()": '{"roles":"","value":""}'}]
elif query == "select current_warehouse()":
return [("TEST_WAREHOUSE")]
raise ValueError(f"Unexpected query: {query}")
return [{"CURRENT_WAREHOUSE()": "TEST_WAREHOUSE"}]
elif query == 'show grants to role "PUBLIC"':
return []
raise MissingQueryMock(f"Unexpected query: {query}")
Comment on lines 289 to +297
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Refactor Suggestion: Use Dictionary for Query Results

To improve efficiency and maintainability, consider using a dictionary for handling query results instead of consecutive if statements.

-        if query == "select current_role()":
-            return [{"CURRENT_ROLE()": "TEST_ROLE"}]
-        elif query == "select current_secondary_roles()":
-            return [{"CURRENT_SECONDARY_ROLES()": '{"roles":"","value":""}'}]
-        elif query == "select current_warehouse()":
-            return [{"CURRENT_WAREHOUSE()": "TEST_WAREHOUSE"}]
-        elif query == 'show grants to role "PUBLIC"':
-            return []
+        query_map = {
+            "select current_role()": [{"CURRENT_ROLE()": "TEST_ROLE"}],
+            "select current_secondary_roles()": [{"CURRENT_SECONDARY_ROLES()": '{"roles":"","value":""}'}],
+            "select current_warehouse()": [{"CURRENT_WAREHOUSE()": "TEST_WAREHOUSE"}],
+            'show grants to role "PUBLIC"': []
+        }
+        return query_map.get(query, raise MissingQueryMock(f"Unexpected query: {query}"))

Committable suggestion was skipped due to low confidence.

Tools
Ruff

289-296: Use a dictionary instead of consecutive if statements

(SIM116)


connection_mock = MagicMock()
cursor_mock = MagicMock()
cursor_mock.execute.side_effect = (
query_results if query_results is not None else default_query_results
)
cursor_mock.execute.side_effect = query_results
connection_mock.cursor.return_value = cursor_mock
mock_connect.return_value = connection_mock


@patch("snowflake.connector.connect")
def test_test_connection_no_warehouse(mock_connect):
def query_results(query):
if query == "select current_role()":
return [("TEST_ROLE",)]
elif query == "select current_secondary_roles()":
return [('{"roles":"","value":""}',)]
elif query == "select current_warehouse()":
return [(None,)]
if query == "select current_warehouse()":
return [{"CURRENT_WAREHOUSE()": None}]
elif query == 'show grants to role "TEST_ROLE"':
return [
("", "USAGE", "DATABASE", "DB1"),
("", "USAGE", "SCHEMA", "DB1.SCHEMA1"),
("", "REFERENCES", "TABLE", "DB1.SCHEMA1.TABLE1"),
]
elif query == 'show grants to role "PUBLIC"':
return []
raise ValueError(f"Unexpected query: {query}")
return [{"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"}]
raise MissingQueryMock(f"Unexpected query: {query}")

setup_mock_connect(mock_connect, query_results)
report = test_connection_helpers.run_test_connection(
Expand All @@ -330,17 +330,9 @@ def query_results(query):
@patch("snowflake.connector.connect")
def test_test_connection_capability_schema_failure(mock_connect):
def query_results(query):
if query == "select current_role()":
return [("TEST_ROLE",)]
elif query == "select current_secondary_roles()":
return [('{"roles":"","value":""}',)]
elif query == "select current_warehouse()":
return [("TEST_WAREHOUSE",)]
elif query == 'show grants to role "TEST_ROLE"':
return [("", "USAGE", "DATABASE", "DB1")]
elif query == 'show grants to role "PUBLIC"':
return []
raise ValueError(f"Unexpected query: {query}")
if query == 'show grants to role "TEST_ROLE"':
return [{"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"}]
raise MissingQueryMock(f"Unexpected query: {query}")

setup_mock_connect(mock_connect, query_results)

Expand All @@ -361,21 +353,17 @@ def query_results(query):
@patch("snowflake.connector.connect")
def test_test_connection_capability_schema_success(mock_connect):
def query_results(query):
if query == "select current_role()":
return [("TEST_ROLE",)]
elif query == "select current_secondary_roles()":
return [('{"roles":"","value":""}',)]
elif query == "select current_warehouse()":
return [("TEST_WAREHOUSE")]
elif query == 'show grants to role "TEST_ROLE"':
if query == 'show grants to role "TEST_ROLE"':
return [
["", "USAGE", "DATABASE", "DB1"],
["", "USAGE", "SCHEMA", "DB1.SCHEMA1"],
["", "REFERENCES", "TABLE", "DB1.SCHEMA1.TABLE1"],
{"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"},
{"privilege": "USAGE", "granted_on": "SCHEMA", "name": "DB1.SCHEMA1"},
{
"privilege": "REFERENCES",
"granted_on": "TABLE",
"name": "DB1.SCHEMA1.TABLE1",
},
]
elif query == 'show grants to role "PUBLIC"':
return []
raise ValueError(f"Unexpected query: {query}")
raise MissingQueryMock(f"Unexpected query: {query}")

setup_mock_connect(mock_connect, query_results)

Expand All @@ -397,30 +385,38 @@ def query_results(query):
@patch("snowflake.connector.connect")
def test_test_connection_capability_all_success(mock_connect):
def query_results(query):
if query == "select current_role()":
return [("TEST_ROLE",)]
elif query == "select current_secondary_roles()":
return [('{"roles":"","value":""}',)]
elif query == "select current_warehouse()":
return [("TEST_WAREHOUSE")]
elif query == 'show grants to role "TEST_ROLE"':
if query == 'show grants to role "TEST_ROLE"':
return [
("", "USAGE", "DATABASE", "DB1"),
("", "USAGE", "SCHEMA", "DB1.SCHEMA1"),
("", "SELECT", "TABLE", "DB1.SCHEMA1.TABLE1"),
("", "USAGE", "ROLE", "TEST_USAGE_ROLE"),
{"privilege": "USAGE", "granted_on": "DATABASE", "name": "DB1"},
{"privilege": "USAGE", "granted_on": "SCHEMA", "name": "DB1.SCHEMA1"},
{
"privilege": "SELECT",
"granted_on": "TABLE",
"name": "DB1.SCHEMA1.TABLE1",
},
{"privilege": "USAGE", "granted_on": "ROLE", "name": "TEST_USAGE_ROLE"},
]
elif query == 'show grants to role "PUBLIC"':
return []
elif query == 'show grants to role "TEST_USAGE_ROLE"':
return [
["", "USAGE", "DATABASE", "SNOWFLAKE"],
["", "USAGE", "SCHEMA", "ACCOUNT_USAGE"],
["", "USAGE", "VIEW", "SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY"],
["", "USAGE", "VIEW", "SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY"],
["", "USAGE", "VIEW", "SNOWFLAKE.ACCOUNT_USAGE.OBJECT_DEPENDENCIES"],
{"privilege": "USAGE", "granted_on": "DATABASE", "name": "SNOWFLAKE"},
{"privilege": "USAGE", "granted_on": "SCHEMA", "name": "ACCOUNT_USAGE"},
{
"privilege": "USAGE",
"granted_on": "VIEW",
"name": "SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY",
},
{
"privilege": "USAGE",
"granted_on": "VIEW",
"name": "SNOWFLAKE.ACCOUNT_USAGE.ACCESS_HISTORY",
},
{
"privilege": "USAGE",
"granted_on": "VIEW",
"name": "SNOWFLAKE.ACCOUNT_USAGE.OBJECT_DEPENDENCIES",
},
]
raise ValueError(f"Unexpected query: {query}")
raise MissingQueryMock(f"Unexpected query: {query}")

setup_mock_connect(mock_connect, query_results)

Expand Down
Loading