Skip to content

Adds an experimental one-ahead prefetching utility#1525

Open
peterdsharpe wants to merge 3 commits intoNVIDIA:mainfrom
peterdsharpe:psharpe/add-prefetching-utils
Open

Adds an experimental one-ahead prefetching utility#1525
peterdsharpe wants to merge 3 commits intoNVIDIA:mainfrom
peterdsharpe:psharpe/add-prefetching-utils

Conversation

@peterdsharpe
Copy link
Collaborator

@peterdsharpe peterdsharpe commented Mar 20, 2026

  • Introduced a new prefetch_map function to optimize data loading by overlapping CPU-bound preparation with GPU-bound processing.
  • Updated CachedPreprocessingDataset to raise a ValueError if sample_paths is empty, ensuring robust error handling.
  • Added prefetch_map to the module's exports for accessibility.

PhysicsNeMo Pull Request

Description

Used downstream in GLOBE training on DrivAerML (PR to come; this is just setting the stage in nice chunks).

Checklist

Dependencies

Review Process

All PRs are reviewed by the PhysicsNeMo team before merging.

Depending on which files are changed, GitHub may automatically assign a maintainer for review.

We are also testing AI-based code review tools (e.g., Greptile), which may add automated comments with a confidence score.
This score reflects the AI’s assessment of merge readiness and is not a qualitative judgment of your work, nor is
it an indication that the PR will be accepted / rejected.

AI-generated feedback should be reviewed critically for usefulness.
You are not required to respond to every AI comment, but they are intended to help both authors and reviewers.
Please react to Greptile comments with 👍 or 👎 to provide feedback on their accuracy.

…ssingDataset

- Introduced a new `prefetch_map` function to optimize data loading by overlapping CPU-bound preparation with GPU-bound processing.
- Updated `CachedPreprocessingDataset` to raise a ValueError if `sample_paths` is empty, ensuring robust error handling.
- Added `prefetch_map` to the module's exports for accessibility.
@greptile-apps
Copy link
Contributor

greptile-apps bot commented Mar 20, 2026

Greptile Summary

This PR adds a prefetch_map utility for one-ahead prefetching using a background thread, and strengthens CachedPreprocessingDataset with an early ValueError for empty sample_paths. The implementation is clean and the core logic is correct for the primary use case (fully-consumed training loops).

  • prefetch.py: The one-ahead prefetch logic is correct — fn(item[i+1]) is submitted to the background thread before fn(item[i]) is yielded to the consumer. Empty-iterable handling (StopIteration guard) and single-element handling are both correct.
  • cached_dataset.py: The ValueError guard is placed before any side-effectful operations (cache directory creation, path existence checks), which is the right ordering.
  • __init__.py: prefetch_map is correctly added to both imports and __all__.
  • No tests: No test files accompany these changes. The PR checklist acknowledges this (checkbox unchecked). Tests for prefetch_map edge cases (empty iterable, single-element iterable, early exit, exception propagation from fn) and the new ValueError in CachedPreprocessingDataset would improve confidence.

Important Files Changed

Filename Overview
physicsnemo/experimental/models/globe/utilities/prefetch.py New one-ahead prefetch utility using a single-worker ThreadPoolExecutor. Logic is correct for the common (fully-consumed) case, but cleanup blocks on early exit without documentation.
physicsnemo/experimental/models/globe/utilities/cached_dataset.py Adds a ValueError guard for empty sample_paths in init, placed correctly before any side-effectful operations (cache dir creation). Change is minimal and safe.
physicsnemo/experimental/models/globe/utilities/init.py Adds prefetch_map to the module's public imports and all. Straightforward bookkeeping change with no issues.

Last reviewed commit: "Add prefetch_map uti..."

Comment on lines +51 to +63
with ThreadPoolExecutor(max_workers=1) as pool:
it = iter(iterable)
try:
future = pool.submit(fn, next(it))
except StopIteration:
return

for item in it:
next_future = pool.submit(fn, item)
yield future.result()
future = next_future

yield future.result()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Blocking cleanup on early exit is undocumented

When a consumer does not fully exhaust the iterator (e.g., break, or an exception in the consumer), Python calls generator.close(), which throws GeneratorExit at the suspended yield on line 60. This triggers ThreadPoolExecutor.__exit__, which calls shutdown(wait=True) — blocking until the in-flight next_future completes.

Since fn is documented as a potentially expensive CPU-bound operation (subsampling, geometry precomputation, host-to-device transfer), this can cause a silent and surprising stall at loop teardown time. For example:

for item in prefetch_map(huge_dataloader, expensive_fn):  # fn takes ~5 s per item
    if error_condition:
        break  # silently blocks for up to ~5 s during cleanup

It would be worth adding a note to the docstring warning about this behavior, e.g.:

Notes
-----
If the iterator is not fully consumed (e.g., due to an early ``break`` or
an exception in the caller), cleanup will block until the background
thread's in-flight task completes.

…cumentation

- Updated the `prefetch_map` function to ensure the background thread is properly shut down by using a `finally` block for `pool.shutdown()`, allowing for non-blocking cancellation of in-flight tasks.
- Added notes to the docstring to clarify behavior when the iterator is not fully consumed, improving documentation for better user understanding.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant