Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: pulsar ingestion fails when Avro schema is missing namespace or name #12058

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

Alice-608
Copy link
Contributor

@Alice-608 Alice-608 commented Dec 8, 2024

fix: pulsar ingestion fails when Avro schema is missing namespace or name

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

@github-actions github-actions bot added ingestion PR or Issue related to the ingestion of metadata community-contribution PR or Issue raised by member(s) of DataHub Community labels Dec 8, 2024
@Alice-608
Copy link
Contributor Author

fix: pulsar ingestion fails when Avro schema is missing namespace or name

Problem Description
During the Pulsar ingestion process in DataHub, if the Avro schema for a topic is missing either the namespace or name field, the ingestion process crashes with the following error:
<class 'TypeError'>: unsupported operand type(s) for +: 'NoneType' and 'str'

The root cause is that the code attempts to concatenate these fields without checking if they exist:
self.schema_name = avro_schema.get("namespace") + "." + avro_schema.get("name")

Fix Description
This fix introduces the following changes:
Validation: Check for the presence of the namespace and name fields before concatenation.
Default Values: If either field is missing, assign default values:
namespace: "default_namespace"
name: "default_name"
Informational Logging: Log messages to inform users about missing fields and the default values used.
This ensures that the ingestion process no longer fails due to missing schema fields and continues processing metadata gracefully.

Modified Files
datahub/metadata-ingestion/src/datahub/ingestion/source/pulsar.py

@datahub-cyborg datahub-cyborg bot added the needs-review Label for PRs that need review from a maintainer. label Dec 8, 2024
Comment on lines +92 to +101
namespace = avro_schema.get("namespace")
name = avro_schema.get("name")
if not namespace:
logger.warning("namespace is missing in schema, using 'default_namespace'")
namespace = "default_namespace"
if not name:
logger.warning("name is missing in schema, using 'default_name'")
name = "default_name"
self.schema_name = namespace + "." + name

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Checking avro specs, both namespace and name are optional.
Only name is required in the case of record or enum and optional if eg primitive types or arrays.
So the fix makes sense to cover that scenario.

I'm concerned about having such a fallback values, since they do not match the original avro schema definition.

Ideally, if both namespace and name are missed, schema_name should be null or empty string.
This schema_name is used as the schemaName for the SchemaMetadataKey, which is required and len > 1.

/**
* Schema name e.g. PageViewEvent, identity.Profile, ams.account_management_tracking
*/
@validate.strlen = {
"max" : 500,
"min" : 1
}
schemaName: string

So what about something like that:

Suggested change
namespace = avro_schema.get("namespace")
name = avro_schema.get("name")
if not namespace:
logger.warning("namespace is missing in schema, using 'default_namespace'")
namespace = "default_namespace"
if not name:
logger.warning("name is missing in schema, using 'default_name'")
name = "default_name"
self.schema_name = namespace + "." + name
self.schema_name = "null"
if avro_schema.get("namespace") and avro_schema.get("name"):
self.schema_name = avro_schema.get("namespace") + "." + avro_schema.get("name")
elif avro_schema.get("namespace"):
self.schema_name = avro_schema.get("namespace")
elif avro_schema.get("name"):
self.schema_name = avro_schema.get("name")

WDYT @hsheth2 ? Other solutions may require to update the SchemaMetadataKey definition.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah that seems pretty reasonable

we don't really use the schema_name for much, so a fallback value of null feels reasonable over throwing an exception / reporting a warning

@datahub-cyborg datahub-cyborg bot added pending-submitter-response Issue/request has been reviewed but requires a response from the submitter and removed needs-review Label for PRs that need review from a maintainer. labels Dec 11, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-contribution PR or Issue raised by member(s) of DataHub Community ingestion PR or Issue related to the ingestion of metadata pending-submitter-response Issue/request has been reviewed but requires a response from the submitter
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants