Skip to content

Commit

Permalink
Add schema and generated java to data model, refactor Gobblin audit p…
Browse files Browse the repository at this point in the history
…rocessor
  • Loading branch information
alyiwang committed Sep 11, 2017
1 parent 6e87040 commit 1b553b4
Show file tree
Hide file tree
Showing 13 changed files with 162 additions and 35 deletions.
3 changes: 2 additions & 1 deletion gradle/scripts/license.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ subprojects {
exclude "**/jython/requests/**"
exclude "**/pyparsing.py"
excludes(["**/*.json", "**/*.avsc", "**/*.avro", "**/*.conf", "**/*.yaml", "**/*.xml"])
excludes(["**/*.txt", "**/*.csv", "**/*.md"])
excludes(["**/*.txt", "**/*.csv", "**/*.md", "**/*.job", "**/*.properties", "**/*.template"])
excludes(["**/com/linkedin/events/**", "**/gobblin/metrics/**"])
}

plugins.withType(PlayPlugin) {
Expand Down
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ modules.each { module ->
include "${module}"
}

gradle.ext.appBuildEnvironment = "opensource"
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,18 @@ public static String toStringReplaceNull(Object obj, String replacement) {
String string = String.valueOf(obj);
return string == null || string.equals("null") ? replacement : string;
}

/**
* Convert a map from object to object to a map from string to string
* Calling String.valueOf on both key and value
* @param map
* @return
*/
public static <T, K> Map<String, String> toStringMap(Map<T, K> map) {
final Map<String, String> newMap = new HashMap<>();
for (Map.Entry<T, K> entry : map.entrySet()) {
newMap.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
}
return newMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.common.utils;

import java.util.HashMap;
import java.util.Map;
import org.testng.Assert;
import org.testng.annotations.Test;

import static wherehows.common.utils.StringUtil.*;


public class StringUtilTest {

@Test
public void testToStringMap() {
CharSequence key = "foo";
Object value = "bar";

Map<CharSequence, Object> map = new HashMap<>();
map.put(key, value);

Map newMap = toStringMap(map);

Assert.assertTrue(newMap.containsKey("foo"));
Assert.assertEquals(newMap.get("foo"), "bar");
}
}
23 changes: 23 additions & 0 deletions wherehows-data-model/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
# Wherehows Data Model
The module contains the data model used by WhereHows, including the table DDLs for MySQL DB, Elastic search indices,
and avro schemas for Kafka events. It also includes the auto generated Kafka event java classes, to be used by other modules.

## Code Generation for Avro Schema
The java code here under src/main/java are auto generated by Avro-tool from avro schema (.avsc) of Kafka events.
They should not be edited directly.

Note that we currently require avro version 1.4 in Kafka related tasks.

To generate java code, first download avro-tool-1.4.1, then use command line from wherehows-data-model/:
```
java -jar avro-tools-1.4.1.jar compile schema xxx.avsc src/main/java
```


## Build
```
$ ../gradlew build
BUILD SUCCESSFUL in 0s
4 actionable tasks: 2 executed, 2 up-to-date
```
21 changes: 15 additions & 6 deletions wherehows-data-model/build.gradle
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
sourceSets {
main {
resources {
srcDir 'DDL'
srcDir 'ELASTICSEARCH'
}
plugins {
id "com.commercehub.gradle.plugin.avro" version "0.8.0" apply false
}

if (gradle.appBuildEnvironment == "opensource") {
apply plugin: "com.commercehub.gradle.plugin.avro"

avro {
fieldVisibility = "PUBLIC"
}
}

apply plugin: 'java'

dependencies {
compile externalDependency.avro
}
35 changes: 35 additions & 0 deletions wherehows-data-model/src/main/avro/GobblinTrackingEvent_audit.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
{
"type": "record",
"name": "GobblinTrackingEvent_audit",
"namespace": "gobblin.metrics",
"fields": [
{
"name": "timestamp",
"type": "long",
"doc": "Time at which event was created.",
"default": 0
},
{
"name": "namespace",
"type": [
"string",
"null"
],
"doc": "Namespace used for filtering of events."
},
{
"name": "name",
"type": "string",
"doc": "Event name."
},
{
"name": "metadata",
"type": {
"type": "map",
"values": "string"
},
"doc": "Event metadata.",
"default": {}
}
]
}
3 changes: 2 additions & 1 deletion wherehows-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@ apply plugin: 'application'
mainClassName = "wherehows.main.ApplicationStart"

dependencies {
compile project(':wherehows-dao')
compile project(':wherehows-common')
compile project(':wherehows-dao')
compile project(':wherehows-data-model')
compile externalDependency.jackson_databind
compile externalDependency.jackson_core
compile externalDependency.jackson_annotations
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ public DummyProcessor(DaoFactory daoFactory, KafkaProducer<String, IndexedRecord

/**
* Simply print the message content
* @param record IndexedRecord
* @param indexedRecord IndexedRecord
*/
public void process(IndexedRecord record) {
log.info(record.toString());
public void process(IndexedRecord indexedRecord) {
log.info(indexedRecord.toString());
//System.out.println(record.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,16 @@
*/
package wherehows.processors;

import lombok.RequiredArgsConstructor;
import gobblin.metrics.GobblinTrackingEvent_audit;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.producer.KafkaProducer;
import wherehows.dao.DaoFactory;
import wherehows.service.GobblinTrackingAuditService;


@Slf4j
@RequiredArgsConstructor
public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
public class GobblinTrackingAuditProcessor extends KafkaMessageProcessor {

private static final String DALI_LIMITED_RETENTION_AUDITOR = "DaliLimitedRetentionAuditor";
private static final String DALI_AUTOPURGED_AUDITOR = "DaliAutoPurgeAuditor";
Expand All @@ -30,26 +31,35 @@ public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {

private final GobblinTrackingAuditService gobblinTrackingAuditService;

public GobblinTrackingAuditProcessor(DaoFactory daoFactory, KafkaProducer<String, IndexedRecord> producer) {
super(daoFactory, producer);
gobblinTrackingAuditService =
new GobblinTrackingAuditService(DAO_FACTORY.getDatasetClassificationDao(), DAO_FACTORY.getDictDatasetDao());
}

/**
* Process a Gobblin tracking event audit record
* @param record
* @param topic
* @return null
* @param indexedRecord
* @throws Exception
*/
public void process(GenericData.Record record, String topic) throws Exception {
public void process(IndexedRecord indexedRecord) throws Exception {

if (record == null || record.get("name") == null) {
if (indexedRecord == null || indexedRecord.getClass() != GobblinTrackingEvent_audit.class) {
log.debug("Event record type error");
return;
}

final String name = record.get("name").toString();
GobblinTrackingEvent_audit record = (GobblinTrackingEvent_audit) indexedRecord;

String name = String.valueOf(record.name);
// only handle "DaliLimitedRetentionAuditor","DaliAutoPurgeAuditor" and "DsIgnoreIDPCAuditor"
if (name.equals(DALI_LIMITED_RETENTION_AUDITOR) || name.equals(DALI_AUTOPURGED_AUDITOR) || name.equals(
DS_IGNORE_IDPC_AUDITOR)) {
// TODO: Re-enable this once it's fixed.
} else if (name.equals(METADATA_FILE_CLASSIFIER)) {
gobblinTrackingAuditService.updateHdfsDatasetSchema(record);
} else {
log.info("Gobblin audit message skipped.");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public KafkaMessageProcessor(DaoFactory daoFactory, KafkaProducer<String, Indexe

/**
* Abstract method 'process' to be implemented by specific processor
* @param record IndexedRecord
* @param indexedRecord IndexedRecord
* @throws Exception
*/
public abstract void process(IndexedRecord record) throws Exception;
public abstract void process(IndexedRecord indexedRecord) throws Exception;

}
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@
public class ProcessorFactory {
private final DaoFactory daoFactory;

public GobblinTrackingAuditProcessor getGobblinTrackingAuditProcessor() {
GobblinTrackingAuditService service =
new GobblinTrackingAuditService(daoFactory.getDatasetClassificationDao(), daoFactory.getDictDatasetDao());
return new GobblinTrackingAuditProcessor(service);
}

public JobExecutionLineageProcessor getJobExecutionLineageProcessor() {
return new JobExecutionLineageProcessor(new JobExecutionLineageService());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableList;
import gobblin.metrics.GobblinTrackingEvent_audit;
import java.util.Date;
import java.util.List;
import java.util.Map;
Expand All @@ -24,13 +25,12 @@
import javax.persistence.NoResultException;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.GenericData;
import org.apache.commons.lang3.StringUtils;
import wherehows.common.utils.StringUtil;
import wherehows.dao.table.DatasetClassificationDao;
import wherehows.dao.table.DictDatasetDao;
import wherehows.models.table.DatasetClassification;
import wherehows.models.table.DictDataset;
import wherehows.utils.StringUtil;


@Slf4j
Expand Down Expand Up @@ -66,9 +66,9 @@ public class GobblinTrackingAuditService {
.add(Pattern.compile("/output/"))
.build();

public void updateHdfsDatasetSchema(GenericData.Record record) throws Exception {
Long timestamp = (Long) record.get("timestamp");
Map<String, String> metadata = StringUtil.convertObjectMapToStringMap(record.get("metadata"));
public void updateHdfsDatasetSchema(GobblinTrackingEvent_audit record) throws Exception {
Long timestamp = record.timestamp;
Map<String, String> metadata = StringUtil.toStringMap(record.metadata);

String datasetName = metadata.get("dataset");
if (StringUtils.isEmpty(datasetName) || isDatasetNameBlacklisted(datasetName)) {
Expand Down Expand Up @@ -155,7 +155,7 @@ private String getParentName(String datasetName) {
return "";
}

//TODO the retuen time should be timeStamp
//TODO the return time should be timeStamp
private int getsourceModifiedTime(String hdfsModifiedTime) {
long result = Long.parseLong(hdfsModifiedTime) / 1000;
if (hdfsModifiedTime == null || result > Integer.MAX_VALUE) {
Expand Down

0 comments on commit 1b553b4

Please sign in to comment.