-
Notifications
You must be signed in to change notification settings - Fork 3k
/
test_result_msg.py
46 lines (36 loc) · 1.32 KB
/
test_result_msg.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
import os
from slack_sdk import WebClient
datahub_stats = {}
def add_datahub_stats(stat_name, stat_val):
datahub_stats[stat_name] = stat_val
def send_to_slack(passed: str):
slack_api_token = os.getenv("SLACK_API_TOKEN")
slack_channel = os.getenv("SLACK_CHANNEL")
slack_thread_ts = os.getenv("SLACK_THREAD_TS")
test_identifier = os.getenv("TEST_IDENTIFIER", "LOCAL_TEST")
if slack_api_token is None or slack_channel is None:
return
client = WebClient(token=slack_api_token)
key: str
message = ""
for key, val in datahub_stats.items():
if key.startswith("num-"):
entity_type = key.replace("num-", "")
message += f"Num {entity_type} is {val}\n"
if slack_thread_ts is None:
client.chat_postMessage(
channel=slack_channel,
text=f"{test_identifier} Status - {passed}\n{message}",
)
else:
client.chat_postMessage(
channel=slack_channel,
text=f"{test_identifier} Status - {passed}\n{message}",
thread_ts=slack_thread_ts,
)
def send_message(exitstatus):
try:
send_to_slack("PASSED" if exitstatus == 0 else "FAILED")
except Exception as e:
# We don't want to fail pytest at all
print(f"Exception happened for sending msg to slack {e}")