Skip to content

Commit 9eb7f14

Browse files
committed
Merge pull request #7 from czbernard/linkedin
changes to gitorious crawler and put close connection into finally block in jythons
2 parents 8bc741d + 9ebe0b2 commit 9eb7f14

File tree

19 files changed

+733
-698
lines changed

19 files changed

+733
-698
lines changed

metadata-etl/src/main/java/metadata/etl/git/GitMetadataEtl.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import java.io.File;
1717
import java.io.InputStream;
1818
import java.util.List;
19+
import java.util.Map;
1920
import java.util.Properties;
2021
import metadata.etl.EtlJob;
2122
import org.slf4j.Logger;
@@ -56,13 +57,13 @@ public void extract() throws Exception {
5657
}
5758
FileWriter fw = new FileWriter(localDir + "/" + COMMIT_OUTPUT_FILE);
5859
for (String project : projects) {
59-
List<String> repos = GitUtil.getRepoListFromProject(GitUtil.getHttpsUrl(gitHost, project));
60-
for (String repo : repos) {
61-
String repoUri = GitUtil.getGitUrl(gitHost, repo);
60+
Map<String, String> repos = GitUtil.getRepoListFromProject(GitUtil.getHttpsUrl(gitHost, project));
61+
for (String repo : repos.keySet()) {
62+
String repoUri = repos.get(repo);
6263
String repoDir = localDir + "/" + repo;
6364
GitUtil.clone(repoUri, repoDir);
64-
List<GitUtil.CommitMetadata> commitMetadatas = GitUtil.getRepoMetadata(repoDir);
65-
for (GitUtil.CommitMetadata m : commitMetadatas) {
65+
List<GitUtil.CommitMetadata> commitMetadataList = GitUtil.getRepoMetadata(repoDir);
66+
for (GitUtil.CommitMetadata m : commitMetadataList) {
6667
fw.append(new GitCommitRecord(m, repoUri));
6768
}
6869
}

metadata-etl/src/main/resources/jython/AzkabanExtract.py

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -65,16 +65,18 @@ def __init__(self, args):
6565
print e
6666

6767
def run(self):
68-
self.collect_flow_jobs(self.metadata_folder + "/flow.csv", self.metadata_folder + "/job.csv", self.metadata_folder + "/dag.csv")
69-
self.collect_flow_owners(self.metadata_folder + "/owner.csv")
70-
self.collect_flow_schedules(self.metadata_folder + "/schedule.csv")
71-
self.collect_flow_execs(self.metadata_folder + "/flow_exec.csv", self.metadata_folder + "/job_exec.csv", self.lookback_period)
72-
self.az_cursor.close()
73-
self.az_con.close()
68+
try:
69+
self.collect_flow_jobs(self.metadata_folder + "/flow.csv", self.metadata_folder + "/job.csv", self.metadata_folder + "/dag.csv")
70+
self.collect_flow_owners(self.metadata_folder + "/owner.csv")
71+
self.collect_flow_schedules(self.metadata_folder + "/schedule.csv")
72+
self.collect_flow_execs(self.metadata_folder + "/flow_exec.csv", self.metadata_folder + "/job_exec.csv", self.lookback_period)
73+
finally:
74+
self.az_cursor.close()
75+
self.az_con.close()
7476

7577
def collect_flow_jobs(self, flow_file, job_file, dag_file):
7678
print "collect flow&jobs"
77-
query = "SELECT f.*, p.name as project_name FROM project_flows f inner join projects p on f.project_id = p.id and f.version = p.version where p.active = 1"
79+
query = "SELECT distinct f.*, p.name as project_name FROM project_flows f inner join projects p on f.project_id = p.id and f.version = p.version where p.active = 1"
7880
self.az_cursor.execute(query)
7981
rows = DbUtil.dict_cursor(self.az_cursor)
8082
flow_writer = FileWriter(flow_file)
@@ -89,7 +91,6 @@ def collect_flow_jobs(self, flow_file, job_file, dag_file):
8991
unzipped_content = gzip.GzipFile(mode='r', fileobj=StringIO.StringIO(row[json_column].tostring())).read()
9092
try:
9193
row[json_column] = json.loads(unzipped_content)
92-
#print json.dumps(row[json_column], indent=4)
9394
except:
9495
pass
9596

metadata-etl/src/main/resources/jython/DatasetTreeBuilder.py

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -26,21 +26,23 @@ def __init__(self, args):
2626
jdbc_driver = args[Constant.WH_DB_DRIVER_KEY]
2727
jdbc_url = args[Constant.WH_DB_URL_KEY]
2828
conn_mysql = zxJDBC.connect(jdbc_url, username, password, jdbc_driver)
29-
query = "select distinct id, concat(SUBSTRING_INDEX(urn, ':///', 1), '/', SUBSTRING_INDEX(urn, ':///', -1)) p from dict_dataset order by urn"
3029
cur = conn_mysql.cursor()
31-
cur.execute(query)
32-
datasets = cur.fetchall()
33-
self.dataset_dict = dict()
34-
for dataset in datasets:
35-
current = self.dataset_dict
36-
path_arr = dataset[1].split('/')
37-
for name in path_arr:
38-
current = current.setdefault(name, {})
39-
current["__ID_OF_DATASET__"] = dataset[0]
40-
self.file_name = args[Constant.DATASET_TREE_FILE_NAME_KEY]
41-
self.value = []
42-
cur.close()
43-
conn_mysql.close()
30+
try:
31+
query = "select distinct id, concat(SUBSTRING_INDEX(urn, ':///', 1), '/', SUBSTRING_INDEX(urn, ':///', -1)) p from dict_dataset order by urn"
32+
cur.execute(query)
33+
datasets = cur.fetchall()
34+
self.dataset_dict = dict()
35+
for dataset in datasets:
36+
current = self.dataset_dict
37+
path_arr = dataset[1].split('/')
38+
for name in path_arr:
39+
current = current.setdefault(name, {})
40+
current["__ID_OF_DATASET__"] = dataset[0]
41+
self.file_name = args[Constant.DATASET_TREE_FILE_NAME_KEY]
42+
self.value = []
43+
finally:
44+
cur.close()
45+
conn_mysql.close()
4446

4547
def build_trie_helper(self, depth, path, current, current_dict):
4648
nodes = []

metadata-etl/src/main/resources/jython/FlowTreeBuilder.py

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -26,27 +26,29 @@ def __init__(self, args):
2626
jdbc_driver = args[Constant.WH_DB_DRIVER_KEY]
2727
jdbc_url = args[Constant.WH_DB_URL_KEY]
2828
conn_mysql = zxJDBC.connect(jdbc_url, username, password, jdbc_driver)
29-
query = "select distinct f.flow_id, f.flow_name, f.flow_group, ca.app_code from flow f join cfg_application ca on f.app_id = ca.app_id order by app_code, flow_name"
3029
cur = conn_mysql.cursor()
31-
cur.execute(query)
32-
flows = cur.fetchall()
33-
self.flow_dict = dict()
34-
for flow in flows:
35-
current = self.flow_dict
36-
# if needed, use flow[3].replace(' ', '.')
37-
current = current.setdefault(flow[3], {})
38-
if flow[2] is not None:
39-
current = current.setdefault(flow[2], {})
40-
# for oozie
41-
else:
42-
current = current.setdefault('NA', {})
30+
try:
31+
query = "select distinct f.flow_id, f.flow_name, f.flow_group, ca.app_code from flow f join cfg_application ca on f.app_id = ca.app_id order by app_code, flow_name"
32+
cur.execute(query)
33+
flows = cur.fetchall()
34+
self.flow_dict = dict()
35+
for flow in flows:
36+
current = self.flow_dict
37+
# if needed, use flow[3].replace(' ', '.')
38+
current = current.setdefault(flow[3], {})
39+
if flow[2] is not None:
40+
current = current.setdefault(flow[2], {})
41+
# for oozie
42+
else:
43+
current = current.setdefault('NA', {})
4344

44-
current = current.setdefault(flow[1], {})
45-
current["__ID_OF_FLOW__"] = flow[0]
46-
self.file_name = args[Constant.FLOW_TREE_FILE_NAME_KEY]
47-
self.value = []
48-
cur.close()
49-
conn_mysql.close()
45+
current = current.setdefault(flow[1], {})
46+
current["__ID_OF_FLOW__"] = flow[0]
47+
self.file_name = args[Constant.FLOW_TREE_FILE_NAME_KEY]
48+
self.value = []
49+
finally:
50+
cur.close()
51+
conn_mysql.close()
5052

5153
def build_trie_helper(self, depth, current, current_dict):
5254
nodes = []

metadata-etl/src/main/resources/jython/GitLoad.py

Lines changed: 24 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,22 +20,23 @@
2020

2121

2222
class GitLoad:
23-
24-
def __init__(self, args):
25-
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
26-
args[Constant.WH_DB_USERNAME_KEY],
27-
args[Constant.WH_DB_PASSWORD_KEY],
28-
args[Constant.WH_DB_DRIVER_KEY])
29-
self.wh_cursor = self.wh_con.cursor()
30-
self.app_id = int(args[Constant.APP_ID_KEY])
31-
32-
def run(self):
33-
self.load_from_stg()
34-
self.wh_cursor.close()
35-
self.wh_con.close()
36-
37-
def load_from_stg(self):
38-
query = """
23+
def __init__(self, args):
24+
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
25+
args[Constant.WH_DB_USERNAME_KEY],
26+
args[Constant.WH_DB_PASSWORD_KEY],
27+
args[Constant.WH_DB_DRIVER_KEY])
28+
self.wh_cursor = self.wh_con.cursor()
29+
self.app_id = int(args[Constant.APP_ID_KEY])
30+
31+
def run(self):
32+
try:
33+
self.load_from_stg()
34+
finally:
35+
self.wh_cursor.close()
36+
self.wh_con.close()
37+
38+
def load_from_stg(self):
39+
query = """
3940
INSERT IGNORE INTO source_code_commit_info
4041
(
4142
app_id, repository_urn, commit_id, file_path, file_name, commit_time, committer_name, committer_email,
@@ -46,11 +47,12 @@ def load_from_stg(self):
4647
from stg_source_code_commit_info s
4748
where s.app_id = {app_id}
4849
""".format(app_id=self.app_id)
49-
print query
50-
self.wh_cursor.execute(query)
51-
self.wh_con.commit()
50+
print query
51+
self.wh_cursor.execute(query)
52+
self.wh_con.commit()
53+
5254

5355
if __name__ == "__main__":
54-
props = sys.argv[1]
55-
git = GitLoad(props)
56-
git.run()
56+
props = sys.argv[1]
57+
git = GitLoad(props)
58+
git.run()

metadata-etl/src/main/resources/jython/GitTransform.py

Lines changed: 43 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -20,16 +20,16 @@
2020

2121

2222
class OwnerTransform:
23-
_tables = {"source_code_commit": {"columns": "repository_urn, commit_id, file_path, file_name, commit_time, committer_name, committer_email, author_name, author_email, message",
24-
"file": "commit.csv",
25-
"table": "stg_source_code_commit_info"}
26-
}
23+
_tables = {"source_code_commit": {"columns": "repository_urn, commit_id, file_path, file_name, commit_time, committer_name, committer_email, author_name, author_email, message",
24+
"file": "commit.csv",
25+
"table": "stg_source_code_commit_info"}
26+
}
2727

28-
_clear_staging_tempalte = """
28+
_clear_staging_tempalte = """
2929
DELETE FROM {table}
3030
"""
3131

32-
_read_file_template = """
32+
_read_file_template = """
3333
LOAD DATA LOCAL INFILE '{folder}/{file}'
3434
INTO TABLE {table}
3535
FIELDS TERMINATED BY '\x1a' ESCAPED BY '\0'
@@ -39,43 +39,46 @@ class OwnerTransform:
3939
wh_etl_exec_id = {wh_etl_exec_id};
4040
"""
4141

42-
def __init__(self, args):
43-
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
44-
args[Constant.WH_DB_USERNAME_KEY],
45-
args[Constant.WH_DB_PASSWORD_KEY],
46-
args[Constant.WH_DB_DRIVER_KEY])
47-
self.wh_cursor = self.wh_con.cursor()
48-
self.app_id = int(args[Constant.APP_ID_KEY])
49-
self.wh_etl_exec_id = int(args[Constant.WH_EXEC_ID_KEY])
50-
self.app_folder = args[Constant.WH_APP_FOLDER_KEY]
51-
self.metadata_folder = self.app_folder + "/" + str(self.app_id)
42+
def __init__(self, args):
43+
self.wh_con = zxJDBC.connect(args[Constant.WH_DB_URL_KEY],
44+
args[Constant.WH_DB_USERNAME_KEY],
45+
args[Constant.WH_DB_PASSWORD_KEY],
46+
args[Constant.WH_DB_DRIVER_KEY])
47+
self.wh_cursor = self.wh_con.cursor()
48+
self.app_id = int(args[Constant.APP_ID_KEY])
49+
self.wh_etl_exec_id = int(args[Constant.WH_EXEC_ID_KEY])
50+
self.app_folder = args[Constant.WH_APP_FOLDER_KEY]
51+
self.metadata_folder = self.app_folder + "/" + str(self.app_id)
5252

53-
def run(self):
54-
self.read_file_to_stg()
55-
self.wh_cursor.close()
56-
self.wh_con.close()
53+
def run(self):
54+
try:
55+
self.read_file_to_stg()
56+
finally:
57+
self.wh_cursor.close()
58+
self.wh_con.close()
5759

58-
def read_file_to_stg(self):
59-
t = self._tables["source_code_commit"]
60+
def read_file_to_stg(self):
61+
t = self._tables["source_code_commit"]
6062

61-
# Clear stagging table
62-
query = self._clear_staging_tempalte.format(table=t.get("table"))
63-
print query
64-
self.wh_cursor.execute(query)
65-
self.wh_con.commit()
63+
# Clear stagging table
64+
query = self._clear_staging_tempalte.format(table=t.get("table"))
65+
print query
66+
self.wh_cursor.execute(query)
67+
self.wh_con.commit()
68+
69+
# Load file into stagging table
70+
query = self._read_file_template.format(folder=self.metadata_folder,
71+
file=t.get("file"),
72+
table=t.get("table"),
73+
columns=t.get("columns"),
74+
app_id=self.app_id,
75+
wh_etl_exec_id=self.wh_etl_exec_id)
76+
print query
77+
self.wh_cursor.execute(query)
78+
self.wh_con.commit()
6679

67-
# Load file into stagging table
68-
query = self._read_file_template.format(folder=self.metadata_folder,
69-
file=t.get("file"),
70-
table=t.get("table"),
71-
columns=t.get("columns"),
72-
app_id=self.app_id,
73-
wh_etl_exec_id=self.wh_etl_exec_id)
74-
print query
75-
self.wh_cursor.execute(query)
76-
self.wh_con.commit()
7780

7881
if __name__ == "__main__":
79-
props = sys.argv[1]
80-
ot = OwnerTransform(props)
81-
ot.run()
82+
props = sys.argv[1]
83+
ot = OwnerTransform(props)
84+
ot.run()

metadata-etl/src/main/resources/jython/HdfsLoad.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,9 @@ def load_sample(self):
263263
l.db_id = args[Constant.DB_ID_KEY]
264264
l.wh_etl_exec_id = args[Constant.WH_EXEC_ID_KEY]
265265
l.conn_mysql = zxJDBC.connect(JDBC_URL, username, password, JDBC_DRIVER)
266-
l.load_metadata()
267-
l.load_field()
268-
l.load_sample()
269-
l.conn_mysql.close()
266+
try:
267+
l.load_metadata()
268+
l.load_field()
269+
l.load_sample()
270+
finally:
271+
l.conn_mysql.close()

0 commit comments

Comments
 (0)