Skip to content

Commit 3d9bafd

Browse files
author
Chen Zhiling
authored
Add unique ingestion id for all batch ingestions (#656)
* Add unique dataset id for all batch ingestions * Rename to ingestion_id
1 parent 7a0ff91 commit 3d9bafd

File tree

6 files changed

+48
-6
lines changed

6 files changed

+48
-6
lines changed

protos/feast/core/Store.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ message Store {
6969
// ====================|==================|================================
7070
// - event_timestamp | TIMESTAMP | event time of the FeatureRow
7171
// - created_timestamp | TIMESTAMP | processing time of the ingestion of the FeatureRow
72+
// - ingestion_id | STRING | unique id identifying groups of rows that have been ingested together
7273
// - job_id | STRING | identifier for the job that writes the FeatureRow to the corresponding BigQuery table
7374
//
7475
// BigQuery table created will be partitioned by the field "event_timestamp"

protos/feast/types/FeatureRow.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,4 +39,7 @@ message FeatureRow {
3939
// <project>/<feature-set-name>:<version>. This value will be used by the feast ingestion job to filter
4040
// rows, and write the values to the correct tables.
4141
string feature_set = 6;
42+
43+
// Identifier tying this feature row to a specific ingestion job.
44+
string ingestion_id = 7;
4245
}

sdk/python/feast/client.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import shutil
1919
import tempfile
2020
import time
21+
import uuid
2122
from collections import OrderedDict
2223
from math import ceil
2324
from typing import Dict, List, Optional, Tuple, Union
@@ -825,13 +826,15 @@ def ingest(
825826
# Loop optimization declarations
826827
produce = producer.produce
827828
flush = producer.flush
829+
ingestion_id = _generate_ingestion_id(feature_set)
828830

829831
# Transform and push data to Kafka
830832
if feature_set.source.source_type == "Kafka":
831833
for chunk in get_feature_row_chunks(
832834
file=dest_path,
833835
row_groups=list(range(pq_file.num_row_groups)),
834836
fs=feature_set,
837+
ingestion_id=ingestion_id,
835838
max_workers=max_workers,
836839
):
837840

@@ -916,6 +919,20 @@ def _build_feature_references(
916919
return features
917920

918921

922+
def _generate_ingestion_id(feature_set: FeatureSet) -> str:
923+
"""
924+
Generates a UUID from the feature set name, version, and the current time.
925+
926+
Args:
927+
feature_set: Feature set of the dataset to be ingested.
928+
929+
Returns:
930+
UUID unique to current time and the feature set provided.
931+
"""
932+
uuid_str = f"{feature_set.name}_{feature_set.version}_{int(time.time())}"
933+
return str(uuid.uuid3(uuid.NAMESPACE_DNS, uuid_str))
934+
935+
919936
def _read_table_from_source(
920937
source: Union[pd.DataFrame, str], chunk_size: int, max_workers: int
921938
) -> Tuple[str, str]:

sdk/python/feast/loaders/ingest.py

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626

2727

2828
def _encode_pa_tables(
29-
file: str, feature_set: str, fields: dict, row_group_idx: int
29+
file: str, feature_set: str, fields: dict, ingestion_id: str, row_group_idx: int
3030
) -> List[bytes]:
3131
"""
3232
Helper function to encode a PyArrow table(s) read from parquet file(s) into
@@ -49,6 +49,9 @@ def _encode_pa_tables(
4949
fields (dict[str, enum.Enum.ValueType]):
5050
A mapping of field names to their value types.
5151
52+
ingestion_id (str):
53+
UUID unique to this ingestion job.
54+
5255
row_group_idx(int):
5356
Row group index to read and encode into byte like FeatureRow
5457
protobuf objects.
@@ -81,7 +84,9 @@ def _encode_pa_tables(
8184
# Iterate through the rows
8285
for row_idx in range(table.num_rows):
8386
feature_row = FeatureRow(
84-
event_timestamp=datetime_col[row_idx], feature_set=feature_set
87+
event_timestamp=datetime_col[row_idx],
88+
feature_set=feature_set,
89+
ingestion_id=ingestion_id,
8590
)
8691
# Loop optimization declaration
8792
ext = feature_row.fields.extend
@@ -97,7 +102,11 @@ def _encode_pa_tables(
97102

98103

99104
def get_feature_row_chunks(
100-
file: str, row_groups: List[int], fs: FeatureSet, max_workers: int
105+
file: str,
106+
row_groups: List[int],
107+
fs: FeatureSet,
108+
ingestion_id: str,
109+
max_workers: int,
101110
) -> Iterable[List[bytes]]:
102111
"""
103112
Iterator function to encode a PyArrow table read from a parquet file to
@@ -115,6 +124,9 @@ def get_feature_row_chunks(
115124
fs (feast.feature_set.FeatureSet):
116125
FeatureSet describing parquet files.
117126
127+
ingestion_id (str):
128+
UUID unique to this ingestion job.
129+
118130
max_workers (int):
119131
Maximum number of workers to spawn.
120132
@@ -128,7 +140,7 @@ def get_feature_row_chunks(
128140
field_map = {field.name: field.dtype for field in fs.fields.values()}
129141

130142
pool = Pool(max_workers)
131-
func = partial(_encode_pa_tables, file, feature_set, field_map)
143+
func = partial(_encode_pa_tables, file, feature_set, field_map, ingestion_id)
132144
for chunk in pool.imap(func, row_groups):
133145
yield chunk
134146
return

storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/BigQueryFeatureSink.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@ public abstract class BigQueryFeatureSink implements FeatureSink {
4242
"Event time for the FeatureRow";
4343
public static final String BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION =
4444
"Processing time of the FeatureRow ingestion in Feast\"";
45+
public static final String BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION =
46+
"Unique id identifying groups of rows that have been ingested together";
4547
public static final String BIGQUERY_JOB_ID_FIELD_DESCRIPTION =
4648
"Feast import job ID for the FeatureRow";
4749

@@ -108,10 +110,13 @@ public void prepareWrite(FeatureSetProto.FeatureSet featureSet) {
108110
Table table = bigquery.getTable(tableId);
109111
if (table != null) {
110112
log.info(
111-
"Writing to existing BigQuery table '{}:{}.{}'",
112-
getProjectId(),
113+
"Updating and writing to existing BigQuery table '{}:{}.{}'",
114+
datasetId.getProject(),
113115
datasetId.getDataset(),
114116
tableName);
117+
TableDefinition tableDefinition = createBigQueryTableDefinition(featureSet.getSpec());
118+
TableInfo tableInfo = TableInfo.of(tableId, tableDefinition);
119+
bigquery.update(tableInfo);
115120
return;
116121
}
117122

@@ -166,6 +171,8 @@ private TableDefinition createBigQueryTableDefinition(FeatureSetProto.FeatureSet
166171
"created_timestamp",
167172
Pair.of(
168173
StandardSQLTypeName.TIMESTAMP, BIGQUERY_CREATED_TIMESTAMP_FIELD_DESCRIPTION),
174+
"ingestion_id",
175+
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_INGESTION_ID_FIELD_DESCRIPTION),
169176
"job_id",
170177
Pair.of(StandardSQLTypeName.STRING, BIGQUERY_JOB_ID_FIELD_DESCRIPTION));
171178
for (Map.Entry<String, Pair<StandardSQLTypeName, String>> entry :

storage/connectors/bigquery/src/main/java/feast/storage/connectors/bigquery/writer/FeatureRowToTableRow.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
public class FeatureRowToTableRow implements SerializableFunction<FeatureRow, TableRow> {
3232
private static final String EVENT_TIMESTAMP_COLUMN = "event_timestamp";
3333
private static final String CREATED_TIMESTAMP_COLUMN = "created_timestamp";
34+
private static final String INGESTION_ID_COLUMN = "ingestion_id";
3435
private static final String JOB_ID_COLUMN = "job_id";
3536
private final String jobId;
3637

@@ -47,6 +48,7 @@ public TableRow apply(FeatureRow featureRow) {
4748
TableRow tableRow = new TableRow();
4849
tableRow.set(EVENT_TIMESTAMP_COLUMN, Timestamps.toString(featureRow.getEventTimestamp()));
4950
tableRow.set(CREATED_TIMESTAMP_COLUMN, Instant.now().toString());
51+
tableRow.set(INGESTION_ID_COLUMN, featureRow.getIngestionId());
5052
tableRow.set(JOB_ID_COLUMN, jobId);
5153

5254
for (Field field : featureRow.getFieldsList()) {

0 commit comments

Comments
 (0)