Skip to content

Commit

Permalink
[BSE-3976] Disable SF Iceberg Prefetching Behind a Feature Flag (#8355)
Browse files Browse the repository at this point in the history
  • Loading branch information
srilman authored Oct 7, 2024
1 parent cedf05c commit d7d30ba
Show file tree
Hide file tree
Showing 10 changed files with 99 additions and 27 deletions.
2 changes: 2 additions & 0 deletions BodoSQL/bodosql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,7 @@ def _create_generator(self, hide_credentials: bool):
bodo.enable_streaming_sort_limit_offset,
bodo.bodo_sql_style,
bodo.bodosql_full_caching,
bodo.prefetch_sf_iceberg,
)
extra_args = () if self.default_tz is None else (self.default_tz,)
generator = RelationalAlgebraGeneratorClass(
Expand All @@ -1214,6 +1215,7 @@ def _create_generator(self, hide_credentials: bool):
bodo.enable_streaming_sort_limit_offset,
bodo.bodo_sql_style,
bodo.bodosql_full_caching,
bodo.prefetch_sf_iceberg,
*extra_args,
)
return generator
Expand Down
2 changes: 2 additions & 0 deletions BodoSQL/bodosql/context_ext.py
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ def _gen_sql_plan_pd_func_text_and_lowered_globals(
bodo.enable_streaming_sort_limit_offset,
bodo.bodo_sql_style,
bodo.bodosql_full_caching,
bodo.prefetch_sf_iceberg,
)
else:
extra_args = (
Expand All @@ -476,6 +477,7 @@ def _gen_sql_plan_pd_func_text_and_lowered_globals(
bodo.enable_streaming_sort_limit_offset,
bodo.bodo_sql_style,
bodo.bodosql_full_caching,
bodo.prefetch_sf_iceberg,
*extra_args,
)
except Exception as e:
Expand Down
77 changes: 63 additions & 14 deletions BodoSQL/bodosql/tests/test_types/test_snowflake_iceberg_catalog.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,20 +57,13 @@ def impl(bc, query):
)

query = "SELECT A, B, C FROM BODOSQL_ICEBERG_READ_TEST"
stream = StringIO()
logger = create_string_io_logger(stream)
with set_logging_stream(logger, 1):
check_func(
impl,
(bc, query),
py_output=py_out,
sort_output=True,
reset_index=True,
)
check_logger_msg(
stream,
'Execution time for prefetching SF-managed Iceberg metadata "TEST_DB"."PUBLIC"."BODOSQL_ICEBERG_READ_TEST"',
)
check_func(
impl,
(bc, query),
py_output=py_out,
sort_output=True,
reset_index=True,
)


@temp_env_override({"AWS_REGION": "us-east-1"})
Expand Down Expand Up @@ -667,3 +660,59 @@ def check_table_type():
), f"Table type is not as expected. Expected MANAGED but found {output_table_type}"
finally:
drop_snowflake_table(table_name, db, schema, user=3)


@temp_env_override({"AWS_REGION": "us-east-1"})
def test_prefetch_flag(memory_leak_check):
"""
Test that if the prefetch flag is set, a prefetch occurs
TODO: [BSE-3977] There is a bug that causes the prefetch
to not be used when the query is compiled in the same process
"""

old_prefetch_flag = bodo.prefetch_sf_iceberg

try:
bodo.prefetch_sf_iceberg = True

catalog = bodosql.SnowflakeCatalog(
os.environ["SF_USERNAME"],
os.environ["SF_PASSWORD"],
"bodopartner.us-east-1",
"DEMO_WH",
"TEST_DB",
connection_params={"schema": "PUBLIC", "role": "ACCOUNTADMIN"},
iceberg_volume="exvol",
)
bc = bodosql.BodoSQLContext(catalog=catalog)

def impl(bc, query):
return bc.sql(query)

py_out = pd.DataFrame(
{
"A": ["ally", "bob", "cassie", "david", pd.NA],
"B": [10.5, -124.0, 11.11, 456.2, -8e2],
"C": [True, pd.NA, False, pd.NA, pd.NA],
}
)

query = "SELECT A, B, C FROM BODOSQL_ICEBERG_READ_TEST"
stream = StringIO()
logger = create_string_io_logger(stream)
with set_logging_stream(logger, 1):
check_func(
impl,
(bc, query),
py_output=py_out,
sort_output=True,
reset_index=True,
)

check_logger_msg(
stream,
'Execution time for prefetching SF-managed Iceberg metadata "TEST_DB"."PUBLIC"."BODOSQL_ICEBERG_READ_TEST"',
)

finally:
bodo.prefetch_sf_iceberg = old_prefetch_flag
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
package com.bodosql.calcite.adapter.bodo

data class StreamingOptions(val chunkSize: Int)
data class StreamingOptions(val chunkSize: Int, val prefetchSFIceberg: Boolean)
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,13 @@ class IcebergToBodoPhysicalConverter(cluster: RelOptCluster, traits: RelTraitSet

// Store table name and base connection string in builder for prefetch gen
// Note: Only when reading from a Snowflake-managed Iceberg table (using Snowflake catalog)
if (tableScanNode.getCatalogTable().getCatalog() is SnowflakeCatalog) {
var staticConExpr = relInput.generatePythonConnStr(ImmutableList.of("", ""))
staticConExpr =
if (staticConExpr is StringLiteral) StringLiteral("iceberg+${staticConExpr.arg}") else staticConExpr
ctx.builder().addSfIcebergTablePath(staticConExpr, tableScanNode.getCatalogTable().getQualifiedName())
if (ctx.streamingOptions().prefetchSFIceberg) {
if (tableScanNode.getCatalogTable().getCatalog() is SnowflakeCatalog) {
var staticConExpr = relInput.generatePythonConnStr(ImmutableList.of("", ""))
staticConExpr =
if (staticConExpr is StringLiteral) StringLiteral("iceberg+${staticConExpr.arg}") else staticConExpr
ctx.builder().addSfIcebergTablePath(staticConExpr, tableScanNode.getCatalogTable().getQualifiedName())
}
}

val columnsArg = Expr.List(cols.map { v -> StringLiteral(v) })
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ public BodoCodeGenVisitor(
List<RelDataType> dynamicParamTypes,
Map<String, RelDataType> namedParamTypeMap,
Map<Integer, Integer> idMapping,
boolean hideOperatorIDs) {
boolean hideOperatorIDs,
boolean prefetchSFIceberg) {
super();
this.loweredGlobals = loweredGlobalVariablesMap;
this.originalSQLQuery = originalSQLQuery;
Expand All @@ -209,7 +210,7 @@ public BodoCodeGenVisitor(
this.tracingLevel = tracingLevel;
this.generatedCode = new Module.Builder(verboseLevel);
this.generatedCode.setHideOperatorIDs(hideOperatorIDs);
this.streamingOptions = new StreamingOptions(batchSize);
this.streamingOptions = new StreamingOptions(batchSize, prefetchSFIceberg);
this.dynamicParamTypes = dynamicParamTypes;
this.namedParamTypeMap = namedParamTypeMap;
this.generatedCode.setIDMapping(idMapping);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,9 @@ public class RelationalAlgebraGenerator {
/** Should we use the covering expression approach to cache or only exact matches. */
public static boolean coveringExpressionCaching = false;

/** Should we prefetch metadata location for Snowflake-managed Iceberg tables. */
public static boolean prefetchSFIceberg = false;

/**
* Helper method for RelationalAlgebraGenerator constructors to create a SchemaPlus object from a
* list of BodoSqlSchemas.
Expand Down Expand Up @@ -195,7 +198,8 @@ public RelationalAlgebraGenerator(
boolean enableStreamingSort,
boolean enableStreamingSortLimitOffset,
String sqlStyle,
boolean coveringExpressionCaching) {
boolean coveringExpressionCaching,
boolean prefetchSFIceberg) {
this.catalog = null;
this.plannerType = choosePlannerType(plannerType);
this.verboseLevel = verboseLevel;
Expand All @@ -217,6 +221,7 @@ public RelationalAlgebraGenerator(
this.enableTimestampTz = enableTimestampTz;
this.enableRuntimeJoinFilters = enableRuntimeJoinFilters;
this.coveringExpressionCaching = coveringExpressionCaching;
this.prefetchSFIceberg = prefetchSFIceberg;
}

/** Constructor for the relational algebra generator class that takes in the default timezone. */
Expand All @@ -234,6 +239,7 @@ public RelationalAlgebraGenerator(
boolean enableStreamingSortLimitOffset,
String sqlStyle,
boolean coveringExpressionCaching,
boolean prefetchSFIceberg,
String defaultTz) {
this.catalog = null;
this.plannerType = choosePlannerType(plannerType);
Expand All @@ -258,6 +264,7 @@ public RelationalAlgebraGenerator(
this.enableTimestampTz = enableTimestampTz;
this.enableRuntimeJoinFilters = enableRuntimeJoinFilters;
this.coveringExpressionCaching = coveringExpressionCaching;
this.prefetchSFIceberg = prefetchSFIceberg;
}

/**
Expand All @@ -283,7 +290,8 @@ public RelationalAlgebraGenerator(
boolean enableStreamingSort,
boolean enableStreamingSortLimitOffset,
String sqlStyle,
boolean coveringExpressionCaching) {
boolean coveringExpressionCaching,
boolean prefetchSFIceberg) {
this.catalog = catalog;
this.plannerType = choosePlannerType(plannerType);
this.verboseLevel = verboseLevel;
Expand All @@ -295,6 +303,7 @@ public RelationalAlgebraGenerator(
this.enableRuntimeJoinFilters = enableRuntimeJoinFilters;
this.sqlStyle = sqlStyle;
this.coveringExpressionCaching = coveringExpressionCaching;
this.prefetchSFIceberg = prefetchSFIceberg;
System.setProperty("calcite.default.charset", "UTF-8");
List<String> catalogDefaultSchema = catalog.getDefaultSchema(0);
final @Nullable String currentDatabase;
Expand Down Expand Up @@ -557,7 +566,8 @@ private String getDDLPandasString(
dynamicTypes,
namedParamTypes,
Map.of(),
this.hideOperatorIDs);
this.hideOperatorIDs,
this.prefetchSFIceberg);
codegen.generateDDLCode(ddlNode, new GenerateDDLTypes(this.planner.getTypeFactory()));
return codegen.getGeneratedCode();
}
Expand Down Expand Up @@ -601,7 +611,8 @@ private String getPandasStringFromPlan(
dynamicTypes,
namedParamTypes,
v.getIDMapping(),
this.hideOperatorIDs);
this.hideOperatorIDs,
this.prefetchSFIceberg);
codegen.go(rel);
return codegen.getGeneratedCode();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ abstract class GenTestFixture {
// Maintain case sensitivity in the Snowflake style by default
"SNOWFLAKE",
false,
false,
)

println("SQL query:")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,8 @@ public static void main(String[] args) throws Exception {
true, // Disable Streaming Sort for Testing
false, // Disable Streaming Sort Limit Offset for Testing
"SNOWFLAKE", // Maintain case sensitivity in the Snowflake style by default
false // Only cache identical nodes
false, // Only cache identical nodes
false // Generate a prefetch call at the beginning of SQL queries
);
List<ColumnDataTypeInfo> paramTypes =
List.of(new ColumnDataTypeInfo(BodoSQLColumnDataType.INT64, false));
Expand Down
3 changes: 3 additions & 0 deletions bodo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ def _global_except_hook(exctype, value, traceback):
bodo_disable_streaming_window_sort = (
os.environ.get("BODO_DISABLE_STREAMING_WINDOW_SORT", "0") != "0"
)
# If enabled, generate a prefetch function call to load metadata paths for
# Snowflake-managed Iceberg tables in the BodoSQL plan.
prefetch_sf_iceberg = os.environ.get("BODO_PREFETCH_SF_ICEBERG", "0") != "0"


def get_sql_config_str() -> str:
Expand Down

0 comments on commit d7d30ba

Please sign in to comment.