Skip to content

Commit 8287b56

Browse files
author
Tsotne Tabidze
authored
Implement materialization for RedshiftOfflineStore & RedshiftRetrievalJob (feast-dev#1680)
* Implement materialization for RedshiftOfflineStore Signed-off-by: Tsotne Tabidze <[email protected]> * Address Willem's comments Signed-off-by: Tsotne Tabidze <[email protected]> * Rename method Signed-off-by: Tsotne Tabidze <[email protected]>
1 parent 6d07767 commit 8287b56

11 files changed

Lines changed: 616 additions & 300 deletions

File tree

dump.rdb

1015 Bytes
Binary file not shown.

sdk/python/feast/data_source.py

Lines changed: 25 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,10 @@
1717
from typing import Callable, Dict, Iterable, Optional, Tuple
1818

1919
from pyarrow.parquet import ParquetFile
20-
from tenacity import retry, retry_unless_exception_type, wait_exponential
2120

2221
from feast import type_map
2322
from feast.data_format import FileFormat, StreamFormat
24-
from feast.errors import (
25-
DataSourceNotFoundException,
26-
RedshiftCredentialsError,
27-
RedshiftQueryError,
28-
)
23+
from feast.errors import DataSourceNotFoundException, RedshiftCredentialsError
2924
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
3025
from feast.repo_config import RepoConfig
3126
from feast.value_type import ValueType
@@ -1062,7 +1057,7 @@ def validate(self, config: RepoConfig):
10621057
def get_table_query_string(self) -> str:
10631058
"""Returns a string that can directly be used to reference this table in SQL"""
10641059
if self.table:
1065-
return f"`{self.table}`"
1060+
return f'"{self.table}"'
10661061
else:
10671062
return f"({self.query})"
10681063

@@ -1073,62 +1068,43 @@ def source_datatype_to_feast_value_type() -> Callable[[str], ValueType]:
10731068
def get_table_column_names_and_types(
10741069
self, config: RepoConfig
10751070
) -> Iterable[Tuple[str, str]]:
1076-
import boto3
1077-
from botocore.config import Config
10781071
from botocore.exceptions import ClientError
10791072

10801073
from feast.infra.offline_stores.redshift import RedshiftOfflineStoreConfig
1074+
from feast.infra.utils import aws_utils
10811075

10821076
assert isinstance(config.offline_store, RedshiftOfflineStoreConfig)
10831077

1084-
client = boto3.client(
1085-
"redshift-data", config=Config(region_name=config.offline_store.region)
1086-
)
1078+
client = aws_utils.get_redshift_data_client(config.offline_store.region)
10871079

1088-
try:
1089-
if self.table is not None:
1080+
if self.table is not None:
1081+
try:
10901082
table = client.describe_table(
10911083
ClusterIdentifier=config.offline_store.cluster_id,
10921084
Database=config.offline_store.database,
10931085
DbUser=config.offline_store.user,
10941086
Table=self.table,
10951087
)
1096-
# The API returns valid JSON with empty column list when the table doesn't exist
1097-
if len(table["ColumnList"]) == 0:
1098-
raise DataSourceNotFoundException(self.table)
1088+
except ClientError as e:
1089+
if e.response["Error"]["Code"] == "ValidationException":
1090+
raise RedshiftCredentialsError() from e
1091+
raise
10991092

1100-
columns = table["ColumnList"]
1101-
else:
1102-
statement = client.execute_statement(
1103-
ClusterIdentifier=config.offline_store.cluster_id,
1104-
Database=config.offline_store.database,
1105-
DbUser=config.offline_store.user,
1106-
Sql=f"SELECT * FROM ({self.query}) LIMIT 1",
1107-
)
1093+
# The API returns valid JSON with empty column list when the table doesn't exist
1094+
if len(table["ColumnList"]) == 0:
1095+
raise DataSourceNotFoundException(self.table)
11081096

1109-
# Need to retry client.describe_statement(...) until the task is finished. We don't want to bombard
1110-
# Redshift with queries, and neither do we want to wait for a long time on the initial call.
1111-
# The solution is exponential backoff. The backoff starts with 0.1 seconds and doubles exponentially
1112-
# until reaching 30 seconds, at which point the backoff is fixed.
1113-
@retry(
1114-
wait=wait_exponential(multiplier=0.1, max=30),
1115-
retry=retry_unless_exception_type(RedshiftQueryError),
1116-
)
1117-
def wait_for_statement():
1118-
desc = client.describe_statement(Id=statement["Id"])
1119-
if desc["Status"] in ("SUBMITTED", "STARTED", "PICKED"):
1120-
raise Exception # Retry
1121-
if desc["Status"] != "FINISHED":
1122-
raise RedshiftQueryError(desc) # Don't retry. Raise exception.
1123-
1124-
wait_for_statement()
1125-
1126-
result = client.get_statement_result(Id=statement["Id"])
1127-
1128-
columns = result["ColumnMetadata"]
1129-
except ClientError as e:
1130-
if e.response["Error"]["Code"] == "ValidationException":
1131-
raise RedshiftCredentialsError() from e
1132-
raise
1097+
columns = table["ColumnList"]
1098+
else:
1099+
statement_id = aws_utils.execute_redshift_statement(
1100+
client,
1101+
config.offline_store.cluster_id,
1102+
config.offline_store.database,
1103+
config.offline_store.user,
1104+
f"SELECT * FROM ({self.query}) LIMIT 1",
1105+
)
1106+
columns = aws_utils.get_redshift_statement_result(client, statement_id)[
1107+
"ColumnMetadata"
1108+
]
11331109

11341110
return [(column["name"], column["typeName"].upper()) for column in columns]

sdk/python/feast/infra/offline_stores/redshift.py

Lines changed: 115 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
1+
import uuid
12
from datetime import datetime
23
from typing import List, Optional, Union
34

45
import pandas as pd
6+
import pyarrow as pa
57
from pydantic import StrictStr
68
from pydantic.typing import Literal
79

8-
from feast.data_source import DataSource
10+
from feast.data_source import DataSource, RedshiftSource
911
from feast.feature_view import FeatureView
1012
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
13+
from feast.infra.utils import aws_utils
1114
from feast.registry import Registry
1215
from feast.repo_config import FeastConfigBaseModel, RepoConfig
1316

@@ -30,9 +33,12 @@ class RedshiftOfflineStoreConfig(FeastConfigBaseModel):
3033
database: StrictStr
3134
""" Redshift database name """
3235

33-
s3_path: StrictStr
36+
s3_staging_location: StrictStr
3437
""" S3 path for importing & exporting data to Redshift """
3538

39+
iam_role: StrictStr
40+
""" IAM Role for Redshift, granting it access to S3 """
41+
3642

3743
class RedshiftOfflineStore(OfflineStore):
3844
@staticmethod
@@ -46,7 +52,45 @@ def pull_latest_from_table_or_query(
4652
start_date: datetime,
4753
end_date: datetime,
4854
) -> RetrievalJob:
49-
pass
55+
assert isinstance(data_source, RedshiftSource)
56+
assert isinstance(config.offline_store, RedshiftOfflineStoreConfig)
57+
58+
from_expression = data_source.get_table_query_string()
59+
60+
partition_by_join_key_string = ", ".join(join_key_columns)
61+
if partition_by_join_key_string != "":
62+
partition_by_join_key_string = (
63+
"PARTITION BY " + partition_by_join_key_string
64+
)
65+
timestamp_columns = [event_timestamp_column]
66+
if created_timestamp_column:
67+
timestamp_columns.append(created_timestamp_column)
68+
timestamp_desc_string = " DESC, ".join(timestamp_columns) + " DESC"
69+
field_string = ", ".join(
70+
join_key_columns + feature_name_columns + timestamp_columns
71+
)
72+
73+
redshift_client = aws_utils.get_redshift_data_client(
74+
config.offline_store.region
75+
)
76+
s3_resource = aws_utils.get_s3_resource(config.offline_store.region)
77+
78+
query = f"""
79+
SELECT {field_string}
80+
FROM (
81+
SELECT {field_string},
82+
ROW_NUMBER() OVER({partition_by_join_key_string} ORDER BY {timestamp_desc_string}) AS _feast_row
83+
FROM {from_expression}
84+
WHERE {event_timestamp_column} BETWEEN TIMESTAMP '{start_date}' AND TIMESTAMP '{end_date}'
85+
)
86+
WHERE _feast_row = 1
87+
"""
88+
return RedshiftRetrievalJob(
89+
query=query,
90+
redshift_client=redshift_client,
91+
s3_resource=s3_resource,
92+
config=config,
93+
)
5094

5195
@staticmethod
5296
def get_historical_features(
@@ -59,3 +103,71 @@ def get_historical_features(
59103
full_feature_names: bool = False,
60104
) -> RetrievalJob:
61105
pass
106+
107+
108+
class RedshiftRetrievalJob(RetrievalJob):
109+
def __init__(self, query: str, redshift_client, s3_resource, config: RepoConfig):
110+
"""Initialize RedshiftRetrievalJob object.
111+
112+
Args:
113+
query: Redshift SQL query to execute.
114+
redshift_client: boto3 redshift-data client
115+
s3_resource: boto3 s3 resource object
116+
config: Feast repo config
117+
"""
118+
self.query = query
119+
self._redshift_client = redshift_client
120+
self._s3_resource = s3_resource
121+
self._config = config
122+
self._s3_path = (
123+
self._config.offline_store.s3_staging_location
124+
+ "/unload/"
125+
+ str(uuid.uuid4())
126+
)
127+
128+
def to_df(self) -> pd.DataFrame:
129+
return aws_utils.unload_redshift_query_to_df(
130+
self._redshift_client,
131+
self._config.offline_store.cluster_id,
132+
self._config.offline_store.database,
133+
self._config.offline_store.user,
134+
self._s3_resource,
135+
self._s3_path,
136+
self._config.offline_store.iam_role,
137+
self.query,
138+
)
139+
140+
def to_arrow(self) -> pa.Table:
141+
return aws_utils.unload_redshift_query_to_pa(
142+
self._redshift_client,
143+
self._config.offline_store.cluster_id,
144+
self._config.offline_store.database,
145+
self._config.offline_store.user,
146+
self._s3_resource,
147+
self._s3_path,
148+
self._config.offline_store.iam_role,
149+
self.query,
150+
)
151+
152+
def to_s3(self) -> str:
153+
""" Export dataset to S3 in Parquet format and return path """
154+
aws_utils.execute_redshift_query_and_unload_to_s3(
155+
self._redshift_client,
156+
self._config.offline_store.cluster_id,
157+
self._config.offline_store.database,
158+
self._config.offline_store.user,
159+
self._s3_path,
160+
self._config.offline_store.iam_role,
161+
self.query,
162+
)
163+
return self._s3_path
164+
165+
def to_redshift(self, table_name: str) -> None:
166+
""" Save dataset as a new Redshift table """
167+
aws_utils.execute_redshift_statement(
168+
self._redshift_client,
169+
self._config.offline_store.cluster_id,
170+
self._config.offline_store.database,
171+
self._config.offline_store.user,
172+
f'CREATE TABLE "{table_name}" AS ({self.query})',
173+
)

sdk/python/feast/infra/utils/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)