-
Notifications
You must be signed in to change notification settings - Fork 3k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Showing
3 changed files
with
145 additions
and
5 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,66 @@ | ||
#! /usr/bin/env nix-shell | ||
#! nix-shell dataset-hive-generator.py.nix -i python | ||
|
||
import sys | ||
import time | ||
from pyhive import hive | ||
from TCLIService.ttypes import TOperationState | ||
|
||
import simplejson as json | ||
|
||
HIVESTORE='localhost' | ||
|
||
AVROLOADPATH = '../../metadata-events/mxe-schemas/src/renamed/avro/com/linkedin/mxe/MetadataChangeEvent.avsc' | ||
KAFKATOPIC = 'MetadataChangeEvent' | ||
BOOTSTRAP = 'localhost:9092' | ||
SCHEMAREGISTRY = 'http://localhost:8081' | ||
|
||
def hive_query(query): | ||
""" | ||
Execute the query to the HiveStore. | ||
""" | ||
cursor = hive.connect(HIVESTORE).cursor() | ||
cursor.execute(query, async_=True) | ||
status = cursor.poll().operationState | ||
while status in (TOperationState.INITIALIZED_STATE, TOperationState.RUNNING_STATE): | ||
logs = cursor.fetch_logs() | ||
for message in logs: | ||
sys.stdout.write(message) | ||
status = cursor.poll().operationState | ||
results = cursor.fetchall() | ||
return results | ||
|
||
def build_hive_dataset_mce(dataset_name, schema, metadata): | ||
""" | ||
Create the MetadataChangeEvent via dataset_name and schema. | ||
""" | ||
actor, type, created_time, upstreams_dataset, sys_time = "urn:li:corpuser:" + metadata[2][7:], str(metadata[-1][11:-1]), int(metadata[3][12:]), metadata[-28][10:], int(time.time()) | ||
owners = {"owners":[{"owner":actor,"type":"DATAOWNER"}],"lastModified":{"time":sys_time,"actor":actor}} | ||
upstreams = {"upstreams":[{"auditStamp":{"time":sys_time,"actor":actor},"dataset":"urn:li:dataset:(urn:li:dataPlatform:hive," + upstreams_dataset + ",PROD)","type":"TRANSFORMED"}]} | ||
elements = {"elements":[{"url":HIVESTORE,"description":"sample doc to describe upstreams","createStamp":{"time":sys_time,"actor":actor}}]} | ||
schema_name = {"schemaName":dataset_name,"platform":"urn:li:dataPlatform:hive","version":0,"created":{"time":created_time,"actor":actor}, | ||
"lastModified":{"time":sys_time,"actor":actor},"hash":"","platformSchema":{"com.linkedin.pegasus2avro.schema.OtherSchema": {"rawSchema": schema}}, | ||
"fields":[{"fieldPath":"","description":{"string":""},"nativeDataType":"string","type":{"type":{"com.linkedin.pegasus2avro.schema.StringType":{}}}}]} | ||
|
||
mce = {"auditHeader": None, | ||
"proposedSnapshot":{"com.linkedin.pegasus2avro.metadata.snapshot.DatasetSnapshot": | ||
{"urn": "urn:li:dataset:(urn:li:dataPlatform:hive,"+ dataset_name +",PROD)" | ||
,"aspects": [ | ||
{"com.linkedin.pegasus2avro.common.Ownership": owners} | ||
, {"com.linkedin.pegasus2avro.dataset.UpstreamLineage": upstreams} | ||
, {"com.linkedin.pegasus2avro.common.InstitutionalMemory": elements} | ||
, {"com.linkedin.pegasus2avro.schema.SchemaMetadata": schema_name} | ||
]}}, | ||
"proposedDelta": None} | ||
|
||
print(json.dumps(mce)) | ||
|
||
databases = hive_query('show databases') | ||
for database in databases: | ||
tables = hive_query('show tables in ' + database[0]) | ||
for table in tables: | ||
dataset_name = database[0] + '.' + table[0] | ||
description = hive_query('describe extended ' + dataset_name) | ||
build_hive_dataset_mce(dataset_name, str(description[:-1][:-1]), description[-1][1].split(',')) | ||
|
||
sys.exit(0) |
61 changes: 61 additions & 0 deletions
61
contrib/metadata-ingestion/bin/dataset-hive-generator.py.nix
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,61 @@ | ||
with import <nixpkgs> {} ; | ||
let | ||
avro-python3-1_8 = python3Packages.buildPythonPackage rec { | ||
pname = "avro-python3" ; | ||
version = "1.8.2" ; | ||
|
||
src = python3Packages.fetchPypi { | ||
inherit pname version ; | ||
sha256 = "f82cf0d66189600b1e6b442f650ad5aca6c189576723dcbf6f9ce096eab81bd6" ; | ||
} ; | ||
doCheck = false; | ||
} ; | ||
|
||
sasl = python3Packages.buildPythonPackage rec { | ||
pname = "sasl" ; | ||
version = "0.2.1" ; | ||
|
||
src = python3Packages.fetchPypi { | ||
inherit pname version ; | ||
sha256 = "04f22e17bbebe0cd42471757a48c2c07126773c38741b1dad8d9fe724c16289d" ; | ||
} ; | ||
doCheck = false; | ||
propagatedBuildInputs = [ cyrus_sasl ] ++ (with python3Packages ; [six]) ; | ||
} ; | ||
|
||
thrift_sasl = python3Packages.buildPythonPackage rec { | ||
pname = "thrift_sasl" ; | ||
version = "0.4.2" ; | ||
|
||
src = python3Packages.fetchPypi { | ||
inherit pname version ; | ||
sha256 = "6a1c54731cb3ce61bdc041d9dc36e21f0f56db0163bb7b57be84de3fda70922f" ; | ||
} ; | ||
doCheck = false; | ||
propagatedBuildInputs = with python3Packages; [ thrift sasl ] ; | ||
} ; | ||
|
||
PyHive = python3Packages.buildPythonPackage rec { | ||
pname = "PyHive" ; | ||
version = "0.6.1" ; | ||
|
||
src = python3Packages.fetchPypi { | ||
inherit pname version ; | ||
sha256 = "a5f2b2f8bcd85a8cd80ab64ff8fbfe1c09515d266650a56f789a8d89ad66d7f4" ; | ||
} ; | ||
doCheck = false; | ||
propagatedBuildInputs = with python3Packages ; [ dateutil future thrift sasl thrift_sasl ]; | ||
} ; | ||
|
||
in | ||
mkShell { | ||
buildInputs = (with python3Packages ;[ | ||
python | ||
requests | ||
PyHive | ||
|
||
simplejson | ||
# avro-python3-1_8 | ||
# confluent-kafka | ||
]) ; | ||
} |