Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
194 changes: 104 additions & 90 deletions sdk/python/feast/infra/offline_stores/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -488,10 +488,24 @@ def to_bigquery(
return str(job_config.destination)

with self._query_generator() as query:
self._execute_query(query, job_config, timeout)
dest = job_config.destination
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Conceptually, the PR seems reasonable (once any doubts about temp table cross-contamination are put to rest). But in terms of code readability, I can't help but notice that if this method is called without job_config, we first create it containing only the destination at the start of the method, then we remove the destination here. Perhaps this could use a bit of a cleanup where the job_config management is all in one spot?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strongly agree! but in this PR I'm trying to not break any other flows. Perhaps some users would prefer passing some of their own job configurations to to_bigquery method, that's why I only deal with the destination attribute.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe we shouldn't allow users to pass the destination, there would be some changes needed but let's deal with them in a separate PR

# because setting destination for scripts is not valid
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this just not working before? Seems like it was using the job_config.destination before?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when we run multiple scripts separated by semicolon, BigQuery just doesn't accept passing destination explicitly, will throw error
google.api_core.exceptions.BadRequest: 400 configuration.query.destinationTable cannot be set for scripts

that's why I add some workaround here

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

simply retrieve BQ-generated temporary table after scripts being executed (should look like this <project_id>._08dc845331ibb01824ea0f3c3e079e93b3a076.anon933abd7cb6950c3c488af61fe94ad1713b6820cd1115fddd0c09cde32ec12823 )

then we persist it as our naming convention with prefix historical_

# remove destination attribute if provided
job_config.destination = None
bq_job = self._execute_query(query, job_config, timeout)

print(f"Done writing to '{job_config.destination}'.")
return str(job_config.destination)
if not job_config.dry_run:
config = bq_job.to_api_repr()["configuration"]
# get temp table created by BQ
tmp_dest = config["query"]["destinationTable"]
temp_dest_table = f"{tmp_dest['projectId']}.{tmp_dest['datasetId']}.{tmp_dest['tableId']}"

# persist temp table
sql = f"CREATE TABLE {dest} AS SELECT * FROM {temp_dest_table}"
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similarly, confused why there is a new need to do this when it seemed like maybe it worked before?

self._execute_query(sql, timeout=timeout)

print(f"Done writing to '{dest}'.")
return str(dest)

def _to_arrow_internal(self) -> pyarrow.Table:
with self._query_generator() as query:
Expand Down Expand Up @@ -777,7 +791,7 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
Compute a deterministic hash for the `left_table_query_string` that will be used throughout
all the logic as the field to GROUP BY the data
*/
WITH entity_dataframe AS (
CREATE TEMP TABLE entity_dataframe AS (
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the main change here to create temp tables for everything? (and everything else got indented)?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, create temp table for entity dataframe and every joining feature views.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and the temp tables only exist within the current execution session

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you able to confirm what the scope of the session is? Does each invocation of self._execute_query create a new session? At a cursory glance, I see that we're not explicitly passing create_session=True in the QueryJobConfig instance. I think it would be very helpful to have a confirmation that there isn't going to be any cross contamination between concurrent historical fetches.

Copy link
Copy Markdown
Collaborator Author

@sudohainguyen sudohainguyen Jan 10, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does each invocation of self._execute_query create a new session? yes it does

I see that we're not explicitly passing create_session=True in the QueryJobConfig instance. we only need to declare this config when we need to reuse the session in a different client code scope. In this case, we gather all of feature views in a collection of scripts to execute once. I can confirm after the scripts are executed, temp tables do not exist anywhere else.

SELECT *,
{{entity_df_event_timestamp_col}} AS entity_timestamp
{% for featureview in featureviews %}
Expand All @@ -793,95 +807,95 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
{% endif %}
{% endfor %}
FROM `{{ left_table_query_string }}`
),
);

{% for featureview in featureviews %}

{{ featureview.name }}__entity_dataframe AS (
SELECT
{{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
entity_timestamp,
{{featureview.name}}__entity_row_unique_id
FROM entity_dataframe
GROUP BY
{{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
entity_timestamp,
{{featureview.name}}__entity_row_unique_id
),

/*
This query template performs the point-in-time correctness join for a single feature set table
to the provided entity table.

1. We first join the current feature_view to the entity dataframe that has been passed.
This JOIN has the following logic:
- For each row of the entity dataframe, only keep the rows where the `timestamp_field`
is less than the one provided in the entity dataframe
- If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
is higher the the one provided minus the TTL
- For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
computed previously

The output of this CTE will contain all the necessary information and already filtered out most
of the data that is not relevant.
*/

{{ featureview.name }}__subquery AS (
SELECT
{{ featureview.timestamp_field }} as event_timestamp,
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}
AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}'
{% endif %}
),

{{ featureview.name }}__base AS (
SELECT
subquery.*,
entity_dataframe.entity_timestamp,
entity_dataframe.{{featureview.name}}__entity_row_unique_id
FROM {{ featureview.name }}__subquery AS subquery
INNER JOIN {{ featureview.name }}__entity_dataframe AS entity_dataframe
ON TRUE
AND subquery.event_timestamp <= entity_dataframe.entity_timestamp

CREATE TEMP TABLE {{ featureview.name }}__cleaned AS (
WITH {{ featureview.name }}__entity_dataframe AS (
SELECT
{{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
entity_timestamp,
{{featureview.name}}__entity_row_unique_id
FROM entity_dataframe
GROUP BY
{{ featureview.entities | join(', ')}}{% if featureview.entities %},{% else %}{% endif %}
entity_timestamp,
{{featureview.name}}__entity_row_unique_id
),

/*
This query template performs the point-in-time correctness join for a single feature set table
to the provided entity table.

1. We first join the current feature_view to the entity dataframe that has been passed.
This JOIN has the following logic:
- For each row of the entity dataframe, only keep the rows where the `timestamp_field`
is less than the one provided in the entity dataframe
- If there a TTL for the current feature_view, also keep the rows where the `timestamp_field`
is higher the the one provided minus the TTL
- For each row, Join on the entity key and retrieve the `entity_row_unique_id` that has been
computed previously

The output of this CTE will contain all the necessary information and already filtered out most
of the data that is not relevant.
*/

{{ featureview.name }}__subquery AS (
SELECT
{{ featureview.timestamp_field }} as event_timestamp,
{{ featureview.created_timestamp_column ~ ' as created_timestamp,' if featureview.created_timestamp_column else '' }}
{{ featureview.entity_selections | join(', ')}}{% if featureview.entity_selections %},{% else %}{% endif %}
{% for feature in featureview.features %}
{{ feature }} as {% if full_feature_names %}{{ featureview.name }}__{{featureview.field_mapping.get(feature, feature)}}{% else %}{{ featureview.field_mapping.get(feature, feature) }}{% endif %}{% if loop.last %}{% else %}, {% endif %}
{% endfor %}
FROM {{ featureview.table_subquery }}
WHERE {{ featureview.timestamp_field }} <= '{{ featureview.max_event_timestamp }}'
{% if featureview.ttl == 0 %}{% else %}
AND subquery.event_timestamp >= Timestamp_sub(entity_dataframe.entity_timestamp, interval {{ featureview.ttl }} second)
AND {{ featureview.timestamp_field }} >= '{{ featureview.min_event_timestamp }}'
{% endif %}
),

{{ featureview.name }}__base AS (
SELECT
subquery.*,
entity_dataframe.entity_timestamp,
entity_dataframe.{{featureview.name}}__entity_row_unique_id
FROM {{ featureview.name }}__subquery AS subquery
INNER JOIN {{ featureview.name }}__entity_dataframe AS entity_dataframe
ON TRUE
AND subquery.event_timestamp <= entity_dataframe.entity_timestamp

{% if featureview.ttl == 0 %}{% else %}
AND subquery.event_timestamp >= Timestamp_sub(entity_dataframe.entity_timestamp, interval {{ featureview.ttl }} second)
{% endif %}

{% for entity in featureview.entities %}
AND subquery.{{ entity }} = entity_dataframe.{{ entity }}
{% endfor %}
),

/*
2. If the `created_timestamp_column` has been set, we need to
deduplicate the data first. This is done by calculating the
`MAX(created_at_timestamp)` for each event_timestamp.
We then join the data on the next CTE
*/
{% if featureview.created_timestamp_column %}
{{ featureview.name }}__dedup AS (
SELECT
{{featureview.name}}__entity_row_unique_id,
event_timestamp,
MAX(created_timestamp) as created_timestamp
FROM {{ featureview.name }}__base
GROUP BY {{featureview.name}}__entity_row_unique_id, event_timestamp
),
{% endif %}
{% for entity in featureview.entities %}
AND subquery.{{ entity }} = entity_dataframe.{{ entity }}
{% endfor %}
),

/*
2. If the `created_timestamp_column` has been set, we need to
deduplicate the data first. This is done by calculating the
`MAX(created_at_timestamp)` for each event_timestamp.
We then join the data on the next CTE
*/
{% if featureview.created_timestamp_column %}
{{ featureview.name }}__dedup AS (
SELECT
{{featureview.name}}__entity_row_unique_id,
event_timestamp,
MAX(created_timestamp) as created_timestamp
FROM {{ featureview.name }}__base
GROUP BY {{featureview.name}}__entity_row_unique_id, event_timestamp
),
{% endif %}

/*
3. The data has been filtered during the first CTE "*__base"
Thus we only need to compute the latest timestamp of each feature.
*/
{{ featureview.name }}__latest AS (
/*
3. The data has been filtered during the first CTE "*__base"
Thus we only need to compute the latest timestamp of each feature.
*/
{{ featureview.name }}__latest AS (
SELECT
event_timestamp,
{% if featureview.created_timestamp_column %}created_timestamp,{% endif %}
Expand All @@ -900,13 +914,13 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
{% endif %}
)
WHERE row_number = 1
),
)

/*
4. Once we know the latest value of each feature for a given timestamp,
we can join again the data back to the original "base" dataset
*/
{{ featureview.name }}__cleaned AS (

SELECT base.*
FROM {{ featureview.name }}__base as base
INNER JOIN {{ featureview.name }}__latest
Expand All @@ -917,7 +931,7 @@ def arrow_schema_to_bq_schema(arrow_schema: pyarrow.Schema) -> List[SchemaField]
,created_timestamp
{% endif %}
)
){% if loop.last %}{% else %}, {% endif %}
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

haven't taken a close look, but seems like you removed this line even though there's a loop. Was that intended?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't need to add comma when using temp table. Treat it as a single script separately using semicolon instead

);


{% endfor %}
Expand Down