Skip to content

Commit

Permalink
Merge branch 'datahub-project:master' into bigquery-profiling
Browse files Browse the repository at this point in the history
  • Loading branch information
MugdhaHardikar-GSLab authored Jul 6, 2022
2 parents 32832d1 + 14f769e commit 256c44e
Show file tree
Hide file tree
Showing 32 changed files with 51,727 additions and 51,025 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,14 @@ public static Authentication getAuthentication(DataFetchingEnvironment environme
return ((QueryContext) environment.getContext()).getAuthentication();
}

/**
* @apiNote DO NOT use this method if the facet filters do not include `.keyword` suffix to ensure
* that it is matched against a keyword filter in ElasticSearch.
*
* @param facetFilterInputs The list of facet filters inputs
* @param validFacetFields The set of valid fields against which to filter for.
* @return A map of filter definitions to be used in ElasticSearch.
*/
@Nonnull
public static Map<String, String> buildFacetFilters(@Nullable List<FacetFilterInput> facetFilterInputs,
@Nonnull Set<String> validFacetFields) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
package com.linkedin.datahub.graphql.resolvers.auth;

import com.google.common.collect.ImmutableSet;
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.authorization.AuthorizationUtils;
import com.linkedin.datahub.graphql.exception.AuthorizationException;
import com.linkedin.datahub.graphql.generated.AccessTokenMetadata;
import com.linkedin.datahub.graphql.generated.EntityType;
import com.linkedin.datahub.graphql.generated.FacetFilterInput;
import com.linkedin.datahub.graphql.generated.ListAccessTokenInput;
import com.linkedin.datahub.graphql.generated.ListAccessTokenResult;
import com.linkedin.datahub.graphql.generated.AccessTokenMetadata;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
import com.linkedin.metadata.search.SearchResult;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;

import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

import static com.linkedin.datahub.graphql.resolvers.ResolverUtils.*;
Expand All @@ -35,8 +31,6 @@
public class ListAccessTokensResolver implements DataFetcher<CompletableFuture<ListAccessTokenResult>> {

private static final String EXPIRES_AT_FIELD_NAME = "expiresAt";
private static final Set<String> FACET_FIELDS =
ImmutableSet.of("ownerUrn", "actorUrn", "name", "createdAt", "expiredAt", "description");

private final EntityClient _entityClient;

Expand Down
7 changes: 7 additions & 0 deletions datahub-web-react/src/Mocks.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ const user2 = {
editableInfo: {
pictureLink: null,
},
editableProperties: {
displayName: 'Test',
title: 'test',
pictureLink: null,
teams: [],
skills: [],
},
globalTags: {
tags: [
{
Expand Down
4 changes: 2 additions & 2 deletions datahub-web-react/src/app/entity/dataJob/DataJobEntity.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ import { SidebarDomainSection } from '../shared/containers/profile/sidebar/Domai
import { RunsTab } from './tabs/RunsTab';
import { EntityMenuItems } from '../shared/EntityDropdown/EntityDropdown';

const getDataJobPlatformName = (data: DataJob): string => {
return data.dataFlow?.platform.properties?.displayName || data.dataFlow?.platform.name || '';
const getDataJobPlatformName = (data?: DataJob): string => {
return data?.dataFlow?.platform?.properties?.displayName || data?.dataFlow?.platform?.name || '';
};

/**
Expand Down
27 changes: 27 additions & 0 deletions datahub-web-react/src/app/entity/shared/siblingUtils.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import merge from 'deepmerge';
import { unionBy } from 'lodash';
import { Entity, MatchedField, Maybe, SiblingProperties } from '../../../types.generated';

function cleanHelper(obj, visited) {
Expand Down Expand Up @@ -41,6 +42,31 @@ const combineMerge = (target, source, options) => {
return destination;
};

const mergeTags = (destinationArray, sourceArray, _options) => {
return unionBy(destinationArray, sourceArray, 'tag.urn');
};

const mergeTerms = (destinationArray, sourceArray, _options) => {
return unionBy(destinationArray, sourceArray, 'term.urn');
};

const mergeAssertions = (destinationArray, sourceArray, _options) => {
return unionBy(destinationArray, sourceArray, 'urn');
};

function getArrayMergeFunction(key) {
switch (key) {
case 'tags':
return mergeTags;
case 'terms':
return mergeTerms;
case 'assertions':
return mergeAssertions;
default:
return undefined;
}
}

const customMerge = (isPrimary, key) => {
if (key === 'upstream' || key === 'downstream') {
return (_secondary, primary) => primary;
Expand All @@ -51,6 +77,7 @@ const customMerge = (isPrimary, key) => {
if (key === 'tags' || key === 'terms' || key === 'assertions') {
return (secondary, primary) => {
return merge(secondary, primary, {
arrayMerge: getArrayMergeFunction(key),
customMerge: customMerge.bind({}, isPrimary),
});
};
Expand Down
2 changes: 1 addition & 1 deletion datahub-web-react/src/app/useGetAuthenticatedUser.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ export function useGetAuthenticatedUser(skip?: boolean) {
if (!userUrn) {
throw new Error('Could not find logged in user.');
}
const { data, error } = useGetMeQuery({ skip });
const { data, error } = useGetMeQuery({ skip, fetchPolicy: 'cache-first' });
if (error) {
console.error(`Could not fetch logged in user from cache. + ${error.message}`);
}
Expand Down
3 changes: 3 additions & 0 deletions datahub-web-react/src/graphql/dataFlow.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ query getDataFlow($urn: String!) {
urn
type
orchestrator
platform {
...platformFields
}
}
ownership {
...ownershipFields
Expand Down
2 changes: 1 addition & 1 deletion docs/architecture/metadata-ingestion.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ The figure below describes all the options possible for connecting your favorite

## Metadata Change Proposal: The Center Piece

The center piece for ingestion is are [Metadata Change Proposals] which represent requests to make a metadata change to an organization's Metadata Graph.
The center piece for ingestion are [Metadata Change Proposals] which represent requests to make a metadata change to an organization's Metadata Graph.
Metadata Change Proposals can be sent over Kafka, for highly scalable async publishing from source systems. They can also be sent directly to the HTTP endpoint exposed by the DataHub service tier to get synchronous success / failure responses.

## Pull-based Integration
Expand Down
108 changes: 85 additions & 23 deletions metadata-ingestion/src/datahub/ingestion/source/looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,11 @@
LookerExplore,
LookerUtil,
)
from datahub.metadata.com.linkedin.pegasus2avro.common import ChangeAuditStamps, Status
from datahub.metadata.com.linkedin.pegasus2avro.common import (
AuditStamp,
ChangeAuditStamps,
Status,
)
from datahub.metadata.com.linkedin.pegasus2avro.metadata.snapshot import (
ChartSnapshot,
DashboardSnapshot,
Expand Down Expand Up @@ -158,8 +162,8 @@ class LookerDashboardSourceConfig(LookerAPIConfig, LookerCommonConfig):
@validator("external_base_url", pre=True, always=True)
def external_url_defaults_to_api_config_base_url(
cls, v: Optional[str], *, values: Dict[str, Any], **kwargs: Dict[str, Any]
) -> str:
return v or values["base_url"]
) -> Optional[str]:
return v or values.get("base_url")

@validator("platform_instance")
def platform_instance_not_supported(cls, v: str) -> str:
Expand Down Expand Up @@ -301,6 +305,10 @@ class LookerDashboard:
is_hidden: bool = False
owner: Optional[LookerUser] = None
strip_user_ids_from_email: Optional[bool] = True
last_updated_at: Optional[datetime.datetime] = None
last_updated_by: Optional[LookerUser] = None
deleted_at: Optional[datetime.datetime] = None
deleted_by: Optional[LookerUser] = None

def url(self, base_url):
# If the base_url contains a port number (like https://company.looker.com:19999) remove the port number
Expand Down Expand Up @@ -460,6 +468,7 @@ def _get_looker_dashboard_element( # noqa: C901
)
for exp in explores:
self.explore_set.add((element.query.model, exp))

return LookerDashboardElement(
id=element.id,
title=element.title if element.title is not None else "",
Expand Down Expand Up @@ -712,7 +721,7 @@ def _make_dashboard_and_chart_mces(
description=looker_dashboard.description or "",
title=looker_dashboard.title,
charts=[mce.proposedSnapshot.urn for mce in chart_mces],
lastModified=ChangeAuditStamps(),
lastModified=self._get_change_audit_stamps(looker_dashboard),
dashboardUrl=looker_dashboard.url(self.source_config.external_base_url),
)

Expand Down Expand Up @@ -752,6 +761,46 @@ def get_ownership(
return ownership
return None

def _get_change_audit_stamps(
self, looker_dashboard: LookerDashboard
) -> ChangeAuditStamps:
change_audit_stamp: ChangeAuditStamps = ChangeAuditStamps()
if looker_dashboard.created_at is not None:
change_audit_stamp.created.time = round(
looker_dashboard.created_at.timestamp() * 1000
)
if looker_dashboard.owner is not None:
owner_urn = looker_dashboard.owner._get_urn(
self.source_config.strip_user_ids_from_email
)
if owner_urn:
change_audit_stamp.created.actor = owner_urn
if looker_dashboard.last_updated_at is not None:
change_audit_stamp.lastModified.time = round(
looker_dashboard.last_updated_at.timestamp() * 1000
)
if looker_dashboard.last_updated_by is not None:
updated_by_urn = looker_dashboard.last_updated_by._get_urn(
self.source_config.strip_user_ids_from_email
)
if updated_by_urn:
change_audit_stamp.lastModified.actor = updated_by_urn
if (
looker_dashboard.is_deleted
and looker_dashboard.deleted_by is not None
and looker_dashboard.deleted_at is not None
):
deleter_urn = looker_dashboard.deleted_by._get_urn(
self.source_config.strip_user_ids_from_email
)
if deleter_urn:
change_audit_stamp.deleted = AuditStamp(
actor=deleter_urn,
time=round(looker_dashboard.deleted_at.timestamp() * 1000),
)

return change_audit_stamp

folder_path_cache: Dict[str, str] = {}

def _get_folder_path(self, folder: FolderBase, client: Looker31SDK) -> str:
Expand Down Expand Up @@ -800,36 +849,43 @@ def _get_looker_dashboard(
if dashboard.id is None or dashboard.title is None:
raise ValueError("Both dashboard ID and title are None")

dashboard_owner = (
looker_dashboard = LookerDashboard(
id=dashboard.id,
title=dashboard.title,
description=dashboard.description,
dashboard_elements=dashboard_elements,
created_at=dashboard.created_at,
is_deleted=dashboard.deleted if dashboard.deleted is not None else False,
is_hidden=dashboard.hidden if dashboard.hidden is not None else False,
folder_path=dashboard_folder_path,
owner=self._get_looker_user(dashboard.user_id),
strip_user_ids_from_email=self.source_config.strip_user_ids_from_email,
last_updated_at=dashboard.updated_at,
last_updated_by=self._get_looker_user(dashboard.last_updater_id),
deleted_at=dashboard.deleted_at,
deleted_by=self._get_looker_user(dashboard.deleter_id),
)
return looker_dashboard

def _get_looker_user(self, user_id: Optional[int]) -> Optional[LookerUser]:
user = (
self.user_registry.get_by_id(
dashboard.user_id,
user_id,
self.source_config.transport_options.get_transport_options()
if self.source_config.transport_options is not None
else None,
)
if self.source_config.extract_owners and dashboard.user_id is not None
if self.source_config.extract_owners and user_id is not None
else None
)

if dashboard_owner is not None and self.source_config.extract_owners:
if user is not None and self.source_config.extract_owners:
# Keep track of how many user ids we were able to resolve
self.resolved_user_ids += 1
if dashboard_owner.email is None:
if user.email is None:
self.email_ids_missing += 1

looker_dashboard = LookerDashboard(
id=dashboard.id,
title=dashboard.title,
description=dashboard.description,
dashboard_elements=dashboard_elements,
created_at=dashboard.created_at,
is_deleted=dashboard.deleted if dashboard.deleted is not None else False,
is_hidden=dashboard.deleted if dashboard.deleted is not None else False,
folder_path=dashboard_folder_path,
owner=dashboard_owner,
strip_user_ids_from_email=self.source_config.strip_user_ids_from_email,
)
return looker_dashboard
return user

def process_dashboard(
self, dashboard_id: str
Expand All @@ -847,11 +903,17 @@ def process_dashboard(
"dashboard_elements",
"dashboard_filters",
"deleted",
"hidden",
"description",
"folder",
"user_id",
"created_at",
"updated_at",
"last_updater_id",
"deleted_at",
"deleter_id",
]
dashboard_object = self.client.dashboard(
dashboard_object: Dashboard = self.client.dashboard(
dashboard_id=dashboard_id,
fields=",".join(fields),
transport_options=self.source_config.transport_options.get_transport_options()
Expand Down
17 changes: 10 additions & 7 deletions metadata-ingestion/src/datahub/ingestion/source/looker_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -494,20 +494,23 @@ def _get_fields_from_sql_equality(sql_fragment: str) -> List[str]:
return field_match.findall(sql_fragment)

@classmethod
def __from_dict(cls, model_name: str, dict: Dict) -> "LookerExplore":
def from_dict(cls, model_name: str, dict: Dict) -> "LookerExplore":
view_names = set()
joins = None
# always add the explore's name or the name from the from clause as the view on which this explore is built
view_names.add(dict.get("from", dict.get("name")))

if dict.get("joins", {}) != {}:
# additionally for join-based explores, pull in the linked views
assert "joins" in dict
view_names = set()
for join in dict["joins"]:
join_from = join.get("from")
view_names.add(join_from or join["name"])
sql_on = join.get("sql_on", None)
if sql_on is not None:
fields = cls._get_fields_from_sql_equality(sql_on)
joins = fields
for f in fields:
view_names.add(LookerUtil._extract_view_from_field(f))
else:
# non-join explore, get view_name from `from` field if possible, default to explore name
view_names = set(dict.get("from", dict.get("name")))

return LookerExplore(
model_name=model_name,
name=dict["name"],
Expand Down
Loading

0 comments on commit 256c44e

Please sign in to comment.