-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Start adding java ETL examples, starting with kafka etl. (#1805)
Start adding java ETL examples, starting with kafka etl. We've had a few requests to start providing Java examples rather than Python due to type safety. I've also started to add these to metadata-ingestion-examples to make it clearer these are *examples*. They can be used directly or as a basis for other things. As we port to Java we'll move examples to contrib.
- Loading branch information
John Plaisted
authored
Sep 11, 2020
1 parent
91486a2
commit 6ece2d6
Showing
38 changed files
with
453 additions
and
27 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
# Python ETL examples | ||
|
||
ETL scripts written in Python. | ||
|
||
## Prerequisites | ||
|
||
1. Before running any python metadata ingestion job, you should make sure that DataHub backend services are all running. | ||
The easiest way to do that is through [Docker images](../../docker). | ||
2. You also need to build the `mxe-schemas` module as below. | ||
``` | ||
./gradlew :metadata-events:mxe-schemas:build | ||
``` | ||
This is needed to generate `MetadataChangeEvent.avsc` which is the schema for `MetadataChangeEvent` Kafka topic. | ||
3. All the scripts are written using Python 3 and most likely won't work with Python 2.x interpreters. | ||
You can verify the version of your Python using the following command. | ||
``` | ||
python --version | ||
``` | ||
We recommend using [pyenv](https://github.com/pyenv/pyenv) to install and manage your Python environment. | ||
4. Before launching each ETL ingestion pipeline, you can install/verify the library versions as below. | ||
``` | ||
pip install --user -r requirements.txt | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
# Kafka ETL | ||
|
||
## Ingest metadata from Kafka to DataHub | ||
The kafka_etl provides you ETL channel to communicate with your kafka. | ||
``` | ||
➜ Config your kafka environmental variable in the file. | ||
ZOOKEEPER # Your zookeeper host. | ||
➜ Config your Kafka broker environmental variable in the file. | ||
AVROLOADPATH # Your model event in avro format. | ||
KAFKATOPIC # Your event topic. | ||
BOOTSTRAP # Kafka bootstrap server. | ||
SCHEMAREGISTRY # Kafka schema registry host. | ||
➜ python kafka_etl.py | ||
``` | ||
This will bootstrap DataHub with your metadata in the kafka as a dataset entity. |
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
# Metadata Ingestion | ||
|
||
This directory contains example apps for ingesting data into DataHub. | ||
|
||
You are more than welcome to use these examples directly, or use them as a reference for you own jobs. | ||
|
||
See the READMEs of each example for more information on each. | ||
|
||
### Common themes | ||
|
||
All these examples ingest by firing MetadataChangeEvent Kafka events. They do not ingest directly into DataHub, though | ||
this is possible. Instead, the mce-consumer-job should be running, listening for these events, and perform the ingestion | ||
for us. | ||
|
||
### A note on languages | ||
|
||
We initially wrote these examples in Python (they still exist in `metadata-ingestion`; TODO to delete them once they're | ||
all ported). The idea was that these were very small example scripts, that should've been easy to use. However, upon | ||
reflection, not all developers are familiar with Python, and the lack of types can hinder development. So the decision | ||
was made to port the examples to Java. | ||
|
||
You're more than welcome to extrapolate these examples into whatever languages you like. At LinkedIn, we primarily use | ||
Java. | ||
|
||
### Ingestion at LinkedIn | ||
|
||
It is worth noting that we do not use any of these examples directly (in Java, Python, or anything else) at LinkedIn. We | ||
have several different pipelines for ingesting data; it all depends on the source. | ||
|
||
- Some pipelines are based off other Kafka events, where we'll transform some existing Kafka event to a metadata event. | ||
- For example, we get Kafka events hive changes. We make MCEs out of those hive events to ingest hive data. | ||
- For others, we've directly instrumented existing pipelines / apps / jobs to also emit metadata events. | ||
- For example, TODO? Gobblin? | ||
- For others still, we've created a series offline jobs to ingest data. | ||
- For example, we have an Azkaban job to process our HDFS datasets. | ||
|
||
For some sources of data one of these example scripts may work fine. For others, it may make more sense to have some | ||
custom logic, like the above list. Namely, all these examples today are one-off (they run, fire events, and then stop), | ||
you may wish to build continuous ingestion pipelines instead. | ||
|
||
### "Real" Ingestion Applications | ||
|
||
We appreciate any contributions of apps you may wish to make to ingest data from other sources. | ||
|
||
TODO this section feels a little weird. Are our ingestion apps not really real apps? :p LDAP is real, as is kafka. | ||
Granted, these are just one off apps to ingest. Maybe we should provide a library for these, then expose the one off | ||
apps as examples? |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
plugins { | ||
id 'java' | ||
} | ||
|
||
dependencies { | ||
compile project(':metadata-dao-impl:kafka-producer') | ||
|
||
compile externalDependency.javaxInject | ||
compile externalDependency.kafkaAvroSerde | ||
compile externalDependency.kafkaSerializers | ||
compile externalDependency.lombok | ||
compile externalDependency.springBeans | ||
compile externalDependency.springBootAutoconfigure | ||
compile externalDependency.springCore | ||
compile externalDependency.springKafka | ||
|
||
annotationProcessor externalDependency.lombok | ||
|
||
runtime externalDependency.logbackClassic | ||
} |
42 changes: 42 additions & 0 deletions
42
...ion-examples/common/src/main/java/com/linkedin/metadata/examples/configs/KafkaConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
package com.linkedin.metadata.examples.configs; | ||
|
||
import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig; | ||
import io.confluent.kafka.serializers.KafkaAvroSerializer; | ||
import java.util.Arrays; | ||
import java.util.Map; | ||
import org.apache.avro.generic.IndexedRecord; | ||
import org.apache.kafka.clients.producer.KafkaProducer; | ||
import org.apache.kafka.clients.producer.Producer; | ||
import org.apache.kafka.common.serialization.StringSerializer; | ||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
|
||
|
||
@Configuration | ||
public class KafkaConfig { | ||
@Value("${KAFKA_BOOTSTRAP_SERVER:localhost:29092}") | ||
private String kafkaBootstrapServers; | ||
|
||
@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}") | ||
private String kafkaSchemaRegistryUrl; | ||
|
||
@Bean(name = "kafkaEventProducer") | ||
public Producer<String, IndexedRecord> kafkaListenerContainerFactory(KafkaProperties properties) { | ||
KafkaProperties.Producer producerProps = properties.getProducer(); | ||
|
||
producerProps.setKeySerializer(StringSerializer.class); | ||
producerProps.setValueSerializer(KafkaAvroSerializer.class); | ||
|
||
// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS | ||
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) { | ||
producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(","))); | ||
} // else we rely on KafkaProperties which defaults to localhost:9092 | ||
|
||
Map<String, Object> props = properties.buildProducerProperties(); | ||
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl); | ||
|
||
return new KafkaProducer<>(props); | ||
} | ||
} |
19 changes: 19 additions & 0 deletions
19
...les/common/src/main/java/com/linkedin/metadata/examples/configs/SchemaRegistryConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
package com.linkedin.metadata.examples.configs; | ||
|
||
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient; | ||
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; | ||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
|
||
|
||
@Configuration | ||
public class SchemaRegistryConfig { | ||
@Value("${SCHEMAREGISTRY_URL:http://localhost:8081}") | ||
private String schemaRegistryUrl; | ||
|
||
@Bean(name = "schemaRegistryClient") | ||
public SchemaRegistryClient schemaRegistryFactory() { | ||
return new CachedSchemaRegistryClient(schemaRegistryUrl, 512); | ||
} | ||
} |
26 changes: 26 additions & 0 deletions
26
...examples/common/src/main/java/com/linkedin/metadata/examples/configs/ZooKeeperConfig.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,26 @@ | ||
package com.linkedin.metadata.examples.configs; | ||
|
||
import java.io.IOException; | ||
import org.apache.zookeeper.Watcher; | ||
import org.apache.zookeeper.ZooKeeper; | ||
import org.springframework.beans.factory.annotation.Value; | ||
import org.springframework.context.annotation.Bean; | ||
import org.springframework.context.annotation.Configuration; | ||
|
||
|
||
@Configuration | ||
public class ZooKeeperConfig { | ||
@Value("${ZOOKEEPER:localhost:2181}") | ||
private String zookeeper; | ||
|
||
@Value("${ZOOKEEPER_TIMEOUT_MILLIS:3000}") | ||
private int timeoutMillis; | ||
|
||
@Bean(name = "zooKeeper") | ||
public ZooKeeper zooKeeperFactory() throws IOException { | ||
Watcher noopWatcher = event -> { | ||
}; | ||
|
||
return new ZooKeeper(zookeeper, timeoutMillis, noopWatcher); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,40 @@ | ||
# Kafka ETL | ||
|
||
A small application which reads existing Kafka topics from ZooKeeper, retrieves their schema from the schema registry, | ||
and then fires an MCE for each schema. | ||
|
||
## Running the Application | ||
|
||
First, ensure that services this depends on, like schema registry / zookeeper / mce-consumer-job / gms / etc, are all | ||
running. | ||
|
||
This application can be run via gradle: | ||
|
||
``` | ||
./gradlew :metadata-ingestion-examples:kafka-etl:bootRun | ||
``` | ||
|
||
Or by building and running the jar: | ||
|
||
``` | ||
./gradlew :metadata-ingestion-examples:kafka-etl:build | ||
java -jar metadata-ingestion-examples/kafka-etl/build/libs/kafka-etl.jar | ||
``` | ||
|
||
### Environment Variables | ||
|
||
See the files under `src/main/java/com/linkedin/metadata/examples/kafka/config` for a list of customizable spring | ||
environment variables. | ||
|
||
### Common pitfalls | ||
|
||
For events to be fired correctly, schemas must exist in the schema registry. If a topic was newly created, but no schema | ||
has been registered for it yet, this application will fail to retrieve the schema for that topic. Check the output of | ||
the application to see if this happens. If you see a message like | ||
|
||
``` | ||
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401 | ||
``` | ||
|
||
Then the odds are good that you need to register the schema for this topic. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,29 @@ | ||
plugins { | ||
id 'org.springframework.boot' | ||
id 'java' | ||
} | ||
|
||
dependencies { | ||
compile project(':metadata-utils') | ||
compile project(':metadata-builders') | ||
compile project(':metadata-dao-impl:kafka-producer') | ||
compile project(':metadata-events:mxe-schemas') | ||
compile project(':metadata-ingestion-examples:common') | ||
|
||
compile externalDependency.javaxInject | ||
compile externalDependency.kafkaAvroSerde | ||
compile externalDependency.kafkaSerializers | ||
compile externalDependency.lombok | ||
compile externalDependency.springBeans | ||
compile externalDependency.springBootAutoconfigure | ||
compile externalDependency.springCore | ||
compile externalDependency.springKafka | ||
|
||
annotationProcessor externalDependency.lombok | ||
|
||
runtime externalDependency.logbackClassic | ||
} | ||
|
||
bootJar { | ||
mainClassName = 'com.linkedin.metadata.examples.kafka.KafkaEtlApplication' | ||
} |
Oops, something went wrong.