forked from alibaba/AliSQL
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcomposite_iterators.cc
More file actions
2290 lines (2033 loc) · 80.4 KB
/
composite_iterators.cc
File metadata and controls
2290 lines (2033 loc) · 80.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
/* Copyright (c) 2018, 2025, Oracle and/or its affiliates.
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License, version 2.0,
as published by the Free Software Foundation.
This program is designed to work with certain software (including
but not limited to OpenSSL) that is licensed under separate terms,
as designated in a particular file or component or in included license
documentation. The authors of MySQL hereby grant you an additional
permission to link the program and your derivative works with the
separately licensed software that they have either included with
the program or referenced in the documentation.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License, version 2.0, for more details.
You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
#include "sql/iterators/composite_iterators.h"
#include <limits.h>
#include <string.h>
#include <atomic>
#include <list>
#include <string>
#include <vector>
#include "extra/xxhash/my_xxhash.h"
#include "field_types.h"
#include "mem_root_deque.h"
#include "my_dbug.h"
#include "my_inttypes.h"
#include "my_sys.h"
#include "mysqld_error.h"
#include "prealloced_array.h"
#include "scope_guard.h"
#include "sql/debug_sync.h"
#include "sql/error_handler.h"
#include "sql/field.h"
#include "sql/handler.h"
#include "sql/item.h"
#include "sql/item_func.h"
#include "sql/item_sum.h"
#include "sql/iterators/basic_row_iterators.h"
#include "sql/iterators/timing_iterator.h"
#include "sql/join_optimizer/access_path.h"
#include "sql/join_optimizer/materialize_path_parameters.h"
#include "sql/key.h"
#include "sql/opt_trace.h"
#include "sql/opt_trace_context.h"
#include "sql/pfs_batch_mode.h"
#include "sql/sql_base.h"
#include "sql/sql_class.h"
#include "sql/sql_executor.h"
#include "sql/sql_lex.h"
#include "sql/sql_list.h"
#include "sql/sql_optimizer.h"
#include "sql/sql_show.h"
#include "sql/sql_tmp_table.h"
#include "sql/table.h"
#include "sql/table_function.h" // Table_function
#include "sql/temp_table_param.h"
#include "sql/window.h"
#include "template_utils.h"
using pack_rows::TableCollection;
using std::any_of;
using std::string;
using std::swap;
using std::vector;
int FilterIterator::Read() {
for (;;) {
int err = m_source->Read();
if (err != 0) return err;
bool matched = m_condition->val_int();
if (thd()->killed) {
thd()->send_kill_message();
return 1;
}
/* check for errors evaluating the condition */
if (thd()->is_error()) return 1;
if (!matched) {
m_source->UnlockRow();
continue;
}
// Successful row.
return 0;
}
}
bool LimitOffsetIterator::Init() {
if (m_source->Init()) {
return true;
}
if (m_offset > 0) {
m_seen_rows = m_limit;
m_needs_offset = true;
} else {
m_seen_rows = 0;
m_needs_offset = false;
}
return false;
}
int LimitOffsetIterator::Read() {
if (m_seen_rows >= m_limit) {
// We either have hit our LIMIT, or we need to skip OFFSET rows.
// Check which one.
if (m_needs_offset) {
// We skip OFFSET rows here and not in Init(), since performance schema
// batch mode may not be set up by the executor before the first Read().
// This makes sure that
//
// a) we get the performance benefits of batch mode even when reading
// OFFSET rows, and
// b) we don't inadvertedly enable batch mode (e.g. through the
// NestedLoopIterator) during Init(), since the executor may not
// be ready to _disable_ it if it gets an error before first Read().
for (ha_rows row_idx = 0; row_idx < m_offset; ++row_idx) {
int err = m_source->Read();
if (err != 0) {
// Note that we'll go back into this loop if Init() is called again,
// and return the same error/EOF status.
return err;
}
if (m_skipped_rows != nullptr) {
++*m_skipped_rows;
}
m_source->UnlockRow();
}
m_seen_rows = m_offset;
m_needs_offset = false;
// Fall through to LIMIT testing.
}
if (m_seen_rows >= m_limit) {
// We really hit LIMIT (or hit LIMIT immediately after OFFSET finished),
// so EOF.
if (m_count_all_rows) {
// Count rows until the end or error (ignore the error if any).
while (m_source->Read() == 0) {
++*m_skipped_rows;
}
}
return -1;
}
}
const int result = m_source->Read();
if (m_reject_multiple_rows) {
if (result != 0) {
++m_seen_rows;
return result;
}
// We read a row. Check for scalar subquery cardinality violation
if (m_seen_rows - m_offset > 0) {
my_error(ER_SUBQUERY_NO_1_ROW, MYF(0));
return 1;
}
}
++m_seen_rows;
return result;
}
AggregateIterator::AggregateIterator(
THD *thd, unique_ptr_destroy_only<RowIterator> source, JOIN *join,
TableCollection tables, bool rollup)
: RowIterator(thd),
m_source(std::move(source)),
m_join(join),
m_rollup(rollup),
m_tables(std::move(tables)) {
const size_t upper_data_length = ComputeRowSizeUpperBound(m_tables);
m_first_row_this_group.reserve(upper_data_length);
m_first_row_next_group.reserve(upper_data_length);
}
bool AggregateIterator::Init() {
assert(!m_join->tmp_table_param.precomputed_group_by);
// Disable any leftover rollup items used in children.
m_current_rollup_position = -1;
SetRollupLevel(INT_MAX);
// If the iterator has been executed before, restore the state of
// the table buffers. This is needed for correctness if there is an
// EQRefIterator below this iterator, as the restoring of the
// previous group in Read() may have disturbed the cache in
// EQRefIterator.
if (!m_first_row_next_group.is_empty()) {
LoadIntoTableBuffers(
m_tables, pointer_cast<const uchar *>(m_first_row_next_group.ptr()));
m_first_row_next_group.length(0);
}
if (m_source->Init()) {
return true;
}
// If we have a HAVING after us, it needs to be evaluated within the context
// of the slice we're in (unless we're in the hypergraph optimizer, which
// doesn't use slices). However, we might have a sort before us, and
// SortingIterator doesn't set the slice except on Init(); it just keeps
// whatever was already set. When there is a temporary table after the HAVING,
// the slice coming from there might be wrongly set on Read(), and thus,
// we need to properly restore it before returning any rows.
//
// This is a hack. It would be good to get rid of the slice system altogether
// (the hypergraph join optimizer does not use it).
if (!(m_join->implicit_grouping || m_join->group_optimized_away) &&
!thd()->lex->using_hypergraph_optimizer()) {
m_output_slice = m_join->get_ref_item_slice();
}
m_seen_eof = false;
m_save_nullinfo = 0;
// Not really used, just to be sure.
m_last_unchanged_group_item_idx = 0;
m_state = READING_FIRST_ROW;
return false;
}
int AggregateIterator::Read() {
switch (m_state) {
case READING_FIRST_ROW: {
// Start the first group, if possible. (If we're not at the first row,
// we already saw the first row in the new group at the previous Read().)
int err = m_source->Read();
if (err == -1) {
m_seen_eof = true;
m_state = DONE_OUTPUTTING_ROWS;
if (m_join->grouped || m_join->group_optimized_away) {
SetRollupLevel(m_join->send_group_parts);
return -1;
} else {
// If there's no GROUP BY, we need to output a row even if there are
// no input rows.
// Calculate aggregate functions for no rows
for (Item *item : *m_join->get_current_fields()) {
if (!item->hidden ||
(item->type() == Item::SUM_FUNC_ITEM &&
down_cast<Item_sum *>(item)->aggr_query_block ==
m_join->query_block)) {
item->no_rows_in_result();
}
}
/*
Mark tables as containing only NULL values for ha_write_row().
Calculate a set of tables for which NULL values need to
be restored after sending data.
*/
if (m_join->clear_fields(&m_save_nullinfo)) {
return 1;
}
for (Item_sum **item = m_join->sum_funcs; *item != nullptr; ++item) {
(*item)->clear();
}
if (m_output_slice != -1) {
m_join->set_ref_item_slice(m_output_slice);
}
return 0;
}
}
if (err != 0) return err;
// Set the initial value of the group fields.
(void)update_item_cache_if_changed(m_join->group_fields);
StoreFromTableBuffers(m_tables, &m_first_row_next_group);
m_last_unchanged_group_item_idx = 0;
}
[[fallthrough]];
case LAST_ROW_STARTED_NEW_GROUP:
SetRollupLevel(m_join->send_group_parts);
// We don't need m_first_row_this_group for the old group anymore,
// but we'd like to reuse its buffer, so swap instead of std::move.
// (Testing for state == READING_FIRST_ROW and avoiding the swap
// doesn't seem to give any speed gains.)
swap(m_first_row_this_group, m_first_row_next_group);
LoadIntoTableBuffers(
m_tables, pointer_cast<const uchar *>(m_first_row_this_group.ptr()));
for (Item_sum **item = m_join->sum_funcs; *item != nullptr; ++item) {
if (m_rollup) {
if (down_cast<Item_rollup_sum_switcher *>(*item)
->reset_and_add_for_rollup(m_last_unchanged_group_item_idx))
return true;
} else {
if ((*item)->reset_and_add()) return true;
}
}
// Keep reading rows as long as they are part of the existing group.
for (;;) {
int err = m_source->Read();
if (err == 1) return 1; // Error.
if (err == -1) {
m_seen_eof = true;
// We need to be able to restore the table buffers in Init()
// if the iterator is reexecuted (can happen if it's inside
// a correlated subquery).
StoreFromTableBuffers(m_tables, &m_first_row_next_group);
// End of input rows; return the last group. (One would think this
// LoadIntoTableBuffers() call is unneeded, since the last row read
// would be from the last group, but there may be filters in-between
// us and whatever put data into the row buffers, and those filters
// may have caused other rows to be loaded before discarding them.)
LoadIntoTableBuffers(m_tables, pointer_cast<const uchar *>(
m_first_row_this_group.ptr()));
if (m_rollup && m_join->send_group_parts > 0) {
// Also output the final groups, including the total row
// (with NULLs in all fields).
SetRollupLevel(m_join->send_group_parts);
m_last_unchanged_group_item_idx = 0;
m_state = OUTPUTTING_ROLLUP_ROWS;
} else {
SetRollupLevel(m_join->send_group_parts);
m_state = DONE_OUTPUTTING_ROWS;
}
if (m_output_slice != -1) {
m_join->set_ref_item_slice(m_output_slice);
}
return 0;
}
int first_changed_idx =
update_item_cache_if_changed(m_join->group_fields);
if (first_changed_idx >= 0) {
// The group changed. Store the new row (we can't really use it yet;
// next Read() will deal with it), then load back the group values
// so that we can output a row for the current group.
// NOTE: This does not save and restore FTS information,
// so evaluating MATCH() on these rows may give the wrong result.
// (Storing the row ID and repositioning it with ha_rnd_pos()
// would, but we can't do the latter without disturbing
// ongoing scans. See bug #32565923.) For the old join optimizer,
// we generally solve this by inserting temporary tables or sorts
// (both of which restore the information correctly); for the
// hypergraph join optimizer, we add a special streaming step
// for MATCH columns.
StoreFromTableBuffers(m_tables, &m_first_row_next_group);
LoadIntoTableBuffers(m_tables, pointer_cast<const uchar *>(
m_first_row_this_group.ptr()));
// If we have rollup, we may need to output more than one row.
// Mark so that the next calls to Read() will return those rows.
//
// NOTE: first_changed_idx is the first group value that _changed_,
// while what we store is the last item that did _not_ change.
if (m_rollup) {
m_last_unchanged_group_item_idx = first_changed_idx + 1;
if (static_cast<unsigned>(first_changed_idx) <
m_join->send_group_parts - 1) {
SetRollupLevel(m_join->send_group_parts);
m_state = OUTPUTTING_ROLLUP_ROWS;
} else {
SetRollupLevel(m_join->send_group_parts);
m_state = LAST_ROW_STARTED_NEW_GROUP;
}
} else {
m_last_unchanged_group_item_idx = 0;
m_state = LAST_ROW_STARTED_NEW_GROUP;
}
if (m_output_slice != -1) {
m_join->set_ref_item_slice(m_output_slice);
}
return 0;
}
// Give the new values to all the new aggregate functions.
for (Item_sum **item = m_join->sum_funcs; *item != nullptr; ++item) {
if (m_rollup) {
if (down_cast<Item_rollup_sum_switcher *>(*item)
->aggregator_add_all()) {
return 1;
}
} else {
if ((*item)->aggregator_add()) {
return 1;
}
}
}
// We're still in the same group, so just loop back.
}
case OUTPUTTING_ROLLUP_ROWS:
SetRollupLevel(m_current_rollup_position - 1);
if (m_current_rollup_position <= m_last_unchanged_group_item_idx) {
// Done outputting rollup rows; on next Read() call, deal with the new
// group instead.
if (m_seen_eof) {
m_state = DONE_OUTPUTTING_ROWS;
} else {
m_state = LAST_ROW_STARTED_NEW_GROUP;
}
}
if (m_output_slice != -1) {
m_join->set_ref_item_slice(m_output_slice);
}
return 0;
case DONE_OUTPUTTING_ROWS:
if (m_save_nullinfo != 0) {
m_join->restore_fields(m_save_nullinfo);
m_save_nullinfo = 0;
}
SetRollupLevel(INT_MAX); // Higher-level iterators up above should not
// activate any rollup.
return -1;
}
assert(false);
return 1;
}
void AggregateIterator::SetRollupLevel(int level) {
if (m_rollup && m_current_rollup_position != level) {
m_current_rollup_position = level;
for (Item_rollup_group_item *item : m_join->rollup_group_items) {
item->set_current_rollup_level(level);
}
for (Item_rollup_sum_switcher *item : m_join->rollup_sums) {
item->set_current_rollup_level(level);
}
}
}
bool NestedLoopIterator::Init() {
if (m_source_outer->Init()) {
return true;
}
m_state = NEEDS_OUTER_ROW;
if (m_pfs_batch_mode) {
m_source_inner->EndPSIBatchModeIfStarted();
}
return false;
}
int NestedLoopIterator::Read() {
if (m_state == END_OF_ROWS) {
return -1;
}
for (;;) { // Termination condition within loop.
if (m_state == NEEDS_OUTER_ROW) {
int err = m_source_outer->Read();
if (err == 1) {
return 1; // Error.
}
if (err == -1) {
m_state = END_OF_ROWS;
return -1;
}
if (m_pfs_batch_mode) {
m_source_inner->StartPSIBatchMode();
}
// Init() could read the NULL row flags (e.g., when building a hash
// table), so unset them before instead of after.
m_source_inner->SetNullRowFlag(false);
if (m_source_inner->Init()) {
return 1;
}
m_state = READING_FIRST_INNER_ROW;
}
assert(m_state == READING_INNER_ROWS || m_state == READING_FIRST_INNER_ROW);
int err = m_source_inner->Read();
if (err != 0 && m_pfs_batch_mode) {
m_source_inner->EndPSIBatchModeIfStarted();
}
if (err == 1) {
return 1; // Error.
}
if (thd()->killed) { // Aborted by user.
thd()->send_kill_message();
return 1;
}
if (err == -1) {
// Out of inner rows for this outer row. If we are an outer join
// and never found any inner rows, return a null-complemented row.
// If not, skip that and go straight to reading a new outer row.
if ((m_join_type == JoinType::OUTER &&
m_state == READING_FIRST_INNER_ROW) ||
m_join_type == JoinType::ANTI) {
m_source_inner->SetNullRowFlag(true);
m_state = NEEDS_OUTER_ROW;
return 0;
} else {
m_state = NEEDS_OUTER_ROW;
continue;
}
}
// An inner row has been found.
if (m_join_type == JoinType::ANTI) {
// Anti-joins should stop scanning the inner side as soon as we see
// a row, without returning that row.
m_state = NEEDS_OUTER_ROW;
continue;
}
// We have a new row. Semijoins should stop after the first row;
// regular joins (inner and outer) should go on to scan the rest.
if (m_join_type == JoinType::SEMI) {
m_state = NEEDS_OUTER_ROW;
} else {
m_state = READING_INNER_ROWS;
}
return 0;
}
}
/**
This is a no-op class with a public interface identical to that of the
IteratorProfilerImpl class. This allows iterators with internal time
keeping (such as MaterializeIterator) to use the same code whether
time keeping is enabled or not. And all the mutators are inlinable no-ops,
so that there should be no runtime overhead.
*/
class DummyIteratorProfiler final : public IteratorProfiler {
public:
struct TimeStamp {};
static TimeStamp Now() { return TimeStamp(); }
double GetFirstRowMs() const override {
assert(false);
return 0.0;
}
double GetLastRowMs() const override {
assert(false);
return 0.0;
}
uint64_t GetNumInitCalls() const override {
assert(false);
return 0;
}
uint64_t GetNumRows() const override {
assert(false);
return 0;
}
/*
The methods below are non-virtual with the same name and signature as
in IteratorProfilerImpl. The compiler should thus be able to suppress
calls to these for iterators without profiling.
*/
void StopInit([[maybe_unused]] TimeStamp start_time) {}
void IncrementNumRows([[maybe_unused]] uint64_t materialized_rows) {}
void StopRead([[maybe_unused]] TimeStamp start_time,
[[maybe_unused]] bool read_ok) {}
};
/**
Handles materialization; the first call to Init() will scan the given iterator
to the end, store the results in a temporary table (optionally with
deduplication), and then Read() will allow you to read that table repeatedly
without the cost of executing the given subquery many times (unless you ask
for rematerialization).
When materializing, MaterializeIterator takes care of evaluating any items
that need so, and storing the results in the fields of the outgoing table --
which items is governed by the temporary table parameters.
Conceptually (although not performance-wise!), the MaterializeIterator is a
no-op if you don't ask for deduplication, and in some cases (e.g. when
scanning a table only once), we elide it. However, it's not necessarily
straightforward to do so by just not inserting the iterator, as the optimizer
will have set up everything (e.g., read sets, or what table upstream items
will read from) assuming the materialization will happen, so the realistic
option is setting up everything as if materialization would happen but not
actually write to the table; see StreamingIterator for details.
MaterializeIterator conceptually materializes iterators, not JOINs or
Query_expressions. However, there are many details that leak out
(e.g., setting performance schema batch mode, slices, reusing CTEs,
etc.), so we need to send them in anyway.
'Profiler' should be 'IteratorProfilerImpl' for 'EXPLAIN ANALYZE' and
'DummyIteratorProfiler' otherwise. It is implemented as a a template
parameter rather than a pointer to a base class in order to minimize
the impact this probe has on normal query execution.
*/
template <typename Profiler>
class MaterializeIterator final : public TableRowIterator {
public:
/**
@param thd Thread handler.
@param query_blocks_to_materialize List of query blocks to materialize.
@param path_params MaterializePath settings.
@param table_iterator Iterator used for scanning the temporary table
after materialization.
@param join
When materializing within the same JOIN (e.g., into a temporary table
before sorting), as opposed to a derived table or a CTE, we may need
to change the slice on the join before returning rows from the result
table. If so, join and ref_slice would need to be set, and
query_blocks_to_materialize should contain only one member, with the same
join.
*/
MaterializeIterator(THD *thd,
Mem_root_array<materialize_iterator::QueryBlock>
query_blocks_to_materialize,
const MaterializePathParameters *path_params,
unique_ptr_destroy_only<RowIterator> table_iterator,
JOIN *join);
bool Init() override;
int Read() override;
void SetNullRowFlag(bool is_null_row) override {
m_table_iterator->SetNullRowFlag(is_null_row);
}
void StartPSIBatchMode() override { m_table_iterator->StartPSIBatchMode(); }
void EndPSIBatchModeIfStarted() override;
// The temporary table is private to us, so there's no need to worry about
// locks to other transactions.
void UnlockRow() override {}
const IteratorProfiler *GetProfiler() const override {
assert(thd()->lex->is_explain_analyze);
return &m_profiler;
}
const Profiler *GetTableIterProfiler() const {
return &m_table_iter_profiler;
}
private:
Mem_root_array<materialize_iterator::QueryBlock>
m_query_blocks_to_materialize;
unique_ptr_destroy_only<RowIterator> m_table_iterator;
/// If we are materializing a CTE, points to it (otherwise nullptr).
/// Used so that we see if some other iterator already materialized the table,
/// avoiding duplicate work.
Common_table_expr *m_cte;
/// The query expression we are materializing. For derived tables,
/// we materialize the entire query expression; for materialization within
/// a query expression (e.g. for sorting or for windowing functions),
/// we materialize only parts of it. Used to clear correlated CTEs within
/// the unit when we rematerialize, since they depend on values from
/// outside the query expression, and those values may have changed
/// since last materialization.
Query_expression *m_query_expression;
/// See constructor.
JOIN *const m_join;
/// The slice to set when accessing temporary table; used if anything upstream
/// (e.g. WHERE, HAVING) wants to evaluate values based on its contents.
/// See constructor.
const int m_ref_slice;
/// If true, we need to materialize anew for each Init() (because the contents
/// of the table will depend on some outer non-constant value).
const bool m_rematerialize;
/// See constructor.
const bool m_reject_multiple_rows;
/// See constructor.
const ha_rows m_limit_rows;
struct Invalidator {
const CacheInvalidatorIterator *iterator;
int64_t generation_at_last_materialize;
};
Mem_root_array<Invalidator> m_invalidators;
/**
Profiling data for this iterator. Used for 'EXPLAIN ANALYZE'.
Note that MaterializeIterator merely (re)materializes a set of rows.
It delegates the task of iterating over those rows to m_table_iterator.
m_profiler thus records:
- The total number of rows materialized (for the initial
materialization and any subsequent rematerialization).
- The total time spent on all materializations.
It does not measure the time spent accessing the materialized rows.
That is handled by m_table_iter_profiler. The example below illustrates
what 'EXPLAIN ANALYZE' output will be like. (Cost-data has been removed
for the sake of simplicity.) The second line represents the
MaterializeIterator that materializes x1, and the first line represents
m_table_iterator, which is a TableScanIterator in this example.
-> Table scan on x1 (actual time=t1..t2 rows=r1 loops=l1)
-> Materialize CTE x1 if needed (actual time=t3..t4 rows=r2 loops=l2)
t3 is the average time (across l2 materializations) spent materializing x1.
Since MaterializeIterator does no iteration, we always set t3=t4.
'actual time' is cumulative, so that the values for an iterator should
include the time spent in all its descendants. Therefore we know that
t1*l1>=t3*l2 . (Note that t1 may be smaller than t3. We may re-scan x1
repeatedly without rematerializing it. Restarting a scan is quick, bringing
the average time for fetching the first row (t1) down.)
*/
Profiler m_profiler;
/**
Profiling data for m_table_iterator. 'this' is a descendant of
m_table_iterator in 'EXPLAIN ANALYZE' output, and 'elapsed time'
should be cumulative. Therefore, m_table_iter_profiler will measure
the sum of the time spent materializing the result rows and iterating
over those rows.
*/
Profiler m_table_iter_profiler;
/// Whether we are deduplicating using a hash field on the temporary
/// table. (This condition mirrors check_unique_constraint().)
/// If so, we compute a hash value for every row, look up all rows with
/// the same hash and manually compare them to the row we are trying to
/// insert.
///
/// Note that this is _not_ the common way of deduplicating as we go.
/// The common method is to have a regular index on the table
/// over the right columns, and in that case, ha_write_row() will fail
/// with an ignorable error, so that the row is ignored even though
/// check_unique_constraint() is not called. However, B-tree indexes
/// have limitations, in particular on length, that sometimes require us
/// to do this instead. See create_tmp_table() for details.
bool doing_hash_deduplication() const { return table()->hash_field; }
/// Whether we are deduplicating, whether through a hash field
/// or a regular unique index.
bool doing_deduplication() const;
bool MaterializeRecursive();
bool MaterializeQueryBlock(
const materialize_iterator::QueryBlock &query_block,
ha_rows *stored_rows);
};
template <typename Profiler>
MaterializeIterator<Profiler>::MaterializeIterator(
THD *thd,
Mem_root_array<materialize_iterator::QueryBlock>
query_blocks_to_materialize,
const MaterializePathParameters *path_params,
unique_ptr_destroy_only<RowIterator> table_iterator, JOIN *join)
: TableRowIterator(thd, path_params->table),
m_query_blocks_to_materialize(std::move(query_blocks_to_materialize)),
m_table_iterator(std::move(table_iterator)),
m_cte(path_params->cte),
m_query_expression(path_params->unit),
m_join(join),
m_ref_slice(path_params->ref_slice),
m_rematerialize(path_params->rematerialize),
m_reject_multiple_rows(path_params->reject_multiple_rows),
m_limit_rows(path_params->limit_rows),
m_invalidators(thd->mem_root) {
assert(m_limit_rows == HA_POS_ERROR /* EXCEPT, INTERCEPT */ ||
path_params->table->is_union_or_table());
if (m_ref_slice != -1) {
assert(m_join != nullptr);
}
if (m_join != nullptr) {
assert(m_query_blocks_to_materialize.size() == 1);
assert(m_query_blocks_to_materialize[0].join == m_join);
}
if (path_params->invalidators != nullptr) {
for (const AccessPath *invalidator_path : *path_params->invalidators) {
// We create iterators left-to-right, so we should have created the
// invalidators before this.
assert(invalidator_path->iterator != nullptr);
/*
Add a cache invalidator that must be checked on every Init().
If its generation has increased since last materialize, we need to
rematerialize even if m_rematerialize is false.
*/
m_invalidators.push_back(
Invalidator{down_cast<CacheInvalidatorIterator *>(
invalidator_path->iterator->real_iterator()),
/*generation_at_last_materialize=*/-1});
// If we're invalidated, the join also needs to invalidate all of its
// own materialization operations, but it will automatically do so by
// virtue of the Query_block being marked as uncachable
// (create_iterators() always sets rematerialize=true for such cases).
}
}
}
template <typename Profiler>
bool MaterializeIterator<Profiler>::Init() {
const typename Profiler::TimeStamp start_time = Profiler::Now();
if (!table()->materialized && table()->pos_in_table_list != nullptr &&
table()->pos_in_table_list->is_view_or_derived()) {
// Create the table if it's the very first time.
//
// TODO(sgunders): create_materialized_table() calls
// instantiate_tmp_table(), and then has some logic to deal with more
// complicated cases like multiple reference to the same CTE.
// Consider unifying this with the instantiate_tmp_table() case below
// (which is used for e.g. materialization for sorting).
if (table()->pos_in_table_list->create_materialized_table(thd())) {
return true;
}
}
// If this is a CTE, it could be referred to multiple times in the same query.
// If so, check if we have already been materialized through any of our alias
// tables.
const bool use_shared_cte_materialization =
!table()->materialized && m_cte != nullptr && !m_rematerialize &&
any_of(m_cte->tmp_tables.begin(), m_cte->tmp_tables.end(),
[](const Table_ref *table_ref) {
return table_ref->table != nullptr &&
table_ref->table->materialized;
});
if (use_shared_cte_materialization) {
// If using an already materialized shared CTE table, update the
// invalidators with the latest generation.
for (Invalidator &invalidator : m_invalidators) {
invalidator.generation_at_last_materialize =
invalidator.iterator->generation();
}
table()->materialized = true;
}
if (table()->materialized) {
bool rematerialize = m_rematerialize;
if (!rematerialize && !use_shared_cte_materialization) {
// See if any lateral tables that we depend on have changed since
// last time (which would force a rematerialization).
//
// TODO: It would be better, although probably much harder, to check
// the actual column values instead of just whether we've seen any
// new rows.
for (const Invalidator &invalidator : m_invalidators) {
if (invalidator.iterator->generation() !=
invalidator.generation_at_last_materialize) {
rematerialize = true;
break;
}
}
}
if (!rematerialize) {
// Just a rescan of the same table.
const bool err = m_table_iterator->Init();
m_table_iter_profiler.StopInit(start_time);
return err;
}
}
table()->set_not_started();
if (!table()->is_created()) {
if (instantiate_tmp_table(thd(), table())) {
return true;
}
empty_record(table());
} else {
table()->file->ha_index_or_rnd_end(); // @todo likely unneeded => remove
table()->file->ha_delete_all_rows();
}
if (m_query_expression != nullptr)
if (m_query_expression->clear_correlated_query_blocks()) return true;
if (m_cte != nullptr) {
// This is needed in a special case. Consider:
// SELECT FROM ot WHERE EXISTS(WITH RECURSIVE cte (...)
// SELECT * FROM cte)
// and assume that the CTE is outer-correlated. When EXISTS is
// evaluated, Query_expression::ClearForExecution() calls
// clear_correlated_query_blocks(), which scans the WITH clause and clears
// the CTE, including its references to itself in its recursive definition.
// But, if the query expression owning WITH is merged up, e.g. like this:
// FROM ot SEMIJOIN cte ON TRUE,
// then there is no Query_expression anymore, so its WITH clause is
// not reached. But this "lateral CTE" still needs comprehensive resetting.
// That's done here.
if (m_cte->clear_all_references()) return true;
}
// If we are removing duplicates by way of a hash field
// (see doing_hash_deduplication() for an explanation), we need to
// initialize scanning of the index over that hash field. (This is entirely
// separate from any index usage when reading back the materialized table;
// m_table_iterator will do that for us.)
auto end_unique_index = create_scope_guard([&] {
if (table()->file->inited == handler::INDEX) table()->file->ha_index_end();
});
if (doing_hash_deduplication()) {
if (table()->file->ha_index_init(0, /*sorted=*/false)) {
return true;
}
} else {
// We didn't open the index, so we don't need to close it.
end_unique_index.release();
}
ha_rows stored_rows = 0;
if (m_query_expression != nullptr && m_query_expression->is_recursive()) {
if (MaterializeRecursive()) return true;
} else {
for (const materialize_iterator::QueryBlock &query_block :
m_query_blocks_to_materialize) {
if (MaterializeQueryBlock(query_block, &stored_rows)) return true;
if (table()->is_union_or_table()) {
// For INTERSECT and EXCEPT, this is done in TableScanIterator
if (m_reject_multiple_rows && stored_rows > 1) {
my_error(ER_SUBQUERY_NO_1_ROW, MYF(0));
return true;
} else if (stored_rows >= m_limit_rows) {
break;
}
}
}
}
end_unique_index.reset();
table()->materialized = true;
if (!m_rematerialize) {
DEBUG_SYNC(thd(), "after_materialize_derived");
}
for (Invalidator &invalidator : m_invalidators) {
invalidator.generation_at_last_materialize =
invalidator.iterator->generation();
}
m_profiler.StopInit(start_time);
const bool err = m_table_iterator->Init();
m_table_iter_profiler.StopInit(start_time);
/*
MaterializeIterator reads all rows during Init(), so we do not measure
the time spent on individual read operations.
*/
m_profiler.IncrementNumRows(stored_rows);
return err;
}
/**
Recursive materialization happens much like regular materialization,
but some steps are repeated multiple times. Our general strategy is:
1. Materialize all non-recursive query blocks, once.
2. Materialize all recursive query blocks in turn.
3. Repeat #2 until no query block writes any more rows (ie., we have
converged) -- for UNION DISTINCT queries, rows removed by deduplication
do not count. Each materialization sees only rows that were newly added
since the previous iteration; see FollowTailIterator for more details
on the implementation.
Note that the result table is written to while other iterators are still
reading from it; again, see FollowTailIterator. This means that each run
of #2 can potentially run many actual CTE iterations -- possibly the entire
query to completion if we have only one query block.
This is not how the SQL standard specifies recursive CTE execution
(it assumes building up the new result set from scratch for each iteration,