Skip to content

Commit b48ef76

Browse files
committed
Add schema and generated java to data model, refactor Gobblin audit processor
1 parent d1e53c7 commit b48ef76

File tree

13 files changed

+194
-29
lines changed

13 files changed

+194
-29
lines changed

gradle/scripts/license.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,8 @@ subprojects {
1212
exclude "**/jython/requests/**"
1313
exclude "**/pyparsing.py"
1414
excludes(["**/*.json", "**/*.avsc", "**/*.avro", "**/*.conf", "**/*.yaml", "**/*.xml"])
15-
excludes(["**/*.txt", "**/*.csv", "**/*.md"])
15+
excludes(["**/*.txt", "**/*.csv", "**/*.md", "**/*.job", "**/*.properties", "**/*.template"])
16+
excludes(["**/com/linkedin/events/**", "**/gobblin/metrics/**"])
1617
}
1718

1819
plugins.withType(PlayPlugin) {

wherehows-common/src/main/java/wherehows/common/utils/StringUtil.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,4 +129,18 @@ public static String toStringReplaceNull(Object obj, String replacement) {
129129
String string = String.valueOf(obj);
130130
return string == null || string.equals("null") ? replacement : string;
131131
}
132+
133+
/**
134+
* Convert a map from object to object to a map from string to string
135+
* Calling String.valueOf on both key and value
136+
* @param map
137+
* @return
138+
*/
139+
public static <T, K> Map<String, String> toStringMap(Map<T, K> map) {
140+
final Map<String, String> newMap = new HashMap<>();
141+
for (Map.Entry<T, K> entry : map.entrySet()) {
142+
newMap.put(String.valueOf(entry.getKey()), String.valueOf(entry.getValue()));
143+
}
144+
return newMap;
145+
}
132146
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/**
2+
* Copyright 2015 LinkedIn Corp. All rights reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
*/
14+
package wherehows.common.utils;
15+
16+
import java.util.HashMap;
17+
import java.util.Map;
18+
import org.testng.Assert;
19+
import org.testng.annotations.Test;
20+
21+
import static wherehows.common.utils.StringUtil.*;
22+
23+
24+
public class StringUtilTest {
25+
26+
@Test
27+
public void testToStringMap() {
28+
CharSequence key = "foo";
29+
Object value = "bar";
30+
31+
Map<CharSequence, Object> map = new HashMap<>();
32+
map.put(key, value);
33+
34+
Map newMap = toStringMap(map);
35+
36+
Assert.assertTrue(newMap.containsKey("foo"));
37+
Assert.assertEquals(newMap.get("foo"), "bar");
38+
}
39+
}

wherehows-data-model/README.md

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
# Wherehows Data Model
2+
The module contains the data model used by WhereHows, including the table DDLs for MySQL DB, Elastic search indices,
3+
and avro schemas for Kafka events. It also includes the auto generated Kafka event java classes, to be used by other modules.
4+
5+
## Code Generation for Avro Schema
6+
The java code here under src/main/java are auto generated by Avro-tool from avro schema (.avsc) of Kafka events.
7+
They should not be edited directly.
8+
9+
Note that we currently require avro version 1.4 in Kafka related tasks.
10+
11+
To generate java code, first download avro-tool-1.4.1, then use command line from wherehows-data-model/:
12+
```
13+
java -jar avro-tools-1.4.1.jar compile schema xxx.avsc src/main/java
14+
```
15+
16+
17+
## Build
18+
```
19+
$ ../gradlew build
20+
21+
BUILD SUCCESSFUL in 0s
22+
4 actionable tasks: 2 executed, 2 up-to-date
23+
```
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
{
2+
"type": "record",
3+
"name": "GobblinTrackingEvent_audit",
4+
"namespace": "gobblin.metrics",
5+
"fields": [
6+
{
7+
"name": "timestamp",
8+
"type": "long",
9+
"doc": "Time at which event was created.",
10+
"default": 0
11+
},
12+
{
13+
"name": "namespace",
14+
"type": [
15+
"string",
16+
"null"
17+
],
18+
"doc": "Namespace used for filtering of events."
19+
},
20+
{
21+
"name": "name",
22+
"type": "string",
23+
"doc": "Event name."
24+
},
25+
{
26+
"name": "metadata",
27+
"type": {
28+
"type": "map",
29+
"values": "string"
30+
},
31+
"doc": "Event metadata.",
32+
"default": {}
33+
}
34+
]
35+
}

wherehows-data-model/build.gradle

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
apply plugin: 'java'
2+
3+
dependencies {
4+
compile 'org.apache.avro:avro:1.4.1'
5+
}
6+
17
sourceSets {
28
main {
39
resources {
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Autogenerated by Avro
3+
*
4+
* DO NOT EDIT DIRECTLY
5+
*/
6+
package gobblin.metrics;
7+
8+
@SuppressWarnings("all")
9+
public class GobblinTrackingEvent_audit extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
10+
public static final org.apache.avro.Schema SCHEMA$ = org.apache.avro.Schema.parse("{\"type\":\"record\",\"name\":\"GobblinTrackingEvent_audit\",\"namespace\":\"gobblin.metrics\",\"fields\":[{\"name\":\"timestamp\",\"type\":\"long\",\"doc\":\"Time at which event was created.\",\"default\":0},{\"name\":\"namespace\",\"type\":[\"string\",\"null\"],\"doc\":\"Namespace used for filtering of events.\"},{\"name\":\"name\",\"type\":\"string\",\"doc\":\"Event name.\"},{\"name\":\"metadata\",\"type\":{\"type\":\"map\",\"values\":\"string\"},\"doc\":\"Event metadata.\",\"default\":{}}]}");
11+
/** Time at which event was created. */
12+
public long timestamp;
13+
/** Namespace used for filtering of events. */
14+
public java.lang.CharSequence namespace;
15+
/** Event name. */
16+
public java.lang.CharSequence name;
17+
/** Event metadata. */
18+
public java.util.Map<java.lang.CharSequence,java.lang.CharSequence> metadata;
19+
public org.apache.avro.Schema getSchema() { return SCHEMA$; }
20+
// Used by DatumWriter. Applications should not call.
21+
public java.lang.Object get(int field$) {
22+
switch (field$) {
23+
case 0: return timestamp;
24+
case 1: return namespace;
25+
case 2: return name;
26+
case 3: return metadata;
27+
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
28+
}
29+
}
30+
// Used by DatumReader. Applications should not call.
31+
@SuppressWarnings(value="unchecked")
32+
public void put(int field$, java.lang.Object value$) {
33+
switch (field$) {
34+
case 0: timestamp = (java.lang.Long)value$; break;
35+
case 1: namespace = (java.lang.CharSequence)value$; break;
36+
case 2: name = (java.lang.CharSequence)value$; break;
37+
case 3: metadata = (java.util.Map<java.lang.CharSequence,java.lang.CharSequence>)value$; break;
38+
default: throw new org.apache.avro.AvroRuntimeException("Bad index");
39+
}
40+
}
41+
}

wherehows-kafka/build.gradle

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,9 @@ apply plugin: 'application'
33
mainClassName = "wherehows.main.ApplicationStart"
44

55
dependencies {
6-
compile project(':wherehows-dao')
76
compile project(':wherehows-common')
7+
compile project(':wherehows-dao')
8+
compile project(':wherehows-data-model')
89
compile externalDependency.jackson_databind
910
compile externalDependency.jackson_core
1011
compile externalDependency.jackson_annotations

wherehows-kafka/src/main/java/wherehows/processors/DummyProcessor.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,10 @@ public DummyProcessor(DaoFactory daoFactory, KafkaProducer<String, IndexedRecord
3131

3232
/**
3333
* Simply print the message content
34-
* @param record IndexedRecord
34+
* @param indexedRecord IndexedRecord
3535
*/
36-
public void process(IndexedRecord record) {
37-
log.info(record.toString());
36+
public void process(IndexedRecord indexedRecord) {
37+
log.info(indexedRecord.toString());
3838
//System.out.println(record.toString());
3939
}
4040
}

wherehows-kafka/src/main/java/wherehows/processors/GobblinTrackingAuditProcessor.java

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,16 @@
1313
*/
1414
package wherehows.processors;
1515

16-
import lombok.RequiredArgsConstructor;
16+
import gobblin.metrics.GobblinTrackingEvent_audit;
1717
import lombok.extern.slf4j.Slf4j;
18-
import org.apache.avro.generic.GenericData;
18+
import org.apache.avro.generic.IndexedRecord;
19+
import org.apache.kafka.clients.producer.KafkaProducer;
20+
import wherehows.dao.DaoFactory;
1921
import wherehows.service.GobblinTrackingAuditService;
2022

2123

2224
@Slf4j
23-
@RequiredArgsConstructor
24-
public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
25+
public class GobblinTrackingAuditProcessor extends KafkaMessageProcessor {
2526

2627
private static final String DALI_LIMITED_RETENTION_AUDITOR = "DaliLimitedRetentionAuditor";
2728
private static final String DALI_AUTOPURGED_AUDITOR = "DaliAutoPurgeAuditor";
@@ -30,26 +31,35 @@ public class GobblinTrackingAuditProcessor extends KafkaConsumerProcessor {
3031

3132
private final GobblinTrackingAuditService gobblinTrackingAuditService;
3233

34+
public GobblinTrackingAuditProcessor(DaoFactory daoFactory, KafkaProducer<String, IndexedRecord> producer) {
35+
super(daoFactory, producer);
36+
gobblinTrackingAuditService =
37+
new GobblinTrackingAuditService(DAO_FACTORY.getDatasetClassificationDao(), DAO_FACTORY.getDictDatasetDao());
38+
}
39+
3340
/**
3441
* Process a Gobblin tracking event audit record
35-
* @param record
36-
* @param topic
37-
* @return null
42+
* @param indexedRecord
3843
* @throws Exception
3944
*/
40-
public void process(GenericData.Record record, String topic) throws Exception {
45+
public void process(IndexedRecord indexedRecord) throws Exception {
4146

42-
if (record == null || record.get("name") == null) {
47+
if (indexedRecord == null || indexedRecord.getClass() != GobblinTrackingEvent_audit.class) {
48+
log.debug("Event record type error");
4349
return;
4450
}
4551

46-
final String name = record.get("name").toString();
52+
GobblinTrackingEvent_audit record = (GobblinTrackingEvent_audit) indexedRecord;
53+
54+
String name = String.valueOf(record.name);
4755
// only handle "DaliLimitedRetentionAuditor","DaliAutoPurgeAuditor" and "DsIgnoreIDPCAuditor"
4856
if (name.equals(DALI_LIMITED_RETENTION_AUDITOR) || name.equals(DALI_AUTOPURGED_AUDITOR) || name.equals(
4957
DS_IGNORE_IDPC_AUDITOR)) {
5058
// TODO: Re-enable this once it's fixed.
5159
} else if (name.equals(METADATA_FILE_CLASSIFIER)) {
5260
gobblinTrackingAuditService.updateHdfsDatasetSchema(record);
61+
} else {
62+
log.info("Gobblin audit message skipped.");
5363
}
5464
}
5565
}

0 commit comments

Comments
 (0)