-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Create new SLAInfo aspect on Datasets, Data Jobs, and DPI entities #7129
Closed
divyamanohar-stripe
wants to merge
5
commits into
datahub-project:master
from
divyamanohar-stripe:divyamanohar/sla-info-aspect
Closed
Changes from all commits
Commits
Show all changes
5 commits
Select commit
Hold shift + click to select a range
aa74450
Create new SLAInfo aspect on Datasets, Data Jobs, and DPI entities
divyamanohar-stripe c1b1056
Merge branch 'datahub-project:master' into divyamanohar/sla-info-aspect
divyamanohar-stripe 4fd1e4d
add whitespace before {
divyamanohar-stripe 41a18b8
Merge branch 'master' into divyamanohar/sla-info-aspect
divyamanohar-stripe c5aa947
Merge branch 'master' into divyamanohar/sla-info-aspect
divyamanohar-stripe File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
38 changes: 38 additions & 0 deletions
38
metadata-models/src/main/pegasus/com/linkedin/datajob/SLAInfo.pdl
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
namespace com.linkedin.datajob | ||
|
||
@Aspect = { | ||
"name": "slaInfo" | ||
} | ||
|
||
record SLAInfo { | ||
|
||
@Searchable = { | ||
"fieldName": "slaDefined", | ||
"fieldType": "TEXT", | ||
"addToFilters": true, | ||
"hasValuesFieldName": "hasSLA", | ||
"queryByDefault": false, | ||
"filterNameOverride": "SLA Info" | ||
} | ||
slaDefined: string | ||
|
||
@Searchable = { | ||
"fieldType": "COUNT" | ||
} | ||
errorStartedBy: optional float | ||
|
||
@Searchable = { | ||
"fieldType": "COUNT" | ||
} | ||
errorFinishedBy: optional float | ||
|
||
@Searchable = { | ||
"fieldType": "COUNT" | ||
} | ||
warnStartedBy: optional float | ||
|
||
@Searchable = { | ||
"fieldType": "COUNT" | ||
} | ||
warnFinishedBy: optional float | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hey @divyamanohar-stripe remind me where we define the "time unit" here.. Is the errorStartedBy representing a time after each hour or after each day or something else?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi John! So this implementation defines SLA in terms of "landing time", meaning the time data or an Airflow tasks lands. In Airflow, tasks (data jobs) have the concepts of "logical/execution dates" which is the date the task is scheduled to run. This means daily task would be created at UTC midnight every day, an hourly task would be created at the start of every hour...etc. If I set an SLA of 3 hours on a daily task, that means I expect the task to succeed at 3am (3 hours after the task is created at 12am UTC). If I set an SLA of 3 hours on an hourly task, that means I expect each hourly run to complete 3 hours after the scheduled time. So the 1pm run should complete by 4pm, the 2pm run should complete by 5 pm..etc. For additional Airflow context, a task only starts running once it's created AND it's dependencies are met (upstreams complete), so it's possible for a daily task to not start until 3am for example because it has many upstreams that need to compelte (the root node will be kicked off at 12am).
I know that's a bit confusing, so lmk if you have any more questions about this!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me test my understanding:
Each parent / root DAG has a specific "cadence" or "frequency" which is represented by the partition of data that each task within it is expected to process / consume (This is the "logical or execution date"). Let's say one DAG is scheduled to run each day and process an entire Day of data, while another is scheduled to run each hour and process a specific hour of data.
The type of "cadence" or "frequency" of the root DAG is exactly aligned the cadences of all Tasks within it. For example, any Task that is part of a daily schedule DAG is by definition a "daily task".
Before each DAG runs, a "partition identifier" is generated based on the cadence of the DAG. Again this is input into each task in the DAG indicating which batch of data process. For example, a daily DAG running at 12am on 01/10/2023 may start with a "partition identifier" of
01-09-2023
(or older), which means that the previous day's data should be processed. Moreover, an hourly DAG running at 2pm on the same day may start with a partition identifier of01-10-2023-13-00
, ie processing the previous 1 hour of data.It's useful to call out the assumption that the "logical date" ( === "partition identifier") is not describing when exactly a Task or DAG actually ran. It simply describes the batch of data that a given run should consume (This is my hypothesis). This can hint at when it is supposed to run, but this is not a "globally true" expectation -- beyond Stripe. This aligns with my reading of the Airflow docs:
If my understanding of this is correct, then the "start time" SLA of an individual Task is computed by taking this DAG "partition identifier" (e.g. 01-09-2023), then converting it into a numeric value like a milliseconds timestamp, and then adding the "errorStartedBy" to discern the "expected start time". The "end time" SLA of an individual Task is computed by taking the DAG partition identifier and adding the "errorFinishedBy" value to discern the "expected finished time", e.g.
Start Time SLA (ms) = timestampMillis(partition identifier) + errorStartedBy
Finished Time SLA (ms) = timestampMillis(partition identifier) + errorFinishedBy
For example
Start Time SLA (ms) = timestampMillis(01-10-2023-13) + errorStartedBy
Finished Time SLA (ms) = timestampMillis(01-10-2023-13) + errorFinishedBy
and where violated depends on the real start time of the task.
Start Time SLA Missed = real start time (ms) > Start Time SLA (ms)
End Time SLA Missed = real finished time (ms) > Finished Time SLA (ms)
Is this reading correct?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Examples with values for each variable will definitely help! I'm getting a bit lost in the time units)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, the SLA for a Task is NOT simply relative to the start time of the parent DAG.
For example, if a daily parent DAG begins at 12:05am on 01-10-2023 (delayed), but has the "logical" or "partition identifier" as 01-10-2023-00 then the SLA for a Task inside inside of it is computed based on adding the expected start and finish times to the partition identifier and not to the DAG start time.
SLA != DAG start time ms + Task errorStartBy
SLA == logical / partition time ms + Task errorStartBy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did a bunch more thinking on this and have come to believe that the following variables are necessary to model ANY enforce any Data SLA definition for an individual process run (task or dag):
Where the "data window" (1-3) defines the window of data processed by a specific task run, which can be thought of as a bucket uniquely keyed on the combination of numbers 1-3.
I think Stripe's concept of a "logical/execution time" is really just another way of defining number 1, whereas the "frequency" with which the job runs is (Daily or Hourly) is used to imply #2 and #3. (e.g. a "daily job" assumes a data window unit of "day" and a multiple of 1.. thus, the processed "data window" = logical execution time + 1d -- or minus 1 day, depending on how you structure the logical times). My guess is that some tasks are actually configured to receive both the logical / execution time (#1) along with the data window definition (#2 + #3) as input params, in order to support running in either daily or hourly mode.
Using these inputs we can derive the SLA for a given run by first computing absolute times from #1-5:
Using these values, we simply compare against the "real" run values (6 & 7)
Does this align with how you guys think of it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is correct, however, it's important to note that we have created another aspect called
DataProcessInstanceExecution
which I intend to upstream right after this PR that is the final puzzle piece for being able to determine whether a run missed SLA or not.The only thing we need to determine whether or not a DataProcessInstance missed its SLA is the execution/logical date, startDate, endDate, and the SLA defined (which is relative to the logical date) We have another aspect we created called
DataProcessInstanceExecution
which contains the logicalDate (partition date, ex. 2023-01-01 for a daily task instance or 2023-01-01T13 for some hourly task instance), startDate (when the task actually started running) and the endDate (when the task finished running).For our UI components that have to do with SLA misses, we query for the SLA info aspect set on DataProcessInstance (DPI), as well as the DataProcessInstanceExecution aspect set on DPIs. All we need to do is check:
We don't really need or care about the cadence a task runs which you mentioned (Data window unit (s/m/h/d))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes total sense!