Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/update logic mssql lineage #10959

Open
wants to merge 15 commits into
base: master
Choose a base branch
from

Conversation

sleeperdeep
Copy link
Contributor

@sleeperdeep sleeperdeep commented Jul 22, 2024

Logic of extracting lineage for mssql source was updated.

Summary by CodeRabbit

  • New Features

    • Introduced enhanced SQL Server lineage tracking with new configuration options.
    • Added properties for improved management of SQL procedure dependencies and lineage data representation.
    • Created a comprehensive JSON file outlining metadata updates for better data governance and lineage visibility.
  • Bug Fixes

    • Resolved issues by streamlining metadata processing and enhancing lineage data retrieval methods.
  • Documentation

    • Updated JSON files to reflect new metadata structure and lineage tracking capabilities, ensuring clarity in data management.
  • Chores

    • Updated timestamps and identifiers in JSON configurations to enhance data accuracy and relevance.

…mssql objects; 3) add mssql_lineage flag for config
…mssql objects; 3) add mssql_lineage flag for config
…mssql objects; 3) add mssql_lineage flag for config
@github-actions github-actions bot added the ingestion PR or Issue related to the ingestion of metadata label Jul 22, 2024
Copy link
Contributor

coderabbitai bot commented Jul 22, 2024

Walkthrough

The recent updates enhance the SQL Server ingestion process by integrating detailed lineage tracking capabilities, refining the management of SQL procedure dependencies, and modernizing JSON metadata structures. New attributes and methods facilitate improved data governance and traceability, while configuration options allow for customized lineage extraction from MSSQL databases.

Changes

Files Change Summary
.../job_models.py, .../source.py Enhanced ProcedureDependency and ProcedureLineageStream classes with new attributes and methods for improved dependency tracking and lineage management in SQL Server ingestion.
.../golden_files/*.json Updated JSON files to reflect new metadata structures, including changes to entity types, properties, and timestamps for better lineage tracking and data governance.
.../setup/setup.sql Introduced SQL commands to create a new database, schema, tables, and stored procedures, enhancing the database structure for effective lineage tracking.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant SQLServerConfig
    participant LineageTracker
    participant Database

    User->>SQLServerConfig: Set mssql_lineage = True
    SQLServerConfig->>LineageTracker: Enable lineage tracking
    LineageTracker->>Database: Retrieve dependencies
    Database->>LineageTracker: Provide metadata and lineage info
    LineageTracker->>User: Return lineage data
Loading

A Rabbit's Whimsy
In fields of data, hops so bright,
Procedures dance in day and night.
With lineage tracked, they twirl and play,
Ensuring insights never stray.
So let us cheer, with joy and cheer,
For the paths of data, now so clear! 🐰✨


Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

@github-actions github-actions bot added the community-contribution PR or Issue raised by member(s) of DataHub Community label Jul 22, 2024
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Outside diff range, codebase verification and nitpick comments (1)
metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json (1)

Issues found in JSON structure and data consistency

The JSON file contains some entries with null values for entityUrn, entityType, and aspectName, which indicates potential issues with data consistency. Please verify and correct the following entries:

  • Entries with null values for entityUrn
  • Entries with null values for entityType
  • Entries with null values for aspectName

These inconsistencies need to be resolved to ensure the integrity of the metadata.

Analysis chain

Line range hint 1-1963:
Ensure overall consistency and completeness of the JSON structure.

Verify that all changes are consistent and the JSON structure is complete and correctly formatted.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify overall consistency and completeness of the JSON structure.

# Test: Use jq to validate JSON structure. Expect: Valid JSON structure.
jq empty metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json

Length of output: 102


Script:

#!/bin/bash
# Description: Verify the consistency of entity types, URNs, and aspects within the JSON file.

# Test 1: Check for unique entity URNs.
jq '.[].entityUrn' metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json | sort | uniq -c

# Test 2: Check for consistent entity types.
jq '.[].entityType' metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json | sort | uniq -c

# Test 3: Check for consistent aspect names.
jq '.[].aspectName' metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json | sort | uniq -c

Length of output: 2285

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 20574cf and 8d6b7f9.

Files selected for processing (8)
  • metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py (3 hunks)
  • metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (10 hunks)
  • metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_lineage.json (1 hunks)
  • metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json (26 hunks)
  • metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json (3 hunks)
  • metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_with_lower_case_urn.json (3 hunks)
  • metadata-ingestion/tests/integration/sql_server/setup/setup.sql (1 hunks)
  • metadata-ingestion/tests/integration/sql_server/source_files/mssql_lineage.yml (1 hunks)
Additional context used
Ruff
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py

550-550: Remove unnecessary True if ... else False

Remove unnecessary True if ... else False

(SIM210)

Additional comments not posted (60)
metadata-ingestion/tests/integration/sql_server/source_files/mssql_lineage.yml (1)

15-18: LGTM!

The sink configuration looks good.

metadata-ingestion/tests/integration/sql_server/setup/setup.sql (5)

96-97: LGTM!

The database creation command looks good.


100-101: LGTM!

The schema creation command looks good.


102-105: LGTM!

The table creation commands look good.

Also applies to: 108-109


106-107: LGTM!

The view creation command looks good.


110-117: LGTM!

The stored procedure creation command looks good.

metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py (3)

18-26: LGTM!

The changes to the ProcedureDependency class look good.

Also applies to: 29-31


36-76: LGTM!

The changes to the ProcedureLineageStream class look good.


119-119: LGTM!

The changes to the MSSQLProceduresContainer class look good.

metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_lineage.json (19)

2-22: LGTM!

The JSON object representing the container entity is well-formed and contains all necessary fields.


23-38: LGTM!

The JSON object representing the container entity with the status aspect is well-formed and contains all necessary fields.


39-54: LGTM!

The JSON object representing the container entity with the dataPlatformInstance aspect is well-formed and contains all necessary fields.


55-72: LGTM!

The JSON object representing the container entity with the subTypes aspect is well-formed and contains all necessary fields.


73-88: LGTM!

The JSON object representing the container entity with the browsePathsV2 aspect is well-formed and contains all necessary fields.


89-197: LGTM!

The JSON object representing the container entity with various aspects is well-formed and contains all necessary fields.


198-213: LGTM!

The JSON object representing the dataset entity with the container aspect is well-formed and contains all necessary fields.


214-286: LGTM!

The JSON object representing the dataset snapshot with various aspects is well-formed and contains all necessary fields.


287-304: LGTM!

The JSON object representing the dataset entity with the subTypes aspect is well-formed and contains all necessary fields.


305-329: LGTM!

The JSON object representing the dataset entity with the browsePathsV2 aspect is well-formed and contains all necessary fields.


330-345: LGTM!

The JSON object representing the dataset entity with the container aspect is well-formed and contains all necessary fields.


346-418: LGTM!

The JSON object representing the dataset snapshot with various aspects is well-formed and contains all necessary fields.


419-436: LGTM!

The JSON object representing the dataset entity with the subTypes aspect is well-formed and contains all necessary fields.


437-461: LGTM!

The JSON object representing the dataset entity with the browsePathsV2 aspect is well-formed and contains all necessary fields.


462-477: LGTM!

The JSON object representing the dataset entity with the container aspect is well-formed and contains all necessary fields.


478-550: LGTM!

The JSON object representing the dataset snapshot with various aspects is well-formed and contains all necessary fields.


551-568: LGTM!

The JSON object representing the dataset entity with the subTypes aspect is well-formed and contains all necessary fields.


569-593: LGTM!

The JSON object representing the dataset entity with the browsePathsV2 aspect is well-formed and contains all necessary fields.


594-894: LGTM!

The JSON object representing various entities with multiple aspects is well-formed and contains all necessary fields.

metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (4)

109-112: LGTM!

The new boolean field mssql_lineage for enabling automatic lineage tracking is well-implemented and follows the existing pattern for configuration options.


273-294: LGTM!

The new method get_ucs is well-implemented and correctly retrieves upstream lineage information based on the mssql_lineage configuration.


578-609: LGTM!

The new method _populate_object_links is well-implemented and correctly retrieves SQL expression dependencies.


889-910: LGTM!

The new method _extract_procedure_dependency is well-implemented and correctly extracts dependencies for a stored procedure.

metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json (2)

115-119: Verify the updated job_id and timestamps.

The job_id, date_created, and date_modified fields have been updated. Ensure these values are consistent with other related metadata and correctly formatted.

Verification successful

Verified: The updated job_id and timestamps are consistent and correctly formatted.

The job_id, date_created, and date_modified fields have been updated and are consistent across the relevant files.

  • job_id: 9c897afa-fbd6-43a5-bbf1-f904f80529fc
  • date_created: 2024-07-18 14:21:24.070000
  • date_modified: 2024-07-18 14:21:24.250000
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify consistency of updated job_id and timestamps.

# Test: Search for the updated job_id and timestamps. Expect: Consistent values across the file.
rg --type json '9c897afa-fbd6-43a5-bbf1-f904f80529fc'
rg --type json '2024-07-18 14:21:24.070000'
rg --type json '2024-07-18 14:21:24.250000'

Length of output: 1115


1962-1963: Verify the updated timestamps for Proc.With.SpecialChar.

The date_created and date_modified fields have been updated. Ensure these values are consistent with other related metadata and correctly formatted.

Verification successful

The updated timestamps for Proc.With.SpecialChar are consistent and correctly formatted.

The date_created and date_modified fields have been verified and appear consistently across the relevant files.

  • metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json
  • metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_with_lower_case_urn.json
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify consistency of updated timestamps for `Proc.With.SpecialChar`.

# Test: Search for the updated timestamps. Expect: Consistent values across the file.
rg --type json '2024-07-18 14:21:23.980000'

Length of output: 685

metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_with_lower_case_urn.json (2)

115-119: Verify the consistency and necessity of metadata updates.

The job_id, date_created, and date_modified properties have been updated. Ensure that these updates are consistent with the changes in the metadata and are necessary for the test cases.


1962-1963: Verify the consistency and necessity of metadata updates.

The date_created and date_modified properties for dataJobInfo have been updated. Ensure that these updates are consistent with the changes in the metadata and are necessary for the test cases.

metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json (24)

Line range hint 14-22:
Consistency check for status aspect.

The status aspect indicates whether the entity is removed. Ensure that the removed field is correctly set to false.


Line range hint 23-31:
Accuracy check for dataPlatformInstance aspect.

The dataPlatformInstance aspect specifies the platform. Ensure that the platform is correctly set to urn:li:dataPlatform:mssql.


Line range hint 32-40:
Ensure subTypes aspect is correctly defined.

The subTypes aspect specifies the type names. Ensure that the type names are correctly defined as Database.


Line range hint 41-49:
Validate browsePathsV2 aspect.

The browsePathsV2 aspect defines the path. Ensure that the path is correctly set and complete.


Line range hint 2345-2355:
Consistency check for status aspect.

The status aspect indicates whether the entity is removed. Ensure that the removed field is correctly set to false.


Line range hint 2361-2371:
Accuracy check for dataPlatformInstance aspect.

The dataPlatformInstance aspect specifies the platform. Ensure that the platform is correctly set to urn:li:dataPlatform:mssql.


2377-2387: Ensure subTypes aspect is correctly defined.

The subTypes aspect specifies the type names. Ensure that the type names are correctly defined as Database.


115-119: Ensure dataJobInfo aspect is complete and accurate.

The dataJobInfo aspect includes customProperties with job details such as job_id, job_name, description, date_created, and date_modified. Ensure that these properties are accurate and complete.

Verification successful

Verification Complete: dataJobInfo aspect is accurate and complete.

The dataJobInfo aspect in the file metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json contains the required properties (job_id, job_name, description, date_created, and date_modified) and appears to be accurate.

  • metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the accuracy of `dataJobInfo` aspect.

# Test: Search for the `dataJobInfo` aspect in the codebase. Expect: Correct job details.
rg --type json -A 5 $'"dataJobInfo"'

Length of output: 70859


Line range hint 2396-2406:
Validate browsePathsV2 aspect.

The browsePathsV2 aspect defines the path. Ensure that the path is correctly set and complete.


Line range hint 1-13:
Ensure containerProperties are complete and accurate.

The containerProperties aspect includes customProperties with platform, env, and database. Ensure that these properties are accurate and complete.

Verification successful

Verify the accuracy of containerProperties for mssql

The containerProperties aspect includes customProperties with platform, env, and database. Ensure that these properties are accurate and consistent across the codebase.

  • File: metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json
  • Lines: 112-122

The properties appear consistent with other instances of containerProperties for mssql across the codebase.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the accuracy of `containerProperties`.

# Test: Search for the container properties in the codebase. Expect: Correct platform, environment, and database values.
rg --type json -A 5 $'"containerProperties"'

Length of output: 372597


Line range hint 2345-2355:
Consistency check for status aspect.

The status aspect indicates whether the entity is removed. Ensure that the removed field is correctly set to false.


Line range hint 2361-2371:
Accuracy check for dataPlatformInstance aspect.

The dataPlatformInstance aspect specifies the platform. Ensure that the platform is correctly set to urn:li:dataPlatform:mssql.


Line range hint 1964-1974:
Consistency check for dataJobInputOutput aspect.

The dataJobInputOutput aspect specifies input and output datasets and jobs. Ensure that these fields are correctly set.


1962-1963: Ensure dataJobInfo aspect is complete and accurate.

The dataJobInfo aspect includes customProperties with job details such as code, input parameters, date_created, and date_modified. Ensure that these properties are accurate and complete.


2377-2387: Ensure subTypes aspect is correctly defined.

The subTypes aspect specifies the type names. Ensure that the type names are correctly defined as Database.


2324-2335: Ensure containerProperties are complete and accurate.

The containerProperties aspect includes customProperties with platform, env, and database. Ensure that these properties are accurate and complete.

Verification successful

Ensure containerProperties are complete and accurate.

The containerProperties aspect includes customProperties with platform, env, and database. Based on the verification, these properties are accurate and consistently defined across the codebase.

  • The platform is correctly set to mssql.
  • The env is correctly set to PROD.
  • The database is correctly set to lineagedb.
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the accuracy of `containerProperties`.

# Test: Search for the container properties in the codebase. Expect: Correct platform, environment, and database values.
rg --type json -A 5 $'"containerProperties"'

Length of output: 372597


Line range hint 2396-2406:
Validate browsePathsV2 aspect.

The browsePathsV2 aspect defines the path. Ensure that the path is correctly set and complete.


Line range hint 2434-2444:
Consistency check for status aspect.

The status aspect indicates whether the entity is removed. Ensure that the removed field is correctly set to false.


Line range hint 2450-2460:
Accuracy check for dataPlatformInstance aspect.

The dataPlatformInstance aspect specifies the platform. Ensure that the platform is correctly set to urn:li:dataPlatform:mssql.


Line range hint 2330-2340:
Ensure containerProperties are complete and accurate.

The containerProperties aspect includes customProperties with platform, env, and database. Ensure that these properties are accurate and complete.

Verification successful

Ensure containerProperties are complete and accurate.

The containerProperties aspect in the provided snippet includes customProperties with "platform": "mssql", "env": "PROD", and "database": "lineagedb". These values are consistent with other instances of containerProperties for mssql within the codebase. No discrepancies or missing properties were found.

  • metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json
Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the accuracy of `containerProperties`.

# Test: Search for the container properties in the codebase. Expect: Correct platform, environment, and database values.
rg --type json -A 5 $'"containerProperties"'

Length of output: 372597


Line range hint 2466-2476:
Ensure subTypes aspect is correctly defined.

The subTypes aspect specifies the type names. Ensure that the type names are correctly defined as Schema.


Line range hint 2484-2494:
Validate browsePathsV2 aspect.

The browsePathsV2 aspect defines the path. Ensure that the path is correctly set and complete.


Line range hint 2417-2427:
Ensure containerProperties are complete and accurate.

The containerProperties aspect includes customProperties with platform, env, database, and schema. Ensure that these properties are accurate and complete.

Verification successful

Ensure containerProperties are complete and accurate.

The containerProperties aspect includes customProperties with platform, env, database, and schema. Ensure that these properties are accurate and complete.

  • File: metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json
  • Lines: 2409-2426

The properties platform, env, database, and schema should be consistent with the expected values for an MSSQL platform in a production environment.

Scripts executed

The following scripts were executed for the analysis:

Script:

#!/bin/bash
# Description: Verify the accuracy of `containerProperties`.

# Test: Search for the container properties in the codebase. Expect: Correct platform, environment, database, and schema values.
rg --type json -A 5 $'"containerProperties"'

Length of output: 372597


Line range hint 120-130:
Consistency check for dataJobInputOutput aspect.

The dataJobInputOutput aspect specifies input and output datasets and jobs. Ensure that these fields are correctly set.

Comment on lines 296 to 576
referenced_type
FROM @referenced_info
WHERE id = @id
);
END TRY
BEGIN CATCH
PRINT ERROR_MESSAGE()
END CATCH;
END;

IF @referenced_object_type = 'SN'
BEGIN
BEGIN TRY
SET @SQL = 'SELECT
' + CAST(@id as NVARCHAR(10)) + '
, syn_obj.name
, syn_obj.type
, syn_obj.object_id
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.synonyms AS synon
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS syn_obj
ON OBJECT_ID(synon.base_object_name) = syn_obj.object_id
WHERE synon.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';';
PRINT @SQL;
INSERT INTO @synonym_info(
id
, referenced_name
, referenced_type
, referenced_object_id
)
EXEC sp_executesql @SQL;

UPDATE #ProceduresDependencies
SET
referenced_object_type = si.referenced_type,
referenced_entity_name = si.referenced_name,
referenced_id = si.referenced_object_id
FROM #ProceduresDependencies pd
INNER JOIN @synonym_info si
ON si.id = pd.id
WHERE CURRENT OF proc_depend_cursor;
END TRY
BEGIN CATCH
PRINT ERROR_MESSAGE()
END CATCH;
END;

FETCH NEXT FROM proc_depend_cursor INTO
@id
, @referenced_database_name
, @referenced_id
, @referenced_object_type;
END;
CLOSE proc_depend_cursor;
DEALLOCATE proc_depend_cursor;
END;
"""
)

_dependencies = conn.execute(
"""
SELECT DISTINCT
*
FROM #ProceduresDependencies
WHERE referenced_object_type IN ('U ', 'V ', 'P ');
"""
)

# PAY ATTENTION:
# at 19.07.2024 self.get_db_name method converts db name to lowercase forcely:
# return str(engine.url.database).strip('"').lower()
# to avoid inconsistency, referenced_database_name in this module repeats the same behavior
# remove after fixing
db_force_converting = (
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False
)

for row in _dependencies:

if row:
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}"

_procedures_dependencies.setdefault(_key, []).append(
{
"referenced_database_name": row[
"referenced_database_name"
].lower()
if db_force_converting
else row["referenced_database_name"],
"referenced_schema_name": row["referenced_schema_name"],
"referenced_entity_name": row["referenced_entity_name"],
"referenced_object_type": row["referenced_object_type"],
"is_selected": row["is_selected"],
"is_select_all": row["is_select_all"],
"is_updated": row["is_updated"],
}
)

trans.commit()

self.procedures_dependencies = _procedures_dependencies
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! Minor refactor suggestion.

The new method _populate_stored_procedures_dependencies is well-implemented and correctly gathers dependencies for stored procedures.

However, there is a minor code smell related to the unnecessary use of True if ... else False. This can be simplified as follows:

- db_force_converting = (True if "lower()" in inspect_objects.getsource(self.get_db_name) else False)
+ db_force_converting = "lower()" in inspect_objects.getsource(self.get_db_name)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _populate_stored_procedures_dependencies(self, conn: Connection) -> None:
_procedures_dependencies: Dict[str, List[Dict[str, str]]] = {}
trans = conn.begin()
conn.execute(
"""
BEGIN;
IF OBJECT_ID('tempdb.dbo.#ProceduresDependencies', 'U') IS NOT NULL
DROP TABLE #ProceduresDependencies;
CREATE TABLE #ProceduresDependencies(
id INT NOT NULL IDENTITY(1,1) PRIMARY KEY
, procedure_id INT
, current_db NVARCHAR(128)
, procedure_schema NVARCHAR(128)
, procedure_name NVARCHAR(128)
, type VARCHAR(2)
, referenced_server_name NVARCHAR(128)
, referenced_database_name NVARCHAR(128)
, referenced_schema_name NVARCHAR(128)
, referenced_entity_name NVARCHAR(128)
, referenced_id INT
, referenced_object_type VARCHAR(2)
, is_selected INT
, is_updated INT
, is_select_all INT
, is_all_columns_found INT
);
BEGIN TRY
WITH dependencies
AS (
SELECT
pro.object_id AS procedure_id
, db_name() AS current_db
, schema_name(pro.schema_id) AS procedure_schema
, pro.NAME AS procedure_name
, pro.type AS type
, p.referenced_id AS referenced_id
, CASE
WHEN p.referenced_server_name IS NULL AND p.referenced_id IS NOT NULL
THEN @@SERVERNAME
ELSE p.referenced_server_name
END AS referenced_server_name
, CASE
WHEN p.referenced_database_name IS NULL AND p.referenced_id IS NOT NULL
THEN db_name()
ELSE p.referenced_database_name
END AS referenced_database_name
, CASE
WHEN p.referenced_schema_name IS NULL AND p.referenced_id IS NOT NULL
THEN schema_name(ref_obj.schema_id)
ELSE p.referenced_schema_name
END AS referenced_schema_name
, p.referenced_entity_name AS referenced_entity_name
, ref_obj.type AS referenced_object_type
, p.is_selected AS is_selected
, p.is_updated AS is_updated
, p.is_select_all AS is_select_all
, p.is_all_columns_found AS is_all_columns_found
FROM sys.procedures AS pro
CROSS apply sys.dm_sql_referenced_entities(
Concat(schema_name(pro.schema_id), '.', pro.NAME),
'OBJECT') AS p
LEFT JOIN sys.objects AS ref_obj
ON p.referenced_id = ref_obj.object_id
)
INSERT INTO #ProceduresDependencies (
procedure_id
, current_db
, procedure_schema
, procedure_name
, type
, referenced_server_name
, referenced_database_name
, referenced_schema_name
, referenced_entity_name
, referenced_id
, referenced_object_type
, is_selected
, is_updated
, is_select_all
, is_all_columns_found)
SELECT DISTINCT
d.procedure_id
, d.current_db
, d.procedure_schema
, d.procedure_name
, d.type
, d.referenced_server_name
, d.referenced_database_name
, d.referenced_schema_name
, d.referenced_entity_name
, d.referenced_id
, d.referenced_object_type
, d.is_selected
, d.is_updated
, d.is_select_all
, d.is_all_columns_found
FROM dependencies AS d
WHERE d.referenced_database_name != 'msdb';
END TRY
BEGIN CATCH
IF ERROR_NUMBER() = 2020 or ERROR_NUMBER() = 942
PRINT ERROR_MESSAGE()
ELSE
THROW;
END CATCH;
DECLARE @id INT;
DECLARE @referenced_server_name NVARCHAR(128);
DECLARE @referenced_database_name NVARCHAR(128);
DECLARE @referenced_id INT;
DECLARE @referenced_object_type VARCHAR(2);
DECLARE @referenced_info TABLE (
id INT
, referenced_type VARCHAR(2)
, referenced_schema NVARCHAR(128)
, referenced_name NVARCHAR(128)
);
DECLARE @synonym_info TABLE (
id INT
, referenced_name NVARCHAR(1035)
, referenced_type VARCHAR(2)
, referenced_object_id INT
);
DECLARE @SQL NVARCHAR(MAX);
DECLARE proc_depend_cursor CURSOR FOR
SELECT
id
, referenced_database_name
, referenced_id
, referenced_object_type
FROM #ProceduresDependencies;
OPEN proc_depend_cursor
FETCH NEXT FROM proc_depend_cursor INTO
@id
, @referenced_database_name
, @referenced_id
, @referenced_object_type;
WHILE @@FETCH_STATUS = 0
BEGIN;
IF @referenced_id IS NOT NULL
BEGIN
BEGIN TRY
SET @SQL = 'SELECT
' + CAST(@id AS NVARCHAR(10)) + '
, o.type
, s.name
, o.name
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS o
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.schemas AS s
ON o.schema_id = s.schema_id
WHERE o.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';';
INSERT INTO @referenced_info(
id
, referenced_type
, referenced_schema
, referenced_name
)
EXEC sp_executesql @SQL;
UPDATE #ProceduresDependencies
SET
referenced_object_type = ri.referenced_type,
referenced_schema_name = ri.referenced_schema,
referenced_entity_name = ri.referenced_name
FROM #ProceduresDependencies pd
INNER JOIN @referenced_info ri
ON ri.id = pd.id
WHERE CURRENT OF proc_depend_cursor;
SET @referenced_object_type = (
SELECT
referenced_type
FROM @referenced_info
WHERE id = @id
);
END TRY
BEGIN CATCH
PRINT ERROR_MESSAGE()
END CATCH;
END;
IF @referenced_object_type = 'SN'
BEGIN
BEGIN TRY
SET @SQL = 'SELECT
' + CAST(@id as NVARCHAR(10)) + '
, syn_obj.name
, syn_obj.type
, syn_obj.object_id
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.synonyms AS synon
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS syn_obj
ON OBJECT_ID(synon.base_object_name) = syn_obj.object_id
WHERE synon.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';';
PRINT @SQL;
INSERT INTO @synonym_info(
id
, referenced_name
, referenced_type
, referenced_object_id
)
EXEC sp_executesql @SQL;
UPDATE #ProceduresDependencies
SET
referenced_object_type = si.referenced_type,
referenced_entity_name = si.referenced_name,
referenced_id = si.referenced_object_id
FROM #ProceduresDependencies pd
INNER JOIN @synonym_info si
ON si.id = pd.id
WHERE CURRENT OF proc_depend_cursor;
END TRY
BEGIN CATCH
PRINT ERROR_MESSAGE()
END CATCH;
END;
FETCH NEXT FROM proc_depend_cursor INTO
@id
, @referenced_database_name
, @referenced_id
, @referenced_object_type;
END;
CLOSE proc_depend_cursor;
DEALLOCATE proc_depend_cursor;
END;
"""
)
_dependencies = conn.execute(
"""
SELECT DISTINCT
*
FROM #ProceduresDependencies
WHERE referenced_object_type IN ('U ', 'V ', 'P ');
"""
)
# PAY ATTENTION:
# at 19.07.2024 self.get_db_name method converts db name to lowercase forcely:
# return str(engine.url.database).strip('"').lower()
# to avoid inconsistency, referenced_database_name in this module repeats the same behavior
# remove after fixing
db_force_converting = (
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False
)
for row in _dependencies:
if row:
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}"
_procedures_dependencies.setdefault(_key, []).append(
{
"referenced_database_name": row[
"referenced_database_name"
].lower()
if db_force_converting
else row["referenced_database_name"],
"referenced_schema_name": row["referenced_schema_name"],
"referenced_entity_name": row["referenced_entity_name"],
"referenced_object_type": row["referenced_object_type"],
"is_selected": row["is_selected"],
"is_select_all": row["is_select_all"],
"is_updated": row["is_updated"],
}
)
trans.commit()
self.procedures_dependencies = _procedures_dependencies
db_force_converting = "lower()" in inspect_objects.getsource(self.get_db_name)
Tools
Ruff

550-550: Remove unnecessary True if ... else False

Remove unnecessary True if ... else False

(SIM210)

# to avoid inconsistency, referenced_database_name in this module repeats the same behavior
# remove after fixing
db_force_converting = (
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems extremely hacky, and it's not clear to me why this is necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MSSQL db_name() function, that is used in qury of extraction dependencies, returns name of DB as it was declared. If name of DB was constructed in CamelCase or any uppercase symbols exist, function returns value as is:
изображение

But get_db_name contains logic of transforming name to lowercase style. To avoid this inconsistency, this small check is implemented.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that #10773 has been merged, can we remove this?

def as_input_datasets(self) -> List[str]:
return [
make_dataset_urn_with_platform_instance(
platform=dep.source,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dep.source is always mssql right?

Copy link
Contributor Author

@sleeperdeep sleeperdeep Aug 1, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct. Because procedure dependencies are extraqcting by _populate_stored_procedures_dependencies method, which query is built around:

@@ -237,6 +270,343 @@ def create(cls, config_dict: Dict, ctx: PipelineContext) -> "SQLServerSource":
config = SQLServerConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_ucs(self, inspector: Inspector, key: str) -> List[UpstreamClass]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is ucs? can we expand out the acronym

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is acronym of UpstreamClass -> UpstreamClass -> ucs. Can rename if it is confusing)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just call it get_upstreams

END CATCH;
END;

FETCH NEXT FROM proc_depend_cursor INTO
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does this need to be a procedure, rather than a more simple sql statement?

It's also a bit difficult for me to understand all of the logic in here - some comments and explanation would useful

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 8d6b7f9 and 6dd44dd.

Files selected for processing (2)
  • metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py (5 hunks)
  • metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (12 hunks)
Additional context used
Ruff
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py

550-550: Remove unnecessary True if ... else False

Remove unnecessary True if ... else False

(SIM210)

Additional comments not posted (13)
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py (7)

34-45: Ensure correctness in filtering dependencies for input datasets.

The get_input_datasets property correctly filters dependencies based on type and incoming status. Ensure that the types ("U ", "V ") and dep.incoming are correct and consistent with the rest of the codebase.


47-57: Ensure correctness in filtering dependencies for output datasets.

The get_output_datasets property correctly filters dependencies based on type and outgoing status. Ensure that the types ("U ", "V ") and dep.outgoing are correct and consistent with the rest of the codebase.


60-71: Ensure correctness in filtering dependencies for input data jobs.

The get_input_datajobs property correctly filters dependencies based on type and incoming status. Ensure that the type ("P ",) and dep.incoming are correct and consistent with the rest of the codebase.


115-115: Ensure correct initialization of the schema attribute.

The schema attribute is initialized to an empty string. Ensure that this is the intended default value and that it is correctly used throughout the codebase.


234-237: Ensure correctness in the renamed method get_datajob_input_output_aspect.

The method name change improves clarity. Ensure that the method correctly returns the DataJobInputOutputClass aspect.


Line range hint 242-249:
Ensure correctness in the renamed method get_datajob_info_aspect.

The method name change improves clarity. Ensure that the method correctly returns the DataJobInfoClass aspect.


Line range hint 280-285:
Ensure correctness in the renamed method get_dataflow_info_aspect.

The method name change improves clarity. Ensure that the method correctly returns the DataFlowInfoClass aspect.

metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (6)

109-112: Ensure correctness of the new configuration option mssql_lineage.

The new boolean configuration option mssql_lineage is correctly integrated. Ensure that it is used consistently throughout the codebase.


175-176: Initialize full_lineage and procedures_dependencies dictionaries.

The dictionaries full_lineage and procedures_dependencies are correctly initialized to hold lineage and dependencies information.


185-204: Ensure error handling in _populate_stored_procedures_dependencies.

The method _populate_stored_procedures_dependencies is correctly integrated with error handling. Ensure that the error messages provide sufficient context for debugging.


273-295: Ensure correctness in the get_ucs method.

The get_ucs method correctly retrieves upstream classes based on the configuration flag. Ensure that the lineage data is processed correctly when lineage tracking is enabled.


889-911: Ensure correctness in the _extract_procedure_dependency method.

The _extract_procedure_dependency method correctly extracts procedure dependencies and constructs the ProcedureLineageStream. Ensure that the dependencies are correctly filtered and represented.


Line range hint 841-886:
Ensure correctness in the loop_stored_procedures method.

The loop_stored_procedures method correctly integrates the new lineage tracking logic. Ensure that the dependencies are correctly extracted and assigned to the data_job.

Comment on lines 296 to 577
FROM @referenced_info
WHERE id = @id
);
END TRY
BEGIN CATCH
PRINT ERROR_MESSAGE()
END CATCH;
END;

IF @referenced_object_type = 'SN'
BEGIN
BEGIN TRY
SET @SQL = 'SELECT
' + CAST(@id as NVARCHAR(10)) + '
, syn_obj.name
, syn_obj.type
, syn_obj.object_id
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.synonyms AS synon
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS syn_obj
ON OBJECT_ID(synon.base_object_name) = syn_obj.object_id
WHERE synon.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';';
PRINT @SQL;
INSERT INTO @synonym_info(
id
, referenced_name
, referenced_type
, referenced_object_id
)
EXEC sp_executesql @SQL;

UPDATE #ProceduresDependencies
SET
referenced_object_type = si.referenced_type,
referenced_entity_name = si.referenced_name,
referenced_id = si.referenced_object_id
FROM #ProceduresDependencies pd
INNER JOIN @synonym_info si
ON si.id = pd.id
WHERE CURRENT OF proc_depend_cursor;
END TRY
BEGIN CATCH
PRINT ERROR_MESSAGE()
END CATCH;
END;

FETCH NEXT FROM proc_depend_cursor INTO
@id
, @referenced_database_name
, @referenced_id
, @referenced_object_type;
END;
CLOSE proc_depend_cursor;
DEALLOCATE proc_depend_cursor;
END;
"""
)

_dependencies = conn.execute(
"""
SELECT DISTINCT
*
FROM #ProceduresDependencies
WHERE referenced_object_type IN ('U ', 'V ', 'P ');
"""
)

# PAY ATTENTION:
# at 19.07.2024 self.get_db_name method converts db name to lowercase forcely:
# return str(engine.url.database).strip('"').lower()
# to avoid inconsistency, referenced_database_name in this module repeats the same behavior
# remove after fixing
db_force_converting = (
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False
)

for row in _dependencies:

if row:
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}"

_procedures_dependencies.setdefault(_key, []).append(
{
"referenced_database_name": row[
"referenced_database_name"
].lower()
if db_force_converting
else row["referenced_database_name"],
"referenced_schema_name": row["referenced_schema_name"],
"referenced_entity_name": row["referenced_entity_name"],
"referenced_object_type": row["referenced_object_type"],
"is_selected": row["is_selected"],
"is_select_all": row["is_select_all"],
"is_updated": row["is_updated"],
}
)

trans.commit()

self.procedures_dependencies = _procedures_dependencies

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize the _populate_stored_procedures_dependencies method.

The _populate_stored_procedures_dependencies method is well-implemented but contains a minor code smell related to the unnecessary use of True if ... else False. This can be simplified.

- db_force_converting = (True if "lower()" in inspect_objects.getsource(self.get_db_name) else False)
+ db_force_converting = "lower()" in inspect_objects.getsource(self.get_db_name)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _populate_stored_procedures_dependencies(self, conn: Connection) -> None:
_procedures_dependencies: Dict[str, List[Dict[str, str]]] = {}
trans = conn.begin()
conn.execute(
"""
BEGIN;
IF OBJECT_ID('tempdb.dbo.#ProceduresDependencies', 'U') IS NOT NULL
DROP TABLE #ProceduresDependencies;
CREATE TABLE #ProceduresDependencies(
id INT NOT NULL IDENTITY(1,1) PRIMARY KEY
, procedure_id INT
, current_db NVARCHAR(128)
, procedure_schema NVARCHAR(128)
, procedure_name NVARCHAR(128)
, type VARCHAR(2)
, referenced_server_name NVARCHAR(128)
, referenced_database_name NVARCHAR(128)
, referenced_schema_name NVARCHAR(128)
, referenced_entity_name NVARCHAR(128)
, referenced_id INT
, referenced_object_type VARCHAR(2)
, is_selected INT
, is_updated INT
, is_select_all INT
, is_all_columns_found INT
);
BEGIN TRY
WITH dependencies
AS (
SELECT
pro.object_id AS procedure_id
, db_name() AS current_db
, schema_name(pro.schema_id) AS procedure_schema
, pro.NAME AS procedure_name
, pro.type AS type
, p.referenced_id AS referenced_id
, CASE
WHEN p.referenced_server_name IS NULL AND p.referenced_id IS NOT NULL
THEN @@SERVERNAME
ELSE p.referenced_server_name
END AS referenced_server_name
, CASE
WHEN p.referenced_database_name IS NULL AND p.referenced_id IS NOT NULL
THEN db_name()
ELSE p.referenced_database_name
END AS referenced_database_name
, CASE
WHEN p.referenced_schema_name IS NULL AND p.referenced_id IS NOT NULL
THEN schema_name(ref_obj.schema_id)
ELSE p.referenced_schema_name
END AS referenced_schema_name
, p.referenced_entity_name AS referenced_entity_name
, ref_obj.type AS referenced_object_type
, p.is_selected AS is_selected
, p.is_updated AS is_updated
, p.is_select_all AS is_select_all
, p.is_all_columns_found AS is_all_columns_found
FROM sys.procedures AS pro
CROSS apply sys.dm_sql_referenced_entities(
Concat(schema_name(pro.schema_id), '.', pro.NAME),
'OBJECT') AS p
LEFT JOIN sys.objects AS ref_obj
ON p.referenced_id = ref_obj.object_id
)
INSERT INTO #ProceduresDependencies (
procedure_id
, current_db
, procedure_schema
, procedure_name
, type
, referenced_server_name
, referenced_database_name
, referenced_schema_name
, referenced_entity_name
, referenced_id
, referenced_object_type
, is_selected
, is_updated
, is_select_all
, is_all_columns_found)
SELECT DISTINCT
d.procedure_id
, d.current_db
, d.procedure_schema
, d.procedure_name
, d.type
, d.referenced_server_name
, d.referenced_database_name
, d.referenced_schema_name
, d.referenced_entity_name
, d.referenced_id
, d.referenced_object_type
, d.is_selected
, d.is_updated
, d.is_select_all
, d.is_all_columns_found
FROM dependencies AS d
WHERE d.referenced_database_name != 'msdb';
END TRY
BEGIN CATCH
IF ERROR_NUMBER() = 2020 or ERROR_NUMBER() = 942
PRINT ERROR_MESSAGE()
ELSE
THROW;
END CATCH;
DECLARE @id INT;
DECLARE @referenced_server_name NVARCHAR(128);
DECLARE @referenced_database_name NVARCHAR(128);
DECLARE @referenced_id INT;
DECLARE @referenced_object_type VARCHAR(2);
DECLARE @referenced_info TABLE (
id INT
, referenced_type VARCHAR(2)
, referenced_schema NVARCHAR(128)
, referenced_name NVARCHAR(128)
);
DECLARE @synonym_info TABLE (
id INT
, referenced_name NVARCHAR(1035)
, referenced_type VARCHAR(2)
, referenced_object_id INT
);
DECLARE @SQL NVARCHAR(MAX);
DECLARE proc_depend_cursor CURSOR FOR
SELECT
id
, referenced_database_name
, referenced_id
, referenced_object_type
FROM #ProceduresDependencies;
OPEN proc_depend_cursor
FETCH NEXT FROM proc_depend_cursor INTO
@id
, @referenced_database_name
, @referenced_id
, @referenced_object_type;
WHILE @@FETCH_STATUS = 0
BEGIN;
IF @referenced_id IS NOT NULL
BEGIN
BEGIN TRY
SET @SQL = 'SELECT
' + CAST(@id AS NVARCHAR(10)) + '
, o.type
, s.name
, o.name
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS o
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.schemas AS s
ON o.schema_id = s.schema_id
WHERE o.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';';
INSERT INTO @referenced_info(
id
, referenced_type
, referenced_schema
, referenced_name
)
EXEC sp_executesql @SQL;
UPDATE #ProceduresDependencies
SET
referenced_object_type = ri.referenced_type,
referenced_schema_name = ri.referenced_schema,
referenced_entity_name = ri.referenced_name
FROM #ProceduresDependencies pd
INNER JOIN @referenced_info ri
ON ri.id = pd.id
WHERE CURRENT OF proc_depend_cursor;
SET @referenced_object_type = (
SELECT
referenced_type
FROM @referenced_info
WHERE id = @id
);
END TRY
BEGIN CATCH
PRINT ERROR_MESSAGE()
END CATCH;
END;
IF @referenced_object_type = 'SN'
BEGIN
BEGIN TRY
SET @SQL = 'SELECT
' + CAST(@id as NVARCHAR(10)) + '
, syn_obj.name
, syn_obj.type
, syn_obj.object_id
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.synonyms AS synon
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS syn_obj
ON OBJECT_ID(synon.base_object_name) = syn_obj.object_id
WHERE synon.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';';
PRINT @SQL;
INSERT INTO @synonym_info(
id
, referenced_name
, referenced_type
, referenced_object_id
)
EXEC sp_executesql @SQL;
UPDATE #ProceduresDependencies
SET
referenced_object_type = si.referenced_type,
referenced_entity_name = si.referenced_name,
referenced_id = si.referenced_object_id
FROM #ProceduresDependencies pd
INNER JOIN @synonym_info si
ON si.id = pd.id
WHERE CURRENT OF proc_depend_cursor;
END TRY
BEGIN CATCH
PRINT ERROR_MESSAGE()
END CATCH;
END;
FETCH NEXT FROM proc_depend_cursor INTO
@id
, @referenced_database_name
, @referenced_id
, @referenced_object_type;
END;
CLOSE proc_depend_cursor;
DEALLOCATE proc_depend_cursor;
END;
"""
)
_dependencies = conn.execute(
"""
SELECT DISTINCT
*
FROM #ProceduresDependencies
WHERE referenced_object_type IN ('U ', 'V ', 'P ');
"""
)
# PAY ATTENTION:
# at 19.07.2024 self.get_db_name method converts db name to lowercase forcely:
# return str(engine.url.database).strip('"').lower()
# to avoid inconsistency, referenced_database_name in this module repeats the same behavior
# remove after fixing
db_force_converting = (
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False
)
for row in _dependencies:
if row:
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}"
_procedures_dependencies.setdefault(_key, []).append(
{
"referenced_database_name": row[
"referenced_database_name"
].lower()
if db_force_converting
else row["referenced_database_name"],
"referenced_schema_name": row["referenced_schema_name"],
"referenced_entity_name": row["referenced_entity_name"],
"referenced_object_type": row["referenced_object_type"],
"is_selected": row["is_selected"],
"is_select_all": row["is_select_all"],
"is_updated": row["is_updated"],
}
)
trans.commit()
self.procedures_dependencies = _procedures_dependencies
db_force_converting = "lower()" in inspect_objects.getsource(self.get_db_name)
for row in _dependencies:
if row:
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}"
_procedures_dependencies.setdefault(_key, []).append(
{
"referenced_database_name": row[
"referenced_database_name"
].lower()
if db_force_converting
else row["referenced_database_name"],
"referenced_schema_name": row["referenced_schema_name"],
"referenced_entity_name": row["referenced_entity_name"],
"referenced_object_type": row["referenced_object_type"],
"is_selected": row["is_selected"],
"is_select_all": row["is_select_all"],
"is_updated": row["is_updated"],
}
)
Tools
Ruff

550-550: Remove unnecessary True if ... else False

Remove unnecessary True if ... else False

(SIM210)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between 6dd44dd and b9f357b.

Files selected for processing (1)
  • metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (12 hunks)
Additional context used
Ruff
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py

552-552: Remove unnecessary True if ... else False

Remove unnecessary True if ... else False

(SIM210)

Additional comments not posted (3)
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (3)

110-113: LGTM! New configuration option added.

The addition of the mssql_lineage configuration option is correct and well-documented.


276-297: LGTM! Function correctly integrates new configuration option.

The get_ucs function correctly integrates the mssql_lineage configuration option to conditionally process lineage data.


889-911: LGTM! New function _extract_procedure_dependency added.

The _extract_procedure_dependency function is well-implemented and correctly consolidates the logic for extracting dependencies, improving clarity and maintainability.

Comment on lines 299 to 576
SET @referenced_object_type = (
SELECT
referenced_type
FROM @referenced_info
WHERE id = @id
);
END TRY
BEGIN CATCH
PRINT ERROR_MESSAGE()
END CATCH;
END;

IF @referenced_object_type = 'SN'
BEGIN
BEGIN TRY
SET @SQL = 'SELECT
' + CAST(@id as NVARCHAR(10)) + '
, syn_obj.name
, syn_obj.type
, syn_obj.object_id
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.synonyms AS synon
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS syn_obj
ON OBJECT_ID(synon.base_object_name) = syn_obj.object_id
WHERE synon.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';';
PRINT @SQL;
INSERT INTO @synonym_info(
id
, referenced_name
, referenced_type
, referenced_object_id
)
EXEC sp_executesql @SQL;

UPDATE #ProceduresDependencies
SET
referenced_object_type = si.referenced_type,
referenced_entity_name = si.referenced_name,
referenced_id = si.referenced_object_id
FROM #ProceduresDependencies pd
INNER JOIN @synonym_info si
ON si.id = pd.id
WHERE CURRENT OF proc_depend_cursor;
END TRY
BEGIN CATCH
PRINT ERROR_MESSAGE()
END CATCH;
END;

FETCH NEXT FROM proc_depend_cursor INTO
@id
, @referenced_database_name
, @referenced_id
, @referenced_object_type;
END;
CLOSE proc_depend_cursor;
DEALLOCATE proc_depend_cursor;
END;
"""
)

_dependencies = conn.execute(
"""
SELECT DISTINCT
*
FROM #ProceduresDependencies
WHERE referenced_object_type IN ('U ', 'V ', 'P ');
"""
)

# PAY ATTENTION:
# at 19.07.2024 self.get_db_name method converts db name to lowercase forcely:
# return str(engine.url.database).strip('"').lower()
# to avoid inconsistency, referenced_database_name in this module repeats the same behavior
# remove after fixing
db_force_converting = (
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False
)

for row in _dependencies:

if row:
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}"

self.procedures_dependencies[_key].append(
{
"referenced_database_name": row[
"referenced_database_name"
].lower()
if db_force_converting
else row["referenced_database_name"],
"referenced_schema_name": row["referenced_schema_name"],
"referenced_entity_name": row["referenced_entity_name"],
"referenced_object_type": row["referenced_object_type"],
"is_selected": row["is_selected"],
"is_select_all": row["is_select_all"],
"is_updated": row["is_updated"],
}
)

trans.commit()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optimize the _populate_stored_procedures_dependencies method.

The _populate_stored_procedures_dependencies method is well-implemented but contains a minor code smell related to the unnecessary use of True if ... else False. This can be simplified.

- db_force_converting = (True if "lower()" in inspect_objects.getsource(self.get_db_name) else False)
+ db_force_converting = "lower()" in inspect_objects.getsource(self.get_db_name)
Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def _populate_stored_procedures_dependencies(self, conn: Connection) -> None:
trans = conn.begin()
conn.execute(
"""
BEGIN;
IF OBJECT_ID('tempdb.dbo.#ProceduresDependencies', 'U') IS NOT NULL
DROP TABLE #ProceduresDependencies;
CREATE TABLE #ProceduresDependencies(
id INT NOT NULL IDENTITY(1,1) PRIMARY KEY
, procedure_id INT
, current_db NVARCHAR(128)
, procedure_schema NVARCHAR(128)
, procedure_name NVARCHAR(128)
, type VARCHAR(2)
, referenced_server_name NVARCHAR(128)
, referenced_database_name NVARCHAR(128)
, referenced_schema_name NVARCHAR(128)
, referenced_entity_name NVARCHAR(128)
, referenced_id INT
, referenced_object_type VARCHAR(2)
, is_selected INT
, is_updated INT
, is_select_all INT
, is_all_columns_found INT
);
BEGIN TRY
WITH dependencies
AS (
SELECT
pro.object_id AS procedure_id
, db_name() AS current_db
, schema_name(pro.schema_id) AS procedure_schema
, pro.NAME AS procedure_name
, pro.type AS type
, p.referenced_id AS referenced_id
, CASE
WHEN p.referenced_server_name IS NULL AND p.referenced_id IS NOT NULL
THEN @@SERVERNAME
ELSE p.referenced_server_name
END AS referenced_server_name
, CASE
WHEN p.referenced_database_name IS NULL AND p.referenced_id IS NOT NULL
THEN db_name()
ELSE p.referenced_database_name
END AS referenced_database_name
, CASE
WHEN p.referenced_schema_name IS NULL AND p.referenced_id IS NOT NULL
THEN schema_name(ref_obj.schema_id)
ELSE p.referenced_schema_name
END AS referenced_schema_name
, p.referenced_entity_name AS referenced_entity_name
, ref_obj.type AS referenced_object_type
, p.is_selected AS is_selected
, p.is_updated AS is_updated
, p.is_select_all AS is_select_all
, p.is_all_columns_found AS is_all_columns_found
FROM sys.procedures AS pro
CROSS apply sys.dm_sql_referenced_entities(
Concat(schema_name(pro.schema_id), '.', pro.NAME),
'OBJECT') AS p
LEFT JOIN sys.objects AS ref_obj
ON p.referenced_id = ref_obj.object_id
)
INSERT INTO #ProceduresDependencies (
procedure_id
, current_db
, procedure_schema
, procedure_name
, type
, referenced_server_name
, referenced_database_name
, referenced_schema_name
, referenced_entity_name
, referenced_id
, referenced_object_type
, is_selected
, is_updated
, is_select_all
, is_all_columns_found)
SELECT DISTINCT
d.procedure_id
, d.current_db
, d.procedure_schema
, d.procedure_name
, d.type
, d.referenced_server_name
, d.referenced_database_name
, d.referenced_schema_name
, d.referenced_entity_name
, d.referenced_id
, d.referenced_object_type
, d.is_selected
, d.is_updated
, d.is_select_all
, d.is_all_columns_found
FROM dependencies AS d
WHERE d.referenced_database_name != 'msdb';
END TRY
BEGIN CATCH
IF ERROR_NUMBER() = 2020 or ERROR_NUMBER() = 942
PRINT ERROR_MESSAGE()
ELSE
THROW;
END CATCH;
DECLARE @id INT;
DECLARE @referenced_server_name NVARCHAR(128);
DECLARE @referenced_database_name NVARCHAR(128);
DECLARE @referenced_id INT;
DECLARE @referenced_object_type VARCHAR(2);
DECLARE @referenced_info TABLE (
id INT
, referenced_type VARCHAR(2)
, referenced_schema NVARCHAR(128)
, referenced_name NVARCHAR(128)
);
DECLARE @synonym_info TABLE (
id INT
, referenced_name NVARCHAR(1035)
, referenced_type VARCHAR(2)
, referenced_object_id INT
);
DECLARE @SQL NVARCHAR(MAX);
DECLARE proc_depend_cursor CURSOR FOR
SELECT
id
, referenced_database_name
, referenced_id
, referenced_object_type
FROM #ProceduresDependencies;
OPEN proc_depend_cursor
FETCH NEXT FROM proc_depend_cursor INTO
@id
, @referenced_database_name
, @referenced_id
, @referenced_object_type;
WHILE @@FETCH_STATUS = 0
BEGIN;
IF @referenced_id IS NOT NULL
BEGIN
BEGIN TRY
SET @SQL = 'SELECT
' + CAST(@id AS NVARCHAR(10)) + '
, o.type
, s.name
, o.name
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS o
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.schemas AS s
ON o.schema_id = s.schema_id
WHERE o.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';';
INSERT INTO @referenced_info(
id
, referenced_type
, referenced_schema
, referenced_name
)
EXEC sp_executesql @SQL;
UPDATE #ProceduresDependencies
SET
referenced_object_type = ri.referenced_type,
referenced_schema_name = ri.referenced_schema,
referenced_entity_name = ri.referenced_name
FROM #ProceduresDependencies pd
INNER JOIN @referenced_info ri
ON ri.id = pd.id
WHERE CURRENT OF proc_depend_cursor;
SET @referenced_object_type = (
SELECT
referenced_type
FROM @referenced_info
WHERE id = @id
);
END TRY
BEGIN CATCH
PRINT ERROR_MESSAGE()
END CATCH;
END;
IF @referenced_object_type = 'SN'
BEGIN
BEGIN TRY
SET @SQL = 'SELECT
' + CAST(@id as NVARCHAR(10)) + '
, syn_obj.name
, syn_obj.type
, syn_obj.object_id
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.synonyms AS synon
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS syn_obj
ON OBJECT_ID(synon.base_object_name) = syn_obj.object_id
WHERE synon.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';';
PRINT @SQL;
INSERT INTO @synonym_info(
id
, referenced_name
, referenced_type
, referenced_object_id
)
EXEC sp_executesql @SQL;
UPDATE #ProceduresDependencies
SET
referenced_object_type = si.referenced_type,
referenced_entity_name = si.referenced_name,
referenced_id = si.referenced_object_id
FROM #ProceduresDependencies pd
INNER JOIN @synonym_info si
ON si.id = pd.id
WHERE CURRENT OF proc_depend_cursor;
END TRY
BEGIN CATCH
PRINT ERROR_MESSAGE()
END CATCH;
END;
FETCH NEXT FROM proc_depend_cursor INTO
@id
, @referenced_database_name
, @referenced_id
, @referenced_object_type;
END;
CLOSE proc_depend_cursor;
DEALLOCATE proc_depend_cursor;
END;
"""
)
_dependencies = conn.execute(
"""
SELECT DISTINCT
*
FROM #ProceduresDependencies
WHERE referenced_object_type IN ('U ', 'V ', 'P ');
"""
)
# PAY ATTENTION:
# at 19.07.2024 self.get_db_name method converts db name to lowercase forcely:
# return str(engine.url.database).strip('"').lower()
# to avoid inconsistency, referenced_database_name in this module repeats the same behavior
# remove after fixing
db_force_converting = (
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False
)
for row in _dependencies:
if row:
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}"
self.procedures_dependencies[_key].append(
{
"referenced_database_name": row[
"referenced_database_name"
].lower()
if db_force_converting
else row["referenced_database_name"],
"referenced_schema_name": row["referenced_schema_name"],
"referenced_entity_name": row["referenced_entity_name"],
"referenced_object_type": row["referenced_object_type"],
"is_selected": row["is_selected"],
"is_select_all": row["is_select_all"],
"is_updated": row["is_updated"],
}
)
trans.commit()
db_force_converting = "lower()" in inspect_objects.getsource(self.get_db_name)
Tools
Ruff

552-552: Remove unnecessary True if ... else False

Remove unnecessary True if ... else False

(SIM210)

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between b9f357b and b952656.

Files selected for processing (1)
  • metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (12 hunks)
Files skipped from review as they are similar to previous changes (1)
  • metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py

Copy link
Collaborator

@hsheth2 hsheth2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still have a few high level questions about what this does

@@ -237,6 +270,343 @@ def create(cls, config_dict: Dict, ctx: PipelineContext) -> "SQLServerSource":
config = SQLServerConfig.parse_obj(config_dict)
return cls(config, ctx)

def get_ucs(self, inspector: Inspector, key: str) -> List[UpstreamClass]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's just call it get_upstreams

FROM sys.sql_expression_dependencies
LEFT JOIN sys.objects AS so_dst ON so_dst.object_id = referencing_id
LEFT JOIN sys.objects AS so_src ON so_src.object_id = referenced_id
WHERE so_dst.type in ('U ', 'V ', 'P ')
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will the dst.type ever be a USER_TABLE?

"""
)
# {"U ": "USER_TABLE", "V ": "VIEW", "P ": "SQL_STORED_PROCEDURE"}
for row in _links:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do both this query and the other one give store procedure dependencies? bit confused by this

if upstreams:
yield MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
aspect=UpstreamLineage(upstreams=upstreams),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we also try to generate view lineage using sql parsing. That is capable of generating column lineage as well

This will likely overwrite that stuff, which doesn't feel right

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Review details

Configuration used: CodeRabbit UI
Review profile: CHILL

Commits

Files that changed from the base of the PR and between b9f357b and 3763980.

Files selected for processing (4)
  • metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (12 hunks)
  • metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_lineage.json (1 hunks)
  • metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json (26 hunks)
  • metadata-ingestion/tests/integration/sql_server/setup/setup.sql (1 hunks)
Files not reviewed due to server errors (2)
  • metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_lineage.json
  • metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py
Additional comments not posted (15)
metadata-ingestion/tests/integration/sql_server/setup/setup.sql (9)

96-96: LGTM! Database creation statement is correct.

The CREATE DATABASE LINEAGEDB statement is straightforward and correct.


100-100: LGTM! Schema creation statement is correct.

The CREATE SCHEMA schema_with_lineage statement is straightforward and correct.


102-102: LGTM! Table creation statement is correct.

The CREATE TABLE [schema_with_lineage].[table_number_one] (id VARCHAR(MAX)) statement is straightforward and correct.


104-104: LGTM! Table creation statement is correct.

The CREATE TABLE [schema_with_lineage].[table_number_two] (is_updated BIT) statement is straightforward and correct.


106-106: LGTM! View creation statement is correct.

The CREATE VIEW [schema_with_lineage].[view_of_table_number_two] AS SELECT is_updated FROM [schema_with_lineage].[table_number_two] statement is straightforward and correct.


108-108: LGTM! Table creation statement is correct.

The CREATE TABLE [schema_with_lineage].[table_number_three] (order_number INT, weight VARCHAR(MAX)) statement is straightforward and correct.


110-116: LGTM! Stored procedure creation statement is correct.

The CREATE PROCEDURE [schema_with_lineage].[procedure_number_one] statement is straightforward and correct.


118-126: LGTM! Stored procedure creation statement is correct.

The CREATE PROCEDURE [schema_with_lineage].[procedure_number_two] statement is straightforward and correct.


128-135: LGTM! Stored procedure creation statement is correct.

The CREATE PROCEDURE [schema_with_lineage].[procedure_number_three] statement is straightforward and correct.

metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json (6)

Line range hint 1-1:
Verify the correctness of entity type changes.

The entity type has been changed from dataFlow and dataJob to container. Ensure that this change aligns with the intended refactor and does not introduce inconsistencies in the metadata structure.

Also applies to: 14-14, 27-27, 40-40, 53-53


Line range hint 3-3:
Verify the correctness of aspect name changes.

The aspect names have been updated from status to more descriptive names like containerProperties, dataPlatformInstance, etc. Ensure that these changes are consistent and align with the intended refactor.

Also applies to: 16-16, 29-29, 42-42, 55-55


Line range hint 5-8:
Verify the correctness and completeness of custom properties.

New custom properties have been added to various entities, including platform, env, database, and schema. Ensure that these properties enhance the metadata's descriptive capabilities and are consistent with the rest of the structure.

Also applies to: 18-21, 31-34, 44-47, 57-60


118-119: Verify the correctness of date modifications.

The date_created and date_modified fields have been updated to reflect the current date. Ensure that these dates accurately reflect the recent changes and are consistent across all entities.

Also applies to: 1962-1963, 2368-2369, 2456-2457, 2674-2675


2324-2335: Verify the correctness and completeness of new entities.

Several new container entities have been introduced, each with its own set of properties and metadata. Ensure that these entities expand the scope of the metadata being captured and provide more granular insights into the SQL Server environment.

Also applies to: 2412-2423, 2521-2532, 2629-2640, 2738-2751


4600-4623: Verify the correctness of property removal.

Certain properties, such as procedure_depends_on and depending_on_procedure, have been removed. Ensure that this removal streamlines the data structure and focuses on more relevant information.

Also applies to: 4658-4671, 4705-4718, 4721-4735, 4737-4750

@sleeperdeep
Copy link
Contributor Author

@hsheth2
Can we proceed with review?

@sleeperdeep
Copy link
Contributor Author

@hsheth2
Hi! I found request from one of the user about possibility to build automatic lineage for stored procedure. Link to slack thread:
https://datahubspace.slack.com/archives/CUMUWQU66/p1725532378625809?thread_ts=1721023302.303249&cid=CUMUWQU66
My solution is covered this case. Seems like it is actual direction of development of mssql module. I described shortly my approach. We can continue discussion here.

@sleeperdeep sleeperdeep requested a review from hsheth2 September 6, 2024 17:14
@hsheth2
Copy link
Collaborator

hsheth2 commented Sep 12, 2024

I would like to get this merged in, but I do want to make sure I understand it enough that I would feel confident maintaining this code in the future

To that end, I had left a few comments/questions earlier which seem like they're still pending

Regarding the explicit generation of UpstreamClass aspect - the issue is that if we emit an upstreamLineage aspect in _process_table or _process_view, and then we also generate one from get_view_lineage, then the latter will overwrite the former. Ideally we'd move towards creating a single SqlParsingBuilder instance (or maybe even migrate to the more sophisticated SqlParsingAggregator), and then only add to lineage to that instance throughout those other methods.

The other high level comment is that datahub/ingestion/source/sql/mssql/source.py is getting somewhat unwieldy - it might be useful to split some of the logic out into a separate stored_procedure_extractor.py file

@sleeperdeep
Copy link
Contributor Author

@hsheth2
First of all, generall logic is next:

  1. Extracting dependencies of object in

_process_table

method:

upstreams = self.get_upstreams(inspector=inspector, key=_key)

  1. Doing the same but in

_process_view

method:

upstreams = self.get_upstreams(inspector=inspector, key=_key)

  1. Repeating the same but in

loop_stored_procedures

method:

I don't see any 'get_view_lineage' method, that you've menshened.

So, based on this logic, why do you think, we will overwrite something? Let's assume next simple situation:

CREATE TABLE table_one (id VARCHAR(MAX)); CREATE VIEW view_one AS (SELECT * FROM table_one); CREATE PROCEDURE [procedure_one] AS BEGIN DECLARE @t1 VARCHAR(MAX); SELECT @t1 = [id] FROM table_one; INSERT into table_one values (UPPER(@t1)); END;
But

We have table and view. For table downstream dependency is view. For view, upstream dependency is table. When we will execute this code, we will create 'UpstreamClass' aspect only for view:
{"fineGrainedLineages":[{"downstreamType":"FIELD","confidenceScore":1.0,"downstreams":["urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,master.dbo.view_one,QA),id)"],"upstreamType":"FIELD_SET","upstreams":["urn:li:schemaField:(urn:li:dataset:(urn:li:dataPlatform:mssql,master.dbo.table_one,QA),id)"]}],"upstreams":[{"type":"VIEW","auditStamp":{"actor":"urn:li:corpuser:unknown","time":0},"dataset":"urn:li:dataset:(urn:li:dataPlatform:mssql,master.dbo.table_one,QA)"}]}
For table, nothing will be created.

For stored procedures situation is the same: will be generated 'dataJobInputOutput' aspect:

def get_datajob_input_output_aspect(self) -> DataJobInputOutputClass:

{"inputDatajobs":[],"inputDatasets":["urn:li:dataset:(urn:li:dataPlatform:mssql,master.dbo.table_one,QA)"],"outputDatasets":["urn:li:dataset:(urn:li:dataPlatform:mssql,master.dbo.table_one,QA)"]}
But for table 'UpstreamClass' aspect will not be overwritten.

But I agree with you about overcomplexity datahub/ingestion/source/sql/mssql/source.py. I`ll make some changes about it.

Waiting for your response. Thanks!

@sleeperdeep
Copy link
Contributor Author

@hsheth2 I push new commit. Briefly: I tried to fix unwieldy of this module. And I little bit correct one of the query for lineage.
On othe your questions I tried to reply above.
Can you, please, check it? Because I can not verify, why Vercel is failed for this commit.
Thanks!

@sleeperdeep
Copy link
Contributor Author

@hsheth2 Can you, please, check current changes? I don`t understand why Vercel deployment is failed.

@sleeperdeep
Copy link
Contributor Author

@hsheth2 Can you, please, help me with this issue?

Copy link
Collaborator

@hsheth2 hsheth2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this is looking decent - had a bunch of small comments

The biggest thing missing now is docs - both around permissions, and around what the mssql connector is now capable of. E.g. extracting lineage from stored procedures and things is a pretty cool feature, and our docs should make sure folks know about it :)

@@ -0,0 +1 @@
EXEC [{{procedure_db}}].dbo.sp_helptext '{{procedure_name}}';
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we require additional permissions to be able to execute procedures?

@@ -0,0 +1 @@
EXEC [{{procedure_db}}].dbo.sp_helptext '{{procedure_name}}';
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imo it's more clear to have the short sql queries in the .py file, and only move the large ones out

we can do something like this to avoid the weird indentation

  def method(self, ...):
    conn.execute(f"""\
select * 
from whatever
""")
    ... other statements

platform_instance=dep.server,
)
for dep in self.dependencies
if dep.type in ("P ",) and dep.incoming
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a comment in the code explaining what "P " is for

with inspector.engine.connect() as conn:
if self.config.use_odbc:
self._add_output_converters(conn)
self._populate_table_descriptions(conn, db_name)
self._populate_column_descriptions(conn, db_name)
if self.config.mssql_lineage:
for inspector in self.get_inspectors():
db_name = self.get_db_name(inspector)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there a reason we're looping over self.get_inspectors() twice?

"""
path_to_table_description_query = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
"sql_queries/table_description_query.sql",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should have a constant at the top of the file like SQL_QUERIES_DIR

this should use the pathlib library

this code can then become query = (SQL_QUERIES_DIR / "filename.sql").read_text()

IF OBJECT_ID('tempdb.dbo.#ProceduresDependencies', 'U') IS NOT NULL
DROP TABLE #ProceduresDependencies;

CREATE TABLE #ProceduresDependencies(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will we need any new permissions for this? if so, we need to update the documentation

}
)

trans.commit()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need to commit this? shouldn't we be rolling it back?

also, do we need a try...finally to ensure rollback is called?

@@ -664,3 +829,9 @@ def get_identifier(
if self.config.convert_urns_to_lowercase
else qualified_table_name
)

@staticmethod
def parameterize_query(query: str, params: Dict[str, str]) -> str:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can use query.format(...)

DECLARE @t1 INT = 1
UPDATE [schema_with_lineage].[view_of_table_number_two] SET [is_updated] = CASE
WHEN @t1 = 1 THEN 1
ELSE 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this file seems to have a mix of tabs and spaces for indentation - was that intentional?

@sleeperdeep sleeperdeep requested a review from hsheth2 October 29, 2024 09:58
@sleeperdeep
Copy link
Contributor Author

@hsheth2 I fixed code according to your comments. Please, check PR again)

@hsheth2
Copy link
Collaborator

hsheth2 commented Nov 27, 2024

@sleeperdeep we recently merged this PR #11912, which attempts to do a similar thing as this PR. The main advantage of that other one is that it also generates column-level lineage.

@datahub-cyborg datahub-cyborg bot added the pending-submitter-response Issue/request has been reviewed but requires a response from the submitter label Nov 27, 2024
@sleeperdeep
Copy link
Contributor Author

sleeperdeep commented Dec 5, 2024

@hsheth2 thanks for your work! I ask you to not close current PR untill I will test your merged PR and compare it with my approach. I`ll let you know after testing, if Sql Parsing is satisfied of my purposes and if this some advantages. Thanks.

@datahub-cyborg datahub-cyborg bot added needs-review Label for PRs that need review from a maintainer. and removed pending-submitter-response Issue/request has been reviewed but requires a response from the submitter labels Dec 5, 2024
@hsheth2 hsheth2 added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Dec 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-contribution PR or Issue raised by member(s) of DataHub Community ingestion PR or Issue related to the ingestion of metadata pending-submitter-response Issue/request has been reviewed but requires a response from the submitter
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants