1+ import uuid
12from datetime import datetime
3+ from enum import Enum
24from pathlib import Path
35from typing import Any , List , Optional , Set , Union
46
1719)
1820from sqlalchemy .engine import Engine
1921
22+ from feast import usage
2023from feast .base_feature_view import BaseFeatureView
2124from feast .data_source import DataSource
2225from feast .entity import Entity
3235from feast .feature_view import FeatureView
3336from feast .infra .infra_object import Infra
3437from feast .on_demand_feature_view import OnDemandFeatureView
38+ from feast .project_metadata import ProjectMetadata
3539from feast .protos .feast .core .DataSource_pb2 import DataSource as DataSourceProto
3640from feast .protos .feast .core .Entity_pb2 import Entity as EntityProto
3741from feast .protos .feast .core .FeatureService_pb2 import (
156160 Column ("infra_proto" , LargeBinary , nullable = False ),
157161)
158162
163+
164+ class FeastMetadataKeys (Enum ):
165+ LAST_UPDATED_TIMESTAMP = "last_updated_timestamp"
166+ PROJECT_UUID = "project_uuid"
167+
168+
159169feast_metadata = Table (
160170 "feast_metadata" ,
161171 metadata ,
@@ -189,7 +199,7 @@ def teardown(self):
189199 stmt = delete (t )
190200 conn .execute (stmt )
191201
192- def refresh (self ):
202+ def refresh (self , project : Optional [ str ] ):
193203 # This method is a no-op since we're always reading the latest values from the db.
194204 pass
195205
@@ -459,6 +469,22 @@ def list_on_demand_feature_views(
459469 "feature_view_proto" ,
460470 )
461471
472+ def list_project_metadata (
473+ self , project : str , allow_cache : bool = False
474+ ) -> List [ProjectMetadata ]:
475+ with self .engine .connect () as conn :
476+ stmt = select (feast_metadata ).where (feast_metadata .c .project_id == project ,)
477+ rows = conn .execute (stmt ).all ()
478+ if rows :
479+ project_metadata = ProjectMetadata (project_name = project )
480+ for row in rows :
481+ if row ["metadata_key" ] == FeastMetadataKeys .PROJECT_UUID .value :
482+ project_metadata .project_uuid = row ["metadata_value" ]
483+ break
484+ # TODO(adchia): Add other project metadata in a structured way
485+ return [project_metadata ]
486+ return []
487+
462488 def apply_saved_dataset (
463489 self , saved_dataset : SavedDataset , project : str , commit : bool = True ,
464490 ):
@@ -629,6 +655,7 @@ def proto(self) -> RegistryProto:
629655 (self .list_feature_services , r .feature_services ),
630656 (self .list_saved_datasets , r .saved_datasets ),
631657 (self .list_validation_references , r .validation_references ),
658+ (self .list_project_metadata , r .project_metadata ),
632659 ]:
633660 objs : List [Any ] = lister (project ) # type: ignore
634661 if objs :
@@ -651,14 +678,16 @@ def commit(self):
651678 def _apply_object (
652679 self , table , project : str , id_field_name , obj , proto_field_name , name = None
653680 ):
681+ self ._maybe_init_project_metadata (project )
682+
654683 name = name or obj .name
655684 with self .engine .connect () as conn :
685+ update_datetime = datetime .utcnow ()
686+ update_time = int (update_datetime .timestamp ())
656687 stmt = select (table ).where (
657688 getattr (table .c , id_field_name ) == name , table .c .project_id == project
658689 )
659690 row = conn .execute (stmt ).first ()
660- update_datetime = datetime .utcnow ()
661- update_time = int (update_datetime .timestamp ())
662691 if hasattr (obj , "last_updated_timestamp" ):
663692 obj .last_updated_timestamp = update_datetime
664693
@@ -685,6 +714,30 @@ def _apply_object(
685714
686715 self ._set_last_updated_metadata (update_datetime , project )
687716
717+ def _maybe_init_project_metadata (self , project ):
718+ # Initialize project metadata if needed
719+ with self .engine .connect () as conn :
720+ update_datetime = datetime .utcnow ()
721+ update_time = int (update_datetime .timestamp ())
722+ stmt = select (feast_metadata ).where (
723+ feast_metadata .c .metadata_key == FeastMetadataKeys .PROJECT_UUID .value ,
724+ feast_metadata .c .project_id == project ,
725+ )
726+ row = conn .execute (stmt ).first ()
727+ if row :
728+ usage .set_current_project_uuid (row ["metadata_value" ])
729+ else :
730+ new_project_uuid = f"{ uuid .uuid4 ()} "
731+ values = {
732+ "metadata_key" : FeastMetadataKeys .PROJECT_UUID .value ,
733+ "metadata_value" : new_project_uuid ,
734+ "last_updated_timestamp" : update_time ,
735+ "project_id" : project ,
736+ }
737+ insert_stmt = insert (feast_metadata ).values (values )
738+ conn .execute (insert_stmt )
739+ usage .set_current_project_uuid (new_project_uuid )
740+
688741 def _delete_object (self , table , name , project , id_field_name , not_found_exception ):
689742 with self .engine .connect () as conn :
690743 stmt = delete (table ).where (
@@ -708,6 +761,8 @@ def _get_object(
708761 proto_field_name ,
709762 not_found_exception ,
710763 ):
764+ self ._maybe_init_project_metadata (project )
765+
711766 with self .engine .connect () as conn :
712767 stmt = select (table ).where (
713768 getattr (table .c , id_field_name ) == name , table .c .project_id == project
@@ -721,6 +776,7 @@ def _get_object(
721776 def _list_objects (
722777 self , table , project , proto_class , python_class , proto_field_name
723778 ):
779+ self ._maybe_init_project_metadata (project )
724780 with self .engine .connect () as conn :
725781 stmt = select (table ).where (table .c .project_id == project )
726782 rows = conn .execute (stmt ).all ()
@@ -736,15 +792,16 @@ def _list_objects(
736792 def _set_last_updated_metadata (self , last_updated : datetime , project : str ):
737793 with self .engine .connect () as conn :
738794 stmt = select (feast_metadata ).where (
739- feast_metadata .c .metadata_key == "last_updated_timestamp" ,
795+ feast_metadata .c .metadata_key
796+ == FeastMetadataKeys .LAST_UPDATED_TIMESTAMP .value ,
740797 feast_metadata .c .project_id == project ,
741798 )
742799 row = conn .execute (stmt ).first ()
743800
744801 update_time = int (last_updated .timestamp ())
745802
746803 values = {
747- "metadata_key" : "last_updated_timestamp" ,
804+ "metadata_key" : FeastMetadataKeys . LAST_UPDATED_TIMESTAMP . value ,
748805 "metadata_value" : f"{ update_time } " ,
749806 "last_updated_timestamp" : update_time ,
750807 "project_id" : project ,
@@ -753,7 +810,8 @@ def _set_last_updated_metadata(self, last_updated: datetime, project: str):
753810 update_stmt = (
754811 update (feast_metadata )
755812 .where (
756- feast_metadata .c .metadata_key == "last_updated_timestamp" ,
813+ feast_metadata .c .metadata_key
814+ == FeastMetadataKeys .LAST_UPDATED_TIMESTAMP .value ,
757815 feast_metadata .c .project_id == project ,
758816 )
759817 .values (values )
@@ -766,7 +824,8 @@ def _set_last_updated_metadata(self, last_updated: datetime, project: str):
766824 def _get_last_updated_metadata (self , project : str ):
767825 with self .engine .connect () as conn :
768826 stmt = select (feast_metadata ).where (
769- feast_metadata .c .metadata_key == "last_updated_timestamp" ,
827+ feast_metadata .c .metadata_key
828+ == FeastMetadataKeys .LAST_UPDATED_TIMESTAMP .value ,
770829 feast_metadata .c .project_id == project ,
771830 )
772831 row = conn .execute (stmt ).first ()
0 commit comments