Skip to content

Commit

Permalink
Merge pull request #693 from hangzhang925/master
Browse files Browse the repository at this point in the history
Refactor KAFKA processor to wherehows-kafka module
  • Loading branch information
Hang Zhang(hzhang2) authored Aug 23, 2017
2 parents da41eba + 4b57ab8 commit 03d533e
Show file tree
Hide file tree
Showing 42 changed files with 2,831 additions and 13 deletions.
4 changes: 4 additions & 0 deletions gradle/scripts/dependency.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,9 @@ ext.externalDependency = [

"hibernate_core" : "org.hibernate:hibernate-core:5.2.5.Final",
"hibernate_hikaricp" : "org.hibernate:hibernate-hikaricp:5.2.5.Final",

"lombok" : "org.projectlombok:lombok:1.16.18",
"slf4j_simple" : "org.slf4j:slf4j-simple:1.7.25",

"guava" : "com.google.guava:guava:23.0",
]
1 change: 1 addition & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ def modules = [
"wherehows-etl",
"wherehows-frontend",
"wherehows-hadoop",
"wherehows-kafka",
"wherehows-web",
]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ private void updateHdfsDatasetSchema(GenericData.Record record) throws Exception
private void updateDatasetClassificationResult(String urn, String classificationResult) {
try {
DatasetClassification record = new DatasetClassification(urn, classificationResult, new Date());
datasetClassificationDao.updateDatasetClassification(record);
datasetClassificationDao.update(record);
} catch (Exception e) {
logger.info("unable to update classification result due to {}", e.getMessage());
}
Expand Down
37 changes: 37 additions & 0 deletions wherehows-dao/src/main/java/wherehows/dao/BaseDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.dao;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;


public class BaseDao {

final EntityManagerFactory entityManagerFactory;

public BaseDao(EntityManagerFactory factory) {
this.entityManagerFactory = factory;
}

public void update(Object record) {
EntityManager entityManager = entityManagerFactory.createEntityManager();
entityManager.getTransaction().begin();
entityManager.merge(record);
entityManager.getTransaction().commit();
entityManager.close();
}


}
47 changes: 47 additions & 0 deletions wherehows-dao/src/main/java/wherehows/dao/ClusterInfoDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.dao;

import java.util.List;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;
import lombok.SneakyThrows;
import wherehows.models.ClusterInfo;


public class ClusterInfoDao extends BaseDao {

public ClusterInfoDao(EntityManagerFactory factory) {
super(factory);
}

@SneakyThrows
public List<ClusterInfo> findAll() {
EntityManager entityManager = entityManagerFactory.createEntityManager();
CriteriaBuilder cb = entityManager.getCriteriaBuilder();
CriteriaQuery<ClusterInfo> criteria = cb.createQuery(ClusterInfo.class);
Root<ClusterInfo> entityRoot = criteria.from(ClusterInfo.class);
criteria.select(entityRoot);
try {
return entityManager.createQuery(criteria)
.getResultList();
} finally {
entityManager.close();
}

}
}
17 changes: 17 additions & 0 deletions wherehows-dao/src/main/java/wherehows/dao/DaoFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package wherehows.dao;

import javax.persistence.EntityManagerFactory;
import wherehows.models.DictDataset;


public class DaoFactory {
Expand All @@ -35,4 +36,20 @@ public DaoFactory(EntityManagerFactory entityManagerFactory) {
public DatasetClassificationDao getDatasetClassificationDao() {
return new DatasetClassificationDao(entityManagerFactory);
}

public DictDatasetDao getDictDatasetDao() {
return new DictDatasetDao(entityManagerFactory);
}

public FieldDetailDao getDictFieldDetailDao() {
return new FieldDetailDao(entityManagerFactory);
}

public DatasetSchemaInfoDao getDatasetSchemaInfoDao() {
return new DatasetSchemaInfoDao(entityManagerFactory);
}

public ClusterInfoDao getClusterInfoDao() {
return new ClusterInfoDao(entityManagerFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,10 @@
import wherehows.models.DatasetClassification;


public class DatasetClassificationDao {

private final EntityManagerFactory entityManagerFactory;
public class DatasetClassificationDao extends BaseDao {

public DatasetClassificationDao(EntityManagerFactory factory) {
this.entityManagerFactory = factory;
}

public void updateDatasetClassification(DatasetClassification record) {
EntityManager entityManager = entityManagerFactory.createEntityManager();
entityManager.getTransaction().begin();
entityManager.merge(record);
entityManager.getTransaction().commit();
entityManager.close();
super(factory);
}

public DatasetClassification getDatasetClassification(String urn) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.dao;

import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;
import lombok.SneakyThrows;
import wherehows.models.DatasetSchemaInfo;


public class DatasetSchemaInfoDao extends BaseDao {

public DatasetSchemaInfoDao(EntityManagerFactory factory) {
super(factory);
}

@SneakyThrows
public DatasetSchemaInfo findById(int datasetId) {
EntityManager entityManager = entityManagerFactory.createEntityManager();
CriteriaBuilder cb = entityManager.getCriteriaBuilder();
CriteriaQuery<DatasetSchemaInfo> criteria = cb.createQuery(DatasetSchemaInfo.class);
Root<DatasetSchemaInfo> entityRoot = criteria.from(DatasetSchemaInfo.class);
criteria.select(entityRoot).where(cb.equal(entityRoot.get("dataset_id"), datasetId));

try {
return entityManager.createQuery(criteria).getSingleResult();
} finally {
entityManager.close();
}
}
}
61 changes: 61 additions & 0 deletions wherehows-dao/src/main/java/wherehows/dao/DictDatasetDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.dao;

import java.util.Objects;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;
import lombok.SneakyThrows;
import wherehows.models.DictDataset;


public class DictDatasetDao extends BaseDao {

public DictDatasetDao(EntityManagerFactory factory) {
super(factory);
}

@SneakyThrows
public DictDataset findByUrn(String urn) {
EntityManager entityManager = entityManagerFactory.createEntityManager();
CriteriaBuilder cb = entityManager.getCriteriaBuilder();
CriteriaQuery<DictDataset> criteria = cb.createQuery(DictDataset.class);
Root<DictDataset> entityRoot = criteria.from(DictDataset.class);
criteria.select(entityRoot);
criteria.where(cb.equal(entityRoot.get("urn"), urn));
try {
return entityManager.createQuery(criteria).getSingleResult();
} finally {
entityManager.close();
}
}

@SneakyThrows
public DictDataset findById(int datasetId) {
EntityManager entityManager = entityManagerFactory.createEntityManager();
CriteriaBuilder cb = entityManager.getCriteriaBuilder();
CriteriaQuery<DictDataset> criteria = cb.createQuery(DictDataset.class);
Root<DictDataset> entityRoot = criteria.from(DictDataset.class);
criteria.select(entityRoot);
criteria.where(cb.equal(entityRoot.get("dataset_id"), datasetId));
try {
return entityManager.createQuery(criteria).getSingleResult();
} finally {
entityManager.close();
}
}
}
59 changes: 59 additions & 0 deletions wherehows-dao/src/main/java/wherehows/dao/FieldDetailDao.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.dao;

import java.util.List;
import java.util.Objects;
import javax.persistence.EntityManager;
import javax.persistence.EntityManagerFactory;
import javax.persistence.criteria.CriteriaBuilder;
import javax.persistence.criteria.CriteriaQuery;
import javax.persistence.criteria.Root;
import lombok.SneakyThrows;
import wherehows.models.DictFieldDetail;


public class FieldDetailDao extends BaseDao {

private static final String DELETE_BY_DATASET_ID = "DELETE FROM dict_field_detail WHERE dataset_id = :datasetId";

public FieldDetailDao(EntityManagerFactory factory) {
super(factory);
}

@SneakyThrows
public List<DictFieldDetail> findById(int datasetId) {
EntityManager entityManager = entityManagerFactory.createEntityManager();
CriteriaBuilder cb = entityManager.getCriteriaBuilder();
CriteriaQuery<DictFieldDetail> criteria = cb.createQuery(DictFieldDetail.class);
Root<DictFieldDetail> entityRoot = criteria.from(DictFieldDetail.class);
criteria.select(entityRoot);
criteria.where(cb.equal(entityRoot.get("dataset_Id"), datasetId));
try {
return entityManager.createQuery(criteria).getResultList();
} finally {
entityManager.close();
}
}

@SneakyThrows
public void deleteByDatasetId(int datasetId) {
EntityManager entityManager = entityManagerFactory.createEntityManager();
try {
entityManager.createQuery(DELETE_BY_DATASET_ID).executeUpdate();
} finally {
entityManager.close();
}
}
}
57 changes: 57 additions & 0 deletions wherehows-dao/src/main/java/wherehows/models/ClusterInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package wherehows.models;

import java.util.Date;
import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.Id;
import javax.persistence.Table;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;


@Data
@Entity
@Table(name = "cfg_cluster")
@NoArgsConstructor
@AllArgsConstructor
public class ClusterInfo {

@Id
@Column(name = "cluster_id")
int clusterId;

@Column(name = "cluster_code")
String clusterCode;

@Column(name = "cluster_short_name")
String clusterShortName;

@Column(name = "cluster_type")
String clusterType;

@Column(name = "deployment_tier_code")
String deploymentTierCode;

@Column(name = "data_center_code")
String datacenterCode;

@Column(name = "last_modified")
Date lastModified;

@Column(name = "description")
String description;
}
Loading

0 comments on commit 03d533e

Please sign in to comment.