Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Data dispatcher implementation #13

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7bcd145
created data dispatcher class, test page, and config file
LTrestka May 30, 2023
b30344f
changed data_dispatcher name
LTrestka May 31, 2023
f1a91b7
Data dispatcher test page now has the ability to log in with username…
LTrestka Jun 9, 2023
40793fc
Merge branch 'data_dispatcher_implementation' of https://github.com/f…
LTrestka Jun 9, 2023
991943b
merge conflict resolved
LTrestka Jun 9, 2023
939f438
* Data dispatcher implemented up to the point of submission.
LTrestka Jul 18, 2023
94d4551
Cleaned up UI, added stability and compatibility between sam and data…
LTrestka Jul 31, 2023
6ff236b
submission broker looks at all existing launches at once and quickly …
Aug 15, 2023
6fbc33d
Updated submission agent to fetch data dispatcher jobs
LTrestka Aug 15, 2023
f0757b4
Updates to submission agent to fix broken jobsub_id format.
LTrestka Aug 21, 2023
373e207
Added a "dataset only" option for dd submissions, which is automatica…
LTrestka Aug 22, 2023
04d10ce
Finishing touches with faster queries, updates to split types (to run…
LTrestka Aug 29, 2023
72d766c
Several minor Bug fixes
LTrestka Sep 2, 2023
56b37b8
LTrestka Oct 4, 2023
941afbb
added docs
LTrestka Oct 26, 2023
c82e1d7
Refined docs for github
LTrestka Nov 7, 2023
2c01090
added html
LTrestka Nov 7, 2023
f1e7486
Updates to dd login
LTrestka Nov 7, 2023
bd9171c
Added the ability to use campaign keywords in campaign/stage fields -…
LTrestka Nov 7, 2023
4c30b7e
* Added test split functionality
LTrestka Dec 8, 2023
05f0c0e
added files to gitignore
LTrestka Dec 8, 2023
ade1723
LTrestka Dec 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Finishing touches with faster queries, updates to split types (to run…
… param launch, rather than project launch)

Modifications to UI
  • Loading branch information
LTrestka authored and $GIT_AUTHOR_NAME committed Aug 29, 2023
commit 04d10ce65deb6d5fd148880f7fb2a38599ed5a00
12 changes: 6 additions & 6 deletions submission_broker/config/submission_agent.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ poms_user=poms
poms_user_header=X-Shib-Userid
submission_uri=https://%(lens_host)s/lens/query
lens_server="https://landscape.fnal.gov"
running_query = {"query":"{submissions(group: \"%%s\" %%s){pomsTaskID id POMS_DATA_DISPATCHER_PROJECT_ID:env(name:\"POMS_DATA_DISPATCHER_PROJECT_ID\")}}","operationName":null}
running_query = {"query":"{submissions(group: \"%%s\" %%s){pomsTaskID id POMS_DATA_DISPATCHER_PROJECT_ID:env(name:\"POMS_DATA_DISPATCHER_PROJECT_ID\") POMS_DATA_DISPATCHER_TASK_ID:env(name:\"POMS_DATA_DISPATCHER_TASK_ID\")}}","operationName":null}
full_query = {"query":"{submissions(group: \"%%s\" %%s){id pomsTaskID done running idle held failed completed cancelled } }","operationName":null}
submission_project_query = {"query":"{submission(id:\"%%s\"){id SAM_PROJECT:env(name:\"SAM_PROJECT\") SAM_PROJECT_NAME:env(name:\"SAM_PROJECT_NAME\") args}}","operationName":null}
submission_info_query = {"query":"{submission(id:\"%%s\"){id pomsTaskID POMS_ENV:env(name:\"POMS_ENV\") done running idle held failed completed cancelled } }","operationName":null}
append_submission_sid = submission(pomsTaskID:%%s){pomsTaskID id done error running idle held failed completed cancelled args POMS_ENV:env(name:\"POMS_ENV\") POMS_DATA_DISPATCHER_PROJECT_ID:env(name:\"POMS_DATA_DISPATCHER_PROJECT_ID\")}
append_submission_jid = submission(id:\"%%s\"){pomsTaskID id done error running idle held failed completed cancelled args POMS_ENV:env(name:\"POMS_ENV\") POMS_DATA_DISPATCHER_PROJECT_ID:env(name:\"POMS_DATA_DISPATCHER_PROJECT_ID\")}
append_job = job(id:\"%%s\"){ id status done error pomsTaskID SAM_PROJECT:env(name:\"SAM_PROJECT\") args SAM_PROJECT_NAME:env(name:\"SAM_PROJECT_NAME\") POMS_DATA_DISPATCHER_PROJECT_ID:env(name: \"POMS_DATA_DISPATCHER_PROJECT_ID\")}
append_submission_sid = submission(pomsTaskID:%%s){pomsTaskID id done error running idle held failed completed cancelled args POMS_ENV:env(name:\"POMS_ENV\") POMS_DATA_DISPATCHER_PROJECT_ID:env(name:\"POMS_DATA_DISPATCHER_PROJECT_ID\") POMS_DATA_DISPATCHER_TASK_ID:env(name:\"POMS_DATA_DISPATCHER_TASK_ID\")}
append_submission_jid = submission(id:\"%%s\"){pomsTaskID id done error running idle held failed completed cancelled args POMS_ENV:env(name:\"POMS_ENV\") POMS_DATA_DISPATCHER_PROJECT_ID:env(name:\"POMS_DATA_DISPATCHER_PROJECT_ID\") POMS_DATA_DISPATCHER_TASK_ID:env(name:\"POMS_DATA_DISPATCHER_TASK_ID\")}
append_job = job(id:\"%%s\"){ id status done error pomsTaskID SAM_PROJECT:env(name:\"SAM_PROJECT\") args SAM_PROJECT_NAME:env(name:\"SAM_PROJECT_NAME\") POMS_DATA_DISPATCHER_PROJECT_ID:env(name:\"POMS_DATA_DISPATCHER_PROJECT_ID\") POMS_DATA_DISPATCHER_TASK_ID:env(name:\"POMS_DATA_DISPATCHER_TASK_ID\")}
all_jobs_query_base = {"query": "{%%s}"}
poms_running_query=%(poms_uri)s/running_submissions?campaign_id_list=
poms_dd_complete_query_one=%(poms_uri)s/calculate_dd_project_completion?project_id=
poms_dd_complete_query_all=%(poms_uri)s/calculate_dd_project_completion?project_ids=%%s
poms_dd_complete_query_one=%(poms_uri)s/calculate_dd_project_completion?dd_submission=
poms_dd_complete_query_all=%(poms_uri)s/calculate_dd_project_completion?dd_submissions=%%s
poms_env="dev"

85 changes: 54 additions & 31 deletions submission_broker/submission_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ def __init__(self, config, poms_uri=None, submission_uri=None):
self.known["maxjobs"] = {}
self.known["poms_task_id"] = {}
self.known["jobsub_job_id"] = {}
self.known["dd_task_id"] = {}
self.known["dd_project_id"] = {}
self.known["not_on_server"] = {
"submissions": {},
Expand Down Expand Up @@ -374,17 +375,17 @@ def maybe_report(self, entry):

def maybe_report_data_dispatcher(self, entry, key, val):

dd_status, report_status = self.get_dd_status(entry, val)
dd_status, report_status, val = self.get_dd_status(entry, val)

submission_update = {
"submission_id": entry["pomsTaskID"],
"jobsub_job_id":entry["id"],
"dd_project_id": key,
"dd_task_id": key,
}
update = False

if val != self.known["dd_project_id"].get(entry["pomsTaskID"], None):
self.known["dd_project_id"][entry["pomsTaskID"]] = key
if val != self.known["dd_task_id"].get(entry["pomsTaskID"], None):
self.known["dd_task_id"][entry["pomsTaskID"]] = key
update = True

if entry("id", None) and entry["id"] != self.known["jobsub_job_id"].get(entry["pomsTaskID"], None):
Expand All @@ -403,6 +404,11 @@ def maybe_report_data_dispatcher(self, entry, key, val):
self.known["pct"][entry["pomsTaskID"]] = val
submission_update["pct_complete"] = val
update = True

if entry.get("POMS_DATA_DISPATCHER_PROJECT_ID", None) and entry["POMS_DATA_DISPATCHER_PROJECT_ID"] != self.known["dd_project_id"].get(entry["pomsTaskID"], None):
self.known["dd_project_id"][entry["pomsTaskID"]] = entry["POMS_DATA_DISPATCHER_PROJECT_ID"]
submission_update["dd_project_id"] = entry["POMS_DATA_DISPATCHER_PROJECT_ID"]
update = True

if update:
return submission_update
Expand Down Expand Up @@ -458,7 +464,7 @@ def get_running_submissions_POMS(self, group):
htr = self.psess.get(url)
flist = htr.json()
print("poms running_submissions: ", repr(flist))
ddict = [ {'pomsTaskID': x[0], 'id': x[1], "POMS_DATA_DISPATCHER_PROJECT_ID": x[3]} for x in flist if x[2] == group]
ddict = [ {'pomsTaskID': x[0], 'id': x[1], "POMS_DATA_DISPATCHER_TASK_ID": x[3]} for x in flist if x[2] == group]
print("poms running_submissions for " , group, ": ", repr(ddict))
htr.close()
return ddict
Expand All @@ -472,7 +478,7 @@ def get_all_running_submissions_POMS(self, exp_list):
htr = self.psess.get(url)
flist = htr.json()
print("poms running_submissions: ", repr(flist))
ddict = [ {'pomsTaskID': x[0], 'id': x[1], "POMS_DATA_DISPATCHER_PROJECT_ID": x[3]} for x in flist if x[2] in exp_list]
ddict = [ {'pomsTaskID': x[0], 'id': x[1], "POMS_DATA_DISPATCHER_TASK_ID": x[3]} for x in flist if x[2] in exp_list]
print("poms running_submissions for all experiments: %s" % repr(ddict))
htr.close()
return [submission for submission in ddict
Expand All @@ -482,19 +488,19 @@ def get_all_running_submissions_POMS(self, exp_list):
logging.exception("running_submissons_POMS")
return {}

def get_dd_project_statuses(self, project_ids):
def get_dd_task_statuses(self, dd_task_ids):
start = datetime.now()
url = self.cfg.get("submission_agent", "poms_dd_complete_query_all") % ",".join(project_ids)
url = self.cfg.get("submission_agent", "poms_dd_complete_query_all") % ",".join(dd_task_ids)
try:
LOGIT.info("getting poms data dispatcher project statuses for projects: %s", project_ids)
LOGIT.info("getting poms data dispatcher project statuses for projects: %s", dd_task_ids)
htr = self.psess.get(url)
dd_statuses = htr.json()
elapsed_time = datetime.now() - start
LOGIT.info("got poms data dispatcher project statuses for projects: %s | Elapsed Time: %s.%s seconds" % (repr(dd_statuses), elapsed_time.seconds, elapsed_time.microseconds))
LOGIT.info("got poms data dispatcher project statuses for dd_submissions: %s | Elapsed Time: %s.%s seconds" % (repr(dd_statuses), elapsed_time.seconds, elapsed_time.microseconds))
htr.close()
return dd_statuses
except:
logging.exception("get_dd_project_statuses")
logging.exception("get_dd_task_statuses")
return {}


Expand Down Expand Up @@ -539,7 +545,7 @@ def check_submissions(self, group=None, full_list=None, since=""):
elapsed_time = datetime.now() - start
LOGIT.info("Got data on all running jobs | elapsed time: %s.%s seconds" % (elapsed_time.seconds, elapsed_time.microseconds))
submissions_to_update = {}
dd_project_entries_to_check = {}
dd_task_entries_to_check = {}
for entry in submissions.values():
if not entry:
continue
Expand All @@ -564,23 +570,23 @@ def check_submissions(self, group=None, full_list=None, since=""):
if self.known["status"].get(id, None) == "Completed":
continue

do_sam = entry.get("POMS_DATA_DISPATCHER_PROJECT_ID",'') == ''
do_sam = entry.get("POMS_DATA_DISPATCHER_TASK_ID",'') == ''

if do_sam:
update_submission = self.maybe_report(entry)
if update_submission:
submissions_to_update[pomsTaskID] = update_submission
else:
LOGIT.info("Added dd_project: %s" % entry["POMS_DATA_DISPATCHER_PROJECT_ID"])
dd_project_entries_to_check[entry["POMS_DATA_DISPATCHER_PROJECT_ID"]] = entry
LOGIT.info("Added dd_task: %s" % entry["POMS_DATA_DISPATCHER_TASK_ID"])
dd_task_entries_to_check[entry["POMS_DATA_DISPATCHER_TASK_ID"]] = entry

if len(dd_project_entries_to_check) > 0:
project_ids = list(dd_project_entries_to_check.keys())
dd_project_statuses = self.get_dd_project_statuses(project_ids)
for key, val in dd_project_statuses.items():
if key == "project_ids":
if len(dd_task_entries_to_check) > 0:
dd_task_ids = list(dd_task_entries_to_check.keys())
dd_task_statuses = self.get_dd_task_statuses(dd_task_ids)
for key, val in dd_task_statuses.items():
if key == "dd_task_ids":
continue
entry = dd_project_entries_to_check.get(key)
entry = dd_task_entries_to_check.get(key)
update_submission = self.maybe_report_data_dispatcher(entry, key, val)
if update_submission:
submissions_to_update[pomsTaskID] = update_submission
Expand Down Expand Up @@ -625,32 +631,49 @@ def check_submissions(self, group=None, full_list=None, since=""):
def get_dd_status(self, entry, dd_pct):

status = entry.get("status", None)
if type(dd_pct) == type(str):
ntot = (int(entry["running"]) + int(entry["idle"]) +
int(entry["held"]) + int(entry["completed"]) +
int(entry["failed"]) + int(entry["cancelled"]))

if ntot >= self.known["maxjobs"].get(entry["pomsTaskID"], 0):
self.known["maxjobs"][entry["pomsTaskID"]] = ntot
else:
ntot = self.known["maxjobs"][entry["pomsTaskID"]]

ncomp = int(entry["completed"]) + int(entry["failed"]) + int(entry["cancelled"])

if ntot > 0:
dd_pct = ncomp * 100.0 / ntot
else:
dd_pct = 0

if entry["done"]:
if entry["error"] or dd_pct < 80:
return "Completed with failures", self.dd_status_map.get("Completed with failures")
return "Completed with failures", self.dd_status_map.get("Completed with failures"), dd_pct
if entry["cancelled"] > 1 and (entry["running"] + entry["idle"] + entry["held"] + entry["failed"] + entry["completed"]) == 0:
return "Cancelled", "Cancelled"
return "Completed", "Completed"
return "Cancelled", "Cancelled", dd_pct
return "Completed", "Completed", dd_pct
else:
if entry["id"] or self.known["jobsub_job_id"][entry["pomsTaskId"]]:
if entry["held"] > 0:
return "Held", "Held"
return "Held", "Held", dd_pct
if entry["idle"]:
return "Idle", "Idle"
return "Idle", "Idle", dd_pct
if dd_pct > 0 and dd_pct < 100:
return "Running", "Running"
return "Running", "Running", dd_pct
if dd_pct == 0:
return "Submitted Pending Start", self.dd_status_map.get("Submitted Pending Start")
return "Submitted Pending Start", self.dd_status_map.get("Submitted Pending Start"), dd_pct
if dd_pct == 100:
return "Completed", "Completed"
return "Completed", "Completed", dd_pct
else:
unknown_job = self.known["unknown_jobs"].get(entry["pomsTaskId"], 0)
if unknown_job < 5:
unknown_job += 1
self.known["unknown_jobs"][entry["pomsTaskId"]] = unknown_job
return "Attempting to get Job Id (Attempt: %d of 5)" % unknown_job, "Unknown"
return "Attempting to get Job Id (Attempt: %d of 5)" % unknown_job, "Unknown", dd_pct
else:
return "Failed to Launch", self.dd_status_map.get("Failed to Launch")
return "Failed to Launch", self.dd_status_map.get("Failed to Launch"), dd_pct



Expand Down
Loading