Skip to content

Commit 88489d9

Browse files
authored
Cancel BigQuery job if block_until_done call times out or is interrupted (feast-dev#1699)
* Cancel job if to_bigquery is cancelled by user Signed-off-by: Cody Lin <[email protected]> * cancel job in _upload_entity_df_into_bq as well Signed-off-by: Cody Lin <[email protected]> * Fix _is_done logic? Signed-off-by: Cody Lin <[email protected]> * make cancel job code more readable Signed-off-by: Cody Lin <[email protected]> * move KeyboardInterrupt catch outside retry logic; fix retry logic Signed-off-by: Cody Lin <[email protected]> * make block_until_done public; add custom exception for BQJobStillRunning Signed-off-by: Cody Lin <[email protected]> * fix retry logic to catch specific exception Signed-off-by: Cody Lin <[email protected]> * Make retry params configurable; use finally clause to catch more cancellation cases Signed-off-by: Cody Lin <[email protected]> * Modify docstring Signed-off-by: Cody Lin <[email protected]> * Typo in docstring Signed-off-by: Cody Lin <[email protected]> * Fix lint Signed-off-by: Cody Lin <[email protected]>
1 parent 703c4be commit 88489d9

2 files changed

Lines changed: 58 additions & 20 deletions

File tree

sdk/python/feast/errors.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,11 @@ def __init__(self, repo_obj_type: str, specific_issue: str):
157157
)
158158

159159

160+
class BigQueryJobStillRunning(Exception):
161+
def __init__(self, job_id):
162+
super().__init__(f"The BigQuery job with ID '{job_id}' is still running.")
163+
164+
160165
class BigQueryJobCancelled(Exception):
161166
def __init__(self, job_id):
162167
super().__init__(f"The BigQuery job with ID '{job_id}' was cancelled")

sdk/python/feast/infra/offline_stores/bigquery.py

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,15 @@
1010
from pandas import Timestamp
1111
from pydantic import StrictStr
1212
from pydantic.typing import Literal
13-
from tenacity import retry, stop_after_delay, wait_fixed
13+
from tenacity import Retrying, retry_if_exception_type, stop_after_delay, wait_fixed
1414

1515
from feast import errors
1616
from feast.data_source import BigQuerySource, DataSource
17-
from feast.errors import BigQueryJobCancelled, FeastProviderLoginError
17+
from feast.errors import (
18+
BigQueryJobCancelled,
19+
BigQueryJobStillRunning,
20+
FeastProviderLoginError,
21+
)
1822
from feast.feature_view import FeatureView
1923
from feast.infra.offline_stores.offline_store import OfflineStore, RetrievalJob
2024
from feast.infra.provider import (
@@ -249,12 +253,20 @@ def to_sql(self) -> str:
249253
"""
250254
return self.query
251255

252-
def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[str]:
256+
def to_bigquery(
257+
self,
258+
job_config: bigquery.QueryJobConfig = None,
259+
timeout: int = 1800,
260+
retry_cadence: int = 10,
261+
) -> Optional[str]:
253262
"""
254263
Triggers the execution of a historical feature retrieval query and exports the results to a BigQuery table.
264+
Runs for a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes).
255265
256266
Args:
257267
job_config: An optional bigquery.QueryJobConfig to specify options like destination table, dry run, etc.
268+
timeout: An optional number of seconds for setting the time limit of the QueryJob.
269+
retry_cadence: An optional number of seconds for setting how long the job should checked for completion.
258270
259271
Returns:
260272
Returns the destination table name or returns None if job_config.dry_run is True.
@@ -274,10 +286,7 @@ def to_bigquery(self, job_config: bigquery.QueryJobConfig = None) -> Optional[st
274286
)
275287
return None
276288

277-
block_until_done(client=self.client, bq_job=bq_job)
278-
279-
if bq_job.exception():
280-
raise bq_job.exception()
289+
block_until_done(client=self.client, bq_job=bq_job, timeout=timeout)
281290

282291
print(f"Done writing to '{job_config.destination}'.")
283292
return str(job_config.destination)
@@ -286,23 +295,47 @@ def to_arrow(self) -> pyarrow.Table:
286295
return self.client.query(self.query).to_arrow()
287296

288297

289-
def block_until_done(client, bq_job):
290-
def _is_done(job_id):
291-
return client.get_job(job_id).state in ["PENDING", "RUNNING"]
298+
def block_until_done(
299+
client: Client,
300+
bq_job: Union[bigquery.job.query.QueryJob, bigquery.job.load.LoadJob],
301+
timeout: int = 1800,
302+
retry_cadence: int = 10,
303+
):
304+
"""
305+
Waits for bq_job to finish running, up to a maximum amount of time specified by the timeout parameter (defaulting to 30 minutes).
306+
307+
Args:
308+
client: A bigquery.client.Client to monitor the bq_job.
309+
bq_job: The bigquery.job.QueryJob that blocks until done runnning.
310+
timeout: An optional number of seconds for setting the time limit of the job.
311+
retry_cadence: An optional number of seconds for setting how long the job should checked for completion.
312+
313+
Raises:
314+
BigQueryJobStillRunning exception if the function has blocked longer than 30 minutes.
315+
BigQueryJobCancelled exception to signify when that the job has been cancelled (i.e. from timeout or KeyboardInterrupt).
316+
"""
292317

293-
@retry(wait=wait_fixed(10), stop=stop_after_delay(1800), reraise=True)
294318
def _wait_until_done(job_id):
295-
return _is_done(job_id)
319+
if client.get_job(job_id).state in ["PENDING", "RUNNING"]:
320+
raise BigQueryJobStillRunning(job_id=job_id)
296321

297322
job_id = bq_job.job_id
298-
_wait_until_done(job_id=job_id)
323+
try:
324+
retryer = Retrying(
325+
wait=wait_fixed(retry_cadence),
326+
stop=stop_after_delay(timeout),
327+
retry=retry_if_exception_type(BigQueryJobStillRunning),
328+
reraise=True,
329+
)
330+
retryer(_wait_until_done, job_id)
299331

300-
if bq_job.exception():
301-
raise bq_job.exception()
332+
finally:
333+
if client.get_job(job_id).state in ["PENDING", "RUNNING"]:
334+
client.cancel_job(job_id)
335+
raise BigQueryJobCancelled(job_id=job_id)
302336

303-
if not _is_done(job_id):
304-
client.cancel_job(job_id)
305-
raise BigQueryJobCancelled(job_id=job_id)
337+
if bq_job.exception():
338+
raise bq_job.exception()
306339

307340

308341
@dataclass(frozen=True)
@@ -354,7 +387,7 @@ def _upload_entity_df_into_bigquery(
354387

355388
if type(entity_df) is str:
356389
job = client.query(f"CREATE TABLE {table_id} AS ({entity_df})")
357-
job.result()
390+
block_until_done(client, job)
358391
elif isinstance(entity_df, pandas.DataFrame):
359392
# Drop the index so that we dont have unnecessary columns
360393
entity_df.reset_index(drop=True, inplace=True)
@@ -364,7 +397,7 @@ def _upload_entity_df_into_bigquery(
364397
job = client.load_table_from_dataframe(
365398
entity_df, table_id, job_config=job_config
366399
)
367-
job.result()
400+
block_until_done(client, job)
368401
else:
369402
raise ValueError(
370403
f"The entity dataframe you have provided must be a Pandas DataFrame or BigQuery SQL query, "

0 commit comments

Comments
 (0)