Skip to content

Commit 91ca9a2

Browse files
committed
Add support for database schema def in RedshiftSource
1 parent 8ef2053 commit 91ca9a2

4 files changed

Lines changed: 59 additions & 12 deletions

File tree

protos/feast/core/DataSource.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ message DataSource {
118118
// SQL query that returns a table containing feature data. Must contain an event_timestamp column, and respective
119119
// entity columns
120120
string query = 2;
121+
122+
// Redshift table schema name
123+
string schema = 3;
121124
}
122125

123126
// Defines configuration for custom third-party data sources.

sdk/python/feast/infra/offline_stores/redshift.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ class RedshiftOfflineStoreConfig(FeastConfigBaseModel):
3838
database: StrictStr
3939
""" Redshift database name """
4040

41+
temp_schema_name: StrictStr
42+
""" Redshift schema name to offload temporary tables """
43+
4144
s3_staging_location: StrictStr
4245
""" S3 path for importing & exporting data to Redshift """
4346

@@ -237,6 +240,7 @@ def to_arrow(self) -> pa.Table:
237240
self._config.offline_store.iam_role,
238241
query,
239242
self._drop_columns,
243+
self._config.offline_store.temp_schema_name,
240244
)
241245

242246
def to_s3(self) -> str:
@@ -254,13 +258,15 @@ def to_s3(self) -> str:
254258
)
255259
return self._s3_path
256260

257-
def to_redshift(self, table_name: str) -> None:
261+
def to_redshift(self, table_name: str, schema: Optional[str] = None) -> None:
258262
""" Save dataset as a new Redshift table """
259263
with self._query_generator() as query:
260-
query = f'CREATE TABLE "{table_name}" AS ({query});\n'
264+
schema_prefix = f'{schema}.' if schema is not None else ''
265+
full_table_name = f'{schema_prefix}{table_name}'
266+
query = f'CREATE TABLE "{full_table_name}" AS ({query});\n'
261267
if self._drop_columns is not None:
262268
for column in self._drop_columns:
263-
query += f"ALTER TABLE {table_name} DROP COLUMN {column};\n"
269+
query += f"ALTER TABLE {full_table_name} DROP COLUMN {column};\n"
264270

265271
aws_utils.execute_redshift_statement(
266272
self._redshift_client,
@@ -291,20 +297,22 @@ def _upload_entity_df_and_get_entity_schema(
291297
config.offline_store.iam_role,
292298
table_name,
293299
entity_df,
300+
config.offline_store.temp_schema_name,
294301
)
295302
return dict(zip(entity_df.columns, entity_df.dtypes))
296303
elif isinstance(entity_df, str):
297304
# If the entity_df is a string (SQL query), create a Redshift table out of it,
298305
# get pandas dataframe consisting of 1 row (LIMIT 1) and generate the schema out of it
306+
full_table_name = f'{config.offline_store.temp_schema_name}.{table_name}'
299307
aws_utils.execute_redshift_statement(
300308
redshift_client,
301309
config.offline_store.cluster_id,
302310
config.offline_store.database,
303311
config.offline_store.user,
304-
f"CREATE TABLE {table_name} AS ({entity_df})",
312+
f"CREATE TABLE {full_table_name} AS ({entity_df})",
305313
)
306314
limited_entity_df = RedshiftRetrievalJob(
307-
f"SELECT * FROM {table_name} LIMIT 1", redshift_client, s3_resource, config
315+
f"SELECT * FROM {full_table_name} LIMIT 1", redshift_client, s3_resource, config
308316
).to_df()
309317
return dict(zip(limited_entity_df.columns, limited_entity_df.dtypes))
310318
else:

sdk/python/feast/infra/offline_stores/redshift_source.py

Lines changed: 33 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ def __init__(
1313
self,
1414
event_timestamp_column: Optional[str] = "",
1515
table: Optional[str] = None,
16+
schema: Optional[str] = None,
1617
created_timestamp_column: Optional[str] = "",
1718
field_mapping: Optional[Dict[str, str]] = None,
1819
date_partition_column: Optional[str] = "",
@@ -25,7 +26,7 @@ def __init__(
2526
date_partition_column,
2627
)
2728

28-
self._redshift_options = RedshiftOptions(table=table, query=query)
29+
self._redshift_options = RedshiftOptions(table=table, schema=schema, query=query)
2930

3031
@staticmethod
3132
def from_proto(data_source: DataSourceProto):
@@ -95,7 +96,8 @@ def validate(self, config: RepoConfig):
9596
def get_table_query_string(self) -> str:
9697
"""Returns a string that can directly be used to reference this table in SQL"""
9798
if self.table:
98-
return f'"{self.table}"'
99+
schema_prefix = f'{self.schema}.' if self.schema is not None else ''
100+
return f'"{schema_prefix}{self.table}"'
99101
else:
100102
return f"({self.query})"
101103

@@ -153,9 +155,19 @@ class RedshiftOptions:
153155
DataSource Redshift options used to source features from Redshift query
154156
"""
155157

156-
def __init__(self, table: Optional[str], query: Optional[str]):
158+
def __init__(self, table: Optional[str], query: Optional[str], schema: Optional[str]):
159+
"""Redshift options to encapsulate logic for parsing and working with 2 kinds of source creation
160+
table + schema or query
161+
162+
Args:
163+
table (Optional[str]): Redshift table to be looked for in redshift cluster to form datasource
164+
query (Optional[str]): Query to run to gather datasource
165+
schema (Optional[str]): Schema in redshift cluster to lookup a table.
166+
Has to be provided in case of tables with same name.
167+
"""
157168
self._table = table
158169
self._query = query
170+
self._schema = schema
159171

160172
@property
161173
def query(self):
@@ -185,6 +197,20 @@ def table(self, table_name):
185197
"""
186198
self._table = table_name
187199

200+
@property
201+
def schema(self):
202+
"""
203+
Returns the schema name of this Redshift table schema
204+
"""
205+
return self._schema
206+
207+
@schema.setter
208+
def table(self, schema_name):
209+
"""
210+
Sets the schema ref of this Redshift table schema
211+
"""
212+
self._schema = schema_name
213+
188214
@classmethod
189215
def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions):
190216
"""
@@ -198,7 +224,9 @@ def from_proto(cls, redshift_options_proto: DataSourceProto.RedshiftOptions):
198224
"""
199225

200226
redshift_options = cls(
201-
table=redshift_options_proto.table, query=redshift_options_proto.query,
227+
table=redshift_options_proto.table,
228+
query=redshift_options_proto.query,
229+
schema=redshift_options_proto.schema
202230
)
203231

204232
return redshift_options
@@ -212,7 +240,7 @@ def to_proto(self) -> DataSourceProto.RedshiftOptions:
212240
"""
213241

214242
redshift_options_proto = DataSourceProto.RedshiftOptions(
215-
table=self.table, query=self.query,
243+
table=self.table, query=self.query, schema=self.schema
216244
)
217245

218246
return redshift_options_proto

sdk/python/feast/infra/utils/aws_utils.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ def upload_df_to_redshift(
146146
iam_role: str,
147147
table_name: str,
148148
df: pd.DataFrame,
149+
schema_name: Optional[str] = None,
149150
) -> None:
150151
"""Uploads a Pandas DataFrame to Redshift as a new table.
151152
@@ -204,9 +205,11 @@ def upload_df_to_redshift(
204205

205206
# Create the table with the desired schema and
206207
# copy the Parquet file contents to the Redshift table
208+
schema_prefix = f'{schema_name}.' if schema_name is not None else ''
209+
full_table_name = f'{schema_prefix}{table_name}'
207210
create_and_copy_query = (
208-
f"CREATE TABLE {table_name}({column_query_list}); "
209-
+ f"COPY {table_name} FROM '{s3_path}' IAM_ROLE '{iam_role}' FORMAT AS PARQUET"
211+
f"CREATE TABLE {full_table_name}({column_query_list}); "
212+
+ f"COPY {full_table_name} FROM '{s3_path}' IAM_ROLE '{iam_role}' FORMAT AS PARQUET"
210213
)
211214
execute_redshift_statement(
212215
redshift_data_client, cluster_id, database, user, create_and_copy_query
@@ -227,6 +230,7 @@ def temporarily_upload_df_to_redshift(
227230
iam_role: str,
228231
table_name: str,
229232
df: pd.DataFrame,
233+
schema_name: Optional[str] = None
230234
) -> Iterator[None]:
231235
"""Uploads a Pandas DataFrame to Redshift as a new table with cleanup logic.
232236
@@ -249,6 +253,7 @@ def temporarily_upload_df_to_redshift(
249253
iam_role,
250254
table_name,
251255
df,
256+
schema_name
252257
)
253258

254259
yield
@@ -325,6 +330,7 @@ def unload_redshift_query_to_pa(
325330
iam_role: str,
326331
query: str,
327332
drop_columns: Optional[List[str]] = None,
333+
temp_schema_name: Optional[str] = None,
328334
) -> pa.Table:
329335
""" Unload Redshift Query results to S3 and get the results in PyArrow Table format """
330336
bucket, key = get_bucket_and_key(s3_path)
@@ -356,6 +362,7 @@ def unload_redshift_query_to_df(
356362
iam_role: str,
357363
query: str,
358364
drop_columns: Optional[List[str]] = None,
365+
schema: Optional[List[str]] = None,
359366
) -> pd.DataFrame:
360367
""" Unload Redshift Query results to S3 and get the results in Pandas DataFrame format """
361368
table = unload_redshift_query_to_pa(
@@ -368,5 +375,6 @@ def unload_redshift_query_to_df(
368375
iam_role,
369376
query,
370377
drop_columns,
378+
schema,
371379
)
372380
return table.to_pandas()

0 commit comments

Comments
 (0)