-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/update logic mssql lineage #10959
base: master
Are you sure you want to change the base?
Feature/update logic mssql lineage #10959
Conversation
…mssql objects; 3) add mssql_lineage flag for config
…mssql objects; 3) add mssql_lineage flag for config
…mssql objects; 3) add mssql_lineage flag for config
WalkthroughThe recent updates enhance the SQL Server ingestion process by integrating detailed lineage tracking capabilities, refining the management of SQL procedure dependencies, and modernizing JSON metadata structures. New attributes and methods facilitate improved data governance and traceability, while configuration options allow for customized lineage extraction from MSSQL databases. Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant SQLServerConfig
participant LineageTracker
participant Database
User->>SQLServerConfig: Set mssql_lineage = True
SQLServerConfig->>LineageTracker: Enable lineage tracking
LineageTracker->>Database: Retrieve dependencies
Database->>LineageTracker: Provide metadata and lineage info
LineageTracker->>User: Return lineage data
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Outside diff range, codebase verification and nitpick comments (1)
metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json (1)
Issues found in JSON structure and data consistency
The JSON file contains some entries with
null
values forentityUrn
,entityType
, andaspectName
, which indicates potential issues with data consistency. Please verify and correct the following entries:
- Entries with
null
values forentityUrn
- Entries with
null
values forentityType
- Entries with
null
values foraspectName
These inconsistencies need to be resolved to ensure the integrity of the metadata.
Analysis chain
Line range hint
1-1963
:
Ensure overall consistency and completeness of the JSON structure.Verify that all changes are consistent and the JSON structure is complete and correctly formatted.
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify overall consistency and completeness of the JSON structure. # Test: Use jq to validate JSON structure. Expect: Valid JSON structure. jq empty metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.jsonLength of output: 102
Script:
#!/bin/bash # Description: Verify the consistency of entity types, URNs, and aspects within the JSON file. # Test 1: Check for unique entity URNs. jq '.[].entityUrn' metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json | sort | uniq -c # Test 2: Check for consistent entity types. jq '.[].entityType' metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json | sort | uniq -c # Test 3: Check for consistent aspect names. jq '.[].aspectName' metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json | sort | uniq -cLength of output: 2285
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (8)
- metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py (3 hunks)
- metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (10 hunks)
- metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_lineage.json (1 hunks)
- metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json (26 hunks)
- metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json (3 hunks)
- metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_with_lower_case_urn.json (3 hunks)
- metadata-ingestion/tests/integration/sql_server/setup/setup.sql (1 hunks)
- metadata-ingestion/tests/integration/sql_server/source_files/mssql_lineage.yml (1 hunks)
Additional context used
Ruff
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py
550-550: Remove unnecessary
True if ... else False
Remove unnecessary
True if ... else False
(SIM210)
Additional comments not posted (60)
metadata-ingestion/tests/integration/sql_server/source_files/mssql_lineage.yml (1)
15-18
: LGTM!The sink configuration looks good.
metadata-ingestion/tests/integration/sql_server/setup/setup.sql (5)
96-97
: LGTM!The database creation command looks good.
100-101
: LGTM!The schema creation command looks good.
102-105
: LGTM!The table creation commands look good.
Also applies to: 108-109
106-107
: LGTM!The view creation command looks good.
110-117
: LGTM!The stored procedure creation command looks good.
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py (3)
18-26
: LGTM!The changes to the
ProcedureDependency
class look good.Also applies to: 29-31
36-76
: LGTM!The changes to the
ProcedureLineageStream
class look good.
119-119
: LGTM!The changes to the
MSSQLProceduresContainer
class look good.metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_lineage.json (19)
2-22
: LGTM!The JSON object representing the container entity is well-formed and contains all necessary fields.
23-38
: LGTM!The JSON object representing the container entity with the status aspect is well-formed and contains all necessary fields.
39-54
: LGTM!The JSON object representing the container entity with the dataPlatformInstance aspect is well-formed and contains all necessary fields.
55-72
: LGTM!The JSON object representing the container entity with the subTypes aspect is well-formed and contains all necessary fields.
73-88
: LGTM!The JSON object representing the container entity with the browsePathsV2 aspect is well-formed and contains all necessary fields.
89-197
: LGTM!The JSON object representing the container entity with various aspects is well-formed and contains all necessary fields.
198-213
: LGTM!The JSON object representing the dataset entity with the container aspect is well-formed and contains all necessary fields.
214-286
: LGTM!The JSON object representing the dataset snapshot with various aspects is well-formed and contains all necessary fields.
287-304
: LGTM!The JSON object representing the dataset entity with the subTypes aspect is well-formed and contains all necessary fields.
305-329
: LGTM!The JSON object representing the dataset entity with the browsePathsV2 aspect is well-formed and contains all necessary fields.
330-345
: LGTM!The JSON object representing the dataset entity with the container aspect is well-formed and contains all necessary fields.
346-418
: LGTM!The JSON object representing the dataset snapshot with various aspects is well-formed and contains all necessary fields.
419-436
: LGTM!The JSON object representing the dataset entity with the subTypes aspect is well-formed and contains all necessary fields.
437-461
: LGTM!The JSON object representing the dataset entity with the browsePathsV2 aspect is well-formed and contains all necessary fields.
462-477
: LGTM!The JSON object representing the dataset entity with the container aspect is well-formed and contains all necessary fields.
478-550
: LGTM!The JSON object representing the dataset snapshot with various aspects is well-formed and contains all necessary fields.
551-568
: LGTM!The JSON object representing the dataset entity with the subTypes aspect is well-formed and contains all necessary fields.
569-593
: LGTM!The JSON object representing the dataset entity with the browsePathsV2 aspect is well-formed and contains all necessary fields.
594-894
: LGTM!The JSON object representing various entities with multiple aspects is well-formed and contains all necessary fields.
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (4)
109-112
: LGTM!The new boolean field
mssql_lineage
for enabling automatic lineage tracking is well-implemented and follows the existing pattern for configuration options.
273-294
: LGTM!The new method
get_ucs
is well-implemented and correctly retrieves upstream lineage information based on themssql_lineage
configuration.
578-609
: LGTM!The new method
_populate_object_links
is well-implemented and correctly retrieves SQL expression dependencies.
889-910
: LGTM!The new method
_extract_procedure_dependency
is well-implemented and correctly extracts dependencies for a stored procedure.metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json (2)
115-119
: Verify the updated job_id and timestamps.The
job_id
,date_created
, anddate_modified
fields have been updated. Ensure these values are consistent with other related metadata and correctly formatted.Verification successful
Verified: The updated job_id and timestamps are consistent and correctly formatted.
The
job_id
,date_created
, anddate_modified
fields have been updated and are consistent across the relevant files.
job_id
:9c897afa-fbd6-43a5-bbf1-f904f80529fc
date_created
:2024-07-18 14:21:24.070000
date_modified
:2024-07-18 14:21:24.250000
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify consistency of updated job_id and timestamps. # Test: Search for the updated job_id and timestamps. Expect: Consistent values across the file. rg --type json '9c897afa-fbd6-43a5-bbf1-f904f80529fc' rg --type json '2024-07-18 14:21:24.070000' rg --type json '2024-07-18 14:21:24.250000'Length of output: 1115
1962-1963
: Verify the updated timestamps forProc.With.SpecialChar
.The
date_created
anddate_modified
fields have been updated. Ensure these values are consistent with other related metadata and correctly formatted.Verification successful
The updated timestamps for
Proc.With.SpecialChar
are consistent and correctly formatted.The
date_created
anddate_modified
fields have been verified and appear consistently across the relevant files.
metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_to_file.json
metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_with_lower_case_urn.json
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify consistency of updated timestamps for `Proc.With.SpecialChar`. # Test: Search for the updated timestamps. Expect: Consistent values across the file. rg --type json '2024-07-18 14:21:23.980000'Length of output: 685
metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_with_lower_case_urn.json (2)
115-119
: Verify the consistency and necessity of metadata updates.The
job_id
,date_created
, anddate_modified
properties have been updated. Ensure that these updates are consistent with the changes in the metadata and are necessary for the test cases.
1962-1963
: Verify the consistency and necessity of metadata updates.The
date_created
anddate_modified
properties fordataJobInfo
have been updated. Ensure that these updates are consistent with the changes in the metadata and are necessary for the test cases.metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json (24)
Line range hint
14-22
:
Consistency check forstatus
aspect.The
status
aspect indicates whether the entity is removed. Ensure that theremoved
field is correctly set tofalse
.
Line range hint
23-31
:
Accuracy check fordataPlatformInstance
aspect.The
dataPlatformInstance
aspect specifies the platform. Ensure that the platform is correctly set tourn:li:dataPlatform:mssql
.
Line range hint
32-40
:
EnsuresubTypes
aspect is correctly defined.The
subTypes
aspect specifies the type names. Ensure that the type names are correctly defined asDatabase
.
Line range hint
41-49
:
ValidatebrowsePathsV2
aspect.The
browsePathsV2
aspect defines the path. Ensure that the path is correctly set and complete.
Line range hint
2345-2355
:
Consistency check forstatus
aspect.The
status
aspect indicates whether the entity is removed. Ensure that theremoved
field is correctly set tofalse
.
Line range hint
2361-2371
:
Accuracy check fordataPlatformInstance
aspect.The
dataPlatformInstance
aspect specifies the platform. Ensure that the platform is correctly set tourn:li:dataPlatform:mssql
.
2377-2387
: EnsuresubTypes
aspect is correctly defined.The
subTypes
aspect specifies the type names. Ensure that the type names are correctly defined asDatabase
.
115-119
: EnsuredataJobInfo
aspect is complete and accurate.The
dataJobInfo
aspect includescustomProperties
with job details such asjob_id
,job_name
,description
,date_created
, anddate_modified
. Ensure that these properties are accurate and complete.Verification successful
Verification Complete:
dataJobInfo
aspect is accurate and complete.The
dataJobInfo
aspect in the filemetadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json
contains the required properties (job_id
,job_name
,description
,date_created
, anddate_modified
) and appears to be accurate.
metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the accuracy of `dataJobInfo` aspect. # Test: Search for the `dataJobInfo` aspect in the codebase. Expect: Correct job details. rg --type json -A 5 $'"dataJobInfo"'Length of output: 70859
Line range hint
2396-2406
:
ValidatebrowsePathsV2
aspect.The
browsePathsV2
aspect defines the path. Ensure that the path is correctly set and complete.
Line range hint
1-13
:
EnsurecontainerProperties
are complete and accurate.The
containerProperties
aspect includescustomProperties
withplatform
,env
, anddatabase
. Ensure that these properties are accurate and complete.Verification successful
Verify the accuracy of
containerProperties
formssql
The
containerProperties
aspect includescustomProperties
withplatform
,env
, anddatabase
. Ensure that these properties are accurate and consistent across the codebase.
- File:
metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json
- Lines: 112-122
The properties appear consistent with other instances of
containerProperties
formssql
across the codebase.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the accuracy of `containerProperties`. # Test: Search for the container properties in the codebase. Expect: Correct platform, environment, and database values. rg --type json -A 5 $'"containerProperties"'Length of output: 372597
Line range hint
2345-2355
:
Consistency check forstatus
aspect.The
status
aspect indicates whether the entity is removed. Ensure that theremoved
field is correctly set tofalse
.
Line range hint
2361-2371
:
Accuracy check fordataPlatformInstance
aspect.The
dataPlatformInstance
aspect specifies the platform. Ensure that the platform is correctly set tourn:li:dataPlatform:mssql
.
Line range hint
1964-1974
:
Consistency check fordataJobInputOutput
aspect.The
dataJobInputOutput
aspect specifies input and output datasets and jobs. Ensure that these fields are correctly set.
1962-1963
: EnsuredataJobInfo
aspect is complete and accurate.The
dataJobInfo
aspect includescustomProperties
with job details such ascode
,input parameters
,date_created
, anddate_modified
. Ensure that these properties are accurate and complete.
2377-2387
: EnsuresubTypes
aspect is correctly defined.The
subTypes
aspect specifies the type names. Ensure that the type names are correctly defined asDatabase
.
2324-2335
: EnsurecontainerProperties
are complete and accurate.The
containerProperties
aspect includescustomProperties
withplatform
,env
, anddatabase
. Ensure that these properties are accurate and complete.Verification successful
Ensure
containerProperties
are complete and accurate.The
containerProperties
aspect includescustomProperties
withplatform
,env
, anddatabase
. Based on the verification, these properties are accurate and consistently defined across the codebase.
- The
platform
is correctly set tomssql
.- The
env
is correctly set toPROD
.- The
database
is correctly set tolineagedb
.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the accuracy of `containerProperties`. # Test: Search for the container properties in the codebase. Expect: Correct platform, environment, and database values. rg --type json -A 5 $'"containerProperties"'Length of output: 372597
Line range hint
2396-2406
:
ValidatebrowsePathsV2
aspect.The
browsePathsV2
aspect defines the path. Ensure that the path is correctly set and complete.
Line range hint
2434-2444
:
Consistency check forstatus
aspect.The
status
aspect indicates whether the entity is removed. Ensure that theremoved
field is correctly set tofalse
.
Line range hint
2450-2460
:
Accuracy check fordataPlatformInstance
aspect.The
dataPlatformInstance
aspect specifies the platform. Ensure that the platform is correctly set tourn:li:dataPlatform:mssql
.
Line range hint
2330-2340
:
EnsurecontainerProperties
are complete and accurate.The
containerProperties
aspect includescustomProperties
withplatform
,env
, anddatabase
. Ensure that these properties are accurate and complete.Verification successful
Ensure
containerProperties
are complete and accurate.The
containerProperties
aspect in the provided snippet includescustomProperties
with"platform": "mssql"
,"env": "PROD"
, and"database": "lineagedb"
. These values are consistent with other instances ofcontainerProperties
formssql
within the codebase. No discrepancies or missing properties were found.
- metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json
Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the accuracy of `containerProperties`. # Test: Search for the container properties in the codebase. Expect: Correct platform, environment, and database values. rg --type json -A 5 $'"containerProperties"'Length of output: 372597
Line range hint
2466-2476
:
EnsuresubTypes
aspect is correctly defined.The
subTypes
aspect specifies the type names. Ensure that the type names are correctly defined asSchema
.
Line range hint
2484-2494
:
ValidatebrowsePathsV2
aspect.The
browsePathsV2
aspect defines the path. Ensure that the path is correctly set and complete.
Line range hint
2417-2427
:
EnsurecontainerProperties
are complete and accurate.The
containerProperties
aspect includescustomProperties
withplatform
,env
,database
, andschema
. Ensure that these properties are accurate and complete.Verification successful
Ensure
containerProperties
are complete and accurate.The
containerProperties
aspect includescustomProperties
withplatform
,env
,database
, andschema
. Ensure that these properties are accurate and complete.
- File:
metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json
- Lines: 2409-2426
The properties
platform
,env
,database
, andschema
should be consistent with the expected values for an MSSQL platform in a production environment.Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash # Description: Verify the accuracy of `containerProperties`. # Test: Search for the container properties in the codebase. Expect: Correct platform, environment, database, and schema values. rg --type json -A 5 $'"containerProperties"'Length of output: 372597
Line range hint
120-130
:
Consistency check fordataJobInputOutput
aspect.The
dataJobInputOutput
aspect specifies input and output datasets and jobs. Ensure that these fields are correctly set.
metadata-ingestion/tests/integration/sql_server/source_files/mssql_lineage.yml
Show resolved
Hide resolved
referenced_type | ||
FROM @referenced_info | ||
WHERE id = @id | ||
); | ||
END TRY | ||
BEGIN CATCH | ||
PRINT ERROR_MESSAGE() | ||
END CATCH; | ||
END; | ||
|
||
IF @referenced_object_type = 'SN' | ||
BEGIN | ||
BEGIN TRY | ||
SET @SQL = 'SELECT | ||
' + CAST(@id as NVARCHAR(10)) + ' | ||
, syn_obj.name | ||
, syn_obj.type | ||
, syn_obj.object_id | ||
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.synonyms AS synon | ||
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS syn_obj | ||
ON OBJECT_ID(synon.base_object_name) = syn_obj.object_id | ||
WHERE synon.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';'; | ||
PRINT @SQL; | ||
INSERT INTO @synonym_info( | ||
id | ||
, referenced_name | ||
, referenced_type | ||
, referenced_object_id | ||
) | ||
EXEC sp_executesql @SQL; | ||
|
||
UPDATE #ProceduresDependencies | ||
SET | ||
referenced_object_type = si.referenced_type, | ||
referenced_entity_name = si.referenced_name, | ||
referenced_id = si.referenced_object_id | ||
FROM #ProceduresDependencies pd | ||
INNER JOIN @synonym_info si | ||
ON si.id = pd.id | ||
WHERE CURRENT OF proc_depend_cursor; | ||
END TRY | ||
BEGIN CATCH | ||
PRINT ERROR_MESSAGE() | ||
END CATCH; | ||
END; | ||
|
||
FETCH NEXT FROM proc_depend_cursor INTO | ||
@id | ||
, @referenced_database_name | ||
, @referenced_id | ||
, @referenced_object_type; | ||
END; | ||
CLOSE proc_depend_cursor; | ||
DEALLOCATE proc_depend_cursor; | ||
END; | ||
""" | ||
) | ||
|
||
_dependencies = conn.execute( | ||
""" | ||
SELECT DISTINCT | ||
* | ||
FROM #ProceduresDependencies | ||
WHERE referenced_object_type IN ('U ', 'V ', 'P '); | ||
""" | ||
) | ||
|
||
# PAY ATTENTION: | ||
# at 19.07.2024 self.get_db_name method converts db name to lowercase forcely: | ||
# return str(engine.url.database).strip('"').lower() | ||
# to avoid inconsistency, referenced_database_name in this module repeats the same behavior | ||
# remove after fixing | ||
db_force_converting = ( | ||
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False | ||
) | ||
|
||
for row in _dependencies: | ||
|
||
if row: | ||
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}" | ||
|
||
_procedures_dependencies.setdefault(_key, []).append( | ||
{ | ||
"referenced_database_name": row[ | ||
"referenced_database_name" | ||
].lower() | ||
if db_force_converting | ||
else row["referenced_database_name"], | ||
"referenced_schema_name": row["referenced_schema_name"], | ||
"referenced_entity_name": row["referenced_entity_name"], | ||
"referenced_object_type": row["referenced_object_type"], | ||
"is_selected": row["is_selected"], | ||
"is_select_all": row["is_select_all"], | ||
"is_updated": row["is_updated"], | ||
} | ||
) | ||
|
||
trans.commit() | ||
|
||
self.procedures_dependencies = _procedures_dependencies |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! Minor refactor suggestion.
The new method _populate_stored_procedures_dependencies
is well-implemented and correctly gathers dependencies for stored procedures.
However, there is a minor code smell related to the unnecessary use of True if ... else False
. This can be simplified as follows:
- db_force_converting = (True if "lower()" in inspect_objects.getsource(self.get_db_name) else False)
+ db_force_converting = "lower()" in inspect_objects.getsource(self.get_db_name)
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def _populate_stored_procedures_dependencies(self, conn: Connection) -> None: | |
_procedures_dependencies: Dict[str, List[Dict[str, str]]] = {} | |
trans = conn.begin() | |
conn.execute( | |
""" | |
BEGIN; | |
IF OBJECT_ID('tempdb.dbo.#ProceduresDependencies', 'U') IS NOT NULL | |
DROP TABLE #ProceduresDependencies; | |
CREATE TABLE #ProceduresDependencies( | |
id INT NOT NULL IDENTITY(1,1) PRIMARY KEY | |
, procedure_id INT | |
, current_db NVARCHAR(128) | |
, procedure_schema NVARCHAR(128) | |
, procedure_name NVARCHAR(128) | |
, type VARCHAR(2) | |
, referenced_server_name NVARCHAR(128) | |
, referenced_database_name NVARCHAR(128) | |
, referenced_schema_name NVARCHAR(128) | |
, referenced_entity_name NVARCHAR(128) | |
, referenced_id INT | |
, referenced_object_type VARCHAR(2) | |
, is_selected INT | |
, is_updated INT | |
, is_select_all INT | |
, is_all_columns_found INT | |
); | |
BEGIN TRY | |
WITH dependencies | |
AS ( | |
SELECT | |
pro.object_id AS procedure_id | |
, db_name() AS current_db | |
, schema_name(pro.schema_id) AS procedure_schema | |
, pro.NAME AS procedure_name | |
, pro.type AS type | |
, p.referenced_id AS referenced_id | |
, CASE | |
WHEN p.referenced_server_name IS NULL AND p.referenced_id IS NOT NULL | |
THEN @@SERVERNAME | |
ELSE p.referenced_server_name | |
END AS referenced_server_name | |
, CASE | |
WHEN p.referenced_database_name IS NULL AND p.referenced_id IS NOT NULL | |
THEN db_name() | |
ELSE p.referenced_database_name | |
END AS referenced_database_name | |
, CASE | |
WHEN p.referenced_schema_name IS NULL AND p.referenced_id IS NOT NULL | |
THEN schema_name(ref_obj.schema_id) | |
ELSE p.referenced_schema_name | |
END AS referenced_schema_name | |
, p.referenced_entity_name AS referenced_entity_name | |
, ref_obj.type AS referenced_object_type | |
, p.is_selected AS is_selected | |
, p.is_updated AS is_updated | |
, p.is_select_all AS is_select_all | |
, p.is_all_columns_found AS is_all_columns_found | |
FROM sys.procedures AS pro | |
CROSS apply sys.dm_sql_referenced_entities( | |
Concat(schema_name(pro.schema_id), '.', pro.NAME), | |
'OBJECT') AS p | |
LEFT JOIN sys.objects AS ref_obj | |
ON p.referenced_id = ref_obj.object_id | |
) | |
INSERT INTO #ProceduresDependencies ( | |
procedure_id | |
, current_db | |
, procedure_schema | |
, procedure_name | |
, type | |
, referenced_server_name | |
, referenced_database_name | |
, referenced_schema_name | |
, referenced_entity_name | |
, referenced_id | |
, referenced_object_type | |
, is_selected | |
, is_updated | |
, is_select_all | |
, is_all_columns_found) | |
SELECT DISTINCT | |
d.procedure_id | |
, d.current_db | |
, d.procedure_schema | |
, d.procedure_name | |
, d.type | |
, d.referenced_server_name | |
, d.referenced_database_name | |
, d.referenced_schema_name | |
, d.referenced_entity_name | |
, d.referenced_id | |
, d.referenced_object_type | |
, d.is_selected | |
, d.is_updated | |
, d.is_select_all | |
, d.is_all_columns_found | |
FROM dependencies AS d | |
WHERE d.referenced_database_name != 'msdb'; | |
END TRY | |
BEGIN CATCH | |
IF ERROR_NUMBER() = 2020 or ERROR_NUMBER() = 942 | |
PRINT ERROR_MESSAGE() | |
ELSE | |
THROW; | |
END CATCH; | |
DECLARE @id INT; | |
DECLARE @referenced_server_name NVARCHAR(128); | |
DECLARE @referenced_database_name NVARCHAR(128); | |
DECLARE @referenced_id INT; | |
DECLARE @referenced_object_type VARCHAR(2); | |
DECLARE @referenced_info TABLE ( | |
id INT | |
, referenced_type VARCHAR(2) | |
, referenced_schema NVARCHAR(128) | |
, referenced_name NVARCHAR(128) | |
); | |
DECLARE @synonym_info TABLE ( | |
id INT | |
, referenced_name NVARCHAR(1035) | |
, referenced_type VARCHAR(2) | |
, referenced_object_id INT | |
); | |
DECLARE @SQL NVARCHAR(MAX); | |
DECLARE proc_depend_cursor CURSOR FOR | |
SELECT | |
id | |
, referenced_database_name | |
, referenced_id | |
, referenced_object_type | |
FROM #ProceduresDependencies; | |
OPEN proc_depend_cursor | |
FETCH NEXT FROM proc_depend_cursor INTO | |
@id | |
, @referenced_database_name | |
, @referenced_id | |
, @referenced_object_type; | |
WHILE @@FETCH_STATUS = 0 | |
BEGIN; | |
IF @referenced_id IS NOT NULL | |
BEGIN | |
BEGIN TRY | |
SET @SQL = 'SELECT | |
' + CAST(@id AS NVARCHAR(10)) + ' | |
, o.type | |
, s.name | |
, o.name | |
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS o | |
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.schemas AS s | |
ON o.schema_id = s.schema_id | |
WHERE o.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';'; | |
INSERT INTO @referenced_info( | |
id | |
, referenced_type | |
, referenced_schema | |
, referenced_name | |
) | |
EXEC sp_executesql @SQL; | |
UPDATE #ProceduresDependencies | |
SET | |
referenced_object_type = ri.referenced_type, | |
referenced_schema_name = ri.referenced_schema, | |
referenced_entity_name = ri.referenced_name | |
FROM #ProceduresDependencies pd | |
INNER JOIN @referenced_info ri | |
ON ri.id = pd.id | |
WHERE CURRENT OF proc_depend_cursor; | |
SET @referenced_object_type = ( | |
SELECT | |
referenced_type | |
FROM @referenced_info | |
WHERE id = @id | |
); | |
END TRY | |
BEGIN CATCH | |
PRINT ERROR_MESSAGE() | |
END CATCH; | |
END; | |
IF @referenced_object_type = 'SN' | |
BEGIN | |
BEGIN TRY | |
SET @SQL = 'SELECT | |
' + CAST(@id as NVARCHAR(10)) + ' | |
, syn_obj.name | |
, syn_obj.type | |
, syn_obj.object_id | |
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.synonyms AS synon | |
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS syn_obj | |
ON OBJECT_ID(synon.base_object_name) = syn_obj.object_id | |
WHERE synon.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';'; | |
PRINT @SQL; | |
INSERT INTO @synonym_info( | |
id | |
, referenced_name | |
, referenced_type | |
, referenced_object_id | |
) | |
EXEC sp_executesql @SQL; | |
UPDATE #ProceduresDependencies | |
SET | |
referenced_object_type = si.referenced_type, | |
referenced_entity_name = si.referenced_name, | |
referenced_id = si.referenced_object_id | |
FROM #ProceduresDependencies pd | |
INNER JOIN @synonym_info si | |
ON si.id = pd.id | |
WHERE CURRENT OF proc_depend_cursor; | |
END TRY | |
BEGIN CATCH | |
PRINT ERROR_MESSAGE() | |
END CATCH; | |
END; | |
FETCH NEXT FROM proc_depend_cursor INTO | |
@id | |
, @referenced_database_name | |
, @referenced_id | |
, @referenced_object_type; | |
END; | |
CLOSE proc_depend_cursor; | |
DEALLOCATE proc_depend_cursor; | |
END; | |
""" | |
) | |
_dependencies = conn.execute( | |
""" | |
SELECT DISTINCT | |
* | |
FROM #ProceduresDependencies | |
WHERE referenced_object_type IN ('U ', 'V ', 'P '); | |
""" | |
) | |
# PAY ATTENTION: | |
# at 19.07.2024 self.get_db_name method converts db name to lowercase forcely: | |
# return str(engine.url.database).strip('"').lower() | |
# to avoid inconsistency, referenced_database_name in this module repeats the same behavior | |
# remove after fixing | |
db_force_converting = ( | |
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False | |
) | |
for row in _dependencies: | |
if row: | |
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}" | |
_procedures_dependencies.setdefault(_key, []).append( | |
{ | |
"referenced_database_name": row[ | |
"referenced_database_name" | |
].lower() | |
if db_force_converting | |
else row["referenced_database_name"], | |
"referenced_schema_name": row["referenced_schema_name"], | |
"referenced_entity_name": row["referenced_entity_name"], | |
"referenced_object_type": row["referenced_object_type"], | |
"is_selected": row["is_selected"], | |
"is_select_all": row["is_select_all"], | |
"is_updated": row["is_updated"], | |
} | |
) | |
trans.commit() | |
self.procedures_dependencies = _procedures_dependencies | |
db_force_converting = "lower()" in inspect_objects.getsource(self.get_db_name) |
Tools
Ruff
550-550: Remove unnecessary
True if ... else False
Remove unnecessary
True if ... else False
(SIM210)
# to avoid inconsistency, referenced_database_name in this module repeats the same behavior | ||
# remove after fixing | ||
db_force_converting = ( | ||
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems extremely hacky, and it's not clear to me why this is necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MSSQL db_name() function, that is used in qury of extraction dependencies, returns name of DB as it was declared. If name of DB was constructed in CamelCase or any uppercase symbols exist, function returns value as is:
But get_db_name contains logic of transforming name to lowercase style. To avoid this inconsistency, this small check is implemented.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
now that #10773 has been merged, can we remove this?
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py
Outdated
Show resolved
Hide resolved
def as_input_datasets(self) -> List[str]: | ||
return [ | ||
make_dataset_urn_with_platform_instance( | ||
platform=dep.source, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dep.source is always mssql right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct. Because procedure dependencies are extraqcting by _populate_stored_procedures_dependencies method, which query is built around:
- sys.procedures
- sys.dm_sql_referenced_entities
- sys.objects
So, we assume, that all related entities are mssql objects by default.
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py
Outdated
Show resolved
Hide resolved
@@ -237,6 +270,343 @@ def create(cls, config_dict: Dict, ctx: PipelineContext) -> "SQLServerSource": | |||
config = SQLServerConfig.parse_obj(config_dict) | |||
return cls(config, ctx) | |||
|
|||
def get_ucs(self, inspector: Inspector, key: str) -> List[UpstreamClass]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what is ucs? can we expand out the acronym
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is acronym of UpstreamClass -> UpstreamClass -> ucs. Can rename if it is confusing)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just call it get_upstreams
END CATCH; | ||
END; | ||
|
||
FETCH NEXT FROM proc_depend_cursor INTO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why does this need to be a procedure, rather than a more simple sql statement?
It's also a bit difficult for me to understand all of the logic in here - some comments and explanation would useful
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (2)
- metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py (5 hunks)
- metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (12 hunks)
Additional context used
Ruff
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py
550-550: Remove unnecessary
True if ... else False
Remove unnecessary
True if ... else False
(SIM210)
Additional comments not posted (13)
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/job_models.py (7)
34-45
: Ensure correctness in filtering dependencies for input datasets.The
get_input_datasets
property correctly filters dependencies based on type and incoming status. Ensure that the types("U ", "V ")
anddep.incoming
are correct and consistent with the rest of the codebase.
47-57
: Ensure correctness in filtering dependencies for output datasets.The
get_output_datasets
property correctly filters dependencies based on type and outgoing status. Ensure that the types("U ", "V ")
anddep.outgoing
are correct and consistent with the rest of the codebase.
60-71
: Ensure correctness in filtering dependencies for input data jobs.The
get_input_datajobs
property correctly filters dependencies based on type and incoming status. Ensure that the type("P ",)
anddep.incoming
are correct and consistent with the rest of the codebase.
115-115
: Ensure correct initialization of theschema
attribute.The
schema
attribute is initialized to an empty string. Ensure that this is the intended default value and that it is correctly used throughout the codebase.
234-237
: Ensure correctness in the renamed methodget_datajob_input_output_aspect
.The method name change improves clarity. Ensure that the method correctly returns the
DataJobInputOutputClass
aspect.
Line range hint
242-249
:
Ensure correctness in the renamed methodget_datajob_info_aspect
.The method name change improves clarity. Ensure that the method correctly returns the
DataJobInfoClass
aspect.
Line range hint
280-285
:
Ensure correctness in the renamed methodget_dataflow_info_aspect
.The method name change improves clarity. Ensure that the method correctly returns the
DataFlowInfoClass
aspect.metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (6)
109-112
: Ensure correctness of the new configuration optionmssql_lineage
.The new boolean configuration option
mssql_lineage
is correctly integrated. Ensure that it is used consistently throughout the codebase.
175-176
: Initializefull_lineage
andprocedures_dependencies
dictionaries.The dictionaries
full_lineage
andprocedures_dependencies
are correctly initialized to hold lineage and dependencies information.
185-204
: Ensure error handling in_populate_stored_procedures_dependencies
.The method
_populate_stored_procedures_dependencies
is correctly integrated with error handling. Ensure that the error messages provide sufficient context for debugging.
273-295
: Ensure correctness in theget_ucs
method.The
get_ucs
method correctly retrieves upstream classes based on the configuration flag. Ensure that the lineage data is processed correctly when lineage tracking is enabled.
889-911
: Ensure correctness in the_extract_procedure_dependency
method.The
_extract_procedure_dependency
method correctly extracts procedure dependencies and constructs theProcedureLineageStream
. Ensure that the dependencies are correctly filtered and represented.
Line range hint
841-886
:
Ensure correctness in theloop_stored_procedures
method.The
loop_stored_procedures
method correctly integrates the new lineage tracking logic. Ensure that the dependencies are correctly extracted and assigned to thedata_job
.
FROM @referenced_info | ||
WHERE id = @id | ||
); | ||
END TRY | ||
BEGIN CATCH | ||
PRINT ERROR_MESSAGE() | ||
END CATCH; | ||
END; | ||
|
||
IF @referenced_object_type = 'SN' | ||
BEGIN | ||
BEGIN TRY | ||
SET @SQL = 'SELECT | ||
' + CAST(@id as NVARCHAR(10)) + ' | ||
, syn_obj.name | ||
, syn_obj.type | ||
, syn_obj.object_id | ||
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.synonyms AS synon | ||
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS syn_obj | ||
ON OBJECT_ID(synon.base_object_name) = syn_obj.object_id | ||
WHERE synon.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';'; | ||
PRINT @SQL; | ||
INSERT INTO @synonym_info( | ||
id | ||
, referenced_name | ||
, referenced_type | ||
, referenced_object_id | ||
) | ||
EXEC sp_executesql @SQL; | ||
|
||
UPDATE #ProceduresDependencies | ||
SET | ||
referenced_object_type = si.referenced_type, | ||
referenced_entity_name = si.referenced_name, | ||
referenced_id = si.referenced_object_id | ||
FROM #ProceduresDependencies pd | ||
INNER JOIN @synonym_info si | ||
ON si.id = pd.id | ||
WHERE CURRENT OF proc_depend_cursor; | ||
END TRY | ||
BEGIN CATCH | ||
PRINT ERROR_MESSAGE() | ||
END CATCH; | ||
END; | ||
|
||
FETCH NEXT FROM proc_depend_cursor INTO | ||
@id | ||
, @referenced_database_name | ||
, @referenced_id | ||
, @referenced_object_type; | ||
END; | ||
CLOSE proc_depend_cursor; | ||
DEALLOCATE proc_depend_cursor; | ||
END; | ||
""" | ||
) | ||
|
||
_dependencies = conn.execute( | ||
""" | ||
SELECT DISTINCT | ||
* | ||
FROM #ProceduresDependencies | ||
WHERE referenced_object_type IN ('U ', 'V ', 'P '); | ||
""" | ||
) | ||
|
||
# PAY ATTENTION: | ||
# at 19.07.2024 self.get_db_name method converts db name to lowercase forcely: | ||
# return str(engine.url.database).strip('"').lower() | ||
# to avoid inconsistency, referenced_database_name in this module repeats the same behavior | ||
# remove after fixing | ||
db_force_converting = ( | ||
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False | ||
) | ||
|
||
for row in _dependencies: | ||
|
||
if row: | ||
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}" | ||
|
||
_procedures_dependencies.setdefault(_key, []).append( | ||
{ | ||
"referenced_database_name": row[ | ||
"referenced_database_name" | ||
].lower() | ||
if db_force_converting | ||
else row["referenced_database_name"], | ||
"referenced_schema_name": row["referenced_schema_name"], | ||
"referenced_entity_name": row["referenced_entity_name"], | ||
"referenced_object_type": row["referenced_object_type"], | ||
"is_selected": row["is_selected"], | ||
"is_select_all": row["is_select_all"], | ||
"is_updated": row["is_updated"], | ||
} | ||
) | ||
|
||
trans.commit() | ||
|
||
self.procedures_dependencies = _procedures_dependencies | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimize the _populate_stored_procedures_dependencies
method.
The _populate_stored_procedures_dependencies
method is well-implemented but contains a minor code smell related to the unnecessary use of True if ... else False
. This can be simplified.
- db_force_converting = (True if "lower()" in inspect_objects.getsource(self.get_db_name) else False)
+ db_force_converting = "lower()" in inspect_objects.getsource(self.get_db_name)
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def _populate_stored_procedures_dependencies(self, conn: Connection) -> None: | |
_procedures_dependencies: Dict[str, List[Dict[str, str]]] = {} | |
trans = conn.begin() | |
conn.execute( | |
""" | |
BEGIN; | |
IF OBJECT_ID('tempdb.dbo.#ProceduresDependencies', 'U') IS NOT NULL | |
DROP TABLE #ProceduresDependencies; | |
CREATE TABLE #ProceduresDependencies( | |
id INT NOT NULL IDENTITY(1,1) PRIMARY KEY | |
, procedure_id INT | |
, current_db NVARCHAR(128) | |
, procedure_schema NVARCHAR(128) | |
, procedure_name NVARCHAR(128) | |
, type VARCHAR(2) | |
, referenced_server_name NVARCHAR(128) | |
, referenced_database_name NVARCHAR(128) | |
, referenced_schema_name NVARCHAR(128) | |
, referenced_entity_name NVARCHAR(128) | |
, referenced_id INT | |
, referenced_object_type VARCHAR(2) | |
, is_selected INT | |
, is_updated INT | |
, is_select_all INT | |
, is_all_columns_found INT | |
); | |
BEGIN TRY | |
WITH dependencies | |
AS ( | |
SELECT | |
pro.object_id AS procedure_id | |
, db_name() AS current_db | |
, schema_name(pro.schema_id) AS procedure_schema | |
, pro.NAME AS procedure_name | |
, pro.type AS type | |
, p.referenced_id AS referenced_id | |
, CASE | |
WHEN p.referenced_server_name IS NULL AND p.referenced_id IS NOT NULL | |
THEN @@SERVERNAME | |
ELSE p.referenced_server_name | |
END AS referenced_server_name | |
, CASE | |
WHEN p.referenced_database_name IS NULL AND p.referenced_id IS NOT NULL | |
THEN db_name() | |
ELSE p.referenced_database_name | |
END AS referenced_database_name | |
, CASE | |
WHEN p.referenced_schema_name IS NULL AND p.referenced_id IS NOT NULL | |
THEN schema_name(ref_obj.schema_id) | |
ELSE p.referenced_schema_name | |
END AS referenced_schema_name | |
, p.referenced_entity_name AS referenced_entity_name | |
, ref_obj.type AS referenced_object_type | |
, p.is_selected AS is_selected | |
, p.is_updated AS is_updated | |
, p.is_select_all AS is_select_all | |
, p.is_all_columns_found AS is_all_columns_found | |
FROM sys.procedures AS pro | |
CROSS apply sys.dm_sql_referenced_entities( | |
Concat(schema_name(pro.schema_id), '.', pro.NAME), | |
'OBJECT') AS p | |
LEFT JOIN sys.objects AS ref_obj | |
ON p.referenced_id = ref_obj.object_id | |
) | |
INSERT INTO #ProceduresDependencies ( | |
procedure_id | |
, current_db | |
, procedure_schema | |
, procedure_name | |
, type | |
, referenced_server_name | |
, referenced_database_name | |
, referenced_schema_name | |
, referenced_entity_name | |
, referenced_id | |
, referenced_object_type | |
, is_selected | |
, is_updated | |
, is_select_all | |
, is_all_columns_found) | |
SELECT DISTINCT | |
d.procedure_id | |
, d.current_db | |
, d.procedure_schema | |
, d.procedure_name | |
, d.type | |
, d.referenced_server_name | |
, d.referenced_database_name | |
, d.referenced_schema_name | |
, d.referenced_entity_name | |
, d.referenced_id | |
, d.referenced_object_type | |
, d.is_selected | |
, d.is_updated | |
, d.is_select_all | |
, d.is_all_columns_found | |
FROM dependencies AS d | |
WHERE d.referenced_database_name != 'msdb'; | |
END TRY | |
BEGIN CATCH | |
IF ERROR_NUMBER() = 2020 or ERROR_NUMBER() = 942 | |
PRINT ERROR_MESSAGE() | |
ELSE | |
THROW; | |
END CATCH; | |
DECLARE @id INT; | |
DECLARE @referenced_server_name NVARCHAR(128); | |
DECLARE @referenced_database_name NVARCHAR(128); | |
DECLARE @referenced_id INT; | |
DECLARE @referenced_object_type VARCHAR(2); | |
DECLARE @referenced_info TABLE ( | |
id INT | |
, referenced_type VARCHAR(2) | |
, referenced_schema NVARCHAR(128) | |
, referenced_name NVARCHAR(128) | |
); | |
DECLARE @synonym_info TABLE ( | |
id INT | |
, referenced_name NVARCHAR(1035) | |
, referenced_type VARCHAR(2) | |
, referenced_object_id INT | |
); | |
DECLARE @SQL NVARCHAR(MAX); | |
DECLARE proc_depend_cursor CURSOR FOR | |
SELECT | |
id | |
, referenced_database_name | |
, referenced_id | |
, referenced_object_type | |
FROM #ProceduresDependencies; | |
OPEN proc_depend_cursor | |
FETCH NEXT FROM proc_depend_cursor INTO | |
@id | |
, @referenced_database_name | |
, @referenced_id | |
, @referenced_object_type; | |
WHILE @@FETCH_STATUS = 0 | |
BEGIN; | |
IF @referenced_id IS NOT NULL | |
BEGIN | |
BEGIN TRY | |
SET @SQL = 'SELECT | |
' + CAST(@id AS NVARCHAR(10)) + ' | |
, o.type | |
, s.name | |
, o.name | |
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS o | |
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.schemas AS s | |
ON o.schema_id = s.schema_id | |
WHERE o.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';'; | |
INSERT INTO @referenced_info( | |
id | |
, referenced_type | |
, referenced_schema | |
, referenced_name | |
) | |
EXEC sp_executesql @SQL; | |
UPDATE #ProceduresDependencies | |
SET | |
referenced_object_type = ri.referenced_type, | |
referenced_schema_name = ri.referenced_schema, | |
referenced_entity_name = ri.referenced_name | |
FROM #ProceduresDependencies pd | |
INNER JOIN @referenced_info ri | |
ON ri.id = pd.id | |
WHERE CURRENT OF proc_depend_cursor; | |
SET @referenced_object_type = ( | |
SELECT | |
referenced_type | |
FROM @referenced_info | |
WHERE id = @id | |
); | |
END TRY | |
BEGIN CATCH | |
PRINT ERROR_MESSAGE() | |
END CATCH; | |
END; | |
IF @referenced_object_type = 'SN' | |
BEGIN | |
BEGIN TRY | |
SET @SQL = 'SELECT | |
' + CAST(@id as NVARCHAR(10)) + ' | |
, syn_obj.name | |
, syn_obj.type | |
, syn_obj.object_id | |
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.synonyms AS synon | |
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS syn_obj | |
ON OBJECT_ID(synon.base_object_name) = syn_obj.object_id | |
WHERE synon.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';'; | |
PRINT @SQL; | |
INSERT INTO @synonym_info( | |
id | |
, referenced_name | |
, referenced_type | |
, referenced_object_id | |
) | |
EXEC sp_executesql @SQL; | |
UPDATE #ProceduresDependencies | |
SET | |
referenced_object_type = si.referenced_type, | |
referenced_entity_name = si.referenced_name, | |
referenced_id = si.referenced_object_id | |
FROM #ProceduresDependencies pd | |
INNER JOIN @synonym_info si | |
ON si.id = pd.id | |
WHERE CURRENT OF proc_depend_cursor; | |
END TRY | |
BEGIN CATCH | |
PRINT ERROR_MESSAGE() | |
END CATCH; | |
END; | |
FETCH NEXT FROM proc_depend_cursor INTO | |
@id | |
, @referenced_database_name | |
, @referenced_id | |
, @referenced_object_type; | |
END; | |
CLOSE proc_depend_cursor; | |
DEALLOCATE proc_depend_cursor; | |
END; | |
""" | |
) | |
_dependencies = conn.execute( | |
""" | |
SELECT DISTINCT | |
* | |
FROM #ProceduresDependencies | |
WHERE referenced_object_type IN ('U ', 'V ', 'P '); | |
""" | |
) | |
# PAY ATTENTION: | |
# at 19.07.2024 self.get_db_name method converts db name to lowercase forcely: | |
# return str(engine.url.database).strip('"').lower() | |
# to avoid inconsistency, referenced_database_name in this module repeats the same behavior | |
# remove after fixing | |
db_force_converting = ( | |
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False | |
) | |
for row in _dependencies: | |
if row: | |
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}" | |
_procedures_dependencies.setdefault(_key, []).append( | |
{ | |
"referenced_database_name": row[ | |
"referenced_database_name" | |
].lower() | |
if db_force_converting | |
else row["referenced_database_name"], | |
"referenced_schema_name": row["referenced_schema_name"], | |
"referenced_entity_name": row["referenced_entity_name"], | |
"referenced_object_type": row["referenced_object_type"], | |
"is_selected": row["is_selected"], | |
"is_select_all": row["is_select_all"], | |
"is_updated": row["is_updated"], | |
} | |
) | |
trans.commit() | |
self.procedures_dependencies = _procedures_dependencies | |
db_force_converting = "lower()" in inspect_objects.getsource(self.get_db_name) | |
for row in _dependencies: | |
if row: | |
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}" | |
_procedures_dependencies.setdefault(_key, []).append( | |
{ | |
"referenced_database_name": row[ | |
"referenced_database_name" | |
].lower() | |
if db_force_converting | |
else row["referenced_database_name"], | |
"referenced_schema_name": row["referenced_schema_name"], | |
"referenced_entity_name": row["referenced_entity_name"], | |
"referenced_object_type": row["referenced_object_type"], | |
"is_selected": row["is_selected"], | |
"is_select_all": row["is_select_all"], | |
"is_updated": row["is_updated"], | |
} | |
) |
Tools
Ruff
550-550: Remove unnecessary
True if ... else False
Remove unnecessary
True if ... else False
(SIM210)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (12 hunks)
Additional context used
Ruff
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py
552-552: Remove unnecessary
True if ... else False
Remove unnecessary
True if ... else False
(SIM210)
Additional comments not posted (3)
metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (3)
110-113
: LGTM! New configuration option added.The addition of the
mssql_lineage
configuration option is correct and well-documented.
276-297
: LGTM! Function correctly integrates new configuration option.The
get_ucs
function correctly integrates themssql_lineage
configuration option to conditionally process lineage data.
889-911
: LGTM! New function_extract_procedure_dependency
added.The
_extract_procedure_dependency
function is well-implemented and correctly consolidates the logic for extracting dependencies, improving clarity and maintainability.
SET @referenced_object_type = ( | ||
SELECT | ||
referenced_type | ||
FROM @referenced_info | ||
WHERE id = @id | ||
); | ||
END TRY | ||
BEGIN CATCH | ||
PRINT ERROR_MESSAGE() | ||
END CATCH; | ||
END; | ||
|
||
IF @referenced_object_type = 'SN' | ||
BEGIN | ||
BEGIN TRY | ||
SET @SQL = 'SELECT | ||
' + CAST(@id as NVARCHAR(10)) + ' | ||
, syn_obj.name | ||
, syn_obj.type | ||
, syn_obj.object_id | ||
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.synonyms AS synon | ||
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS syn_obj | ||
ON OBJECT_ID(synon.base_object_name) = syn_obj.object_id | ||
WHERE synon.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';'; | ||
PRINT @SQL; | ||
INSERT INTO @synonym_info( | ||
id | ||
, referenced_name | ||
, referenced_type | ||
, referenced_object_id | ||
) | ||
EXEC sp_executesql @SQL; | ||
|
||
UPDATE #ProceduresDependencies | ||
SET | ||
referenced_object_type = si.referenced_type, | ||
referenced_entity_name = si.referenced_name, | ||
referenced_id = si.referenced_object_id | ||
FROM #ProceduresDependencies pd | ||
INNER JOIN @synonym_info si | ||
ON si.id = pd.id | ||
WHERE CURRENT OF proc_depend_cursor; | ||
END TRY | ||
BEGIN CATCH | ||
PRINT ERROR_MESSAGE() | ||
END CATCH; | ||
END; | ||
|
||
FETCH NEXT FROM proc_depend_cursor INTO | ||
@id | ||
, @referenced_database_name | ||
, @referenced_id | ||
, @referenced_object_type; | ||
END; | ||
CLOSE proc_depend_cursor; | ||
DEALLOCATE proc_depend_cursor; | ||
END; | ||
""" | ||
) | ||
|
||
_dependencies = conn.execute( | ||
""" | ||
SELECT DISTINCT | ||
* | ||
FROM #ProceduresDependencies | ||
WHERE referenced_object_type IN ('U ', 'V ', 'P '); | ||
""" | ||
) | ||
|
||
# PAY ATTENTION: | ||
# at 19.07.2024 self.get_db_name method converts db name to lowercase forcely: | ||
# return str(engine.url.database).strip('"').lower() | ||
# to avoid inconsistency, referenced_database_name in this module repeats the same behavior | ||
# remove after fixing | ||
db_force_converting = ( | ||
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False | ||
) | ||
|
||
for row in _dependencies: | ||
|
||
if row: | ||
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}" | ||
|
||
self.procedures_dependencies[_key].append( | ||
{ | ||
"referenced_database_name": row[ | ||
"referenced_database_name" | ||
].lower() | ||
if db_force_converting | ||
else row["referenced_database_name"], | ||
"referenced_schema_name": row["referenced_schema_name"], | ||
"referenced_entity_name": row["referenced_entity_name"], | ||
"referenced_object_type": row["referenced_object_type"], | ||
"is_selected": row["is_selected"], | ||
"is_select_all": row["is_select_all"], | ||
"is_updated": row["is_updated"], | ||
} | ||
) | ||
|
||
trans.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Optimize the _populate_stored_procedures_dependencies
method.
The _populate_stored_procedures_dependencies
method is well-implemented but contains a minor code smell related to the unnecessary use of True if ... else False
. This can be simplified.
- db_force_converting = (True if "lower()" in inspect_objects.getsource(self.get_db_name) else False)
+ db_force_converting = "lower()" in inspect_objects.getsource(self.get_db_name)
Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def _populate_stored_procedures_dependencies(self, conn: Connection) -> None: | |
trans = conn.begin() | |
conn.execute( | |
""" | |
BEGIN; | |
IF OBJECT_ID('tempdb.dbo.#ProceduresDependencies', 'U') IS NOT NULL | |
DROP TABLE #ProceduresDependencies; | |
CREATE TABLE #ProceduresDependencies( | |
id INT NOT NULL IDENTITY(1,1) PRIMARY KEY | |
, procedure_id INT | |
, current_db NVARCHAR(128) | |
, procedure_schema NVARCHAR(128) | |
, procedure_name NVARCHAR(128) | |
, type VARCHAR(2) | |
, referenced_server_name NVARCHAR(128) | |
, referenced_database_name NVARCHAR(128) | |
, referenced_schema_name NVARCHAR(128) | |
, referenced_entity_name NVARCHAR(128) | |
, referenced_id INT | |
, referenced_object_type VARCHAR(2) | |
, is_selected INT | |
, is_updated INT | |
, is_select_all INT | |
, is_all_columns_found INT | |
); | |
BEGIN TRY | |
WITH dependencies | |
AS ( | |
SELECT | |
pro.object_id AS procedure_id | |
, db_name() AS current_db | |
, schema_name(pro.schema_id) AS procedure_schema | |
, pro.NAME AS procedure_name | |
, pro.type AS type | |
, p.referenced_id AS referenced_id | |
, CASE | |
WHEN p.referenced_server_name IS NULL AND p.referenced_id IS NOT NULL | |
THEN @@SERVERNAME | |
ELSE p.referenced_server_name | |
END AS referenced_server_name | |
, CASE | |
WHEN p.referenced_database_name IS NULL AND p.referenced_id IS NOT NULL | |
THEN db_name() | |
ELSE p.referenced_database_name | |
END AS referenced_database_name | |
, CASE | |
WHEN p.referenced_schema_name IS NULL AND p.referenced_id IS NOT NULL | |
THEN schema_name(ref_obj.schema_id) | |
ELSE p.referenced_schema_name | |
END AS referenced_schema_name | |
, p.referenced_entity_name AS referenced_entity_name | |
, ref_obj.type AS referenced_object_type | |
, p.is_selected AS is_selected | |
, p.is_updated AS is_updated | |
, p.is_select_all AS is_select_all | |
, p.is_all_columns_found AS is_all_columns_found | |
FROM sys.procedures AS pro | |
CROSS apply sys.dm_sql_referenced_entities( | |
Concat(schema_name(pro.schema_id), '.', pro.NAME), | |
'OBJECT') AS p | |
LEFT JOIN sys.objects AS ref_obj | |
ON p.referenced_id = ref_obj.object_id | |
) | |
INSERT INTO #ProceduresDependencies ( | |
procedure_id | |
, current_db | |
, procedure_schema | |
, procedure_name | |
, type | |
, referenced_server_name | |
, referenced_database_name | |
, referenced_schema_name | |
, referenced_entity_name | |
, referenced_id | |
, referenced_object_type | |
, is_selected | |
, is_updated | |
, is_select_all | |
, is_all_columns_found) | |
SELECT DISTINCT | |
d.procedure_id | |
, d.current_db | |
, d.procedure_schema | |
, d.procedure_name | |
, d.type | |
, d.referenced_server_name | |
, d.referenced_database_name | |
, d.referenced_schema_name | |
, d.referenced_entity_name | |
, d.referenced_id | |
, d.referenced_object_type | |
, d.is_selected | |
, d.is_updated | |
, d.is_select_all | |
, d.is_all_columns_found | |
FROM dependencies AS d | |
WHERE d.referenced_database_name != 'msdb'; | |
END TRY | |
BEGIN CATCH | |
IF ERROR_NUMBER() = 2020 or ERROR_NUMBER() = 942 | |
PRINT ERROR_MESSAGE() | |
ELSE | |
THROW; | |
END CATCH; | |
DECLARE @id INT; | |
DECLARE @referenced_server_name NVARCHAR(128); | |
DECLARE @referenced_database_name NVARCHAR(128); | |
DECLARE @referenced_id INT; | |
DECLARE @referenced_object_type VARCHAR(2); | |
DECLARE @referenced_info TABLE ( | |
id INT | |
, referenced_type VARCHAR(2) | |
, referenced_schema NVARCHAR(128) | |
, referenced_name NVARCHAR(128) | |
); | |
DECLARE @synonym_info TABLE ( | |
id INT | |
, referenced_name NVARCHAR(1035) | |
, referenced_type VARCHAR(2) | |
, referenced_object_id INT | |
); | |
DECLARE @SQL NVARCHAR(MAX); | |
DECLARE proc_depend_cursor CURSOR FOR | |
SELECT | |
id | |
, referenced_database_name | |
, referenced_id | |
, referenced_object_type | |
FROM #ProceduresDependencies; | |
OPEN proc_depend_cursor | |
FETCH NEXT FROM proc_depend_cursor INTO | |
@id | |
, @referenced_database_name | |
, @referenced_id | |
, @referenced_object_type; | |
WHILE @@FETCH_STATUS = 0 | |
BEGIN; | |
IF @referenced_id IS NOT NULL | |
BEGIN | |
BEGIN TRY | |
SET @SQL = 'SELECT | |
' + CAST(@id AS NVARCHAR(10)) + ' | |
, o.type | |
, s.name | |
, o.name | |
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS o | |
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.schemas AS s | |
ON o.schema_id = s.schema_id | |
WHERE o.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';'; | |
INSERT INTO @referenced_info( | |
id | |
, referenced_type | |
, referenced_schema | |
, referenced_name | |
) | |
EXEC sp_executesql @SQL; | |
UPDATE #ProceduresDependencies | |
SET | |
referenced_object_type = ri.referenced_type, | |
referenced_schema_name = ri.referenced_schema, | |
referenced_entity_name = ri.referenced_name | |
FROM #ProceduresDependencies pd | |
INNER JOIN @referenced_info ri | |
ON ri.id = pd.id | |
WHERE CURRENT OF proc_depend_cursor; | |
SET @referenced_object_type = ( | |
SELECT | |
referenced_type | |
FROM @referenced_info | |
WHERE id = @id | |
); | |
END TRY | |
BEGIN CATCH | |
PRINT ERROR_MESSAGE() | |
END CATCH; | |
END; | |
IF @referenced_object_type = 'SN' | |
BEGIN | |
BEGIN TRY | |
SET @SQL = 'SELECT | |
' + CAST(@id as NVARCHAR(10)) + ' | |
, syn_obj.name | |
, syn_obj.type | |
, syn_obj.object_id | |
FROM ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.synonyms AS synon | |
INNER JOIN ' + COALESCE(QUOTENAME(@referenced_database_name), '') + '.sys.objects AS syn_obj | |
ON OBJECT_ID(synon.base_object_name) = syn_obj.object_id | |
WHERE synon.object_id = ' + CAST(@referenced_id AS NVARCHAR(10)) + ';'; | |
PRINT @SQL; | |
INSERT INTO @synonym_info( | |
id | |
, referenced_name | |
, referenced_type | |
, referenced_object_id | |
) | |
EXEC sp_executesql @SQL; | |
UPDATE #ProceduresDependencies | |
SET | |
referenced_object_type = si.referenced_type, | |
referenced_entity_name = si.referenced_name, | |
referenced_id = si.referenced_object_id | |
FROM #ProceduresDependencies pd | |
INNER JOIN @synonym_info si | |
ON si.id = pd.id | |
WHERE CURRENT OF proc_depend_cursor; | |
END TRY | |
BEGIN CATCH | |
PRINT ERROR_MESSAGE() | |
END CATCH; | |
END; | |
FETCH NEXT FROM proc_depend_cursor INTO | |
@id | |
, @referenced_database_name | |
, @referenced_id | |
, @referenced_object_type; | |
END; | |
CLOSE proc_depend_cursor; | |
DEALLOCATE proc_depend_cursor; | |
END; | |
""" | |
) | |
_dependencies = conn.execute( | |
""" | |
SELECT DISTINCT | |
* | |
FROM #ProceduresDependencies | |
WHERE referenced_object_type IN ('U ', 'V ', 'P '); | |
""" | |
) | |
# PAY ATTENTION: | |
# at 19.07.2024 self.get_db_name method converts db name to lowercase forcely: | |
# return str(engine.url.database).strip('"').lower() | |
# to avoid inconsistency, referenced_database_name in this module repeats the same behavior | |
# remove after fixing | |
db_force_converting = ( | |
True if "lower()" in inspect_objects.getsource(self.get_db_name) else False | |
) | |
for row in _dependencies: | |
if row: | |
_key = f"{row['current_db'].lower() if db_force_converting else row['current_db']}.{row['procedure_schema']}.{row['procedure_name']}" | |
self.procedures_dependencies[_key].append( | |
{ | |
"referenced_database_name": row[ | |
"referenced_database_name" | |
].lower() | |
if db_force_converting | |
else row["referenced_database_name"], | |
"referenced_schema_name": row["referenced_schema_name"], | |
"referenced_entity_name": row["referenced_entity_name"], | |
"referenced_object_type": row["referenced_object_type"], | |
"is_selected": row["is_selected"], | |
"is_select_all": row["is_select_all"], | |
"is_updated": row["is_updated"], | |
} | |
) | |
trans.commit() | |
db_force_converting = "lower()" in inspect_objects.getsource(self.get_db_name) |
Tools
Ruff
552-552: Remove unnecessary
True if ... else False
Remove unnecessary
True if ... else False
(SIM210)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (1)
- metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (12 hunks)
Files skipped from review as they are similar to previous changes (1)
- metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still have a few high level questions about what this does
@@ -237,6 +270,343 @@ def create(cls, config_dict: Dict, ctx: PipelineContext) -> "SQLServerSource": | |||
config = SQLServerConfig.parse_obj(config_dict) | |||
return cls(config, ctx) | |||
|
|||
def get_ucs(self, inspector: Inspector, key: str) -> List[UpstreamClass]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just call it get_upstreams
FROM sys.sql_expression_dependencies | ||
LEFT JOIN sys.objects AS so_dst ON so_dst.object_id = referencing_id | ||
LEFT JOIN sys.objects AS so_src ON so_src.object_id = referenced_id | ||
WHERE so_dst.type in ('U ', 'V ', 'P ') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will the dst.type ever be a USER_TABLE?
""" | ||
) | ||
# {"U ": "USER_TABLE", "V ": "VIEW", "P ": "SQL_STORED_PROCEDURE"} | ||
for row in _links: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do both this query and the other one give store procedure dependencies? bit confused by this
if upstreams: | ||
yield MetadataChangeProposalWrapper( | ||
entityUrn=dataset_urn, | ||
aspect=UpstreamLineage(upstreams=upstreams), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we also try to generate view lineage using sql parsing. That is capable of generating column lineage as well
This will likely overwrite that stuff, which doesn't feel right
…3. update golden files
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Files selected for processing (4)
- metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py (12 hunks)
- metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_lineage.json (1 hunks)
- metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json (26 hunks)
- metadata-ingestion/tests/integration/sql_server/setup/setup.sql (1 hunks)
Files not reviewed due to server errors (2)
- metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_lineage.json
- metadata-ingestion/src/datahub/ingestion/source/sql/mssql/source.py
Additional comments not posted (15)
metadata-ingestion/tests/integration/sql_server/setup/setup.sql (9)
96-96
: LGTM! Database creation statement is correct.The
CREATE DATABASE LINEAGEDB
statement is straightforward and correct.
100-100
: LGTM! Schema creation statement is correct.The
CREATE SCHEMA schema_with_lineage
statement is straightforward and correct.
102-102
: LGTM! Table creation statement is correct.The
CREATE TABLE [schema_with_lineage].[table_number_one] (id VARCHAR(MAX))
statement is straightforward and correct.
104-104
: LGTM! Table creation statement is correct.The
CREATE TABLE [schema_with_lineage].[table_number_two] (is_updated BIT)
statement is straightforward and correct.
106-106
: LGTM! View creation statement is correct.The
CREATE VIEW [schema_with_lineage].[view_of_table_number_two] AS SELECT is_updated FROM [schema_with_lineage].[table_number_two]
statement is straightforward and correct.
108-108
: LGTM! Table creation statement is correct.The
CREATE TABLE [schema_with_lineage].[table_number_three] (order_number INT, weight VARCHAR(MAX))
statement is straightforward and correct.
110-116
: LGTM! Stored procedure creation statement is correct.The
CREATE PROCEDURE [schema_with_lineage].[procedure_number_one]
statement is straightforward and correct.
118-126
: LGTM! Stored procedure creation statement is correct.The
CREATE PROCEDURE [schema_with_lineage].[procedure_number_two]
statement is straightforward and correct.
128-135
: LGTM! Stored procedure creation statement is correct.The
CREATE PROCEDURE [schema_with_lineage].[procedure_number_three]
statement is straightforward and correct.metadata-ingestion/tests/integration/sql_server/golden_files/golden_mces_mssql_no_db_with_filter.json (6)
Line range hint
1-1
:
Verify the correctness of entity type changes.The entity type has been changed from
dataFlow
anddataJob
tocontainer
. Ensure that this change aligns with the intended refactor and does not introduce inconsistencies in the metadata structure.Also applies to: 14-14, 27-27, 40-40, 53-53
Line range hint
3-3
:
Verify the correctness of aspect name changes.The aspect names have been updated from
status
to more descriptive names likecontainerProperties
,dataPlatformInstance
, etc. Ensure that these changes are consistent and align with the intended refactor.Also applies to: 16-16, 29-29, 42-42, 55-55
Line range hint
5-8
:
Verify the correctness and completeness of custom properties.New custom properties have been added to various entities, including
platform
,env
,database
, andschema
. Ensure that these properties enhance the metadata's descriptive capabilities and are consistent with the rest of the structure.Also applies to: 18-21, 31-34, 44-47, 57-60
118-119
: Verify the correctness of date modifications.The
date_created
anddate_modified
fields have been updated to reflect the current date. Ensure that these dates accurately reflect the recent changes and are consistent across all entities.Also applies to: 1962-1963, 2368-2369, 2456-2457, 2674-2675
2324-2335
: Verify the correctness and completeness of new entities.Several new container entities have been introduced, each with its own set of properties and metadata. Ensure that these entities expand the scope of the metadata being captured and provide more granular insights into the SQL Server environment.
Also applies to: 2412-2423, 2521-2532, 2629-2640, 2738-2751
4600-4623
: Verify the correctness of property removal.Certain properties, such as
procedure_depends_on
anddepending_on_procedure
, have been removed. Ensure that this removal streamlines the data structure and focuses on more relevant information.Also applies to: 4658-4671, 4705-4718, 4721-4735, 4737-4750
@hsheth2 |
@hsheth2 |
I would like to get this merged in, but I do want to make sure I understand it enough that I would feel confident maintaining this code in the future To that end, I had left a few comments/questions earlier which seem like they're still pending Regarding the explicit generation of UpstreamClass aspect - the issue is that if we emit an upstreamLineage aspect in _process_table or _process_view, and then we also generate one from The other high level comment is that |
@hsheth2
method:
method:
method:
I don't see any 'get_view_lineage' method, that you've menshened. So, based on this logic, why do you think, we will overwrite something? Let's assume next simple situation:
We have table and view. For table downstream dependency is view. For view, upstream dependency is table. When we will execute this code, we will create 'UpstreamClass' aspect only for view: For stored procedures situation is the same: will be generated 'dataJobInputOutput' aspect:
{"inputDatajobs":[],"inputDatasets":["urn:li:dataset:(urn:li:dataPlatform:mssql,master.dbo.table_one,QA)"],"outputDatasets":["urn:li:dataset:(urn:li:dataPlatform:mssql,master.dbo.table_one,QA)"]} But for table 'UpstreamClass' aspect will not be overwritten. But I agree with you about overcomplexity Waiting for your response. Thanks! |
…e setup and golden files for mssql testing
@hsheth2 I push new commit. Briefly: I tried to fix unwieldy of this module. And I little bit correct one of the query for lineage. |
@hsheth2 Can you, please, check current changes? I don`t understand why Vercel deployment is failed. |
@hsheth2 Can you, please, help me with this issue? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall this is looking decent - had a bunch of small comments
The biggest thing missing now is docs - both around permissions, and around what the mssql connector is now capable of. E.g. extracting lineage from stored procedures and things is a pretty cool feature, and our docs should make sure folks know about it :)
@@ -0,0 +1 @@ | |||
EXEC [{{procedure_db}}].dbo.sp_helptext '{{procedure_name}}'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will we require additional permissions to be able to execute procedures?
@@ -0,0 +1 @@ | |||
EXEC [{{procedure_db}}].dbo.sp_helptext '{{procedure_name}}'; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
imo it's more clear to have the short sql queries in the .py file, and only move the large ones out
we can do something like this to avoid the weird indentation
def method(self, ...):
conn.execute(f"""\
select *
from whatever
""")
... other statements
platform_instance=dep.server, | ||
) | ||
for dep in self.dependencies | ||
if dep.type in ("P ",) and dep.incoming |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you add a comment in the code explaining what "P "
is for
with inspector.engine.connect() as conn: | ||
if self.config.use_odbc: | ||
self._add_output_converters(conn) | ||
self._populate_table_descriptions(conn, db_name) | ||
self._populate_column_descriptions(conn, db_name) | ||
if self.config.mssql_lineage: | ||
for inspector in self.get_inspectors(): | ||
db_name = self.get_db_name(inspector) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if there a reason we're looping over self.get_inspectors() twice?
""" | ||
path_to_table_description_query = os.path.join( | ||
os.path.dirname(os.path.abspath(__file__)), | ||
"sql_queries/table_description_query.sql", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should have a constant at the top of the file like SQL_QUERIES_DIR
this should use the pathlib
library
this code can then become query = (SQL_QUERIES_DIR / "filename.sql").read_text()
IF OBJECT_ID('tempdb.dbo.#ProceduresDependencies', 'U') IS NOT NULL | ||
DROP TABLE #ProceduresDependencies; | ||
|
||
CREATE TABLE #ProceduresDependencies( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will we need any new permissions for this? if so, we need to update the documentation
} | ||
) | ||
|
||
trans.commit() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we need to commit this? shouldn't we be rolling it back?
also, do we need a try...finally to ensure rollback is called?
@@ -664,3 +829,9 @@ def get_identifier( | |||
if self.config.convert_urns_to_lowercase | |||
else qualified_table_name | |||
) | |||
|
|||
@staticmethod | |||
def parameterize_query(query: str, params: Dict[str, str]) -> str: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can use query.format(...)
DECLARE @t1 INT = 1 | ||
UPDATE [schema_with_lineage].[view_of_table_number_two] SET [is_updated] = CASE | ||
WHEN @t1 = 1 THEN 1 | ||
ELSE 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file seems to have a mix of tabs and spaces for indentation - was that intentional?
@hsheth2 I fixed code according to your comments. Please, check PR again) |
@sleeperdeep we recently merged this PR #11912, which attempts to do a similar thing as this PR. The main advantage of that other one is that it also generates column-level lineage. |
@hsheth2 thanks for your work! I ask you to not close current PR untill I will test your merged PR and compare it with my approach. I`ll let you know after testing, if Sql Parsing is satisfied of my purposes and if this some advantages. Thanks. |
Logic of extracting lineage for mssql source was updated.
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Chores