Skip to content

Commit

Permalink
fix(ingestion-web) sorting and filtering uses api (datahub-project#11844
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jayacryl authored Nov 13, 2024
1 parent e672ade commit 64e9114
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 80 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.client.EntityClient;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.query.filter.SortCriterion;
import com.linkedin.metadata.query.filter.SortOrder;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchResult;
import graphql.schema.DataFetcher;
import graphql.schema.DataFetchingEnvironment;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;

/** Lists all ingestion sources stored within DataHub. Requires the MANAGE_INGESTION privilege. */
@Slf4j
public class ListIngestionSourcesResolver
implements DataFetcher<CompletableFuture<ListIngestionSourcesResult>> {

Expand Down Expand Up @@ -57,6 +59,22 @@ public CompletableFuture<ListIngestionSourcesResult> get(
final List<FacetFilterInput> filters =
input.getFilters() == null ? Collections.emptyList() : input.getFilters();

// construct sort criteria, defaulting to systemCreated
final SortCriterion sortCriterion;

// if query is expecting to sort by something, use that
final com.linkedin.datahub.graphql.generated.SortCriterion sortCriterionInput =
input.getSort();
if (sortCriterionInput != null) {
sortCriterion =
new SortCriterion()
.setField(sortCriterionInput.getField())
.setOrder(SortOrder.valueOf(sortCriterionInput.getSortOrder().name()));
} else {
// TODO: default to last executed
sortCriterion = null;
}

return GraphQLConcurrencyUtils.supplyAsync(
() -> {
try {
Expand All @@ -69,41 +87,32 @@ public CompletableFuture<ListIngestionSourcesResult> get(
Constants.INGESTION_SOURCE_ENTITY_NAME,
query,
buildFilter(filters, Collections.emptyList()),
null,
sortCriterion != null ? List.of(sortCriterion) : null,
start,
count);

final List<Urn> entitiesUrnList =
gmsResult.getEntities().stream().map(SearchEntity::getEntity).toList();
// Then, resolve all ingestion sources
final Map<Urn, EntityResponse> entities =
_entityClient.batchGetV2(
context.getOperationContext(),
Constants.INGESTION_SOURCE_ENTITY_NAME,
new HashSet<>(
gmsResult.getEntities().stream()
.map(SearchEntity::getEntity)
.collect(Collectors.toList())),
new HashSet<>(entitiesUrnList),
ImmutableSet.of(
Constants.INGESTION_INFO_ASPECT_NAME,
Constants.INGESTION_SOURCE_KEY_ASPECT_NAME));

final Collection<EntityResponse> sortedEntities =
entities.values().stream()
.sorted(
Comparator.comparingLong(
s ->
-s.getAspects()
.get(Constants.INGESTION_SOURCE_KEY_ASPECT_NAME)
.getCreated()
.getTime()))
.collect(Collectors.toList());
final List<EntityResponse> entitiesOrdered =
entitiesUrnList.stream().map(entities::get).filter(Objects::nonNull).toList();

// Now that we have entities we can bind this to a result.
final ListIngestionSourcesResult result = new ListIngestionSourcesResult();
result.setStart(gmsResult.getFrom());
result.setCount(gmsResult.getPageSize());
result.setTotal(gmsResult.getNumEntities());
result.setIngestionSources(
IngestionResolverUtils.mapIngestionSources(sortedEntities));
IngestionResolverUtils.mapIngestionSources(entitiesOrdered));
return result;

} catch (Exception e) {
Expand Down
5 changes: 5 additions & 0 deletions datahub-graphql-core/src/main/resources/ingestion.graphql
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,11 @@ input ListIngestionSourcesInput {
Optional Facet filters to apply to the result set
"""
filters: [FacetFilterInput!]

"""
Optional sort order. Defaults to use systemCreated.
"""
sort: SortCriterion
}

"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
public class ListIngestionSourceResolverTest {

private static final ListIngestionSourcesInput TEST_INPUT =
new ListIngestionSourcesInput(0, 20, null, null);
new ListIngestionSourcesInput(0, 20, null, null, null);

@Test
public void testGetSuccess() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.linkedin.datahub.upgrade.config;

import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.datahub.upgrade.system.ingestion.BackfillIngestionSourceInfoIndices;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Conditional;
import org.springframework.context.annotation.Configuration;

@Configuration
@Conditional(SystemUpdateCondition.NonBlockingSystemUpdateCondition.class)
public class BackfillIngestionSourceInfoIndicesConfig {

@Bean
public NonBlockingSystemUpgrade backfillIngestionSourceInfoIndices(
final OperationContext opContext,
final EntityService<?> entityService,
final AspectDao aspectDao,
@Value("${systemUpdate.ingestionIndices.enabled}") final boolean enabled,
@Value("${systemUpdate.ingestionIndices.batchSize}") final Integer batchSize,
@Value("${systemUpdate.ingestionIndices.delayMs}") final Integer delayMs,
@Value("${systemUpdate.ingestionIndices.limit}") final Integer limit) {
return new BackfillIngestionSourceInfoIndices(
opContext, entityService, aspectDao, enabled, batchSize, delayMs, limit);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.linkedin.datahub.upgrade.system.ingestion;

import com.google.common.collect.ImmutableList;
import com.linkedin.datahub.upgrade.UpgradeStep;
import com.linkedin.datahub.upgrade.system.NonBlockingSystemUpgrade;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import java.util.List;
import javax.annotation.Nonnull;

public class BackfillIngestionSourceInfoIndices implements NonBlockingSystemUpgrade {

private final List<UpgradeStep> _steps;

public BackfillIngestionSourceInfoIndices(
@Nonnull OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
boolean enabled,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
if (enabled) {
_steps =
ImmutableList.of(
new BackfillIngestionSourceInfoIndicesStep(
opContext, entityService, aspectDao, batchSize, batchDelayMs, limit));
} else {
_steps = ImmutableList.of();
}
}

@Override
public String id() {
return getClass().getSimpleName();
}

@Override
public List<UpgradeStep> steps() {
return _steps;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package com.linkedin.datahub.upgrade.system.ingestion;

import static com.linkedin.metadata.Constants.*;

import com.linkedin.common.urn.Urn;
import com.linkedin.datahub.upgrade.system.AbstractMCLStep;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.entity.AspectDao;
import com.linkedin.metadata.entity.EntityService;
import io.datahubproject.metadata.context.OperationContext;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class BackfillIngestionSourceInfoIndicesStep extends AbstractMCLStep {

private static final String UPGRADE_ID = BackfillIngestionSourceInfoIndices.class.getSimpleName();
private static final Urn UPGRADE_ID_URN = BootstrapStep.getUpgradeUrn(UPGRADE_ID);

public BackfillIngestionSourceInfoIndicesStep(
@Nonnull OperationContext opContext,
EntityService<?> entityService,
AspectDao aspectDao,
Integer batchSize,
Integer batchDelayMs,
Integer limit) {
super(opContext, entityService, aspectDao, batchSize, batchDelayMs, limit);
}

@Override
public String id() {
return UPGRADE_ID;
}

@Nonnull
@Override
protected String getAspectName() {
return INGESTION_INFO_ASPECT_NAME;
}

@Nullable
@Override
protected String getUrnLike() {
return "urn:li:" + INGESTION_SOURCE_ENTITY_NAME + ":%";
}

/**
* Returns whether the upgrade should proceed if the step fails after exceeding the maximum
* retries.
*/
@Override
public boolean isOptional() {
return true;
}
}
43 changes: 26 additions & 17 deletions datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import TabToolbar from '../../entity/shared/components/styled/TabToolbar';
import { IngestionSourceBuilderModal } from './builder/IngestionSourceBuilderModal';
import { addToListIngestionSourcesCache, CLI_EXECUTOR_ID, removeFromListIngestionSourcesCache } from './utils';
import { DEFAULT_EXECUTOR_ID, SourceBuilderState, StringMapEntryInput } from './builder/types';
import { IngestionSource, UpdateIngestionSourceInput } from '../../../types.generated';
import { IngestionSource, SortCriterion, SortOrder, UpdateIngestionSourceInput } from '../../../types.generated';
import { SearchBar } from '../../search/SearchBar';
import { useEntityRegistry } from '../../useEntityRegistry';
import { ExecutionDetailsModal } from './executions/ExecutionRequestDetailsModal';
Expand Down Expand Up @@ -60,16 +60,6 @@ export enum IngestionSourceType {
CLI,
}

export function shouldIncludeSource(source: any, sourceFilter: IngestionSourceType) {
if (sourceFilter === IngestionSourceType.CLI) {
return source.config.executorId === CLI_EXECUTOR_ID;
}
if (sourceFilter === IngestionSourceType.UI) {
return source.config.executorId !== CLI_EXECUTOR_ID;
}
return true;
}

const DEFAULT_PAGE_SIZE = 25;

const removeExecutionsFromIngestionSource = (source) => {
Expand Down Expand Up @@ -105,6 +95,7 @@ export const IngestionSourceList = () => {
// Set of removed urns used to account for eventual consistency
const [removedUrns, setRemovedUrns] = useState<string[]>([]);
const [sourceFilter, setSourceFilter] = useState(IngestionSourceType.ALL);
const [sort, setSort] = useState<SortCriterion>();
const [hideSystemSources, setHideSystemSources] = useState(true);

/**
Expand All @@ -115,16 +106,24 @@ export const IngestionSourceList = () => {
// Ingestion Source Default Filters
const filters = hideSystemSources
? [{ field: 'sourceType', values: [SYSTEM_INTERNAL_SOURCE_TYPE], negated: true }]
: undefined;
: [];
if (sourceFilter !== IngestionSourceType.ALL) {
filters.push({
field: 'sourceExecutorId',
values: [CLI_EXECUTOR_ID],
negated: sourceFilter !== IngestionSourceType.CLI,
});
}

// Ingestion Source Queries
const { loading, error, data, client, refetch } = useListIngestionSourcesQuery({
variables: {
input: {
start,
count: pageSize,
query: (query?.length && query) || undefined,
filters,
query: query?.length ? query : undefined,
filters: filters.length ? filters : undefined,
sort,
},
},
fetchPolicy: (query?.length || 0) > 0 ? 'no-cache' : 'cache-first',
Expand All @@ -138,9 +137,7 @@ export const IngestionSourceList = () => {

const totalSources = data?.listIngestionSources?.total || 0;
const sources = data?.listIngestionSources?.ingestionSources || [];
const filteredSources = sources.filter(
(source) => !removedUrns.includes(source.urn) && shouldIncludeSource(source, sourceFilter),
) as IngestionSource[];
const filteredSources = sources.filter((source) => !removedUrns.includes(source.urn)) as IngestionSource[];
const focusSource =
(focusSourceUrn && filteredSources.find((source) => source.urn === focusSourceUrn)) || undefined;

Expand Down Expand Up @@ -376,6 +373,17 @@ export const IngestionSourceList = () => {
setFocusSourceUrn(undefined);
};

const onChangeSort = (field, order) => {
setSort(
order
? {
sortOrder: order === 'ascend' ? SortOrder.Ascending : SortOrder.Descending,
field,
}
: undefined,
);
};

return (
<>
{!data && loading && <Message type="loading" content="Loading ingestion sources..." />}
Expand Down Expand Up @@ -438,6 +446,7 @@ export const IngestionSourceList = () => {
onView={onView}
onDelete={onDelete}
onRefresh={onRefresh}
onChangeSort={onChangeSort}
/>
<SourcePaginationContainer>
<Pagination
Expand Down
Loading

0 comments on commit 64e9114

Please sign in to comment.