Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
linting errors
Signed-off-by: Pushkar Gupta <[email protected]>
  • Loading branch information
pushkarmoi committed May 14, 2024
commit 718a1ffa7eeaa9304f4964ed6da4dccb58dfbbdf
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def online_write_batch(
to show progress.
"""
self._init_writer(config=config)
assert self._writer is not None

for entity_key, features, event_timestamp, _ in data:
entity_id: str = compute_entity_id(
Expand Down Expand Up @@ -199,6 +200,7 @@ def update(
infrastructure corresponding to other feature views should be not be touched.
"""
self._init_writer(config=config)
assert self._writer is not None

# note: we assume tables_to_keep does not overlap with tables_to_delete

Expand All @@ -222,6 +224,7 @@ def teardown(
entities: Entities whose corresponding infrastructure should be deleted.
"""
self._init_writer(config=config)
assert self._writer is not None

# drop fields corresponding to this feature-view
for feature_view in tables:
Expand Down Expand Up @@ -275,7 +278,7 @@ def _init_writer(self, config: RepoConfig):
client_options = IKVOnlineStore._config_to_client_options(online_config)

self._writer = create_new_writer(client_options)
self._writer.startup() # blocking operation
self._writer.startup() # blocking operation

def _init_reader(self, config: RepoConfig):
"""Initializes ikv reader client."""
Expand All @@ -287,7 +290,7 @@ def _init_reader(self, config: RepoConfig):

if online_config.mount_directory and len(online_config.mount_directory) > 0:
self._reader = create_new_reader(client_options)
self._reader.startup() # blocking operation
self._reader.startup() # blocking operation

@staticmethod
def _config_to_client_options(config: IKVOnlineStoreConfig) -> ClientOptions:
Expand Down