1818import sys
1919from datetime import date , datetime
2020from pathlib import Path
21- from typing import Any , Callable , Dict , List , Literal , Optional , Sequence , Tuple
21+ from typing import Any , Callable , Dict , List , Literal , Optional , Sequence , Tuple , Union
2222
2323from pydantic import StrictStr
2424
2525from feast import Entity
2626from feast .feature_view import FeatureView
27+ from feast .field import Field
2728from feast .infra .infra_object import SQLITE_INFRA_OBJECT_CLASS_TYPE , InfraObject
2829from feast .infra .key_encoding_utils import (
2930 deserialize_entity_key ,
3839from feast .protos .feast .types .EntityKey_pb2 import EntityKey as EntityKeyProto
3940from feast .protos .feast .types .Value_pb2 import Value as ValueProto
4041from feast .repo_config import FeastConfigBaseModel , RepoConfig
41- from feast .utils import _build_retrieve_online_document_record , to_naive_utc
42+ from feast .type_map import feast_value_type_to_python_type
43+ from feast .types import FEAST_VECTOR_TYPES
44+ from feast .utils import (
45+ _build_retrieve_online_document_record ,
46+ _serialize_vector_to_float_list ,
47+ to_naive_utc ,
48+ )
4249
4350
4451def adapt_date_iso (val : date ):
@@ -94,6 +101,7 @@ class SqliteOnlineStoreConfig(FeastConfigBaseModel, VectorStoreConfig):
94101
95102 vector_enabled : bool = False
96103 vector_len : Optional [int ] = None
104+ text_search_enabled : bool = False
97105
98106
99107class SqliteOnlineStore (OnlineStore ):
@@ -144,9 +152,8 @@ def online_write_batch(
144152 progress : Optional [Callable [[int ], Any ]],
145153 ) -> None :
146154 conn = self ._get_conn (config )
147-
148155 project = config .project
149-
156+ feature_type_dict = { f . name : f . dtype for f in table . features }
150157 with conn :
151158 for entity_key , values , timestamp , created_ts in data :
152159 entity_key_bin = serialize_entity_key (
@@ -160,71 +167,51 @@ def online_write_batch(
160167 table_name = _table_id (project , table )
161168 for feature_name , val in values .items ():
162169 if config .online_store .vector_enabled :
163- vector_bin = serialize_f32 (
164- val .float_list_val .val , config .online_store .vector_len
165- ) # type: ignore
166- conn .execute (
167- f"""
168- UPDATE { table_name }
169- SET value = ?, vector_value = ?, event_ts = ?, created_ts = ?
170- WHERE (entity_key = ? AND feature_name = ?)
171- """ ,
172- (
173- # SET
174- val .SerializeToString (),
175- vector_bin ,
176- timestamp ,
177- created_ts ,
178- # WHERE
179- entity_key_bin ,
180- feature_name ,
181- ),
182- )
170+ if feature_type_dict [feature_name ] in FEAST_VECTOR_TYPES :
171+ val_bin = serialize_f32 (
172+ val .float_list_val .val , config .online_store .vector_len
173+ ) # type: ignore
183174
175+ else :
176+ val_bin = feast_value_type_to_python_type (val )
184177 conn .execute (
185- f"""INSERT OR IGNORE INTO { table_name }
186- (entity_key, feature_name, value, vector_value, event_ts, created_ts)
187- VALUES (?, ?, ?, ?, ?, ?)""" ,
178+ f"""
179+ INSERT INTO { table_name } (entity_key, feature_name, value, vector_value, event_ts, created_ts)
180+ VALUES (?, ?, ?, ?, ?, ?)
181+ ON CONFLICT(entity_key, feature_name) DO UPDATE SET
182+ value = excluded.value,
183+ vector_value = excluded.vector_value,
184+ event_ts = excluded.event_ts,
185+ created_ts = excluded.created_ts;
186+ """ ,
188187 (
189- entity_key_bin ,
190- feature_name ,
191- val .SerializeToString (),
192- vector_bin ,
193- timestamp ,
194- created_ts ,
188+ entity_key_bin , # entity_key
189+ feature_name , # feature_name
190+ val .SerializeToString (), # value
191+ val_bin , # vector_value
192+ timestamp , # event_ts
193+ created_ts , # created_ts
195194 ),
196195 )
197-
198196 else :
199197 conn .execute (
200198 f"""
201- UPDATE { table_name }
202- SET value = ?, event_ts = ?, created_ts = ?
203- WHERE (entity_key = ? AND feature_name = ?)
199+ INSERT INTO { table_name } (entity_key, feature_name, value, event_ts, created_ts)
200+ VALUES (?, ?, ?, ?, ?)
201+ ON CONFLICT(entity_key, feature_name) DO UPDATE SET
202+ value = excluded.value,
203+ event_ts = excluded.event_ts,
204+ created_ts = excluded.created_ts;
204205 """ ,
205206 (
206- # SET
207- val .SerializeToString (),
208- timestamp ,
209- created_ts ,
210- # WHERE
211- entity_key_bin ,
212- feature_name ,
207+ entity_key_bin , # entity_key
208+ feature_name , # feature_name
209+ val .SerializeToString (), # value
210+ timestamp , # event_ts
211+ created_ts , # created_ts
213212 ),
214213 )
215214
216- conn .execute (
217- f"""INSERT OR IGNORE INTO { table_name }
218- (entity_key, feature_name, value, event_ts, created_ts)
219- VALUES (?, ?, ?, ?, ?)""" ,
220- (
221- entity_key_bin ,
222- feature_name ,
223- val .SerializeToString (),
224- timestamp ,
225- created_ts ,
226- ),
227- )
228215 if progress :
229216 progress (1 )
230217
@@ -482,13 +469,21 @@ def retrieve_online_documents_v2(
482469 conn = self ._get_conn (config )
483470 cur = conn .cursor ()
484471
485- online_store = config .online_store
486- if not isinstance (online_store , SqliteOnlineStoreConfig ):
487- raise ValueError ("online_store must be SqliteOnlineStoreConfig" )
488472 if not online_store .vector_len :
489473 raise ValueError ("vector_len is not configured in the online store config" )
474+
490475 query_embedding_bin = serialize_f32 (query , online_store .vector_len ) # type: ignore
491476 table_name = _table_id (config .project , table )
477+ vector_fields : List [Field ] = [
478+ f for f in table .features if getattr (f , "vector_index" , None )
479+ ]
480+ assert len (vector_fields ) > 0 , (
481+ f"No vector field found, please update feature view = { table .name } to declare a vector field"
482+ )
483+ assert len (vector_fields ) < 2 , (
484+ "Only one vector field is supported, please update feature view = {table.name} to declare one vector field"
485+ )
486+ vector_field : str = vector_fields [0 ].name
492487
493488 cur .execute (
494489 f"""
@@ -500,17 +495,19 @@ def retrieve_online_documents_v2(
500495
501496 cur .execute (
502497 f"""
503- INSERT INTO vec_table(rowid, vector_value)
498+ INSERT INTO vec_table (rowid, vector_value)
504499 select rowid, vector_value from { table_name }
500+ where feature_name = "{ vector_field } "
505501 """
506502 )
507503
508504 cur .execute (
509505 f"""
510506 select
511- fv.entity_key,
512- fv.feature_name,
513- fv.value,
507+ fv2.entity_key,
508+ fv2.feature_name,
509+ fv2.value,
510+ fv.vector_value,
514511 f.distance,
515512 fv.event_ts,
516513 fv.created_ts
@@ -526,38 +523,80 @@ def retrieve_online_documents_v2(
526523 ) f
527524 left join { table_name } fv
528525 on f.rowid = fv.rowid
529- where fv.feature_name in ({ "," .join (["?" for _ in requested_features ])} )
526+ left join { table_name } fv2
527+ on fv.entity_key = fv2.entity_key
528+ where fv2.feature_name != "{ vector_field } "
530529 """ ,
531530 (
532531 query_embedding_bin ,
533532 top_k ,
534- * [f .split (":" )[- 1 ] for f in requested_features ],
535533 ),
536534 )
537535
538536 rows = cur .fetchall ()
539- result : List [
537+ results : List [
540538 Tuple [
541539 Optional [datetime ],
542540 Optional [EntityKeyProto ],
543541 Optional [Dict [str , ValueProto ]],
544542 ]
545543 ] = []
546544
547- for entity_key , feature_name , value_bin , distance , event_ts , created_ts in rows :
548- val = ValueProto ()
549- val .ParseFromString (value_bin )
550- entity_key_proto = None
551- if entity_key :
552- entity_key_proto = deserialize_entity_key (
553- entity_key ,
554- entity_key_serialization_version = config .entity_key_serialization_version ,
545+ entity_dict : Dict [
546+ str , Dict [str , Union [str , ValueProto , EntityKeyProto , datetime ]]
547+ ] = {}
548+ for (
549+ entity_key ,
550+ feature_name ,
551+ value_bin ,
552+ vector_value ,
553+ distance ,
554+ event_ts ,
555+ created_ts ,
556+ ) in rows :
557+ entity_key_proto = deserialize_entity_key (
558+ entity_key ,
559+ entity_key_serialization_version = config .entity_key_serialization_version ,
560+ )
561+ if entity_key not in entity_dict :
562+ entity_dict [entity_key ] = {}
563+
564+ feature_val = ValueProto ()
565+ feature_val .ParseFromString (value_bin )
566+ entity_dict [entity_key ]["entity_key_proto" ] = entity_key_proto
567+ entity_dict [entity_key ][feature_name ] = feature_val
568+ entity_dict [entity_key ][vector_field ] = _serialize_vector_to_float_list (
569+ vector_value
570+ )
571+ entity_dict [entity_key ]["distance" ] = ValueProto (float_val = distance )
572+ entity_dict [entity_key ]["event_ts" ] = event_ts
573+ entity_dict [entity_key ]["created_ts" ] = created_ts
574+
575+ for entity_key_value in entity_dict :
576+ res_event_ts : Optional [datetime ] = None
577+ res_entity_key_proto : Optional [EntityKeyProto ] = None
578+ if isinstance (entity_dict [entity_key_value ]["event_ts" ], datetime ):
579+ res_event_ts = entity_dict [entity_key_value ]["event_ts" ] # type: ignore[assignment]
580+
581+ if isinstance (
582+ entity_dict [entity_key_value ]["entity_key_proto" ], EntityKeyProto
583+ ):
584+ res_entity_key_proto = entity_dict [entity_key_value ]["entity_key_proto" ] # type: ignore[assignment]
585+
586+ res_dict : Dict [str , ValueProto ] = {
587+ k : v
588+ for k , v in entity_dict [entity_key_value ].items ()
589+ if isinstance (v , ValueProto ) and isinstance (k , str )
590+ }
591+
592+ results .append (
593+ (
594+ res_event_ts ,
595+ res_entity_key_proto ,
596+ res_dict ,
555597 )
556- res = {feature_name : val }
557- res ["distance" ] = ValueProto (float_val = distance )
558- result .append ((event_ts , entity_key_proto , res ))
559-
560- return result
598+ )
599+ return results
561600
562601
563602def _initialize_conn (
@@ -640,7 +679,17 @@ def update(self):
640679 except ModuleNotFoundError :
641680 logging .warning ("Cannot use sqlite_vec for vector search" )
642681 self .conn .execute (
643- f"CREATE TABLE IF NOT EXISTS { self .name } (entity_key BLOB, feature_name TEXT, value BLOB, vector_value BLOB, event_ts timestamp, created_ts timestamp, PRIMARY KEY(entity_key, feature_name))"
682+ f"""
683+ CREATE TABLE IF NOT EXISTS { self .name } (
684+ entity_key BLOB,
685+ feature_name TEXT,
686+ value BLOB,
687+ vector_value BLOB,
688+ event_ts timestamp,
689+ created_ts timestamp,
690+ PRIMARY KEY(entity_key, feature_name)
691+ )
692+ """
644693 )
645694 self .conn .execute (
646695 f"CREATE INDEX IF NOT EXISTS { self .name } _ek ON { self .name } (entity_key);"
0 commit comments