Skip to content

Commit

Permalink
Start adding java ETL examples, starting with kafka etl. (#1805)
Browse files Browse the repository at this point in the history
Start adding java ETL examples, starting with kafka etl.

We've had a few requests to start providing Java examples rather than Python due to type safety.

I've also started to add these to metadata-ingestion-examples to make it clearer these are *examples*. They can be used directly or as a basis for other things.

As we port to Java we'll move examples to contrib.
  • Loading branch information
John Plaisted authored Sep 11, 2020
1 parent 91486a2 commit 6ece2d6
Show file tree
Hide file tree
Showing 38 changed files with 453 additions and 27 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ Please follow the [DataHub Quickstart Guide](docs/quickstart.md) to get a copy o
* [Frontend](datahub-frontend)
* [Web App](datahub-web)
* [Generalized Metadata Service](gms)
* [Metadata Ingestion](metadata-ingestion)
* [Metadata Ingestion](metadata-ingestion-examples)
* [Metadata Processing Jobs](metadata-jobs)

## Releases
Expand Down
9 changes: 5 additions & 4 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ project.ext.externalDependency = [
'httpClient': 'org.apache.httpcomponents:httpclient:4.5.9',
'jacksonCore': 'com.fasterxml.jackson.core:jackson-core:2.9.7',
'jacksonDataBind': 'com.fasterxml.jackson.core:jackson-databind:2.9.7',
"javatuples": "org.javatuples:javatuples:1.2",
'javatuples': 'org.javatuples:javatuples:1.2',
'javaxInject' : 'javax.inject:javax.inject:1',
'jerseyCore': 'org.glassfish.jersey.core:jersey-client:2.25.1',
'jerseyGuava': 'org.glassfish.jersey.bundles.repackaged:jersey-guava:2.25.1',
'jsonSimple': 'com.googlecode.json-simple:json-simple:1.1.1',
Expand All @@ -57,16 +58,16 @@ project.ext.externalDependency = [
'mariadbConnector': 'org.mariadb.jdbc:mariadb-java-client:2.6.0',
'mockito': 'org.mockito:mockito-core:3.0.0',
'mysqlConnector': 'mysql:mysql-connector-java:5.1.47',
"neo4jHarness": "org.neo4j.test:neo4j-harness:3.4.11",
"neo4jJavaDriver": "org.neo4j.driver:neo4j-java-driver:4.0.0",
'neo4jHarness': 'org.neo4j.test:neo4j-harness:3.4.11',
'neo4jJavaDriver': 'org.neo4j.driver:neo4j-java-driver:4.0.0',
'parseqTest': 'com.linkedin.parseq:parseq:3.0.7:test',
'playDocs': 'com.typesafe.play:play-docs_2.11:2.6.18',
'playGuice': 'com.typesafe.play:play-guice_2.11:2.6.18',
'playJavaJdbc': 'com.typesafe.play:play-java-jdbc_2.11:2.6.18',
'playTest': 'com.typesafe.play:play-test_2.11:2.6.18',
'postgresql': 'org.postgresql:postgresql:42.2.14',
'reflections': 'org.reflections:reflections:0.9.11',
"rythmEngine": "org.rythmengine:rythm-engine:1.3.0",
'rythmEngine': 'org.rythmengine:rythm-engine:1.3.0',
'servletApi': 'javax.servlet:javax.servlet-api:3.1.0',
'springBeans': 'org.springframework:spring-beans:5.2.3.RELEASE',
'springContext': 'org.springframework:spring-context:5.2.3.RELEASE',
Expand Down
File renamed without changes.
23 changes: 23 additions & 0 deletions contrib/metadata-ingestion/python/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Python ETL examples

ETL scripts written in Python.

## Prerequisites

1. Before running any python metadata ingestion job, you should make sure that DataHub backend services are all running.
The easiest way to do that is through [Docker images](../../docker).
2. You also need to build the `mxe-schemas` module as below.
```
./gradlew :metadata-events:mxe-schemas:build
```
This is needed to generate `MetadataChangeEvent.avsc` which is the schema for `MetadataChangeEvent` Kafka topic.
3. All the scripts are written using Python 3 and most likely won't work with Python 2.x interpreters.
You can verify the version of your Python using the following command.
```
python --version
```
We recommend using [pyenv](https://github.com/pyenv/pyenv) to install and manage your Python environment.
4. Before launching each ETL ingestion pipeline, you can install/verify the library versions as below.
```
pip install --user -r requirements.txt
```
17 changes: 17 additions & 0 deletions contrib/metadata-ingestion/python/kafka-etl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
# Kafka ETL

## Ingest metadata from Kafka to DataHub
The kafka_etl provides you ETL channel to communicate with your kafka.
```
➜ Config your kafka environmental variable in the file.
ZOOKEEPER # Your zookeeper host.
➜ Config your Kafka broker environmental variable in the file.
AVROLOADPATH # Your model event in avro format.
KAFKATOPIC # Your event topic.
BOOTSTRAP # Kafka bootstrap server.
SCHEMAREGISTRY # Kafka schema registry host.
➜ python kafka_etl.py
```
This will bootstrap DataHub with your metadata in the kafka as a dataset entity.
File renamed without changes.
15 changes: 10 additions & 5 deletions metadata-events/mxe-avro-1.7/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,24 @@ dependencies {
avsc project(':metadata-events:mxe-schemas')
}

def genDir = file("src/generated/java")

task avroCodeGen(type: com.commercehub.gradle.plugin.avro.GenerateAvroJavaTask, dependsOn: configurations.avsc) {
source("$rootDir/metadata-events/mxe-schemas/src/renamed/avro")
outputDir = file("src/generated/java")
outputDir = genDir
dependsOn(':metadata-events:mxe-schemas:renameNamespace')
}

compileJava.source(avroCodeGen.outputs)
build.dependsOn avroCodeGen

clean {
project.delete('src/generated')
idea {
module {
sourceDirs += genDir
generatedSourceDirs += genDir
}
}

avroCodeGen.dependsOn(':metadata-events:mxe-schemas:renameNamespace')
project.rootProject.tasks.idea.dependsOn(avroCodeGen)

// Exclude classes from avro-schemas
jar {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private EventUtils() {

@Nonnull
private static Schema getAvroSchemaFromResource(@Nonnull String resourcePath) {
URL url = Resources.getResource(resourcePath);
URL url = EventUtils.class.getClassLoader().getResource(resourcePath);
try {
return Schema.parse(Resources.toString(url, Charsets.UTF_8));
} catch (IOException e) {
Expand Down
47 changes: 47 additions & 0 deletions metadata-ingestion-examples/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
# Metadata Ingestion

This directory contains example apps for ingesting data into DataHub.

You are more than welcome to use these examples directly, or use them as a reference for you own jobs.

See the READMEs of each example for more information on each.

### Common themes

All these examples ingest by firing MetadataChangeEvent Kafka events. They do not ingest directly into DataHub, though
this is possible. Instead, the mce-consumer-job should be running, listening for these events, and perform the ingestion
for us.

### A note on languages

We initially wrote these examples in Python (they still exist in `metadata-ingestion`; TODO to delete them once they're
all ported). The idea was that these were very small example scripts, that should've been easy to use. However, upon
reflection, not all developers are familiar with Python, and the lack of types can hinder development. So the decision
was made to port the examples to Java.

You're more than welcome to extrapolate these examples into whatever languages you like. At LinkedIn, we primarily use
Java.

### Ingestion at LinkedIn

It is worth noting that we do not use any of these examples directly (in Java, Python, or anything else) at LinkedIn. We
have several different pipelines for ingesting data; it all depends on the source.

- Some pipelines are based off other Kafka events, where we'll transform some existing Kafka event to a metadata event.
- For example, we get Kafka events hive changes. We make MCEs out of those hive events to ingest hive data.
- For others, we've directly instrumented existing pipelines / apps / jobs to also emit metadata events.
- For example, TODO? Gobblin?
- For others still, we've created a series offline jobs to ingest data.
- For example, we have an Azkaban job to process our HDFS datasets.

For some sources of data one of these example scripts may work fine. For others, it may make more sense to have some
custom logic, like the above list. Namely, all these examples today are one-off (they run, fire events, and then stop),
you may wish to build continuous ingestion pipelines instead.

### "Real" Ingestion Applications

We appreciate any contributions of apps you may wish to make to ingest data from other sources.

TODO this section feels a little weird. Are our ingestion apps not really real apps? :p LDAP is real, as is kafka.
Granted, these are just one off apps to ingest. Maybe we should provide a library for these, then expose the one off
apps as examples?
20 changes: 20 additions & 0 deletions metadata-ingestion-examples/common/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
plugins {
id 'java'
}

dependencies {
compile project(':metadata-dao-impl:kafka-producer')

compile externalDependency.javaxInject
compile externalDependency.kafkaAvroSerde
compile externalDependency.kafkaSerializers
compile externalDependency.lombok
compile externalDependency.springBeans
compile externalDependency.springBootAutoconfigure
compile externalDependency.springCore
compile externalDependency.springKafka

annotationProcessor externalDependency.lombok

runtime externalDependency.logbackClassic
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.linkedin.metadata.examples.configs;

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import java.util.Arrays;
import java.util.Map;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class KafkaConfig {
@Value("${KAFKA_BOOTSTRAP_SERVER:localhost:29092}")
private String kafkaBootstrapServers;

@Value("${KAFKA_SCHEMAREGISTRY_URL:http://localhost:8081}")
private String kafkaSchemaRegistryUrl;

@Bean(name = "kafkaEventProducer")
public Producer<String, IndexedRecord> kafkaListenerContainerFactory(KafkaProperties properties) {
KafkaProperties.Producer producerProps = properties.getProducer();

producerProps.setKeySerializer(StringSerializer.class);
producerProps.setValueSerializer(KafkaAvroSerializer.class);

// KAFKA_BOOTSTRAP_SERVER has precedence over SPRING_KAFKA_BOOTSTRAP_SERVERS
if (kafkaBootstrapServers != null && kafkaBootstrapServers.length() > 0) {
producerProps.setBootstrapServers(Arrays.asList(kafkaBootstrapServers.split(",")));
} // else we rely on KafkaProperties which defaults to localhost:9092

Map<String, Object> props = properties.buildProducerProperties();
props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, kafkaSchemaRegistryUrl);

return new KafkaProducer<>(props);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.linkedin.metadata.examples.configs;

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class SchemaRegistryConfig {
@Value("${SCHEMAREGISTRY_URL:http://localhost:8081}")
private String schemaRegistryUrl;

@Bean(name = "schemaRegistryClient")
public SchemaRegistryClient schemaRegistryFactory() {
return new CachedSchemaRegistryClient(schemaRegistryUrl, 512);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.linkedin.metadata.examples.configs;

import java.io.IOException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class ZooKeeperConfig {
@Value("${ZOOKEEPER:localhost:2181}")
private String zookeeper;

@Value("${ZOOKEEPER_TIMEOUT_MILLIS:3000}")
private int timeoutMillis;

@Bean(name = "zooKeeper")
public ZooKeeper zooKeeperFactory() throws IOException {
Watcher noopWatcher = event -> {
};

return new ZooKeeper(zookeeper, timeoutMillis, noopWatcher);
}
}
40 changes: 40 additions & 0 deletions metadata-ingestion-examples/kafka-etl/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# Kafka ETL

A small application which reads existing Kafka topics from ZooKeeper, retrieves their schema from the schema registry,
and then fires an MCE for each schema.

## Running the Application

First, ensure that services this depends on, like schema registry / zookeeper / mce-consumer-job / gms / etc, are all
running.

This application can be run via gradle:

```
./gradlew :metadata-ingestion-examples:kafka-etl:bootRun
```

Or by building and running the jar:

```
./gradlew :metadata-ingestion-examples:kafka-etl:build
java -jar metadata-ingestion-examples/kafka-etl/build/libs/kafka-etl.jar
```

### Environment Variables

See the files under `src/main/java/com/linkedin/metadata/examples/kafka/config` for a list of customizable spring
environment variables.

### Common pitfalls

For events to be fired correctly, schemas must exist in the schema registry. If a topic was newly created, but no schema
has been registered for it yet, this application will fail to retrieve the schema for that topic. Check the output of
the application to see if this happens. If you see a message like

```
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject not found.; error code: 40401
```

Then the odds are good that you need to register the schema for this topic.
29 changes: 29 additions & 0 deletions metadata-ingestion-examples/kafka-etl/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
plugins {
id 'org.springframework.boot'
id 'java'
}

dependencies {
compile project(':metadata-utils')
compile project(':metadata-builders')
compile project(':metadata-dao-impl:kafka-producer')
compile project(':metadata-events:mxe-schemas')
compile project(':metadata-ingestion-examples:common')

compile externalDependency.javaxInject
compile externalDependency.kafkaAvroSerde
compile externalDependency.kafkaSerializers
compile externalDependency.lombok
compile externalDependency.springBeans
compile externalDependency.springBootAutoconfigure
compile externalDependency.springCore
compile externalDependency.springKafka

annotationProcessor externalDependency.lombok

runtime externalDependency.logbackClassic
}

bootJar {
mainClassName = 'com.linkedin.metadata.examples.kafka.KafkaEtlApplication'
}
Loading

0 comments on commit 6ece2d6

Please sign in to comment.