Skip to content

Commit 021e9ea

Browse files
committed
feat: Support cache_mode for registries
Signed-off-by: ntkathole <[email protected]>
1 parent 0a951ce commit 021e9ea

File tree

4 files changed

+83
-4
lines changed

4 files changed

+83
-4
lines changed

sdk/python/feast/feature_store.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1073,6 +1073,17 @@ def apply(
10731073

10741074
self._registry.commit()
10751075

1076+
# Refresh the registry cache to ensure that changes are immediately visible
1077+
# This is especially important for UI and other clients that may be reading
1078+
# from the registry, as it ensures they see the updated state without waiting
1079+
# for the cache TTL to expire.
1080+
#
1081+
# Behavior by cache_mode:
1082+
# - sync mode: Immediate consistency - refresh after apply
1083+
# - thread mode: Eventual consistency - skip refresh, background thread handles it
1084+
if self.config.registry.cache_mode == "sync":
1085+
self.refresh_registry()
1086+
10761087
def teardown(self):
10771088
"""Tears down all local and cloud resources for the feature store."""
10781089
tables: List[FeatureView] = []

sdk/python/feast/infra/registry/sql.py

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,6 @@ class SqlRegistryConfig(RegistryConfig):
234234
sqlalchemy_config_kwargs: Dict[str, Any] = {"echo": False}
235235
""" Dict[str, Any]: Extra arguments to pass to SQLAlchemy.create_engine. """
236236

237-
cache_mode: StrictStr = "sync"
238-
""" str: Cache mode type, Possible options are sync and thread(asynchronous caching using threading library)"""
239-
240237
thread_pool_executor_worker_count: StrictInt = 0
241238
""" int: Number of worker threads to use for asynchronous caching in SQL Registry. If set to 0, it doesn't use ThreadPoolExecutor. """
242239

sdk/python/feast/repo_config.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -152,6 +152,12 @@ class RegistryConfig(FeastBaseModel):
152152
set to infinity by setting TTL to 0 seconds, which means the cache will only be loaded once and will never
153153
expire. Users can manually refresh the cache by calling feature_store.refresh_registry() """
154154

155+
cache_mode: StrictStr = "sync"
156+
"""str: Cache mode type. Possible options are 'sync' (immediate refresh after each write operation) and
157+
'thread' (asynchronous background refresh at cache_ttl_seconds intervals). In 'sync' mode, registry changes
158+
are immediately visible. In 'thread' mode, changes may take up to
159+
cache_ttl_seconds to be visible."""
160+
155161
s3_additional_kwargs: Optional[Dict[str, str]] = None
156162
""" Dict[str, str]: Extra arguments to pass to boto3 when writing the registry file to S3. """
157163

sdk/python/tests/unit/local_feast_tests/test_local_feature_store.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
from datetime import datetime, timedelta
22
from tempfile import mkstemp
3+
from unittest.mock import patch
34

45
import pytest
56
from pytest_lazyfixture import lazy_fixture
@@ -18,7 +19,7 @@
1819
from feast.permissions.action import AuthzedAction
1920
from feast.permissions.permission import Permission
2021
from feast.permissions.policy import RoleBasedPolicy
21-
from feast.repo_config import RepoConfig
22+
from feast.repo_config import RegistryConfig, RepoConfig
2223
from feast.stream_feature_view import stream_feature_view
2324
from feast.types import Array, Bytes, Float32, Int64, String, ValueType, from_value_type
2425
from tests.integration.feature_repos.universal.feature_views import TAGS
@@ -797,3 +798,67 @@ def feature_store_with_local_registry():
797798
entity_key_serialization_version=3,
798799
)
799800
)
801+
802+
803+
@pytest.mark.parametrize(
804+
"test_feature_store",
805+
[lazy_fixture("feature_store_with_local_registry")],
806+
)
807+
def test_apply_refreshes_registry_cache_sync_mode(test_feature_store):
808+
"""Test that apply() refreshes registry cache when cache_mode is 'sync' (default)"""
809+
# Create a simple entity (no FeatureView to avoid file path issues)
810+
entity = Entity(name="test_entity", join_keys=["id"])
811+
812+
# Mock the refresh_registry method to verify it's called
813+
with patch.object(test_feature_store, "refresh_registry") as mock_refresh:
814+
# Apply the entity
815+
test_feature_store.apply([entity])
816+
817+
# Verify refresh_registry was called once (due to sync mode)
818+
mock_refresh.assert_called_once()
819+
820+
test_feature_store.teardown()
821+
822+
823+
@pytest.mark.parametrize(
824+
"test_feature_store",
825+
[lazy_fixture("feature_store_with_local_registry")],
826+
)
827+
def test_apply_skips_refresh_registry_cache_thread_mode(test_feature_store):
828+
"""Test that apply() skips registry refresh when cache_mode is 'thread'"""
829+
# Create a simple entity
830+
entity = Entity(name="test_entity", join_keys=["id"])
831+
832+
# Temporarily change cache_mode to 'thread'
833+
original_cache_mode = test_feature_store.config.registry.cache_mode
834+
test_feature_store.config.registry.cache_mode = "thread"
835+
836+
try:
837+
# Mock the refresh_registry method to verify it's NOT called
838+
with patch.object(test_feature_store, "refresh_registry") as mock_refresh:
839+
# Apply the entity
840+
test_feature_store.apply([entity])
841+
842+
# Verify refresh_registry was NOT called (due to thread mode)
843+
mock_refresh.assert_not_called()
844+
finally:
845+
# Restore original cache_mode
846+
test_feature_store.config.registry.cache_mode = original_cache_mode
847+
848+
test_feature_store.teardown()
849+
850+
851+
def test_registry_config_cache_mode_default():
852+
"""Test that RegistryConfig has cache_mode with default value 'sync'"""
853+
config = RegistryConfig()
854+
assert hasattr(config, "cache_mode")
855+
assert config.cache_mode == "sync"
856+
857+
858+
def test_registry_config_cache_mode_can_be_set():
859+
"""Test that RegistryConfig cache_mode can be set to different values"""
860+
config = RegistryConfig(cache_mode="thread")
861+
assert config.cache_mode == "thread"
862+
863+
config = RegistryConfig(cache_mode="sync")
864+
assert config.cache_mode == "sync"

0 commit comments

Comments
 (0)