Skip to content

Commit

Permalink
Rewrite Kafka client master and worker (#731)
Browse files Browse the repository at this point in the history
  • Loading branch information
alyiwang authored Sep 8, 2017
1 parent e0e2acf commit d1e53c7
Show file tree
Hide file tree
Showing 15 changed files with 544 additions and 18 deletions.
6 changes: 3 additions & 3 deletions gradle/scripts/dependency.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,9 @@ ext.externalDependency = [
"akka" : "com.typesafe.akka:akka-actor_2.10:2.3.15",
"spring_jdbc" : "org.springframework:spring-jdbc:4.1.6.RELEASE",

"kafka" : "org.apache.kafka:kafka_2.10:0.10.0.1",
"kafka_clients" : "org.apache.kafka:kafka-clients:0.10.0.1",
"confluent_common_cfg": "io.confluent:common-config:3.0.1",
"kafka" : "org.apache.kafka:kafka_2.10:0.10.2.1",
"kafka_clients" : "org.apache.kafka:kafka-clients:0.10.2.1",
"confluent_avro_serde": "io.confluent:kafka-avro-serializer:3.3.0",

"parquet_avro" : "org.apache.parquet:parquet-avro:1.8.1",

Expand Down
2 changes: 1 addition & 1 deletion wherehows-common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ dependencies {
compile externalDependency.avro
compile externalDependency.kafka
compile externalDependency.kafka_clients
compile externalDependency.confluent_common_cfg
compile externalDependency.confluent_avro_serde
compile externalDependency.jackson_core
compile externalDependency.jackson_databind
compile externalDependency.guava
Expand Down
5 changes: 5 additions & 0 deletions wherehows-common/src/main/java/wherehows/common/Constant.java
Original file line number Diff line number Diff line change
Expand Up @@ -269,4 +269,9 @@ public class Constant {

// ump metrics
public static final String UMP_METRIC_CSV_FILE_KEY = "ump.metric_csv_file";

// kafka client
public static final String KAFKA_CONSUMER_TOPIC_KEY = "kafka.consumer.topic";
public static final String KAFKA_PRODUCER_TOPIC_KEY = "kafka.producer.topic";
public static final String KAFKA_PROCESSOR_KEY = "kafka.processor";
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,30 @@ private static String resolveEnviornmentalVariable(String value) {
return value;
}

private static String jobNameFromFile(File file) {
String filename = file.getName();
/**
* Get job name without file extension
* @param file File
* @return String
*/
public static String jobNameFromFile(File file) {
return jobNameFromFileName(file.getName());
}

/**
* Get job name without file extension
* @param path Path
* @return String
*/
public static String jobNameFromPath(Path path) {
return jobNameFromFileName(path.getFileName().toString());
}

/**
* Get job name without file extension
* @param filename String
* @return String
*/
public static String jobNameFromFileName(String filename) {
return filename.substring(0, filename.lastIndexOf('.'));
}

Expand All @@ -95,6 +117,23 @@ public static Map<String, Properties> getScheduledJobs(String dir) {
return jobs;
}

/**
* Returns a map of job name to job properties which are enabled.
*/
public static Map<String, Properties> getEnabledJobs(String dir) {
Map<String, Properties> jobs = new HashMap<>();
for (File file : new File(dir).listFiles()) {
if (file.getAbsolutePath().endsWith(".job")) {
Properties prop = getResolvedProperties(file.toPath());
if (!prop.containsKey(Constant.JOB_DISABLED_KEY)) {
// job name = file name without the extension.
jobs.put(jobNameFromFile(file), prop);
}
}
}
return jobs;
}

/**
* Returns a map of job name to job properties which are enabled and of certain type.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,7 @@ public void testGetScheduledJobs() throws IOException {
Path path2 = createPropertiesFile(propertyStr2);
Path path3 = createPropertiesFile(propertyStr3);

String filename1 = path1.getFileName().toString();
filename1 = filename1.substring(0, filename1.lastIndexOf('.'));
String filename1 = jobNameFromPath(path1);

String dir = path1.getParent().toString();

Expand All @@ -71,6 +70,30 @@ public void testGetScheduledJobs() throws IOException {
Files.deleteIfExists(path3);
}

@Test
public void testGetEnabledJobs() throws IOException {
String propertyStr1 = "job.class=test\n" + "job.cron.expr=0 0 1 * * ? *\n" + "#job.disabled=1\n" + "job.type=TEST1";
String propertyStr2 = "job.class=test\n" + "#job.disabled=1\n";

Path path1 = createPropertiesFile(propertyStr1);
Path path2 = createPropertiesFile(propertyStr2);

String filename1 = jobNameFromPath(path1);
String filename2 = jobNameFromPath(path2);

String dir = path1.getParent().toString();

Map<String, Properties> jobs = getEnabledJobs(dir);

Assert.assertEquals(jobs.size(), 2);
Assert.assertEquals(jobs.get(filename1).getProperty("job.class"), "test");
Assert.assertEquals(jobs.get(filename1).getProperty("job.disabled", ""), "");
Assert.assertEquals(jobs.get(filename2).getProperty("job.class"), "test");

Files.deleteIfExists(path1);
Files.deleteIfExists(path2);
}

@Test
public void testGetEnabledJobsByType() throws IOException {
String propertyStr1 = "job.class=test\n" + "job.cron.expr=0 0 1 * * ? *\n" + "#job.disabled=1\n" + "job.type=TEST1";
Expand All @@ -79,8 +102,7 @@ public void testGetEnabledJobsByType() throws IOException {
Path path1 = createPropertiesFile(propertyStr1);
Path path2 = createPropertiesFile(propertyStr2);

String filename1 = path1.getFileName().toString();
filename1 = filename1.substring(0, filename1.lastIndexOf('.'));
String filename1 = jobNameFromPath(path1);

String dir = path1.getParent().toString();

Expand Down
22 changes: 18 additions & 4 deletions wherehows-kafka/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,23 @@ dependencies {
compile externalDependency.mysql
compile externalDependency.guava
compile externalDependency.slf4j_simple

compile externalDependency.kafka_clients
compile externalDependency.confluent_avro_serde

testCompile externalDependency.testng
}

configurations.all {
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'log4j'

resolutionStrategy {
force 'org.apache.avro:avro:1.4.1'
}
}

configurations {
all*.exclude group: 'org.slf4j', module: 'slf4j-log4j12'
all*.exclude group: 'log4j'
}
test {
// enable TestNG support (default is JUnit)
useTestNG()
}
131 changes: 131 additions & 0 deletions wherehows-kafka/src/main/java/wherehows/actors/KafkaClientMaster.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.actors;

import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import java.lang.reflect.Constructor;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import wherehows.common.Constant;
import wherehows.common.utils.JobsUtil;
import wherehows.dao.DaoFactory;
import wherehows.msgs.KafkaCommMsg;
import wherehows.processors.KafkaMessageProcessor;

import static wherehows.main.ApplicationStart.DAO_FACTORY;
import static wherehows.utils.KafkaClientUtil.*;


@Slf4j
public class KafkaClientMaster extends UntypedActor {

private final String KAFKA_JOB_DIR;

// Map of kafka job name to properties
private static Map<String, Properties> _kafkaJobList;

// Map of kafka job name to worker actor
private static Map<String, ActorRef> _kafkaWorkers = new HashMap<>();

public KafkaClientMaster(String kafkaJobDir) {
this.KAFKA_JOB_DIR = kafkaJobDir;
}

@Override
public void preStart() throws Exception {

_kafkaJobList = JobsUtil.getEnabledJobs(KAFKA_JOB_DIR);
log.info("Kafka jobs: {}", _kafkaJobList.keySet());

if (_kafkaJobList.size() == 0) {
context().stop(getSelf());
return;
}
log.info("Starting KafkaClientMaster...");

for (Map.Entry<String, Properties> entry : _kafkaJobList.entrySet()) {
// handle one kafka topic
final String kafkaJobName = entry.getKey();
final Properties props = entry.getValue();
log.info("Create Kafka client with config: " + props);
try {
final String consumerTopic = props.getProperty(Constant.KAFKA_CONSUMER_TOPIC_KEY, null);
final String processor = props.getProperty(Constant.KAFKA_PROCESSOR_KEY, null);
final String producerTopic = props.getProperty(Constant.KAFKA_PRODUCER_TOPIC_KEY, null);

if (consumerTopic == null || processor == null) {
log.error("Missing required configs for: " + kafkaJobName);
continue;
}

// create consumer
Properties consumerProp = getPropertyTrimPrefix(props, "consumer");

KafkaConsumer<String, IndexedRecord> consumer = getConsumer(consumerProp);
consumer.subscribe(Collections.singletonList(consumerTopic));

// create producer if configured
KafkaProducer<String, IndexedRecord> producer = null;
if (producerTopic != null) {
Properties producerProp = getPropertyTrimPrefix(props, "producer");

producer = getProducer(producerProp);
}

// get processor instance
Class processorClass = Class.forName(processor);
Constructor<?> ctor = processorClass.getConstructor(DaoFactory.class, KafkaProducer.class);
KafkaMessageProcessor processorInstance = (KafkaMessageProcessor) ctor.newInstance(DAO_FACTORY, producer);

// create worker
ActorRef worker =
getContext().actorOf(Props.create(KafkaWorker.class, consumerTopic, consumer, processorInstance));

_kafkaWorkers.put(consumerTopic, worker);
worker.tell("ApplicationStart", getSelf());

log.info("Started Kafka job: " + kafkaJobName + " listening to topic: " + consumerTopic);
} catch (Exception e) {
log.error("Error starting Kafka job: " + kafkaJobName, e);
}
}
}

@Override
public void onReceive(Object message) throws Exception {
if (message instanceof KafkaCommMsg) {
final KafkaCommMsg kafkaCommMsg = (KafkaCommMsg) message;
log.debug(kafkaCommMsg.toString());
} else {
unhandled(message);
}
}

@Override
public void postStop() throws Exception {
log.info("Terminating KafkaClientMaster...");
KafkaWorker.RUNNING = false;
for (ActorRef worker : _kafkaWorkers.values()) {
getContext().stop(worker);
}
}
}
96 changes: 96 additions & 0 deletions wherehows-kafka/src/main/java/wherehows/actors/KafkaWorker.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.actors;

import akka.actor.UntypedActor;
import lombok.extern.slf4j.Slf4j;
import org.apache.avro.generic.IndexedRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import wherehows.processors.KafkaMessageProcessor;


/**
* Akka actor for listening to a Kafka topic and waiting for messages.
*/
@Slf4j
public class KafkaWorker extends UntypedActor {

public static boolean RUNNING = true;

private final String _topic;

private final KafkaConsumer<String, IndexedRecord> _consumer;

private final KafkaMessageProcessor _processor;

private final int _consumer_poll_interval = 1000;

private int _receivedRecordCount;

public KafkaWorker(String topic, KafkaConsumer<String, IndexedRecord> consumer, KafkaMessageProcessor processor) {
this._topic = topic;
this._consumer = consumer;
this._processor = processor;
this._receivedRecordCount = 0; // number of received kafka messages
}

@Override
public void onReceive(Object message) throws Exception {
if (message.equals("ApplicationStart")) {
try {
while (RUNNING) {
ConsumerRecords<String, IndexedRecord> records = _consumer.poll(_consumer_poll_interval);
for (ConsumerRecord<String, IndexedRecord> record : records) {
_receivedRecordCount++;

try {
_processor.process(record.value());
} catch (Exception e) {
log.error("Processor Error: ", e);
}

if (_receivedRecordCount % 1000 == 0) {
log.debug(_topic + " received " + _receivedRecordCount);
}
}

_consumer.commitSync();
}
} catch (Exception e) {
log.error("Consumer Error ", e);
} finally {
log.info("Shutting down consumer and worker for topic: " + _topic);
try {
_consumer.close();
} catch (Exception e) {
log.error("Error closing consumer for topic " + _topic + " : " + e.toString());
}
}
} else {
unhandled(message);
}
}

@Override
public void postStop() throws Exception {
log.info("Stop worker for topic: " + _topic);
try {
_consumer.close();
} catch (Exception e) {
log.error("Error closing consumer for topic " + _topic + " : " + e.toString());
}
}
}
Loading

0 comments on commit d1e53c7

Please sign in to comment.