Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
284 changes: 284 additions & 0 deletions examples/basic/functional_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,17 @@
# Import SDK components (assumes installation is already validated)
from PowerPlatform.Dataverse.client import DataverseClient
from PowerPlatform.Dataverse.core.errors import HttpError, MetadataError
from PowerPlatform.Dataverse.models.relationship import (
LookupAttributeMetadata,
OneToManyRelationshipMetadata,
ManyToManyRelationshipMetadata,
CascadeConfiguration,
)
from PowerPlatform.Dataverse.models.labels import Label, LocalizedLabel
from PowerPlatform.Dataverse.common.constants import (
CASCADE_BEHAVIOR_NO_CASCADE,
CASCADE_BEHAVIOR_REMOVE_LINK,
)
from azure.identity import InteractiveBrowserCredential


Expand Down Expand Up @@ -380,6 +391,274 @@ def cleanup_test_data(client: DataverseClient, table_info: Dict[str, Any], recor
print("Test table kept for future testing")


def backoff(op, *, delays=(0, 2, 5, 10, 20, 20)):
"""Retry helper with exponential backoff for metadata propagation delays."""
last = None
total_delay = 0
attempts = 0
for d in delays:
if d:
time.sleep(d)
total_delay += d
attempts += 1
try:
result = op()
if attempts > 1:
print(f" * Backoff succeeded after {attempts - 1} retry(s); waited {total_delay}s total.")
return result
except Exception as ex:
last = ex
continue
if last:
if attempts:
print(f" [WARN] Backoff exhausted after {max(attempts - 1, 0)} retry(s); waited {total_delay}s total.")
raise last


def test_relationships(client: DataverseClient) -> None:
"""Test relationship lifecycle: create tables, 1:N, N:N, query, delete."""
print("\n-> Relationship Tests")
print("=" * 50)

rel_parent_schema = "test_RelParent"
rel_child_schema = "test_RelChild"
rel_m2m_schema = "test_RelProject"

# Track IDs for cleanup
rel_id_1n = None
rel_id_lookup = None
rel_id_nn = None
created_tables = []

try:
# --- Cleanup any leftover resources from previous run ---
print("Checking for leftover relationship test resources...")
found_leftovers = False
for rel_name in [
"test_RelParent_RelChild",
"contact_test_relchild_test_ManagerId",
"test_relchild_relproject",
]:
try:
rel = client.tables.get_relationship(rel_name)
if rel:
found_leftovers = True
break
except Exception:
pass

if not found_leftovers:
for tbl in [rel_child_schema, rel_parent_schema, rel_m2m_schema]:
try:
if client.tables.get(tbl):
found_leftovers = True
break
except Exception:
pass

if found_leftovers:
cleanup_ok = input("Found leftover test resources. Clean up? (y/N): ").strip().lower() in ["y", "yes"]
if cleanup_ok:
for rel_name in [
"test_RelParent_RelChild",
"contact_test_relchild_test_ManagerId",
"test_relchild_relproject",
]:
try:
rel = client.tables.get_relationship(rel_name)
if rel:
client.tables.delete_relationship(rel.relationship_id)
print(f" (Cleaned up relationship: {rel_name})")
except Exception:
pass

for tbl in [rel_child_schema, rel_parent_schema, rel_m2m_schema]:
try:
if client.tables.get(tbl):
client.tables.delete(tbl)
print(f" (Cleaned up table: {tbl})")
except Exception:
pass
else:
print("Skipping cleanup -- resources may conflict with new test run.")

# --- Create parent and child tables ---
print("\nCreating relationship test tables...")

parent_info = backoff(
lambda: client.tables.create(
rel_parent_schema,
{"test_Code": "string"},
)
)
created_tables.append(rel_parent_schema)
print(f"[OK] Created parent table: {parent_info['table_schema_name']}")

child_info = backoff(
lambda: client.tables.create(
rel_child_schema,
{"test_Number": "string"},
)
)
created_tables.append(rel_child_schema)
print(f"[OK] Created child table: {child_info['table_schema_name']}")

proj_info = backoff(
lambda: client.tables.create(
rel_m2m_schema,
{"test_ProjectCode": "string"},
)
)
created_tables.append(rel_m2m_schema)
print(f"[OK] Created M:N table: {proj_info['table_schema_name']}")

# --- Wait for table metadata to propagate ---
wait_for_table_metadata(client, rel_parent_schema)
wait_for_table_metadata(client, rel_child_schema)
wait_for_table_metadata(client, rel_m2m_schema)

# --- Test 1: Create 1:N relationship (core API) ---
print("\n Test 1: Create 1:N relationship (core API)")
print(" " + "-" * 45)

lookup = LookupAttributeMetadata(
schema_name="test_ParentId",
display_name=Label(localized_labels=[LocalizedLabel(label="Parent", language_code=1033)]),
required_level="None",
)

relationship = OneToManyRelationshipMetadata(
schema_name="test_RelParent_RelChild",
referenced_entity=parent_info["table_logical_name"],
referencing_entity=child_info["table_logical_name"],
referenced_attribute=f"{parent_info['table_logical_name']}id",
cascade_configuration=CascadeConfiguration(
delete=CASCADE_BEHAVIOR_REMOVE_LINK,
assign=CASCADE_BEHAVIOR_NO_CASCADE,
merge=CASCADE_BEHAVIOR_NO_CASCADE,
),
)

result_1n = backoff(
lambda: client.tables.create_one_to_many_relationship(
lookup=lookup,
relationship=relationship,
)
)

assert result_1n.relationship_schema_name == "test_RelParent_RelChild"
assert result_1n.relationship_type == "one_to_many"
assert result_1n.lookup_schema_name is not None
rel_id_1n = result_1n.relationship_id
print(f" [OK] Created 1:N relationship: {result_1n.relationship_schema_name}")
print(f" Lookup: {result_1n.lookup_schema_name}")
print(f" ID: {rel_id_1n}")

# --- Test 2: Create lookup field (convenience API) ---
print("\n Test 2: Create lookup field (convenience API)")
print(" " + "-" * 45)

result_lookup = backoff(
lambda: client.tables.create_lookup_field(
referencing_table=child_info["table_logical_name"],
lookup_field_name="test_ManagerId",
referenced_table="contact",
display_name="Manager",
description="The record's manager contact",
required=False,
cascade_delete=CASCADE_BEHAVIOR_REMOVE_LINK,
)
)

assert result_lookup.relationship_type == "one_to_many"
assert result_lookup.lookup_schema_name is not None
rel_id_lookup = result_lookup.relationship_id
print(f" [OK] Created lookup: {result_lookup.lookup_schema_name}")
print(f" Relationship: {result_lookup.relationship_schema_name}")

# --- Test 3: Create N:N relationship ---
print("\n Test 3: Create N:N relationship")
print(" " + "-" * 45)

m2m = ManyToManyRelationshipMetadata(
schema_name="test_relchild_relproject",
entity1_logical_name=child_info["table_logical_name"],
entity2_logical_name=proj_info["table_logical_name"],
)

result_nn = backoff(lambda: client.tables.create_many_to_many_relationship(relationship=m2m))

assert result_nn.relationship_schema_name == "test_relchild_relproject"
assert result_nn.relationship_type == "many_to_many"
rel_id_nn = result_nn.relationship_id
print(f" [OK] Created N:N relationship: {result_nn.relationship_schema_name}")
print(f" ID: {rel_id_nn}")

# --- Test 4: Get relationship metadata ---
print("\n Test 4: Query relationship metadata")
print(" " + "-" * 45)

fetched_1n = client.tables.get_relationship("test_RelParent_RelChild")
assert fetched_1n is not None
assert fetched_1n.relationship_type == "one_to_many"
assert fetched_1n.relationship_id == rel_id_1n
print(f" [OK] Retrieved 1:N: {fetched_1n.relationship_schema_name}")
print(f" Referenced: {fetched_1n.referenced_entity}")
print(f" Referencing: {fetched_1n.referencing_entity}")

fetched_nn = client.tables.get_relationship("test_relchild_relproject")
assert fetched_nn is not None
assert fetched_nn.relationship_type == "many_to_many"
assert fetched_nn.relationship_id == rel_id_nn
print(f" [OK] Retrieved N:N: {fetched_nn.relationship_schema_name}")
print(f" Entity1: {fetched_nn.entity1_logical_name}")
print(f" Entity2: {fetched_nn.entity2_logical_name}")

# Non-existent relationship should return None
missing = client.tables.get_relationship("nonexistent_relationship_xyz")
assert missing is None
print(" [OK] Non-existent relationship returns None")

# --- Test 5: Delete relationships ---
print("\n Test 5: Delete relationships")
print(" " + "-" * 45)

backoff(lambda: client.tables.delete_relationship(rel_id_1n))
rel_id_1n = None
print(" [OK] Deleted 1:N relationship")

backoff(lambda: client.tables.delete_relationship(rel_id_lookup))
rel_id_lookup = None
print(" [OK] Deleted lookup relationship")

backoff(lambda: client.tables.delete_relationship(rel_id_nn))
rel_id_nn = None
print(" [OK] Deleted N:N relationship")

# Verify deletion
verify = client.tables.get_relationship("test_RelParent_RelChild")
assert verify is None
print(" [OK] Verified 1:N deletion (get returns None)")

print("\n[OK] All relationship tests passed!")

finally:
# Cleanup: delete any remaining relationships then tables
for rid in [rel_id_1n, rel_id_lookup, rel_id_nn]:
if rid:
try:
client.tables.delete_relationship(rid)
except Exception:
pass

for tbl in reversed(created_tables):
try:
backoff(lambda name=tbl: client.tables.delete(name))
print(f" (Cleaned up table: {tbl})")
except Exception as e:
print(f" [WARN] Could not delete {tbl}: {e}")


def _table_still_exists(client: DataverseClient, table_schema_name: Optional[str]) -> bool:
if not table_schema_name:
return False
Expand All @@ -403,6 +682,7 @@ def main():
print(" - Table Creation & Metadata Operations")
print(" - Record CRUD Operations")
print(" - Query Functionality")
print(" - Relationship Operations (1:N, N:N, lookup, get, delete)")
print(" - Interactive Cleanup")
print("=" * 70)
print("For installation validation, run examples/basic/installation_example.py first")
Expand All @@ -422,6 +702,9 @@ def main():
# Test querying
test_query_records(client, table_info)

# Test relationships
test_relationships(client)

# Success summary
print("\nFunctional Test Summary")
print("=" * 50)
Expand All @@ -430,6 +713,7 @@ def main():
print("[OK] Record Creation: Success")
print("[OK] Record Reading: Success")
print("[OK] Record Querying: Success")
print("[OK] Relationship Operations: Success")
print("\nYour PowerPlatform Dataverse Client SDK is fully functional!")

# Cleanup
Expand Down
8 changes: 8 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,11 @@ select = [
"UP", # pyupgrade
"B", # flake8-bugbear
]

[tool.pytest.ini_options]
testpaths = ["tests/unit"]
Comment thread
saurabhrb marked this conversation as resolved.
markers = [
"e2e: end-to-end tests requiring a live Dataverse environment (DATAVERSE_URL)",
]
# e2e tests require a live Dataverse environment:
# DATAVERSE_URL=https://yourorg.crm.dynamics.com pytest tests/e2e/ -v -s
2 changes: 2 additions & 0 deletions tests/e2e/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Copyright (c) Microsoft Corporation.
# Licensed under the MIT license.
Loading
Loading