Skip to content

Commit 7429075

Browse files
authored
feat(java-sdk): add utils classes to give equivalence with python uti… (#12002)
1 parent e19af9e commit 7429075

File tree

13 files changed

+782
-1
lines changed

13 files changed

+782
-1
lines changed

metadata-integration/java/datahub-client/build.gradle

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ dependencies {
2020
api project(':entity-registry')
2121
api project(':metadata-integration:java:datahub-event')
2222
implementation project(':metadata-integration:java:datahub-schematron:lib')
23+
2324
implementation(externalDependency.kafkaAvroSerializer) {
2425
exclude group: "org.apache.avro"
2526
}
@@ -60,10 +61,35 @@ task copyAvroSchemas {
6061
compileJava.dependsOn copyAvroSchemas
6162

6263

64+
// Add Python environment validation task
65+
task validatePythonEnv {
66+
doFirst {
67+
def venvPath = System.getProperty('python.venv.path', '../../../metadata-ingestion/venv')
68+
def isWindows = System.getProperty('os.name').toLowerCase().contains('windows')
69+
def pythonExe = isWindows ? "${venvPath}/Scripts/python.exe" : "${venvPath}/bin/python"
70+
71+
def result = exec {
72+
commandLine pythonExe, "-c", "import sys; print(sys.executable)"
73+
ignoreExitValue = true
74+
standardOutput = new ByteArrayOutputStream()
75+
errorOutput = new ByteArrayOutputStream()
76+
}
77+
78+
if (result.exitValue != 0) {
79+
throw new GradleException("Python virtual environment not properly set up at ${venvPath}")
80+
}
81+
}
82+
}
83+
6384
test {
6485
// to avoid simultaneous executions of tests when complete build is run
6586
mustRunAfter(":metadata-io:test")
6687
useJUnit()
88+
// Add Python environment configuration
89+
dependsOn validatePythonEnv
90+
dependsOn tasks.getByPath(":metadata-ingestion:installDev")
91+
systemProperty 'python.venv.path', System.getProperty('python.venv.path', '../../../metadata-ingestion/venv')
92+
finalizedBy jacocoTestReport
6793
}
6894

6995
task checkShadowJar(type: Exec) {
@@ -111,7 +137,6 @@ shadowJar {
111137
relocate 'org.checkerframework', 'datahub.shaded.org.checkerframework'
112138
relocate 'com.google.errorprone', 'datahub.shaded.com.google.errorprone'
113139
// Below jars added for kafka emitter only
114-
// relocate 'org.apache.avro', 'datahub.shaded.org.apache.avro'
115140
relocate 'com.thoughtworks.paranamer', 'datahub.shaded.com.thoughtworks.paranamer'
116141
relocate 'org.xerial.snappy', 'datahub.shaded.org.xerial.snappy'
117142
relocate 'org.apache.kafka', 'datahub.shaded.org.apache.kafka'
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
import click
2+
from typing import Dict, Any
3+
import json
4+
from dataclasses import dataclass
5+
from abc import ABC, abstractmethod
6+
from datahub.emitter.mcp_builder import DatabaseKey, SchemaKey
7+
8+
9+
class URNGenerator(ABC):
10+
@abstractmethod
11+
def generate(self, args: Dict[str, Any]) -> str:
12+
pass
13+
14+
15+
class DatabaseURNGenerator(URNGenerator):
16+
def generate(self, args: Dict[str, Any]) -> str:
17+
required_fields = ["platform", "database"]
18+
for field in required_fields:
19+
if field not in args:
20+
raise ValueError(f"Missing required field: {field}")
21+
22+
all_fields = required_fields + ["instance"]
23+
for arg in args:
24+
if arg not in all_fields:
25+
raise ValueError(f"Invalid field: {arg}")
26+
27+
database_key = DatabaseKey(
28+
platform=args["platform"],
29+
instance=args.get("instance"),
30+
database=args["database"],
31+
)
32+
return database_key.as_urn()
33+
34+
35+
class SchemaURNGenerator(URNGenerator):
36+
def generate(self, args: Dict[str, Any]) -> str:
37+
required_fields = ["platform", "database", "schema"]
38+
all_fields = required_fields + ["instance", "env"]
39+
for field in required_fields:
40+
if field not in args:
41+
raise ValueError(f"Missing required field: {field}")
42+
43+
for arg in args:
44+
if arg not in all_fields:
45+
raise ValueError(f"Invalid field: {arg}")
46+
47+
schema_key = SchemaKey(
48+
platform=args["platform"],
49+
instance=args.get("instance"),
50+
env=args.get("env"),
51+
database=args["database"],
52+
schema=args["schema"],
53+
)
54+
return schema_key.as_urn()
55+
56+
57+
URN_GENERATORS = {
58+
"database": DatabaseURNGenerator(),
59+
"schema": SchemaURNGenerator(),
60+
}
61+
62+
63+
def validate_key_value(ctx, param, value):
64+
if not value:
65+
return {}
66+
67+
result = {}
68+
for item in value:
69+
try:
70+
key, val = item.split("=", 1)
71+
result[key.strip()] = val.strip()
72+
except ValueError:
73+
raise click.BadParameter(
74+
f"Invalid key-value pair: {item}. Format should be key=value"
75+
)
76+
return result
77+
78+
79+
@click.command()
80+
@click.option(
81+
"--container-type",
82+
type=click.Choice(["database", "schema"]),
83+
required=True,
84+
help="The type of container to generate a URN for",
85+
)
86+
@click.option(
87+
"--param",
88+
"-p",
89+
multiple=True,
90+
callback=validate_key_value,
91+
help="Parameters in key=value format. Can be used multiple times.",
92+
)
93+
@click.option(
94+
"--output-format",
95+
type=click.Choice(["text", "json"]),
96+
default="text",
97+
help="Output format for the URN",
98+
)
99+
def generate_urn(container_type: str, param: Dict[str, str], output_format: str):
100+
"""Generate URNs for different types of containers.
101+
102+
Example usage:
103+
./container_urn_generator.py --container-type database -p platform=test-platform -p instance=DEV -p database=test-database
104+
"""
105+
try:
106+
generator = URN_GENERATORS[container_type]
107+
urn = generator.generate(param)
108+
109+
if output_format == "json":
110+
result = {"urn": urn, "container_type": container_type, "parameters": param}
111+
click.echo(json.dumps(result, indent=2))
112+
else:
113+
click.echo(urn)
114+
115+
except KeyError as e:
116+
raise click.UsageError(f"Unknown container type: {container_type}")
117+
except ValueError as e:
118+
raise click.UsageError(str(e))
119+
120+
121+
if __name__ == "__main__":
122+
generate_urn()
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
package io.datahubproject.models.util;
2+
3+
import com.fasterxml.jackson.annotation.JsonInclude;
4+
import com.linkedin.common.urn.Urn;
5+
import java.util.HashMap;
6+
import java.util.Map;
7+
import lombok.Data;
8+
import lombok.EqualsAndHashCode;
9+
import lombok.experimental.SuperBuilder;
10+
11+
@Data
12+
@SuperBuilder
13+
@EqualsAndHashCode(callSuper = true)
14+
@JsonInclude(JsonInclude.Include.NON_NULL)
15+
public abstract class ContainerKey extends DataHubKey {
16+
private String platform;
17+
private String instance;
18+
19+
private static final String URN_PREFIX = "urn:li:container:";
20+
private static final String URN_ENTITY = "container";
21+
private static final String PLATFORM_MAP_FIELD = "platform";
22+
private static final String INSTANCE_MAP_FIELD = "instance";
23+
24+
@Override
25+
public Map<String, String> guidDict() {
26+
27+
Map<String, String> bag = new HashMap<>();
28+
if (platform != null) bag.put(PLATFORM_MAP_FIELD, platform);
29+
if (instance != null) bag.put(INSTANCE_MAP_FIELD, instance);
30+
31+
return bag;
32+
}
33+
34+
public String asUrnString() {
35+
String guid = guid();
36+
return URN_PREFIX + guid;
37+
}
38+
39+
public Urn asUrn() {
40+
return Urn.createFromTuple(URN_ENTITY, guid());
41+
}
42+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package io.datahubproject.models.util;
2+
3+
import com.fasterxml.jackson.databind.ObjectMapper;
4+
import java.security.MessageDigest;
5+
import java.util.Map;
6+
import lombok.SneakyThrows;
7+
import lombok.extern.slf4j.Slf4j;
8+
9+
@Slf4j
10+
public class DataHubGuidGenerator {
11+
private static final ObjectMapper objectMapper = new ObjectMapper();
12+
13+
@SneakyThrows
14+
public static String dataHubGuid(Map<String, String> obj) {
15+
// Configure ObjectMapper for consistent serialization
16+
objectMapper.configure(
17+
com.fasterxml.jackson.databind.SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true);
18+
19+
// Convert map to JSON string with sorted keys
20+
String jsonKey = objectMapper.writeValueAsString(obj);
21+
22+
// Generate MD5 hash
23+
MessageDigest md = MessageDigest.getInstance("MD5");
24+
byte[] hashBytes = md.digest(jsonKey.getBytes());
25+
26+
// Convert byte array to hexadecimal string
27+
StringBuilder hexString = new StringBuilder();
28+
for (byte hashByte : hashBytes) {
29+
String hex = Integer.toHexString(0xff & hashByte);
30+
if (hex.length() == 1) {
31+
hexString.append('0');
32+
}
33+
hexString.append(hex);
34+
}
35+
36+
if (log.isDebugEnabled()) {
37+
log.debug("DataHub Guid for {} is : {}", jsonKey, hexString);
38+
}
39+
return hexString.toString();
40+
}
41+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.datahubproject.models.util;
2+
3+
import com.fasterxml.jackson.annotation.JsonInclude;
4+
import com.fasterxml.jackson.core.type.TypeReference;
5+
import com.fasterxml.jackson.databind.ObjectMapper;
6+
import java.util.Map;
7+
import lombok.Data;
8+
import lombok.experimental.SuperBuilder;
9+
10+
@Data
11+
@SuperBuilder
12+
@JsonInclude(JsonInclude.Include.NON_NULL)
13+
public abstract class DataHubKey {
14+
// Static ObjectMapper instance since it's thread-safe and expensive to create
15+
protected static final ObjectMapper MAPPER = new ObjectMapper();
16+
// Static TypeReference instance since it doesn't change
17+
private static final TypeReference<Map<String, String>> MAP_TYPE_REFERENCE =
18+
new TypeReference<Map<String, String>>() {};
19+
20+
static {
21+
MAPPER.setSerializationInclusion(JsonInclude.Include.NON_NULL);
22+
}
23+
24+
public Map<String, String> guidDict() {
25+
return MAPPER.convertValue(this, MAP_TYPE_REFERENCE);
26+
}
27+
28+
public String guid() {
29+
Map<String, String> bag = guidDict();
30+
return DataHubGuidGenerator.dataHubGuid(bag);
31+
}
32+
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
package io.datahubproject.models.util;
2+
3+
import com.fasterxml.jackson.annotation.JsonInclude;
4+
import java.util.Map;
5+
import lombok.Data;
6+
import lombok.EqualsAndHashCode;
7+
import lombok.experimental.SuperBuilder;
8+
9+
@Data
10+
@SuperBuilder
11+
@EqualsAndHashCode(callSuper = true)
12+
@JsonInclude(JsonInclude.Include.NON_NULL)
13+
public class DatabaseKey extends ContainerKey {
14+
private String database;
15+
16+
private static final String DATABASE_MAP_FIELD = "database";
17+
18+
@Override
19+
public Map<String, String> guidDict() {
20+
// Get the parent's GUID dictionary first
21+
Map<String, String> bag = super.guidDict();
22+
23+
// Add the database field if it's not null
24+
if (database != null) {
25+
bag.put(DATABASE_MAP_FIELD, database);
26+
}
27+
28+
return bag;
29+
}
30+
}

0 commit comments

Comments
 (0)