@@ -276,32 +276,65 @@ def _materialize_one(
276276
277277 fv_latest_values_sql = offline_job .to_sql ()
278278
279+ if feature_view .entity_columns :
280+ join_keys = [entity .name for entity in feature_view .entity_columns ]
281+ unique_entities = '"' + '", "' .join (join_keys ) + '"'
282+
283+ query = f"""
284+ SELECT
285+ COUNT(DISTINCT { unique_entities } )
286+ FROM
287+ { feature_view .batch_source .get_table_query_string ()}
288+ """
289+
290+ with GetSnowflakeConnection (self .repo_config .offline_store ) as conn :
291+ entities_to_write = conn .cursor ().execute (query ).fetchall ()[0 ][0 ]
292+ else :
293+ entities_to_write = (
294+ 1 # entityless feature view has a placeholder entity
295+ )
296+
279297 if feature_view .batch_source .field_mapping is not None :
280298 fv_latest_mapped_values_sql = _run_snowflake_field_mapping (
281299 fv_latest_values_sql , feature_view .batch_source .field_mapping
282300 )
283301
284- fv_to_proto_sql = self .generate_snowflake_materialization_query (
285- self .repo_config ,
286- fv_latest_mapped_values_sql ,
287- feature_view ,
288- project ,
289- )
302+ features_full_list = feature_view .features
303+ feature_batches = [
304+ features_full_list [i : i + 100 ]
305+ for i in range (0 , len (features_full_list ), 100 )
306+ ]
290307
291308 if self .repo_config .online_store .type == "snowflake.online" :
292- self .materialize_to_snowflake_online_store (
293- self .repo_config ,
294- fv_to_proto_sql ,
295- feature_view ,
296- project ,
297- )
309+ rows_to_write = entities_to_write * len (features_full_list )
298310 else :
299- self .materialize_to_external_online_store (
300- self .repo_config ,
301- fv_to_proto_sql ,
302- feature_view ,
303- tqdm_builder ,
304- )
311+ rows_to_write = entities_to_write * len (feature_batches )
312+
313+ with tqdm_builder (rows_to_write ) as pbar :
314+ for i , feature_batch in enumerate (feature_batches ):
315+ fv_to_proto_sql = self .generate_snowflake_materialization_query (
316+ self .repo_config ,
317+ fv_latest_mapped_values_sql ,
318+ feature_view ,
319+ feature_batch ,
320+ project ,
321+ )
322+
323+ if self .repo_config .online_store .type == "snowflake.online" :
324+ self .materialize_to_snowflake_online_store (
325+ self .repo_config ,
326+ fv_to_proto_sql ,
327+ feature_view ,
328+ project ,
329+ )
330+ pbar .update (entities_to_write * len (feature_batch ))
331+ else :
332+ self .materialize_to_external_online_store (
333+ self .repo_config ,
334+ fv_to_proto_sql ,
335+ feature_view ,
336+ pbar ,
337+ )
305338
306339 return SnowflakeMaterializationJob (
307340 job_id = job_id , status = MaterializationJobStatus .SUCCEEDED
@@ -316,6 +349,7 @@ def generate_snowflake_materialization_query(
316349 repo_config : RepoConfig ,
317350 fv_latest_mapped_values_sql : str ,
318351 feature_view : Union [BatchFeatureView , FeatureView ],
352+ feature_batch : list ,
319353 project : str ,
320354 ) -> str :
321355
@@ -338,7 +372,7 @@ def generate_snowflake_materialization_query(
338372 UDF serialization function.
339373 """
340374 feature_sql_list = []
341- for feature in feature_view . features :
375+ for feature in feature_batch :
342376 feature_value_type_name = feature .dtype .to_value_type ().name
343377
344378 feature_sql = _convert_value_name_to_snowflake_udf (
@@ -434,19 +468,16 @@ def materialize_to_snowflake_online_store(
434468 """
435469
436470 with GetSnowflakeConnection (repo_config .batch_engine ) as conn :
437- query_id = execute_snowflake_statement (conn , query ).sfqid
471+ execute_snowflake_statement (conn , query ).sfqid
438472
439- click .echo (
440- f"Snowflake Query ID: { Style .BRIGHT + Fore .GREEN } { query_id } { Style .RESET_ALL } "
441- )
442473 return None
443474
444475 def materialize_to_external_online_store (
445476 self ,
446477 repo_config : RepoConfig ,
447478 materialization_sql : str ,
448479 feature_view : Union [StreamFeatureView , FeatureView ],
449- tqdm_builder : Callable [[ int ], tqdm ] ,
480+ pbar : tqdm ,
450481 ) -> None :
451482
452483 feature_names = [feature .name for feature in feature_view .features ]
@@ -455,10 +486,6 @@ def materialize_to_external_online_store(
455486 query = materialization_sql
456487 cursor = execute_snowflake_statement (conn , query )
457488 for i , df in enumerate (cursor .fetch_pandas_batches ()):
458- click .echo (
459- f"Snowflake: Processing Materialization ResultSet Batch #{ i + 1 } "
460- )
461-
462489 entity_keys = (
463490 df ["entity_key" ].apply (EntityKeyProto .FromString ).to_numpy ()
464491 )
@@ -494,11 +521,10 @@ def materialize_to_external_online_store(
494521 )
495522 )
496523
497- with tqdm_builder (len (rows_to_write )) as pbar :
498- self .online_store .online_write_batch (
499- repo_config ,
500- feature_view ,
501- rows_to_write ,
502- lambda x : pbar .update (x ),
503- )
524+ self .online_store .online_write_batch (
525+ repo_config ,
526+ feature_view ,
527+ rows_to_write ,
528+ lambda x : pbar .update (x ),
529+ )
504530 return None
0 commit comments