Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new SLAInfo aspect on Datasets, Data Jobs, and DPI entities #7129

Conversation

divyamanohar-stripe
Copy link
Contributor

@divyamanohar-stripe divyamanohar-stripe commented Jan 25, 2023

Checklist

  • The PR conforms to DataHub's Contributing Guideline (particularly Commit Message Format)
  • Links to related issues (if applicable)
  • Tests for the changes have been added/updated (if applicable)
  • Docs related to the changes have been added/updated (if applicable). If a new feature has been added a Usage Guide has been added for the same.
  • For any breaking change/potential downtime/deprecation/big changes an entry has been made in Updating DataHub

[STRIPE]
This PR is to create the new SLAInfo aspect on DataJobs, Datasets, and DataProcessInstance entities to allow users to emit metadata about SLA information set on these entities. This includes different levels of SLA, and different types.

This has been added and implemented at Stripe, and in an effort to upstream our UI features, we are pushing up some of our metadata model changes.

@github-actions github-actions bot added the product PR or Issue related to the DataHub UI/UX label Jan 25, 2023
@anshbansal anshbansal added the community-contribution PR or Issue raised by member(s) of DataHub Community label Jan 25, 2023
"name": "slaInfo"
}

record SLAInfo {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @divyamanohar-stripe remind me where we define the "time unit" here.. Is the errorStartedBy representing a time after each hour or after each day or something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi John! So this implementation defines SLA in terms of "landing time", meaning the time data or an Airflow tasks lands. In Airflow, tasks (data jobs) have the concepts of "logical/execution dates" which is the date the task is scheduled to run. This means daily task would be created at UTC midnight every day, an hourly task would be created at the start of every hour...etc. If I set an SLA of 3 hours on a daily task, that means I expect the task to succeed at 3am (3 hours after the task is created at 12am UTC). If I set an SLA of 3 hours on an hourly task, that means I expect each hourly run to complete 3 hours after the scheduled time. So the 1pm run should complete by 4pm, the 2pm run should complete by 5 pm..etc. For additional Airflow context, a task only starts running once it's created AND it's dependencies are met (upstreams complete), so it's possible for a daily task to not start until 3am for example because it has many upstreams that need to compelte (the root node will be kicked off at 12am).

I know that's a bit confusing, so lmk if you have any more questions about this!

Copy link
Collaborator

@jjoyce0510 jjoyce0510 Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me test my understanding:

Each parent / root DAG has a specific "cadence" or "frequency" which is represented by the partition of data that each task within it is expected to process / consume (This is the "logical or execution date"). Let's say one DAG is scheduled to run each day and process an entire Day of data, while another is scheduled to run each hour and process a specific hour of data.

The type of "cadence" or "frequency" of the root DAG is exactly aligned the cadences of all Tasks within it. For example, any Task that is part of a daily schedule DAG is by definition a "daily task".

Before each DAG runs, a "partition identifier" is generated based on the cadence of the DAG. Again this is input into each task in the DAG indicating which batch of data process. For example, a daily DAG running at 12am on 01/10/2023 may start with a "partition identifier" of 01-09-2023 (or older), which means that the previous day's data should be processed. Moreover, an hourly DAG running at 2pm on the same day may start with a partition identifier of 01-10-2023-13-00, ie processing the previous 1 hour of data.

It's useful to call out the assumption that the "logical date" ( === "partition identifier") is not describing when exactly a Task or DAG actually ran. It simply describes the batch of data that a given run should consume (This is my hypothesis). This can hint at when it is supposed to run, but this is not a "globally true" expectation -- beyond Stripe. This aligns with my reading of the Airflow docs:

Screen Shot 2023-01-26 at 5 16 54 PM

If my understanding of this is correct, then the "start time" SLA of an individual Task is computed by taking this DAG "partition identifier" (e.g. 01-09-2023), then converting it into a numeric value like a milliseconds timestamp, and then adding the "errorStartedBy" to discern the "expected start time". The "end time" SLA of an individual Task is computed by taking the DAG partition identifier and adding the "errorFinishedBy" value to discern the "expected finished time", e.g.

Start Time SLA (ms) = timestampMillis(partition identifier) + errorStartedBy
Finished Time SLA (ms) = timestampMillis(partition identifier) + errorFinishedBy

For example

Start Time SLA (ms) = timestampMillis(01-10-2023-13) + errorStartedBy
Finished Time SLA (ms) = timestampMillis(01-10-2023-13) + errorFinishedBy

and where violated depends on the real start time of the task.

Start Time SLA Missed = real start time (ms) > Start Time SLA (ms)
End Time SLA Missed = real finished time (ms) > Finished Time SLA (ms)

Is this reading correct?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Examples with values for each variable will definitely help! I'm getting a bit lost in the time units)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, the SLA for a Task is NOT simply relative to the start time of the parent DAG.

For example, if a daily parent DAG begins at 12:05am on 01-10-2023 (delayed), but has the "logical" or "partition identifier" as 01-10-2023-00 then the SLA for a Task inside inside of it is computed based on adding the expected start and finish times to the partition identifier and not to the DAG start time.

SLA != DAG start time ms + Task errorStartBy
SLA == logical / partition time ms + Task errorStartBy

Copy link
Collaborator

@jjoyce0510 jjoyce0510 Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a bunch more thinking on this and have come to believe that the following variables are necessary to model ANY enforce any Data SLA definition for an individual process run (task or dag):

  1. Data window start time (ms)
  2. Data window unit (s/m/h/d)
  3. Data window multiple (1x, 2x, 3x unit)
  4. Expected Processing Start Offset (ms)
  5. Expected Processing End Offset (ms)
  6. Real Processing Start Time (ms)
  7. Real Processing End Time (ms)

Where the "data window" (1-3) defines the window of data processed by a specific task run, which can be thought of as a bucket uniquely keyed on the combination of numbers 1-3.

I think Stripe's concept of a "logical/execution time" is really just another way of defining number 1, whereas the "frequency" with which the job runs is (Daily or Hourly) is used to imply #2 and #3. (e.g. a "daily job" assumes a data window unit of "day" and a multiple of 1.. thus, the processed "data window" = logical execution time + 1d -- or minus 1 day, depending on how you structure the logical times). My guess is that some tasks are actually configured to receive both the logical / execution time (#1) along with the data window definition (#2 + #3) as input params, in order to support running in either daily or hourly mode.

Using these inputs we can derive the SLA for a given run by first computing absolute times from #1-5:

Expected Processing Start Time (ms) = Data Window Start Time (ms) + Data Window Multiple * Data Window Unit * (ms / Data Window Unit) + Expected Processing Start Offset (ms) 
Expected Processing End Time (ms) = Data Window Start Time (ms) + Data Window Multiple * Data Window Unit * (ms / Data Window Unit) + Expected Processing End Offset (ms) 

Using these values, we simply compare against the "real" run values (6 & 7)

Missed Start Time SLA = Real Processing Start Time (ms) > Expected Processing Start Time (ms) 
Missed End Time SLA = Real Processing End Time (ms) > Expected Processing End Time (ms) 

Does this align with how you guys think of it?

Copy link
Contributor Author

@divyamanohar-stripe divyamanohar-stripe Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SLA == logical / partition time ms + Task errorStartBy

This is correct, however, it's important to note that we have created another aspect called DataProcessInstanceExecution which I intend to upstream right after this PR that is the final puzzle piece for being able to determine whether a run missed SLA or not.

The only thing we need to determine whether or not a DataProcessInstance missed its SLA is the execution/logical date, startDate, endDate, and the SLA defined (which is relative to the logical date) We have another aspect we created called DataProcessInstanceExecution which contains the logicalDate (partition date, ex. 2023-01-01 for a daily task instance or 2023-01-01T13 for some hourly task instance), startDate (when the task actually started running) and the endDate (when the task finished running).

For our UI components that have to do with SLA misses, we query for the SLA info aspect set on DataProcessInstance (DPI), as well as the DataProcessInstanceExecution aspect set on DPIs. All we need to do is check:

targetEndDate = logicalDate (timestamp) + errorFinishedBy (timedelta)
if endDate > targetEndDate:
   then we missed SLA

We don't really need or care about the cadence a task runs which you mentioned (Data window unit (s/m/h/d))

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes total sense!

@RyanHolstien
Copy link
Collaborator

Closing, reference PR

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
community-contribution PR or Issue raised by member(s) of DataHub Community poc-marathon-dec-2023 product PR or Issue related to the DataHub UI/UX
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants