-
Notifications
You must be signed in to change notification settings - Fork 3.3k
feat(integration/spark): extract table name on delta merges adding table location to urn & spark streaming lineage fixes #13602
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
✅ Meticulous spotted visual differences in 3 of 1318 screens tested, but all differences have already been approved: view differences detected. Meticulous evaluated ~9 hours of user flows against your PR. Last updated for commit f42a9ef. This comment will update as new commits are pushed. |
Codecov ReportAll modified and coverable lines are covered by tests ✅ ❌ Unsupported file formatUpload processing failed due to unsupported file format. Please review the parser error message:
|
…-on-tables' of https://github.com/datahub-project/datahub into split-spark-merges-on-tables
| Map<String, MetadataChangeProposalWrapper> schemaMap) { | ||
| List<MetadataChangeProposalWrapper> mcps = new ArrayList<>(); | ||
|
|
||
| String pipelineName = conf.getOpenLineageConf().getPipelineName(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this out to a separate method? This method became quite long
| boolean isMergeIntoCommand = job.getName().contains("execute_merge_into_command_edge"); | ||
| String tableName = null; | ||
|
|
||
| // If this is a MERGE INTO command and enhanced extraction is enabled, try to extract the target table name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this as well into a separate method to make this method a bit more digestable
| // Method 1: Check for table name in the SQL facet (most reliable) | ||
| if (job.getFacets() != null && job.getFacets().getSql() != null) { | ||
| String sqlQuery = job.getFacets().getSql().getQuery(); | ||
| if (sqlQuery != null && sqlQuery.toUpperCase().contains("MERGE INTO")) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move MERGE INTO to a constant?
| } | ||
| } | ||
|
|
||
| // Method 3: Check for table identifiers in symlinks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I feel like these methods can be one helper method
| if (isMergeIntoCommand && tableName != null && datahubConf.isEnhancedMergeIntoExtraction()) { | ||
| // Create modified job names that include the table name | ||
| String tablePart = tableName.replace(".", "_").replace(" ", "_").toLowerCase(); | ||
| String enhancedJobName = job.getName() + "." + tablePart; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does jobname contain the operation (I wonder if we should add merge_into to the job name to identify these jobs
|
|
||
| // Try to get table from 'catalogTable' if it exists | ||
| try { | ||
| Method getCatalogTable = relation.getClass().getMethod("catalogTable"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we identify the class, cast, and use the catalog table method?
I'm afraid of how to detect if Openlineage will rename this method to catalogTable2 or something.
| } | ||
|
|
||
| /** Install a Logback appender using reflection. */ | ||
| private static boolean installLogbackAppender(Object logger) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These looks magical. I hope this works :)
| /** Extract the query ID from a log message. */ | ||
| private static String extractQueryId(String logMessage) { | ||
| // The query ID is typically in the format [queryId] | ||
| java.util.regex.Pattern pattern = java.util.regex.Pattern.compile("\\[([0-9a-f-]+)\\]"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you move this to static to make sure it is recompiled only once?
// Precompiled patterns for better performance
private static final java.util.regex.Pattern BRACKET_QUERY_ID_PATTERN =
java.util.regex.Pattern.compile("\\[([0-9a-f-]+)\\]");
private static final java.util.regex.Pattern EQUALS_QUERY_ID_PATTERN =
java.util.regex.Pattern.compile("id = ([0-9a-f-]+)");
| // Extract offset information for commit messages | ||
| if (logMessage.contains("Committing offset")) { | ||
| java.util.regex.Pattern offsetPattern = | ||
| java.util.regex.Pattern.compile("offset: (\\{[^\\}]+\\})"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
| metadata.put("logicalPlan", plan); | ||
|
|
||
| // Look for table references in the plan | ||
| java.util.regex.Pattern tablePattern = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same here
| } | ||
| } | ||
|
|
||
| // Method 3: Check for table identifiers in symlinks |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you split this into multiple methods?
No description provided.