Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create new SLAInfo aspect on Datasets, Data Jobs, and DPI entities #7129

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ public class DataJobType implements SearchableEntityType<DataJob, String>, Brows
STATUS_ASPECT_NAME,
DOMAINS_ASPECT_NAME,
DEPRECATION_ASPECT_NAME,
DATA_PLATFORM_INSTANCE_ASPECT_NAME
DATA_PLATFORM_INSTANCE_ASPECT_NAME,
SLA_INFO_ASPECT_NAME
);
private static final Set<String> FACET_FIELDS = ImmutableSet.of("flow");
private final EntityClient _entityClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.datahub.graphql.generated.DataJobProperties;
import com.linkedin.datahub.graphql.generated.Dataset;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.SLAInfo;
import com.linkedin.datahub.graphql.types.common.mappers.DataPlatformInstanceAspectMapper;
import com.linkedin.datahub.graphql.types.common.mappers.DeprecationMapper;
import com.linkedin.datahub.graphql.types.common.mappers.InstitutionalMemoryMapper;
Expand Down Expand Up @@ -98,6 +99,15 @@ public DataJob apply(@Nonnull final EntityResponse entityResponse) {
result.setDeprecation(DeprecationMapper.map(new Deprecation(data)));
} else if (DATA_PLATFORM_INSTANCE_ASPECT_NAME.equals(name)) {
result.setDataPlatformInstance(DataPlatformInstanceAspectMapper.map(new DataPlatformInstance(data)));
} else if (SLA_INFO_ASPECT_NAME.equals(name)) {
final com.linkedin.datajob.SLAInfo gmsSLAInfo = new com.linkedin.datajob.SLAInfo(data);
final SLAInfo slaInfo = new SLAInfo();
slaInfo.setSlaDefined(gmsSLAInfo.getSlaDefined());
slaInfo.setErrorStartedBy(gmsSLAInfo.getErrorStartedBy());
slaInfo.setWarnStartedBy(gmsSLAInfo.getWarnStartedBy());
slaInfo.setErrorFinishedBy(gmsSLAInfo.getErrorFinishedBy());
slaInfo.setWarnFinishedBy(gmsSLAInfo.getWarnFinishedBy());
result.setSlaInfo(slaInfo);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.datahub.graphql.generated.DataProcessInstance;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.SLAInfo;
import com.linkedin.datahub.graphql.types.common.mappers.AuditStampMapper;
import com.linkedin.datahub.graphql.types.common.mappers.util.MappingHelper;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
Expand Down Expand Up @@ -37,6 +38,7 @@ public DataProcessInstance apply(@Nonnull final EntityResponse entityResponse) {
EnvelopedAspectMap aspectMap = entityResponse.getAspects();
MappingHelper<DataProcessInstance> mappingHelper = new MappingHelper<>(aspectMap, result);
mappingHelper.mapToResult(DATA_PROCESS_INSTANCE_PROPERTIES_ASPECT_NAME, this::mapDataProcessProperties);
mappingHelper.mapToResult(SLA_INFO_ASPECT_NAME, this::mapSLAInfo);

return mappingHelper.getResult();
}
Expand All @@ -51,4 +53,15 @@ private void mapDataProcessProperties(@Nonnull DataProcessInstance dpi, @Nonnull
dpi.setExternalUrl(dataProcessInstanceProperties.getExternalUrl().toString());
}
}

private void mapSLAInfo(@Nonnull DataProcessInstance dpi, @Nonnull DataMap dataMap) {
final com.linkedin.datajob.SLAInfo gmsSLAInfo = new com.linkedin.datajob.SLAInfo(dataMap);
final SLAInfo slaInfo = new SLAInfo();
slaInfo.setSlaDefined(gmsSLAInfo.getSlaDefined());
slaInfo.setErrorStartedBy(gmsSLAInfo.getErrorStartedBy());
slaInfo.setWarnStartedBy(gmsSLAInfo.getWarnStartedBy());
slaInfo.setErrorFinishedBy(gmsSLAInfo.getErrorFinishedBy());
slaInfo.setWarnFinishedBy(gmsSLAInfo.getWarnFinishedBy());
dpi.setSlaInfo(slaInfo);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public class DatasetType implements SearchableEntityType<Dataset, String>, Brows
SCHEMA_METADATA_ASPECT_NAME,
DATA_PLATFORM_INSTANCE_ASPECT_NAME,
SIBLINGS_ASPECT_NAME,
EMBED_ASPECT_NAME
EMBED_ASPECT_NAME,
SLA_INFO_ASPECT_NAME
);

private static final Set<String> FACET_FIELDS = ImmutableSet.of("origin", "platform");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.linkedin.datahub.graphql.generated.DatasetEditableProperties;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FabricType;
import com.linkedin.datahub.graphql.generated.SLAInfo;
import com.linkedin.datahub.graphql.types.common.mappers.DataPlatformInstanceAspectMapper;
import com.linkedin.datahub.graphql.types.common.mappers.DeprecationMapper;
import com.linkedin.datahub.graphql.types.common.mappers.EmbedMapper;
Expand Down Expand Up @@ -105,6 +106,7 @@ public Dataset apply(@Nonnull final EntityResponse entityResponse) {
dataset.setFineGrainedLineages(UpstreamLineagesMapper.map(new UpstreamLineage(dataMap))));
mappingHelper.mapToResult(EMBED_ASPECT_NAME, (dataset, dataMap) ->
dataset.setEmbed(EmbedMapper.map(new Embed(dataMap))));
mappingHelper.mapToResult(SLA_INFO_ASPECT_NAME, this::mapSLAInfo);
return mappingHelper.getResult();
}

Expand Down Expand Up @@ -174,4 +176,15 @@ private void mapDomains(@Nonnull Dataset dataset, @Nonnull DataMap dataMap) {
final Domains domains = new Domains(dataMap);
dataset.setDomain(DomainAssociationMapper.map(domains, dataset.getUrn()));
}

private void mapSLAInfo(@Nonnull Dataset dataset, @Nonnull DataMap dataMap) {
final com.linkedin.datajob.SLAInfo gmsSLAInfo = new com.linkedin.datajob.SLAInfo(dataMap);
final SLAInfo slaInfo = new SLAInfo();
slaInfo.setSlaDefined(gmsSLAInfo.getSlaDefined());
slaInfo.setErrorStartedBy(gmsSLAInfo.getErrorStartedBy());
slaInfo.setWarnStartedBy(gmsSLAInfo.getWarnStartedBy());
slaInfo.setErrorFinishedBy(gmsSLAInfo.getErrorFinishedBy());
slaInfo.setWarnFinishedBy(gmsSLAInfo.getWarnFinishedBy());
dataset.setSlaInfo(slaInfo);
}
}
32 changes: 32 additions & 0 deletions datahub-graphql-core/src/main/resources/entity.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -1241,6 +1241,11 @@ type Dataset implements EntityWithRelationships & Entity & BrowsableEntity {
Privileges given to a user relevant to this entity
"""
privileges: EntityPrivileges

"""
User-defined SLA info set on the corresponding Dataset
"""
slaInfo: SLAInfo
}

type FineGrainedLineage {
Expand Down Expand Up @@ -5354,6 +5359,11 @@ type DataJob implements EntityWithRelationships & Entity & BrowsableEntity {
Privileges given to a user relevant to this entity
"""
privileges: EntityPrivileges

"""
User-defined SLA info set on the corresponding Data Job
"""
slaInfo: SLAInfo
}

"""
Expand Down Expand Up @@ -5401,6 +5411,11 @@ type DataProcessInstance implements EntityWithRelationships & Entity {
The link to view the task run in the source system
"""
externalUrl: String

"""
User-defined SLA info set on the corresponding data process
"""
slaInfo: SLAInfo
}

"""
Expand Down Expand Up @@ -5483,6 +5498,23 @@ enum DataProcessInstanceRunResultType {
UP_FOR_RETRY
}

"""
User-defined SLAs set on DataJobs, Datasets and DataProcessInstances
"""
type SLAInfo {
"""
Whether or not there is an SLA defined
"""
slaDefined: String!

"""
Different SLA types and levels
"""
errorStartedBy: Float
errorFinishedBy: Float
warnStartedBy: Float
warnFinishedBy: Float
}

"""
Deprecated, use DataJobProperties instead
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public class Constants {
public static final String ORIGIN_ASPECT_NAME = "origin";
public static final String INPUT_FIELDS_ASPECT_NAME = "inputFields";
public static final String EMBED_ASPECT_NAME = "embed";
public static final String SLA_INFO_ASPECT_NAME = "slaInfo";

// User
public static final String CORP_USER_KEY_ASPECT_NAME = "corpUserKey";
Expand Down
38 changes: 38 additions & 0 deletions metadata-models/src/main/pegasus/com/linkedin/datajob/SLAInfo.pdl
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
namespace com.linkedin.datajob

@Aspect = {
"name": "slaInfo"
}

record SLAInfo {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @divyamanohar-stripe remind me where we define the "time unit" here.. Is the errorStartedBy representing a time after each hour or after each day or something else?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi John! So this implementation defines SLA in terms of "landing time", meaning the time data or an Airflow tasks lands. In Airflow, tasks (data jobs) have the concepts of "logical/execution dates" which is the date the task is scheduled to run. This means daily task would be created at UTC midnight every day, an hourly task would be created at the start of every hour...etc. If I set an SLA of 3 hours on a daily task, that means I expect the task to succeed at 3am (3 hours after the task is created at 12am UTC). If I set an SLA of 3 hours on an hourly task, that means I expect each hourly run to complete 3 hours after the scheduled time. So the 1pm run should complete by 4pm, the 2pm run should complete by 5 pm..etc. For additional Airflow context, a task only starts running once it's created AND it's dependencies are met (upstreams complete), so it's possible for a daily task to not start until 3am for example because it has many upstreams that need to compelte (the root node will be kicked off at 12am).

I know that's a bit confusing, so lmk if you have any more questions about this!

Copy link
Collaborator

@jjoyce0510 jjoyce0510 Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me test my understanding:

Each parent / root DAG has a specific "cadence" or "frequency" which is represented by the partition of data that each task within it is expected to process / consume (This is the "logical or execution date"). Let's say one DAG is scheduled to run each day and process an entire Day of data, while another is scheduled to run each hour and process a specific hour of data.

The type of "cadence" or "frequency" of the root DAG is exactly aligned the cadences of all Tasks within it. For example, any Task that is part of a daily schedule DAG is by definition a "daily task".

Before each DAG runs, a "partition identifier" is generated based on the cadence of the DAG. Again this is input into each task in the DAG indicating which batch of data process. For example, a daily DAG running at 12am on 01/10/2023 may start with a "partition identifier" of 01-09-2023 (or older), which means that the previous day's data should be processed. Moreover, an hourly DAG running at 2pm on the same day may start with a partition identifier of 01-10-2023-13-00, ie processing the previous 1 hour of data.

It's useful to call out the assumption that the "logical date" ( === "partition identifier") is not describing when exactly a Task or DAG actually ran. It simply describes the batch of data that a given run should consume (This is my hypothesis). This can hint at when it is supposed to run, but this is not a "globally true" expectation -- beyond Stripe. This aligns with my reading of the Airflow docs:

Screen Shot 2023-01-26 at 5 16 54 PM

If my understanding of this is correct, then the "start time" SLA of an individual Task is computed by taking this DAG "partition identifier" (e.g. 01-09-2023), then converting it into a numeric value like a milliseconds timestamp, and then adding the "errorStartedBy" to discern the "expected start time". The "end time" SLA of an individual Task is computed by taking the DAG partition identifier and adding the "errorFinishedBy" value to discern the "expected finished time", e.g.

Start Time SLA (ms) = timestampMillis(partition identifier) + errorStartedBy
Finished Time SLA (ms) = timestampMillis(partition identifier) + errorFinishedBy

For example

Start Time SLA (ms) = timestampMillis(01-10-2023-13) + errorStartedBy
Finished Time SLA (ms) = timestampMillis(01-10-2023-13) + errorFinishedBy

and where violated depends on the real start time of the task.

Start Time SLA Missed = real start time (ms) > Start Time SLA (ms)
End Time SLA Missed = real finished time (ms) > Finished Time SLA (ms)

Is this reading correct?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Examples with values for each variable will definitely help! I'm getting a bit lost in the time units)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, the SLA for a Task is NOT simply relative to the start time of the parent DAG.

For example, if a daily parent DAG begins at 12:05am on 01-10-2023 (delayed), but has the "logical" or "partition identifier" as 01-10-2023-00 then the SLA for a Task inside inside of it is computed based on adding the expected start and finish times to the partition identifier and not to the DAG start time.

SLA != DAG start time ms + Task errorStartBy
SLA == logical / partition time ms + Task errorStartBy

Copy link
Collaborator

@jjoyce0510 jjoyce0510 Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a bunch more thinking on this and have come to believe that the following variables are necessary to model ANY enforce any Data SLA definition for an individual process run (task or dag):

  1. Data window start time (ms)
  2. Data window unit (s/m/h/d)
  3. Data window multiple (1x, 2x, 3x unit)
  4. Expected Processing Start Offset (ms)
  5. Expected Processing End Offset (ms)
  6. Real Processing Start Time (ms)
  7. Real Processing End Time (ms)

Where the "data window" (1-3) defines the window of data processed by a specific task run, which can be thought of as a bucket uniquely keyed on the combination of numbers 1-3.

I think Stripe's concept of a "logical/execution time" is really just another way of defining number 1, whereas the "frequency" with which the job runs is (Daily or Hourly) is used to imply #2 and #3. (e.g. a "daily job" assumes a data window unit of "day" and a multiple of 1.. thus, the processed "data window" = logical execution time + 1d -- or minus 1 day, depending on how you structure the logical times). My guess is that some tasks are actually configured to receive both the logical / execution time (#1) along with the data window definition (#2 + #3) as input params, in order to support running in either daily or hourly mode.

Using these inputs we can derive the SLA for a given run by first computing absolute times from #1-5:

Expected Processing Start Time (ms) = Data Window Start Time (ms) + Data Window Multiple * Data Window Unit * (ms / Data Window Unit) + Expected Processing Start Offset (ms) 
Expected Processing End Time (ms) = Data Window Start Time (ms) + Data Window Multiple * Data Window Unit * (ms / Data Window Unit) + Expected Processing End Offset (ms) 

Using these values, we simply compare against the "real" run values (6 & 7)

Missed Start Time SLA = Real Processing Start Time (ms) > Expected Processing Start Time (ms) 
Missed End Time SLA = Real Processing End Time (ms) > Expected Processing End Time (ms) 

Does this align with how you guys think of it?

Copy link
Contributor Author

@divyamanohar-stripe divyamanohar-stripe Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SLA == logical / partition time ms + Task errorStartBy

This is correct, however, it's important to note that we have created another aspect called DataProcessInstanceExecution which I intend to upstream right after this PR that is the final puzzle piece for being able to determine whether a run missed SLA or not.

The only thing we need to determine whether or not a DataProcessInstance missed its SLA is the execution/logical date, startDate, endDate, and the SLA defined (which is relative to the logical date) We have another aspect we created called DataProcessInstanceExecution which contains the logicalDate (partition date, ex. 2023-01-01 for a daily task instance or 2023-01-01T13 for some hourly task instance), startDate (when the task actually started running) and the endDate (when the task finished running).

For our UI components that have to do with SLA misses, we query for the SLA info aspect set on DataProcessInstance (DPI), as well as the DataProcessInstanceExecution aspect set on DPIs. All we need to do is check:

targetEndDate = logicalDate (timestamp) + errorFinishedBy (timedelta)
if endDate > targetEndDate:
   then we missed SLA

We don't really need or care about the cadence a task runs which you mentioned (Data window unit (s/m/h/d))

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes total sense!


@Searchable = {
"fieldName": "slaDefined",
"fieldType": "TEXT",
"addToFilters": true,
"hasValuesFieldName": "hasSLA",
"queryByDefault": false,
"filterNameOverride": "SLA Info"
}
slaDefined: string

@Searchable = {
"fieldType": "COUNT"
}
errorStartedBy: optional float

@Searchable = {
"fieldType": "COUNT"
}
errorFinishedBy: optional float

@Searchable = {
"fieldType": "COUNT"
}
warnStartedBy: optional float

@Searchable = {
"fieldType": "COUNT"
}
warnFinishedBy: optional float
}
3 changes: 3 additions & 0 deletions metadata-models/src/main/resources/entity-registry.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ entities:
- testResults
- siblings
- embed
- slaInfo
- name: dataHubPolicy
doc: DataHub Policies represent access policies granted to users or groups on metadata operations like edit, view etc.
category: internal
Expand All @@ -36,6 +37,7 @@ entities:
- domains
- deprecation
- versionInfo
- slaInfo
- name: dataFlow
category: core
keyAspect: dataFlowKey
Expand All @@ -52,6 +54,7 @@ entities:
- dataProcessInstanceProperties
- dataProcessInstanceRelationships
- dataProcessInstanceRunEvent
- slaInfo
- name: chart
keyAspect: chartKey
aspects:
Expand Down