Skip to content

Conversation

@julian-elastic
Copy link
Contributor

@julian-elastic julian-elastic commented Dec 11, 2025

This PR is a refactoring step towards Init Lookup Driver Just once.

Currently, AbstractLookupService::doLookup() creates operators directly from the request and input page. Since operators are tightly coupled with the driver and input pages, they cannot be reused across multiple pages. We plan to add local logical and physical planning. However, we cannot do that per page as it would add too much overhead. We need to perform planning once during session initialization rather than for every page. This PR takes a step in that direction by generating a physical plan first that can be shared across multiple pages. Main changes include:
1. Refactor AbstractLookupService::doLookup(). Instead of creating operators directly, we now create PhysicalPlan. We then covert PhysicalPlan ->Operator Factories-> Operators.
This separation allows the PhysicalPlan to be generated once and cached in a future PR, since it doesn't depend on the input page data.
2. QueryLists are no longer dependent on a particular page and stateless in terms of page contents. They use channelOffset instead of blocks. QueryLists are to be created during planning (before we have input pages), so they can no longer store blocks directly. Instead, each QueryList stores a channelOffset (the index of the block within a page). Since the page structure is consistent across all pages in a session, the channelOffset remains the same. At runtime, when getQuery() is called, the QueryList extracts the appropriate block from the current page using inputPage.getBlock(channelOffset)
3. New Physical Plan Nodes - LookupDropMergeExec and ParameterizedQueryExec
4. New LookupExecutionMapper - converts a Physical Plan to Operators for the lookup node, handles dictionary encoding optimization for Enrich (and possibly lookup join in the future).
5. Add unit tests for the refactor
6. Add LookupJoinIT - allow for fast debugging of failing csv-spec it tests by only loading the required indexes

@elasticsearchmachine
Copy link
Collaborator

Hi @julian-elastic, I've created a changelog YAML for you.

@julian-elastic julian-elastic marked this pull request as ready for review December 15, 2025 21:38
@elasticsearchmachine elasticsearchmachine added the Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) label Dec 15, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

@elasticsearchmachine
Copy link
Collaborator

Hi @julian-elastic, I've created a changelog YAML for you.

Copy link
Contributor

@alex-spies alex-spies left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a first pass. Thanks a lot @julian-elastic , this is going in the right direction and the AbstractLookupService#doLookup method is starting to look like a nice planning pipeline.

However, the main thing I noticed is that the physical plan abstraction is a bit broken in places, because we include operator-level concepts into the physical plans. I left some remarks about that.

As I continued reviewing more, I realized that some of the complexity and some of the places where the border between physical plan and operator is crossed, are probably due to AbstractLookupService being used both for enrich and lookup join.

In particular, LookupMergeDropExec confused me and is crossing the border into operator territory (see below). However, I think we can simplify that if we separate the code paths for enrich and lookup join. If I understand correctly, LookupMergeDropExec always maps to a project operator for lookup joins because for joins, we have mergePages = false.

If you agree with that (please see if you can confirm my observation - it's late in my day and I might have missed something), then my comments below are a bit outdated and the conclusion is rather that we shouldn't have LookupMergeDropExec at all, and the doLookup method should be refactored for the lookup join case, only, leaving the code for enrich alone for the most part.

It's perfectly fine to have the code for enrich and lookup join diverge, even to the point that we get rid of AbstractLookupService altogether and only have the separate EnrichLookupService and LookupFomrIndexService. The code paths will necessarily diverge as we continue improving the planning and execution of lookup joins.

}
Class<?> argClass = (Class<?>) argType;

// Handle array types - can't mock arrays, so create them directly
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that our query plans never really use arrays, but generally lists.

consistency nit: any reason to use arrays in LookupMergeDropExec (which necessitates this test change)? (Except that it's admittedly weird to not use int[].)

public class LookupMergeDropExec extends UnaryExec {
private final List<NamedExpression> extractFields;
private final ElementType[] mergingTypes;
private final int[] mergingChannels;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Other PhysicalPlans are not aware of channels - channels are an operator concept. (I know, in other query planning frameworks, the physical plans would already be aware of the physical layout of columnar data.)

Normally, we'd use attributes to refer to specific columns of the physical plan. Can't we do the same here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is the attributes are different in subsequent requests I think (they would have different id due to deserialization). Finding them by name might lead to performance issues. Any ideas how to go around those? Do you still think it is a bad idea to go with channels for now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we only use mergingChannels in the mergePages == true case, that is, for ENRICH. Same for mergingTypes.

I think separating the enrich and lookup join code paths should get rid of this altogether, because we won't need them?

Comment on lines +28 to +29
* This handles either merging multiple result pages into one (via MergePositionsOperator)
* or dropping the doc block (via ProjectOperator).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is confusing. The MergePositionsOperator and ProjectOperator are quite different. Shouldn't/can't they be represented using different physical plans?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we decide which one to use dynamically depending on the page contents and the optimization value. So for one page we might use MergePositionsOperator, for the other ProjectOperator. We cannot make that decision at Physical Planning because we don't have the page contents yet. The decision has to be made during Execution planning.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think MergePositionOperator is only required for ENRICH, when we merge multiple matches into multivalues, no?

Looking through the code, this depends on mergePages, which is set during instantiation of the AbstractLookupService. For lookup join, we instantiate it with mergePages == false. Which means that the LookupMergeDropExec could just be a ProjectExec every time, no?

/**
* Physical plan node representing a lookup source operation.
* This represents the source of a lookup query before conversion to operators.
* The QueryList is created during physical plan creation and will receive the Block at runtime.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is "the Block" in this context? Can we extend this javadoc, maybe also add an example?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will add better comments. It is the Page that is passed in doQuery() that is sent at runtime.

Releasables.wrap(shardContext.release, localBreaker)

// Phase 2: Build PhysicalOperation, a factory for Operators needed
PhysicalOperation physicalOperation = executionMapper.buildOperatorFactories(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In LocalExecutionPlanner, there's a little more machinery: we're building a LocalExecutionPlan there, which has driver factories, which in turn have driver suppliers, which then contain physical operations.

That doesn't apply to us because that machinery is aimed at building drivers based on shards, correct?

AliasFilter aliasFilter,
Block block,
@Nullable DataType inputDataType
int channelOffset,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The query list requires the offset, which makes it pretty much operator level. But ParameterizedQueryExec contains a query list, which means we need to think about channels to make a ParameterizedQueryExec.

Is there a way to postpone the instantiation of the query list to the point where we perform the mapping to operators? Or another way to abstract so that can forget about channels until we hit the actual LookupExecutionMapper?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the idea was that we do as much work as possible during Logical and Physical planning so it is done once and shared across requests. Also during the instantiation we decide what filters to push to Lucene and what to apply after. You need the filters to apply after for your Local Logical/Physical Optimization Rules. I am open to ideas though

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but the idea was that we do as much work as possible during Logical and Physical planning so it is done once and shared across requests.

We're on the same page here! Assuming in the future, we'll be at a point where we stream pages to the driver(s), there should be an operator that has a QueryList, which has a channel offset and is re-used for each incoming page. 100%!

What I'm criticizing: This means QueryList is something that should exist at operator level, not inside a physical plan node. The ParameterizedQueryExec should not have to know anything about channels - it's the mapper's job to determine which input channel to use.

If we start making physical plans aware of channels, I'm pretty sure we'll end up with some weirdnesses that will start to proliferate and require workarounds. For instance, I won't be able to rewrite the query plan upstream from ParameterizedQueryExec (injecting an EvalExec, for instance, or projecting away a column that we realize we no longer need) without having to also update ParameterizedQueryExec because the upstream channels may change. This means that suddenly, the lookup physical optimizer will need to be aware of channels, which are supposed to be still abstracted away at this point :(

It's actually worse than that, because the query list not only is channel-aware, but it also has an alias filter which requires the indices service. It has shard contexts. And a Warnings object!

All of that is something that an operator would have, but not a physical plan.

In practical terms, I think ParameterizedQueryExec shouldn't contain a QueryList, but instead all the things that the lookup mapper needs to create the query list from.

}
releasables.add(finishPages);

var warnings = Warnings.createWarnings(
Copy link
Contributor

@alex-spies alex-spies Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, do we need to create the warnings object in multiple places? There's another one that we make in createLookupPhysicalPlan.

Maybe we want to limit it to one warnings object that we pass around - or, even better, can we avoid needing the warnings object in createLookupPhysicalPlan if we manage to separate physical stuff out further from the query plan? This'd be the first time that a physical plan internally carries around a warnings object. Creating warnings is normally something that happens during runtime.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will look into refactoring. It works right now though, the CSV tests pass with all warnings correct even though it is created in multiple places.

);

// Phase 1: Physical Planning
LookupShardContext shardContext = lookupShardContextFactory.create(request.shardId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The fact that we need a shard context to create the physical plan also looks like operator-level objects seep into the planning layer. I'd expect shard contexts to only be needed once we map to operators.

If you compare with e.g. EsQueryExec, the physical plan that corresponds to running a Lucene query, that one is completely oblivious of shards.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will need refactoring. I will work on it.

* because the list will never grow mega large.
*/
// Determine optimization state
BlockOptimization blockOptimization = executionMapper.determineOptimization(request.inputPage);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do the dictionary/range block optimizations apply to lookup joins? Or just to enrich?

As it stands, the actual physical operators depends on the page that we perform the lookup for. Being per-page, this check would normally be inside an operator, not in the mapping stage.

Update: I see that mergePages is the differentiator between enrich and lookup join - being constantly false for the latter. So, for lookup joins, this step doesn't matter and we don't actually have to look into each page.

Since enrich doesn't need a proper planning stage, only lookup joins: how about we separate the code for the both more? Enforcing so much code share will be increasingly hard as we're re-architecting lookup join's planning and execution flow. If needed, they honestly don't even have to inherit from the same super class. That was useful a while ago, because lookup join re-used a ton of enrich code. But that's bound to change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now dictionary/range optimization just apply to enrich. But I would like to take advantage of them for Lookup Join too in the future. So I did not want to rip this code out completely, and then have to put it back in when we want to apply this for optimization for Lookup Join too.

Copy link
Contributor

@alex-spies alex-spies Dec 22, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When we apply the dictionary/range optimizations, I'm pretty sure the code won't be transferable, anyway, because for ENRICH, the matching values get stuffed into multivalues, while for lookup join, multiple matching values become multiple new rows.

Let's separate the code paths. I think this refactor is currently quite complicated because it also refactors ENRICH code that's supposed to stay the way it is.

I think in the mergePages == false case, a lot of things simplify.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) >tech debt v9.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants