Skip to content

Commit

Permalink
Add schema and generated java to data model, refactor Gobblin audit p…
Browse files Browse the repository at this point in the history
…rocessor
  • Loading branch information
alyiwang committed Sep 11, 2017
1 parent 6e87040 commit 2502f0d
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 36 deletions.
3 changes: 2 additions & 1 deletion gradle/scripts/license.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ subprojects {
exclude "**/jython/requests/**"
exclude "**/pyparsing.py"
excludes(["**/*.json", "**/*.avsc", "**/*.avro", "**/*.conf", "**/*.yaml", "**/*.xml"])
excludes(["**/*.txt", "**/*.csv", "**/*.md"])
excludes(["**/*.txt", "**/*.csv", "**/*.md", "**/*.job", "**/*.properties", "**/*.template"])
excludes(["**/com/linkedin/events/**", "**/gobblin/metrics/**"])
}

plugins.withType(PlayPlugin) {
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ modules.each { module ->
include "${module}"
}

gradle.ext.appBuildEnvironment = "OpenSource"
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,18 @@ public static String toStringReplaceNull(Object obj, String replacement) {
String string = String.valueOf(obj);
return string == null || string.equals("null") ? replacement : string;
}

/**
* Convert a map from object to object to a map from string to string
* Calling String.valueOf on both key and value
* @param map
* @return
*/
public static <T, K> Map<String, String> toStringMap(Map<T, K> map) {
final Map<String, String> newMap = new HashMap<>();
for (Map.Entry<T, K> entry : map.entrySet()) {
newMap.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
}
return newMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.utils;

import java.util.HashMap;
import java.util.Map;
import org.testng.Assert;
import org.testng.annotations.Test;

import static wherehows.common.utils.StringUtil.*;


public class StringUtilTest {

@Test
public void testToStringMap() {
CharSequence key = "foo";
Object value = "bar";

Map<CharSequence, Object> map = new HashMap<>();
map.put(key, value);

Map newMap = toStringMap(map);

Assert.assertTrue(newMap.containsKey("foo"));
Assert.assertEquals(newMap.get("foo"), "bar");
}
}
23 changes: 23 additions & 0 deletions wherehows-data-model/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Wherehows Data Model
The module contains the data model used by WhereHows, including the table DDLs for MySQL DB, Elastic search indices,
and avro schemas for Kafka events. It also includes the auto generated Kafka event java classes, to be used by other modules.

## Code Generation for Avro Schema
The java code here under src/main/java are auto generated by Avro-tool from avro schema (.avsc) of Kafka events.
They should not be edited directly.

Note that we currently require avro version 1.4 in Kafka related tasks.

To generate java code, first download avro-tool-1.4.1, then use command line from wherehows-data-model/:
```
java -jar avro-tools-1.4.1.jar compile schema xxx.avsc src/main/java
```


## Build
```
$ ../gradlew build
BUILD SUCCESSFUL in 0s
4 actionable tasks: 2 executed, 2 up-to-date
```
35 changes: 35 additions & 0 deletions wherehows-data-model/avro/GobblinTrackingEvent_audit.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"type": "record",
"name": "GobblinTrackingEvent_audit",
"namespace": "gobblin.metrics",
"fields": [
{
"name": "timestamp",
"type": "long",
"doc": "Time at which event was created.",
"default": 0
},
{
"name": "namespace",
"type": [
"string",
"null"
],
"doc": "Namespace used for filtering of events."
},
{
"name": "name",
"type": "string",
"doc": "Event name."
},
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
},
"doc": "Event metadata.",
"default": {}
}
]
}
62 changes: 61 additions & 1 deletion wherehows-data-model/avro/MetadataChangeEvent.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -1397,7 +1397,67 @@
]
}
],
"doc": "Describe the purgeable entity fields"
"doc": "Human-entered compliance metadata"
},
{
"name": "suggestedCompliancePolicy",
"type": [
"null",
{
"type": "record",
"name": "SuggestedCompliancePolicy",
"fields": [
{
"name": "suggestedFieldClassifications",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "SuggestedFieldClassification",
"fields": [
{
"name": "suggestion",
"type": "ComplianceEntity",
"doc": "Suggestion for the field level compliance metadata."
},
{
"name": "confidenceLevel",
"type": "float",
"doc": "The confidence level for the suggestion."
}
]
}
},
"doc": "A list of suggested field-level compliance metadata."
},
{
"name": "suggestedDatasetClassification",
"type": {
"type": "map",
"values": {
"type": "record",
"name": "SuggestedDatasetClassification",
"fields": [
{
"name": "isContaining",
"type": "boolean",
"doc": "Whether the dataset contains the specific kind of data."
},
{
"name": "confidenceLevel",
"type": "float",
"doc": "The confidence level for the suggestion."
}
]
}
},
"doc": "A map of suggested dataset-level compliance metadata, where the key is the field name of DatasetClassification, and the value is the corresponding suggestion for that field."
}
]
}
],
"doc": "Machine-suggested compliance metadata",
"default": null
}
]
}
24 changes: 18 additions & 6 deletions wherehows-data-model/build.gradle
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
sourceSets {
main {
resources {
srcDir 'DDL'
srcDir 'ELASTICSEARCH'
}
plugins {
id "com.commercehub.gradle.plugin.avro" version "0.8.0" apply false
}

println gradle.appBuildEnvironment

if ("OpenSource" == gradle.appBuildEnvironment) {
println "apply gradle.plugin.avro"
apply plugin: "com.commercehub.gradle.plugin.avro"

avro {
fieldVisibility = "PUBLIC"
}
}

apply plugin: 'java'

dependencies {
compile externalDependency.avro
}
35 changes: 35 additions & 0 deletions wherehows-data-model/src/main/avro/GobblinTrackingEvent_audit.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"type": "record",
"name": "GobblinTrackingEvent_audit",
"namespace": "gobblin.metrics",
"fields": [
{
"name": "timestamp",
"type": "long",
"doc": "Time at which event was created.",
"default": 0
},
{
"name": "namespace",
"type": [
"string",
"null"
],
"doc": "Namespace used for filtering of events."
},
{
"name": "name",
"type": "string",
"doc": "Event name."
},
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
},
"doc": "Event metadata.",
"default": {}
}
]
}
3 changes: 2 additions & 1 deletion wherehows-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ apply plugin: 'application'
mainClassName = "wherehows.main.ApplicationStart"

dependencies {
compile project(':wherehows-dao')
compile project(':wherehows-common')
compile project(':wherehows-dao')
compile project(':wherehows-data-model')
compile externalDependency.jackson_databind
compile externalDependency.jackson_core
compile externalDependency.jackson_annotations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ public DummyProcessor(DaoFactory daoFactory, KafkaProducer<String, IndexedRecord

/**
* Simply print the message content
* @param record IndexedRecord
* @param indexedRecord IndexedRecord
*/
public void process(IndexedRecord record) {
log.info(record.toString());
public void process(IndexedRecord indexedRecord) {
log.info(indexedRecord.toString());
//System.out.println(record.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@
*/
package wherehows.processors;

import lombok.RequiredArgsConstructor;
import gobblin.metrics.GobblinTrackingEvent_audit;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import wherehows.dao.DaoFactory;
import wherehows.service.GobblinTrackingAuditService;


@Slf4j
@RequiredArgsConstructor
public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
public class GobblinTrackingAuditProcessor extends KafkaMessageProcessor {

private static final String DALI_LIMITED_RETENTION_AUDITOR = "DaliLimitedRetentionAuditor";
private static final String DALI_AUTOPURGED_AUDITOR = "DaliAutoPurgeAuditor";
Expand All @@ -30,26 +31,35 @@ public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {

private final GobblinTrackingAuditService gobblinTrackingAuditService;

public GobblinTrackingAuditProcessor(DaoFactory daoFactory, KafkaProducer<String, IndexedRecord> producer) {
super(daoFactory, producer);
gobblinTrackingAuditService =
new GobblinTrackingAuditService(DAO_FACTORY.getDatasetClassificationDao(), DAO_FACTORY.getDictDatasetDao());
}

/**
* Process a Gobblin tracking event audit record
* @param record
* @param topic
* @return null
* @param indexedRecord
* @throws Exception
*/
public void process(GenericData.Record record, String topic) throws Exception {
public void process(IndexedRecord indexedRecord) throws Exception {

if (record == null || record.get("name") == null) {
if (indexedRecord == null || indexedRecord.getClass() != GobblinTrackingEvent_audit.class) {
log.debug("Event record type error");
return;
}

final String name = record.get("name").toString();
GobblinTrackingEvent_audit record = (GobblinTrackingEvent_audit) indexedRecord;

String name = String.valueOf(record.name);
// only handle "DaliLimitedRetentionAuditor","DaliAutoPurgeAuditor" and "DsIgnoreIDPCAuditor"
if (name.equals(DALI_LIMITED_RETENTION_AUDITOR) || name.equals(DALI_AUTOPURGED_AUDITOR) || name.equals(
DS_IGNORE_IDPC_AUDITOR)) {
// TODO: Re-enable this once it's fixed.
} else if (name.equals(METADATA_FILE_CLASSIFIER)) {
gobblinTrackingAuditService.updateHdfsDatasetSchema(record);
} else {
log.info("Gobblin audit message skipped.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public KafkaMessageProcessor(DaoFactory daoFactory, KafkaProducer<String, Indexe

/**
* Abstract method 'process' to be implemented by specific processor
* @param record IndexedRecord
* @param indexedRecord IndexedRecord
* @throws Exception
*/
public abstract void process(IndexedRecord record) throws Exception;
public abstract void process(IndexedRecord indexedRecord) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@
public class ProcessorFactory {
private final DaoFactory daoFactory;

public GobblinTrackingAuditProcessor getGobblinTrackingAuditProcessor() {
GobblinTrackingAuditService service =
new GobblinTrackingAuditService(daoFactory.getDatasetClassificationDao(), daoFactory.getDictDatasetDao());
return new GobblinTrackingAuditProcessor(service);
}

public JobExecutionLineageProcessor getJobExecutionLineageProcessor() {
return new JobExecutionLineageProcessor(new JobExecutionLineageService());
}
Expand Down
Loading

0 comments on commit 2502f0d

Please sign in to comment.