Skip to content

Commit b274d44

Browse files
authored
fix: Write batching does not include multiple partitions (#245)
* fix: Write one row per batch, validate non-negative ttl, refactor logic to remove duplicate code * deprecate key_batch_size for a split read and write key batch size config. Write multiple batches defined by write_key_batch_size * set default batch size to 100, fix normal feature views to batch per key not row * adjust write batching to only batch the same entity key rows together to avoid batching across partitions * skip inserting sorted fv row when ttl is negative * batch everything when batch size is none or 0 * refactor _get_ttl * fix formatting * address all none case * _get_ttl only uses feature view ttl * fix tests * fix linting
1 parent 97e6b17 commit b274d44

5 files changed

Lines changed: 184 additions & 152 deletions

File tree

go/internal/feast/onlinestore/cassandraonlinestore.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ type CassandraConfig struct {
5656
loadBalancingPolicy gocql.HostSelectionPolicy
5757
connectionTimeoutMillis int64
5858
requestTimeoutMillis int64
59-
keyBatchSize int
59+
readBatchSize int
6060
}
6161

6262
const (
@@ -199,12 +199,17 @@ func extractCassandraConfig(onlineStoreConfig map[string]any) (*CassandraConfig,
199199
}
200200
cassandraConfig.requestTimeoutMillis = int64(requestTimeoutMillis.(float64))
201201

202-
keyBatchSize, ok := onlineStoreConfig["key_batch_size"]
202+
readBatchSize, ok := onlineStoreConfig["read_batch_size"]
203203
if !ok {
204-
keyBatchSize = 10.0
205-
log.Warn().Msg("key_batch_size not specified, defaulting to batches of size 10")
204+
if legacyBatchSize, ok := onlineStoreConfig["key_batch_size"]; ok {
205+
readBatchSize = legacyBatchSize
206+
log.Warn().Msg("key_batch_size is deprecated, please use read_batch_size instead")
207+
} else {
208+
readBatchSize = 100.0
209+
log.Warn().Msg("read_batch_size not specified, defaulting to batches of size 100")
210+
}
206211
}
207-
cassandraConfig.keyBatchSize = int(keyBatchSize.(float64))
212+
cassandraConfig.readBatchSize = int(readBatchSize.(float64))
208213

209214
return &cassandraConfig, nil
210215
}
@@ -255,14 +260,14 @@ func NewCassandraOnlineStore(project string, config *registry.RepoConfig, online
255260
}
256261
store.session = createdSession
257262

258-
if cassandraConfig.keyBatchSize <= 0 || cassandraConfig.keyBatchSize > 100 {
259-
return nil, fmt.Errorf("key_batch_size must be greater than zero and less than 100")
260-
} else if cassandraConfig.keyBatchSize == 1 {
263+
if cassandraConfig.readBatchSize <= 0 || cassandraConfig.readBatchSize > 100 {
264+
return nil, fmt.Errorf("read_batch_size must be greater than zero and less than or equal to 100")
265+
} else if cassandraConfig.readBatchSize == 1 {
261266
log.Info().Msg("key batching is disabled")
262267
} else {
263-
log.Info().Msgf("key batching is enabled with a batch size of %d", cassandraConfig.keyBatchSize)
268+
log.Info().Msgf("key batching is enabled with a batch size of %d", cassandraConfig.readBatchSize)
264269
}
265-
store.KeyBatchSize = cassandraConfig.keyBatchSize
270+
store.KeyBatchSize = cassandraConfig.readBatchSize
266271

267272
// parse tableNameFormatVersion
268273
tableNameFormatVersion, ok := onlineStoreConfig["table_name_format_version"]
@@ -867,7 +872,7 @@ func (c *CassandraOnlineStore) OnlineReadRange(
867872
sortKeyFilters []*model.SortKeyFilter,
868873
limit int32,
869874
) ([][]RangeFeatureData, error) {
870-
// If SortKeyFilters are not specified or if keyBatchSize is 1, use unbatched read
875+
// If SortKeyFilters are not specified or if readBatchSize is 1, use unbatched read
871876
if c.KeyBatchSize == 1 || hasUnspecifiedOrder(sortKeyFilters) {
872877
return c.UnbatchedKeysOnlineReadRange(ctx, entityKeys, featureViewNames, featureNames, sortKeyFilters, limit)
873878
} else {

go/internal/feast/registry/repoconfig_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -356,7 +356,7 @@ func TestNewRepoConfigForScyllaDBFromJSON(t *testing.T) {
356356
"online_store": {
357357
"type": "scylladb",
358358
"hosts": ["localhost:9042"],
359-
"key_batch_size": 85,
359+
"read_batch_size": 85,
360360
"table_name_format_version": 2
361361
}
362362
}`
@@ -378,8 +378,8 @@ func TestNewRepoConfigForScyllaDBFromJSON(t *testing.T) {
378378
// assert.Equal(t, "feature_repo", config.Project)
379379
assert.Equal(t, filepath.Join(dir, "data/registry.db"), registryConfig.Path)
380380
assert.Equal(t, "local", config.Provider)
381-
assert.Equal(t, float64(85), config.OnlineStore["key_batch_size"])
381+
assert.Equal(t, float64(85), config.OnlineStore["read_batch_size"])
382382
assert.Equal(t, float64(2), config.OnlineStore["table_name_format_version"])
383-
assert.Equal(t, int(85), int(config.OnlineStore["key_batch_size"].(float64)))
383+
assert.Equal(t, int(85), int(config.OnlineStore["read_batch_size"].(float64)))
384384
assert.Equal(t, int(2), int(config.OnlineStore["table_name_format_version"].(float64)))
385385
}

sdk/python/feast/feature_view.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -291,6 +291,11 @@ def ensure_valid(self):
291291
"""
292292
super().ensure_valid()
293293

294+
if self.ttl < timedelta(days=0):
295+
raise ValueError(
296+
"Feature view ttl cannot be negative. Please set a positive value or 0 for no ttl."
297+
)
298+
294299
if not self.entities:
295300
raise ValueError("Feature view has no entities.")
296301

0 commit comments

Comments
 (0)