Skip to content

Commit

Permalink
fix: correct the way to catch the exception (#1727)
Browse files Browse the repository at this point in the history
* fix: modify the etl script dependency

* fix: Correct the way to catch the exception

* fix: Compatible with the following kafka cluster when the Kafka Topic message Key cannot be empty

* fix: Adjust the kafka message key; Improve the comment of field

* fix: Avro schema required for key

Co-authored-by: Cobolbaby <[email protected]>
  • Loading branch information
cobolbaby and cobolbaby authored Jul 10, 2020
1 parent ed12808 commit 5dc6165
Showing 1 changed file with 16 additions and 9 deletions.
25 changes: 16 additions & 9 deletions metadata-ingestion/sql-etl/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ def build_dataset_mce(platform, dataset_name, columns):
fields.append({
"fieldPath": column["name"],
"nativeDataType": repr(column["type"]),
"type": { "type":get_column_type(column["type"]) }
"type": { "type":get_column_type(column["type"]) },
"description": column["comment"]
})

schema_metadata = {
Expand All @@ -80,21 +81,27 @@ def build_dataset_mce(platform, dataset_name, columns):
}


def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))


def produce_dataset_mce(mce, kafka_config):
"""
Produces a MetadataChangeEvent to Kafka
"""
conf = {'bootstrap.servers': kafka_config.bootstrap_server,
'on_delivery': delivery_report,
'schema.registry.url': kafka_config.schema_registry}
key_schema = avro.loads('{"type": "string"}')
record_schema = avro.load(kafka_config.avsc_path)
producer = AvroProducer(conf, default_value_schema=record_schema)

try:
producer.produce(topic=kafka_config.kafka_topic, value=mce)
producer.poll(0)
print('\n%s has been successfully produced!\n' % mce)
except ValueError as e:
print('Message serialization failed %s' % e)
producer = AvroProducer(conf, default_key_schema=key_schema, default_value_schema=record_schema)

producer.produce(topic=kafka_config.kafka_topic, key=mce['proposedSnapshot'][1]['urn'], value=mce)
producer.flush()


Expand Down

0 comments on commit 5dc6165

Please sign in to comment.