-
Notifications
You must be signed in to change notification settings - Fork 1.3k
fix: Improve BQ point-in-time joining scalability #3429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -488,10 +488,24 @@ def to_bigquery( | |
| return str(job_config.destination) | ||
|
|
||
| with self._query_generator() as query: | ||
| self._execute_query(query, job_config, timeout) | ||
| dest = job_config.destination | ||
| # because setting destination for scripts is not valid | ||
|
||
| # remove destination attribute if provided | ||
| job_config.destination = None | ||
| bq_job = self._execute_query(query, job_config, timeout) | ||
|
|
||
| print(f"Done writing to '{job_config.destination}'.") | ||
| return str(job_config.destination) | ||
| if not job_config.dry_run: | ||
| config = bq_job.to_api_repr()["configuration"] | ||
| # get temp table created by BQ | ||
| tmp_dest = config["query"]["destinationTable"] | ||
| temp_dest_table = f"{tmp_dest['projectId']}.{tmp_dest['datasetId']}.{tmp_dest['tableId']}" | ||
|
|
||
| # persist temp table | ||
| sql = f"CREATE TABLE {dest} AS SELECT * FROM {temp_dest_table}" | ||
|
||
| self._execute_query(sql, timeout=timeout) | ||
|
|
||
| print(f"Done writing to '{dest}'.") | ||
| return str(dest) | ||
|
|
||
| def _to_arrow_internal(self) -> pyarrow.Table: | ||
| with self._query_generator() as query: | ||
|
|
@@ -777,7 +791,7 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField] | |
| Compute a deterministic hash for the `left_table_query_string` that will be used throughout | ||
| all the logic as the field to GROUP BY the data | ||
| */ | ||
| WITH entity_dataframe AS ( | ||
| CREATE TEMP TABLE entity_dataframe AS ( | ||
|
||
| SELECT *, | ||
| {{entity_df_event_timestamp_col}} AS entity_timestamp | ||
| {% for featureview in featureviews %} | ||
|
|
@@ -793,95 +807,95 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField] | |
| {% endif %} | ||
| {% endfor %} | ||
| FROM `{{ left_table_query_string }}` | ||
| ), | ||
| ); | ||
|
|
||
| {% for featureview in featureviews %} | ||
|
|
||
| {{ featureview.name }}__entity_dataframe AS ( | ||
| SELECT | ||
| {{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} | ||
| entity_timestamp, | ||
| {{featureview.name}}__entity_row_unique_id | ||
| FROM entity_dataframe | ||
| GROUP BY | ||
| {{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} | ||
| entity_timestamp, | ||
| {{featureview.name}}__entity_row_unique_id | ||
| ), | ||
|
|
||
| /* | ||
| This query template performs the point-in-time correctness join for a single feature set table | ||
| to the provided entity table. | ||
|
|
||
| 1. We first join the current feature_view to the entity dataframe that has been passed. | ||
| This JOIN has the following logic: | ||
| - For each row of the entity dataframe, only keep the rows where the `timestamp_field` | ||
| is less than the one provided in the entity dataframe | ||
| - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` | ||
| is higher the the one provided minus the TTL | ||
| - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been | ||
| computed previously | ||
|
|
||
| The output of this CTE will contain all the necessary information and already filtered out most | ||
| of the data that is not relevant. | ||
| */ | ||
|
|
||
| {{ featureview.name }}__subquery AS ( | ||
| SELECT | ||
| {{ featureview.timestamp_field }} as event_timestamp, | ||
| {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} | ||
| {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} | ||
| {% for feature in featureview.features %} | ||
| {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} | ||
| {% endfor %} | ||
| FROM {{ featureview.table_subquery }} | ||
| WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}' | ||
| {% if featureview.ttl == 0 %}{% else %} | ||
| AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}' | ||
| {% endif %} | ||
| ), | ||
|
|
||
| {{ featureview.name }}__base AS ( | ||
| SELECT | ||
| subquery.*, | ||
| entity_dataframe.entity_timestamp, | ||
| entity_dataframe.{{featureview.name}}__entity_row_unique_id | ||
| FROM {{ featureview.name }}__subquery AS subquery | ||
| INNER JOIN {{ featureview.name }}__entity_dataframe AS entity_dataframe | ||
| ON TRUE | ||
| AND subquery.event_timestamp <= entity_dataframe.entity_timestamp | ||
|
|
||
| CREATE TEMP TABLE {{ featureview.name }}__cleaned AS ( | ||
| WITH {{ featureview.name }}__entity_dataframe AS ( | ||
| SELECT | ||
| {{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} | ||
| entity_timestamp, | ||
| {{featureview.name}}__entity_row_unique_id | ||
| FROM entity_dataframe | ||
| GROUP BY | ||
| {{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %} | ||
| entity_timestamp, | ||
| {{featureview.name}}__entity_row_unique_id | ||
| ), | ||
|
|
||
| /* | ||
| This query template performs the point-in-time correctness join for a single feature set table | ||
| to the provided entity table. | ||
|
|
||
| 1. We first join the current feature_view to the entity dataframe that has been passed. | ||
| This JOIN has the following logic: | ||
| - For each row of the entity dataframe, only keep the rows where the `timestamp_field` | ||
| is less than the one provided in the entity dataframe | ||
| - If there a TTL for the current feature_view, also keep the rows where the `timestamp_field` | ||
| is higher the the one provided minus the TTL | ||
| - For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been | ||
| computed previously | ||
|
|
||
| The output of this CTE will contain all the necessary information and already filtered out most | ||
| of the data that is not relevant. | ||
| */ | ||
|
|
||
| {{ featureview.name }}__subquery AS ( | ||
| SELECT | ||
| {{ featureview.timestamp_field }} as event_timestamp, | ||
| {{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }} | ||
| {{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %} | ||
| {% for feature in featureview.features %} | ||
| {{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %} | ||
| {% endfor %} | ||
| FROM {{ featureview.table_subquery }} | ||
| WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}' | ||
| {% if featureview.ttl == 0 %}{% else %} | ||
| AND subquery.event_timestamp >= Timestamp_sub(entity_dataframe.entity_timestamp, interval {{ featureview.ttl }} second) | ||
| AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}' | ||
| {% endif %} | ||
| ), | ||
|
|
||
| {{ featureview.name }}__base AS ( | ||
| SELECT | ||
| subquery.*, | ||
| entity_dataframe.entity_timestamp, | ||
| entity_dataframe.{{featureview.name}}__entity_row_unique_id | ||
| FROM {{ featureview.name }}__subquery AS subquery | ||
| INNER JOIN {{ featureview.name }}__entity_dataframe AS entity_dataframe | ||
| ON TRUE | ||
| AND subquery.event_timestamp <= entity_dataframe.entity_timestamp | ||
|
|
||
| {% if featureview.ttl == 0 %}{% else %} | ||
| AND subquery.event_timestamp >= Timestamp_sub(entity_dataframe.entity_timestamp, interval {{ featureview.ttl }} second) | ||
| {% endif %} | ||
|
|
||
| {% for entity in featureview.entities %} | ||
| AND subquery.{{ entity }} = entity_dataframe.{{ entity }} | ||
| {% endfor %} | ||
| ), | ||
|
|
||
| /* | ||
| 2. If the `created_timestamp_column` has been set, we need to | ||
| deduplicate the data first. This is done by calculating the | ||
| `MAX(created_at_timestamp)` for each event_timestamp. | ||
| We then join the data on the next CTE | ||
| */ | ||
| {% if featureview.created_timestamp_column %} | ||
| {{ featureview.name }}__dedup AS ( | ||
| SELECT | ||
| {{featureview.name}}__entity_row_unique_id, | ||
| event_timestamp, | ||
| MAX(created_timestamp) as created_timestamp | ||
| FROM {{ featureview.name }}__base | ||
| GROUP BY {{featureview.name}}__entity_row_unique_id, event_timestamp | ||
| ), | ||
| {% endif %} | ||
| {% for entity in featureview.entities %} | ||
| AND subquery.{{ entity }} = entity_dataframe.{{ entity }} | ||
| {% endfor %} | ||
| ), | ||
|
|
||
| /* | ||
| 2. If the `created_timestamp_column` has been set, we need to | ||
| deduplicate the data first. This is done by calculating the | ||
| `MAX(created_at_timestamp)` for each event_timestamp. | ||
| We then join the data on the next CTE | ||
| */ | ||
| {% if featureview.created_timestamp_column %} | ||
| {{ featureview.name }}__dedup AS ( | ||
| SELECT | ||
| {{featureview.name}}__entity_row_unique_id, | ||
| event_timestamp, | ||
| MAX(created_timestamp) as created_timestamp | ||
| FROM {{ featureview.name }}__base | ||
| GROUP BY {{featureview.name}}__entity_row_unique_id, event_timestamp | ||
| ), | ||
| {% endif %} | ||
|
|
||
| /* | ||
| 3. The data has been filtered during the first CTE "*__base" | ||
| Thus we only need to compute the latest timestamp of each feature. | ||
| */ | ||
| {{ featureview.name }}__latest AS ( | ||
| /* | ||
| 3. The data has been filtered during the first CTE "*__base" | ||
| Thus we only need to compute the latest timestamp of each feature. | ||
| */ | ||
| {{ featureview.name }}__latest AS ( | ||
| SELECT | ||
| event_timestamp, | ||
| {% if featureview.created_timestamp_column %}created_timestamp,{% endif %} | ||
|
|
@@ -900,13 +914,13 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField] | |
| {% endif %} | ||
| ) | ||
| WHERE row_number = 1 | ||
| ), | ||
| ) | ||
|
|
||
| /* | ||
| 4. Once we know the latest value of each feature for a given timestamp, | ||
| we can join again the data back to the original "base" dataset | ||
| */ | ||
| {{ featureview.name }}__cleaned AS ( | ||
|
|
||
| SELECT base.* | ||
| FROM {{ featureview.name }}__base as base | ||
| INNER JOIN {{ featureview.name }}__latest | ||
|
|
@@ -917,7 +931,7 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField] | |
| ,created_timestamp | ||
| {% endif %} | ||
| ) | ||
| ){% if loop.last %}{% else %}, {% endif %} | ||
|
||
| ); | ||
|
|
||
|
|
||
| {% endfor %} | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Conceptually, the PR seems reasonable (once any doubts about temp table cross-contamination are put to rest). But in terms of code readability, I can't help but notice that if this method is called without
job_config, we first create it containing only the destination at the start of the method, then we remove the destination here. Perhaps this could use a bit of a cleanup where the job_config management is all in one spot?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
strongly agree! but in this PR I'm trying to not break any other flows. Perhaps some users would prefer passing some of their own job configurations to
to_bigquerymethod, that's why I only deal with thedestinationattribute.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe we shouldn't allow users to pass the destination, there would be some changes needed but let's deal with them in a separate PR