Skip to content

Commit 3e89681

Browse files
committed
feat: Added Lineage APIs to get registry objects relationships
Signed-off-by: ntkathole <[email protected]>
1 parent 40d25c6 commit 3e89681

File tree

14 files changed

+1416
-6
lines changed

14 files changed

+1416
-6
lines changed

protos/feast/registry/RegistryServer.proto

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ service RegistryServer{
8888
rpc Refresh (RefreshRequest) returns (google.protobuf.Empty) {}
8989
rpc Proto (google.protobuf.Empty) returns (feast.core.Registry) {}
9090

91+
// Lineage RPCs
92+
rpc GetRegistryLineage (GetRegistryLineageRequest) returns (GetRegistryLineageResponse) {}
93+
rpc GetObjectRelationships (GetObjectRelationshipsRequest) returns (GetObjectRelationshipsResponse) {}
94+
9195
}
9296

9397
message RefreshRequest {
@@ -424,3 +428,39 @@ message DeleteProjectRequest {
424428
string name = 1;
425429
bool commit = 2;
426430
}
431+
432+
// Lineage
433+
434+
message EntityReference {
435+
string type = 1; // "dataSource", "entity", "featureView", "featureService"
436+
string name = 2;
437+
}
438+
439+
message EntityRelation {
440+
EntityReference source = 1;
441+
EntityReference target = 2;
442+
}
443+
444+
message GetRegistryLineageRequest {
445+
string project = 1;
446+
bool allow_cache = 2;
447+
string filter_object_type = 3;
448+
string filter_object_name = 4;
449+
}
450+
451+
message GetRegistryLineageResponse {
452+
repeated EntityRelation relationships = 1;
453+
repeated EntityRelation indirect_relationships = 2;
454+
}
455+
456+
message GetObjectRelationshipsRequest {
457+
string project = 1;
458+
string object_type = 2;
459+
string object_name = 3;
460+
bool include_indirect = 4;
461+
bool allow_cache = 5;
462+
}
463+
464+
message GetObjectRelationshipsResponse {
465+
repeated EntityRelation relationships = 1;
466+
}

sdk/python/feast/api/registry/rest/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from feast.api.registry.rest.entities import get_entity_router
55
from feast.api.registry.rest.feature_services import get_feature_service_router
66
from feast.api.registry.rest.feature_views import get_feature_view_router
7+
from feast.api.registry.rest.lineage import get_lineage_router
78
from feast.api.registry.rest.permissions import get_permission_router
89
from feast.api.registry.rest.projects import get_project_router
910
from feast.api.registry.rest.saved_datasets import get_saved_dataset_router
@@ -14,6 +15,7 @@ def register_all_routes(app: FastAPI, grpc_handler):
1415
app.include_router(get_data_source_router(grpc_handler))
1516
app.include_router(get_feature_service_router(grpc_handler))
1617
app.include_router(get_feature_view_router(grpc_handler))
18+
app.include_router(get_lineage_router(grpc_handler))
1719
app.include_router(get_permission_router(grpc_handler))
1820
app.include_router(get_project_router(grpc_handler))
1921
app.include_router(get_saved_dataset_router(grpc_handler))
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
"""REST API endpoints for registry lineage and relationships."""
2+
3+
from typing import Optional
4+
5+
from fastapi import APIRouter, HTTPException, Query
6+
7+
from feast.api.registry.rest.rest_utils import grpc_call
8+
from feast.protos.feast.registry import RegistryServer_pb2
9+
10+
11+
def get_lineage_router(grpc_handler) -> APIRouter:
12+
router = APIRouter()
13+
14+
@router.get("/lineage/registry")
15+
def get_registry_lineage(
16+
project: str = Query(...),
17+
allow_cache: bool = Query(True),
18+
filter_object_type: Optional[str] = Query(None),
19+
filter_object_name: Optional[str] = Query(None),
20+
):
21+
"""
22+
Get complete registry lineage with relationships and indirect relationships.
23+
Args:
24+
project: Project name
25+
allow_cache: Whether to allow cached data
26+
filter_object_type: Optional filter by object type (dataSource, entity, featureView, featureService)
27+
filter_object_name: Optional filter by object name
28+
Returns:
29+
Dictionary containing relationships and indirect_relationships arrays
30+
"""
31+
req = RegistryServer_pb2.GetRegistryLineageRequest(
32+
project=project,
33+
allow_cache=allow_cache,
34+
filter_object_type=filter_object_type or "",
35+
filter_object_name=filter_object_name or "",
36+
)
37+
response = grpc_call(grpc_handler.GetRegistryLineage, req)
38+
39+
return {
40+
"relationships": response.get("relationships", []),
41+
"indirect_relationships": response.get("indirectRelationships", []),
42+
}
43+
44+
@router.get("/lineage/objects/{object_type}/{object_name}")
45+
def get_object_relationships(
46+
object_type: str,
47+
object_name: str,
48+
project: str = Query(...),
49+
include_indirect: bool = Query(False),
50+
allow_cache: bool = Query(True),
51+
):
52+
"""
53+
Get relationships for a specific object.
54+
Args:
55+
object_type: Type of object (dataSource, entity, featureView, featureService)
56+
object_name: Name of the object
57+
project: Project name
58+
include_indirect: Whether to include indirect relationships
59+
allow_cache: Whether to allow cached data
60+
Returns:
61+
Dictionary containing relationships array for the specific object
62+
"""
63+
valid_types = ["dataSource", "entity", "featureView", "featureService"]
64+
if object_type not in valid_types:
65+
raise HTTPException(
66+
status_code=400,
67+
detail=f"Invalid object_type. Must be one of: {valid_types}",
68+
)
69+
70+
req = RegistryServer_pb2.GetObjectRelationshipsRequest(
71+
project=project,
72+
object_type=object_type,
73+
object_name=object_name,
74+
include_indirect=include_indirect,
75+
allow_cache=allow_cache,
76+
)
77+
response = grpc_call(grpc_handler.GetObjectRelationships, req)
78+
79+
return {"relationships": response.get("relationships", [])}
80+
81+
@router.get("/lineage/complete")
82+
def get_complete_registry_data(
83+
project: str = Query(...),
84+
allow_cache: bool = Query(True),
85+
):
86+
"""
87+
Get complete registry data.
88+
This endpoint provides all the data the UI currently loads:
89+
- All registry objects
90+
- Relationships
91+
- Indirect relationships
92+
- Merged feature view data
93+
Returns:
94+
Complete registry data structure.
95+
"""
96+
# Get lineage data
97+
lineage_req = RegistryServer_pb2.GetRegistryLineageRequest(
98+
project=project,
99+
allow_cache=allow_cache,
100+
)
101+
lineage_response = grpc_call(grpc_handler.GetRegistryLineage, lineage_req)
102+
103+
# Get all registry objects
104+
entities_req = RegistryServer_pb2.ListEntitiesRequest(
105+
project=project, allow_cache=allow_cache
106+
)
107+
entities_response = grpc_call(grpc_handler.ListEntities, entities_req)
108+
109+
data_sources_req = RegistryServer_pb2.ListDataSourcesRequest(
110+
project=project, allow_cache=allow_cache
111+
)
112+
data_sources_response = grpc_call(
113+
grpc_handler.ListDataSources, data_sources_req
114+
)
115+
116+
feature_views_req = RegistryServer_pb2.ListAllFeatureViewsRequest(
117+
project=project, allow_cache=allow_cache
118+
)
119+
feature_views_response = grpc_call(
120+
grpc_handler.ListAllFeatureViews, feature_views_req
121+
)
122+
123+
feature_services_req = RegistryServer_pb2.ListFeatureServicesRequest(
124+
project=project, allow_cache=allow_cache
125+
)
126+
feature_services_response = grpc_call(
127+
grpc_handler.ListFeatureServices, feature_services_req
128+
)
129+
130+
return {
131+
"project": project,
132+
"objects": {
133+
"entities": entities_response.get("entities", []),
134+
"dataSources": data_sources_response.get("dataSources", []),
135+
"featureViews": feature_views_response.get("featureViews", []),
136+
"featureServices": feature_services_response.get("featureServices", []),
137+
},
138+
"relationships": lineage_response.get("relationships", []),
139+
"indirectRelationships": lineage_response.get("indirectRelationships", []),
140+
}
141+
142+
return router

sdk/python/feast/infra/registry/base_registry.py

Lines changed: 138 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -788,16 +788,152 @@ def refresh(self, project: Optional[str] = None):
788788
"""Refreshes the state of the registry cache by fetching the registry state from the remote registry store."""
789789
raise NotImplementedError
790790

791+
# Lineage operations
792+
def get_registry_lineage(
793+
self,
794+
project: str,
795+
allow_cache: bool = False,
796+
filter_object_type: Optional[str] = None,
797+
filter_object_name: Optional[str] = None,
798+
) -> tuple[List[Any], List[Any]]:
799+
"""
800+
Get complete registry lineage with relationships and indirect relationships.
801+
Args:
802+
project: Feast project name
803+
allow_cache: Whether to allow returning data from a cached registry
804+
filter_object_type: Optional filter by object type (dataSource, entity, featureView, featureService)
805+
filter_object_name: Optional filter by object name
806+
Returns:
807+
Tuple of (direct_relationships, indirect_relationships)
808+
"""
809+
from feast.lineage.registry_lineage import RegistryLineageGenerator
810+
811+
# Create a registry proto with all objects
812+
registry_proto = self._build_registry_proto(project, allow_cache)
813+
814+
# Generate lineage
815+
lineage_generator = RegistryLineageGenerator()
816+
relationships, indirect_relationships = lineage_generator.generate_lineage(
817+
registry_proto
818+
)
819+
820+
# Apply filtering if specified
821+
if filter_object_type and filter_object_name:
822+
relationships = [
823+
rel
824+
for rel in relationships
825+
if (
826+
(
827+
rel.source.type.value == filter_object_type
828+
and rel.source.name == filter_object_name
829+
)
830+
or (
831+
rel.target.type.value == filter_object_type
832+
and rel.target.name == filter_object_name
833+
)
834+
)
835+
]
836+
indirect_relationships = [
837+
rel
838+
for rel in indirect_relationships
839+
if (
840+
(
841+
rel.source.type.value == filter_object_type
842+
and rel.source.name == filter_object_name
843+
)
844+
or (
845+
rel.target.type.value == filter_object_type
846+
and rel.target.name == filter_object_name
847+
)
848+
)
849+
]
850+
851+
return relationships, indirect_relationships
852+
853+
def get_object_relationships(
854+
self,
855+
project: str,
856+
object_type: str,
857+
object_name: str,
858+
include_indirect: bool = False,
859+
allow_cache: bool = False,
860+
) -> List[Any]:
861+
"""
862+
Get relationships for a specific object.
863+
Args:
864+
project: Feast project name
865+
object_type: Type of object (dataSource, entity, featureView, featureService)
866+
object_name: Name of the object
867+
include_indirect: Whether to include indirect relationships
868+
allow_cache: Whether to allow returning data from a cached registry
869+
Returns:
870+
List of relationships involving the specified object
871+
"""
872+
from feast.lineage.registry_lineage import (
873+
RegistryLineageGenerator,
874+
)
875+
876+
registry_proto = self._build_registry_proto(project, allow_cache)
877+
lineage_generator = RegistryLineageGenerator()
878+
879+
return lineage_generator.get_object_relationships(
880+
registry_proto, object_type, object_name, include_indirect=include_indirect
881+
)
882+
883+
def _build_registry_proto(
884+
self, project: str, allow_cache: bool = False
885+
) -> RegistryProto:
886+
"""Helper method to build a registry proto with all objects."""
887+
registry = RegistryProto()
888+
889+
# Add all entities
890+
entities = self.list_entities(project=project, allow_cache=allow_cache)
891+
for entity in entities:
892+
registry.entities.append(entity.to_proto())
893+
894+
# Add all data sources
895+
data_sources = self.list_data_sources(project=project, allow_cache=allow_cache)
896+
for data_source in data_sources:
897+
registry.data_sources.append(data_source.to_proto())
898+
899+
# Add all feature views
900+
feature_views = self.list_feature_views(
901+
project=project, allow_cache=allow_cache
902+
)
903+
for feature_view in feature_views:
904+
registry.feature_views.append(feature_view.to_proto())
905+
906+
# Add all stream feature views
907+
stream_feature_views = self.list_stream_feature_views(
908+
project=project, allow_cache=allow_cache
909+
)
910+
for stream_feature_view in stream_feature_views:
911+
registry.stream_feature_views.append(stream_feature_view.to_proto())
912+
913+
# Add all on-demand feature views
914+
on_demand_feature_views = self.list_on_demand_feature_views(
915+
project=project, allow_cache=allow_cache
916+
)
917+
for on_demand_feature_view in on_demand_feature_views:
918+
registry.on_demand_feature_views.append(on_demand_feature_view.to_proto())
919+
920+
# Add all feature services
921+
feature_services = self.list_feature_services(
922+
project=project, allow_cache=allow_cache
923+
)
924+
for feature_service in feature_services:
925+
registry.feature_services.append(feature_service.to_proto())
926+
927+
return registry
928+
791929
@staticmethod
792930
def _message_to_sorted_dict(message: Message) -> Dict[str, Any]:
793931
return json.loads(MessageToJson(message, sort_keys=True))
794932

795933
def to_dict(self, project: str) -> Dict[str, List[Any]]:
796934
"""Returns a dictionary representation of the registry contents for the specified project.
797-
798935
For each list in the dictionary, the elements are sorted by name, so this
799936
method can be used to compare two registries.
800-
801937
Args:
802938
project: Feast project to convert to a dict
803939
"""

0 commit comments

Comments
 (0)