-
Notifications
You must be signed in to change notification settings - Fork 3k
/
utils.py
320 lines (243 loc) · 9.06 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import json
import logging
import os
from datetime import datetime, timedelta, timezone
from typing import Any, Dict, List, Tuple
import requests
from datahub.cli import cli_utils, env_utils
from datahub.ingestion.run.pipeline import Pipeline
from joblib import Parallel, delayed
from requests.structures import CaseInsensitiveDict
from tests.consistency_utils import wait_for_writes_to_sync
TIME: int = 1581407189000
logger = logging.getLogger(__name__)
def get_frontend_session():
username, password = get_admin_credentials()
return login_as(username, password)
def login_as(username: str, password: str):
return cli_utils.get_frontend_session_login_as(
username=username, password=password, frontend_url=get_frontend_url()
)
def get_admin_username() -> str:
return get_admin_credentials()[0]
def get_admin_credentials():
return (
os.getenv("ADMIN_USERNAME", "datahub"),
os.getenv("ADMIN_PASSWORD", "datahub"),
)
def get_root_urn():
return "urn:li:corpuser:datahub"
def get_gms_url():
return os.getenv("DATAHUB_GMS_URL") or "http://localhost:8080"
def get_frontend_url():
return os.getenv("DATAHUB_FRONTEND_URL") or "http://localhost:9002"
def get_kafka_broker_url():
return os.getenv("DATAHUB_KAFKA_URL") or "localhost:9092"
def get_kafka_schema_registry():
# internal registry "http://localhost:8080/schema-registry/api/"
return os.getenv("DATAHUB_KAFKA_SCHEMA_REGISTRY_URL") or "http://localhost:8081"
def get_mysql_url():
return os.getenv("DATAHUB_MYSQL_URL") or "localhost:3306"
def get_mysql_username():
return os.getenv("DATAHUB_MYSQL_USERNAME") or "datahub"
def get_mysql_password():
return os.getenv("DATAHUB_MYSQL_PASSWORD") or "datahub"
def get_sleep_info() -> Tuple[int, int]:
return (
int(os.getenv("DATAHUB_TEST_SLEEP_BETWEEN", 20)),
int(os.getenv("DATAHUB_TEST_SLEEP_TIMES", 3)),
)
def is_k8s_enabled():
return os.getenv("K8S_CLUSTER_ENABLED", "false").lower() in ["true", "yes"]
def wait_for_healthcheck_util(auth_session):
assert not check_endpoint(auth_session, f"{get_frontend_url()}/admin")
assert not check_endpoint(auth_session, f"{get_gms_url()}/health")
def check_endpoint(auth_session, url):
try:
get = auth_session.get(url)
if get.status_code == 200:
return
else:
return f"{url}: is Not reachable, status_code: {get.status_code}"
except requests.exceptions.RequestException as e:
raise SystemExit(f"{url}: is Not reachable \nErr: {e}")
def ingest_file_via_rest(auth_session, filename: str) -> Pipeline:
pipeline = Pipeline.create(
{
"source": {
"type": "file",
"config": {"filename": filename},
},
"sink": {
"type": "datahub-rest",
"config": {
"server": auth_session.gms_url(),
"token": auth_session.gms_token(),
},
},
}
)
pipeline.run()
pipeline.raise_from_status()
wait_for_writes_to_sync()
return pipeline
def delete_urn(graph_client, urn: str) -> None:
graph_client.hard_delete_entity(urn)
def delete_urns(graph_client, urns: List[str]) -> None:
for urn in urns:
delete_urn(graph_client, urn)
def delete_urns_from_file(
graph_client, filename: str, shared_data: bool = False
) -> None:
if not env_utils.get_boolean_env_variable("CLEANUP_DATA", True):
print("Not cleaning data to save time")
return
def delete(entry):
is_mcp = "entityUrn" in entry
urn = None
# Kill Snapshot
if is_mcp:
urn = entry["entityUrn"]
else:
snapshot_union = entry["proposedSnapshot"]
snapshot = list(snapshot_union.values())[0]
urn = snapshot["urn"]
delete_urn(graph_client, urn)
with open(filename) as f:
d = json.load(f)
Parallel(n_jobs=10)(delayed(delete)(entry) for entry in d)
wait_for_writes_to_sync()
# Fixed now value
NOW: datetime = datetime.now()
def get_timestampmillis_at_start_of_day(relative_day_num: int) -> int:
"""
Returns the time in milliseconds from epoch at the start of the day
corresponding to `now + relative_day_num`
"""
time: datetime = NOW + timedelta(days=float(relative_day_num))
time = datetime(
year=time.year,
month=time.month,
day=time.day,
hour=0,
minute=0,
second=0,
microsecond=0,
)
return int(time.timestamp() * 1000)
def get_strftime_from_timestamp_millis(ts_millis: int) -> str:
return datetime.fromtimestamp(ts_millis / 1000, tz=timezone.utc).isoformat()
def create_datahub_step_state_aspect(
username: str, onboarding_id: str
) -> Dict[str, Any]:
entity_urn = f"urn:li:dataHubStepState:urn:li:corpuser:{username}-{onboarding_id}"
print(f"Creating dataHubStepState aspect for {entity_urn}")
return {
"auditHeader": None,
"entityType": "dataHubStepState",
"entityUrn": entity_urn,
"changeType": "UPSERT",
"aspectName": "dataHubStepStateProperties",
"aspect": {
"value": f'{{"properties":{{}},"lastModified":{{"actor":"urn:li:corpuser:{username}","time":{TIME}}}}}',
"contentType": "application/json",
},
"systemMetadata": None,
}
def create_datahub_step_state_aspects(
username: str, onboarding_ids: List[str], onboarding_filename: str
) -> None:
"""
For a specific user, creates dataHubStepState aspects for each onboarding id in the list
"""
aspects_dict: List[Dict[str, Any]] = [
create_datahub_step_state_aspect(username, onboarding_id)
for onboarding_id in onboarding_ids
]
with open(onboarding_filename, "w") as f:
json.dump(aspects_dict, f, indent=2)
class TestSessionWrapper:
"""
Many of the tests do not consider async writes. This
class intercepts mutations using the requests library
to simulate sync requests.
"""
def __init__(self, requests_session):
self._upstream = requests_session
self._frontend_url = get_frontend_url()
self._gms_url = get_gms_url()
self._gms_token_id, self._gms_token = self._generate_gms_token()
def __getattr__(self, name):
# Intercept method calls
attr = getattr(self._upstream, name)
if callable(attr):
def wrapper(*args, **kwargs):
# Pre-processing can be done here
if name in ("get", "head", "post", "put", "delete", "option", "patch"):
if "headers" not in kwargs:
kwargs["headers"] = CaseInsensitiveDict()
kwargs["headers"].update(
{"Authorization": f"Bearer {self._gms_token}"}
)
result = attr(*args, **kwargs)
# Post-processing can be done here
if name in ("post", "put"):
# Wait for sync if writing
# delete is excluded for efficient test clean-up
self._wait(*args, **kwargs)
return result
return wrapper
return attr
def gms_token(self):
return self._gms_token
def gms_token_id(self):
return self._gms_token_id
def frontend_url(self):
return self._frontend_url
def gms_url(self):
return self._gms_url
def _wait(self, *args, **kwargs):
if "/logIn" not in args[0]:
print("TestSessionWrapper sync wait.")
wait_for_writes_to_sync()
def _generate_gms_token(self):
actor_urn = self._upstream.cookies["actor"]
json = {
"query": """mutation createAccessToken($input: CreateAccessTokenInput!) {
createAccessToken(input: $input) {
accessToken
metadata {
id
}
}
}""",
"variables": {
"input": {
"type": "PERSONAL",
"actorUrn": actor_urn,
"duration": "ONE_DAY",
"name": "Test Session Token",
"description": "Token generated for smoke-tests",
}
},
}
response = self._upstream.post(
f"{self._frontend_url}/api/v2/graphql", json=json
)
response.raise_for_status()
return (
response.json()["data"]["createAccessToken"]["metadata"]["id"],
response.json()["data"]["createAccessToken"]["accessToken"],
)
def destroy(self):
if self._gms_token_id:
json = {
"query": """mutation revokeAccessToken($tokenId: String!) {
revokeAccessToken(tokenId: $tokenId)
}""",
"variables": {"tokenId": self._gms_token_id},
}
response = self._upstream.post(
f"{self._frontend_url}/api/v2/graphql", json=json
)
response.raise_for_status()