Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -148,39 +148,24 @@ public Iterable<TabletInsertionEvent> toTabletInsertionEvents() {
new Iterator<TabletInsertionEvent>() {

private TsFileInsertionEventTableParserTabletIterator tabletIterator;
private PipeRawTabletInsertionEvent nextEvent;
private Tablet bufferedTablet;
private boolean iterationClosed = false;

@Override
public boolean hasNext() {
try {
if (tabletIterator == null) {
tabletIterator =
new TsFileInsertionEventTableParserTabletIterator(
tsFileSequenceReader,
entry ->
(Objects.isNull(tablePattern)
|| tablePattern.matchesTable(entry.getKey()))
&& hasTablePrivilege(entry.getKey()),
allocatedMemoryBlockForTablet,
allocatedMemoryBlockForBatchData,
allocatedMemoryBlockForChunk,
allocatedMemoryBlockForChunkMeta,
allocatedMemoryBlockForTableSchemas,
currentModifications,
startTime,
endTime);
if (nextEvent != null) {
return true;
}
final boolean hasNext = tabletIterator.hasNext();
if (hasNext && !parseStartTimeRecorded) {
// Record start time on first hasNext() that returns true
recordParseStartTime();
} else if (!hasNext && parseStartTimeRecorded && !parseEndTimeRecorded) {
// Record end time on last hasNext() that returns false
recordParseEndTime();
close();
} else if (!hasNext) {
close();

final Tablet tablet = pollNextNonEmptyTablet();
if (tablet == null) {
return false;
}
return hasNext;

nextEvent = buildTabletInsertionEvent(tablet, !prepareNextNonEmptyTablet());
return true;
} catch (Exception e) {
close();
throw new PipeException(
Expand Down Expand Up @@ -211,74 +196,108 @@ private boolean hasTablePrivilege(final String tableName) {
return false;
}

private Tablet pollNextNonEmptyTablet() throws Exception {
if (!prepareNextNonEmptyTablet()) {
return null;
}

final Tablet tablet = bufferedTablet;
bufferedTablet = null;
return tablet;
}

private boolean prepareNextNonEmptyTablet() throws Exception {
if (bufferedTablet != null) {
return true;
}
if (iterationClosed) {
return false;
}

if (tabletIterator == null) {
tabletIterator =
new TsFileInsertionEventTableParserTabletIterator(
tsFileSequenceReader,
entry ->
(Objects.isNull(tablePattern)
|| tablePattern.matchesTable(entry.getKey()))
&& hasTablePrivilege(entry.getKey()),
allocatedMemoryBlockForTablet,
allocatedMemoryBlockForBatchData,
allocatedMemoryBlockForChunk,
allocatedMemoryBlockForChunkMeta,
allocatedMemoryBlockForTableSchemas,
currentModifications,
startTime,
endTime);
}

while (tabletIterator.hasNext()) {
if (!parseStartTimeRecorded) {
recordParseStartTime();
}

final Tablet tablet = tabletIterator.next();
recordTabletMetrics(tablet);
if (!PipeRawTabletInsertionEvent.isTabletEmpty(tablet)) {
bufferedTablet = tablet;
return true;
}
}

closeIteration();
return false;
}

private void closeIteration() {
if (iterationClosed) {
return;
}

if (parseStartTimeRecorded && !parseEndTimeRecorded) {
recordParseEndTime();
}
close();
iterationClosed = true;
}

private PipeRawTabletInsertionEvent buildTabletInsertionEvent(
final Tablet tablet, final boolean needToReport) {
return sourceEvent == null
? new PipeRawTabletInsertionEvent(
Boolean.TRUE,
null,
null,
null,
tablet,
true,
null,
0,
pipeTaskMeta,
sourceEvent,
needToReport)
: new PipeRawTabletInsertionEvent(
Boolean.TRUE,
sourceEvent.getSourceDatabaseNameFromDataRegion(),
sourceEvent.getRawTableModelDataBase(),
sourceEvent.getRawTreeModelDataBase(),
tablet,
true,
sourceEvent.getPipeName(),
sourceEvent.getCreationTime(),
pipeTaskMeta,
sourceEvent,
needToReport);
}

@Override
public TabletInsertionEvent next() {
if (!hasNext()) {
close();
throw new NoSuchElementException();
}

final Tablet tablet = tabletIterator.next();
// Record tablet metrics
recordTabletMetrics(tablet);

final TabletInsertionEvent next;
if (!hasNext()) {
next =
sourceEvent == null
? new PipeRawTabletInsertionEvent(
Boolean.TRUE,
null,
null,
null,
tablet,
true,
null,
0,
pipeTaskMeta,
sourceEvent,
true)
: new PipeRawTabletInsertionEvent(
Boolean.TRUE,
sourceEvent.getSourceDatabaseNameFromDataRegion(),
sourceEvent.getRawTableModelDataBase(),
sourceEvent.getRawTreeModelDataBase(),
tablet,
true,
sourceEvent.getPipeName(),
sourceEvent.getCreationTime(),
pipeTaskMeta,
sourceEvent,
true);
close();
} else {
next =
sourceEvent == null
? new PipeRawTabletInsertionEvent(
Boolean.TRUE,
null,
null,
null,
tablet,
true,
null,
0,
pipeTaskMeta,
sourceEvent,
false)
: new PipeRawTabletInsertionEvent(
Boolean.TRUE,
sourceEvent.getSourceDatabaseNameFromDataRegion(),
sourceEvent.getRawTableModelDataBase(),
sourceEvent.getRawTreeModelDataBase(),
tablet,
true,
sourceEvent.getPipeName(),
sourceEvent.getCreationTime(),
pipeTaskMeta,
sourceEvent,
false);
}
final TabletInsertionEvent next = nextEvent;
nextEvent = null;
return next;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeSinkNonReportTimeConfigurableException;
import org.apache.iotdb.commons.pipe.resource.log.PipeLogger;
import org.apache.iotdb.commons.queryengine.common.SqlDialect;
import org.apache.iotdb.commons.utils.StatusUtils;
Expand Down Expand Up @@ -284,7 +285,8 @@ private void doTransfer(
&& status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& !(skipIfNoPrivileges
&& status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode())) {
throw new PipeException(
throwWriteBackExceptionIfNecessary(
status,
String.format(
"Write back PipeInsertNodeTabletInsertionEvent %s error, result status %s",
pipeInsertNodeTabletInsertionEvent, status));
Expand Down Expand Up @@ -328,7 +330,8 @@ private void doTransfer(final PipeRawTabletInsertionEvent pipeRawTabletInsertion
&& status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& !(skipIfNoPrivileges
&& status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode())) {
throw new PipeException(
throwWriteBackExceptionIfNecessary(
status,
String.format(
"Write back PipeRawTabletInsertionEvent %s error, result status %s",
pipeRawTabletInsertionEvent, status));
Expand Down Expand Up @@ -373,13 +376,23 @@ private void doTransfer(final PipeStatementInsertionEvent pipeStatementInsertion
&& status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()
&& !(skipIfNoPrivileges
&& status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode())) {
throw new PipeException(
throwWriteBackExceptionIfNecessary(
status,
String.format(
"Write back PipeStatementInsertionEvent %s error, result status %s",
pipeStatementInsertionEvent, status));
}
}

private static void throwWriteBackExceptionIfNecessary(
final TSStatus status, final String exceptionMessage) {
if (status.getCode() == TSStatusCode.NO_PERMISSION.getStatusCode()) {
throw new PipeRuntimeSinkNonReportTimeConfigurableException(exceptionMessage, Long.MAX_VALUE);
}

throw new PipeException(exceptionMessage);
}

@Override
public void close() throws Exception {
if (session != null) {
Expand Down Expand Up @@ -410,7 +423,7 @@ private TSStatus executeStatementForTableModel(
.status;
} catch (final AccessDeniedException e) {
if (!skipIfNoPrivileges) {
throw e;
throw new PipeRuntimeSinkNonReportTimeConfigurableException(e.getMessage(), Long.MAX_VALUE);
}
LOGGER.debug(
DataNodePipeMessages.EXECUTE_STATEMENT_TO_DATABASE_SKIP_BECAUSE_NO,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,77 @@ public void testTableParserSkipsUnnecessaryBitMaps() throws Exception {
}
}

@Test
public void testTableParserWithTablePatternReportsLastNonEmptyTablet() throws Exception {
final int originalPipeDataStructureTabletRowSize =
PipeConfig.getInstance().getPipeDataStructureTabletRowSize();
CommonDescriptor.getInstance().getConfig().setPipeDataStructureTabletRowSize(2);

try {
alignedTsFile = new File("table-parser-table-pattern.tsfile");
if (alignedTsFile.exists()) {
Assert.assertTrue(alignedTsFile.delete());
}

final List<IMeasurementSchema> schemaList =
Arrays.asList(
new MeasurementSchema("tag0", TSDataType.STRING),
new MeasurementSchema("s0", TSDataType.INT64));
final List<String> columnNameList = Arrays.asList("tag0", "s0");
final List<TSDataType> dataTypeList = Arrays.asList(TSDataType.STRING, TSDataType.INT64);
final List<ColumnCategory> columnCategoryList =
Arrays.asList(ColumnCategory.TAG, ColumnCategory.FIELD);

try (final TsFileWriter writer = new TsFileWriter(alignedTsFile)) {
writer.registerTableSchema(new TableSchema("test", schemaList, columnCategoryList));
writer.registerTableSchema(new TableSchema("test1", schemaList, columnCategoryList));
writer.writeTable(
generateSimpleTableTablet(
"test", columnNameList, dataTypeList, columnCategoryList, "ignored", 0, 2));
writer.writeTable(
generateSimpleTableTablet(
"test1", columnNameList, dataTypeList, columnCategoryList, "matched", 3, 4));
writer.writeTable(
generateSimpleTableTablet(
"test1", columnNameList, dataTypeList, columnCategoryList, "unmatched", 2, 10));
}

try (final TsFileInsertionEventTableParser parser =
new TsFileInsertionEventTableParser(
alignedTsFile,
new TablePattern(true, null, "test1"),
3,
5,
null,
null,
null,
false)) {
final Iterator<TabletInsertionEvent> iterator = parser.toTabletInsertionEvents().iterator();
int rowCount = 0;
PipeRawTabletInsertionEvent lastEvent = null;
while (iterator.hasNext()) {
final PipeRawTabletInsertionEvent event = (PipeRawTabletInsertionEvent) iterator.next();
final Tablet tablet = event.convertToTablet();
Assert.assertEquals("test1", tablet.getTableName());
Assert.assertFalse(PipeRawTabletInsertionEvent.isTabletEmpty(tablet));
rowCount += tablet.getRowSize();
if (lastEvent != null) {
Assert.assertFalse(lastEvent.isNeedToReport());
}
lastEvent = event;
}

Assert.assertEquals(2, rowCount);
Assert.assertNotNull(lastEvent);
Assert.assertTrue(lastEvent.isNeedToReport());
}
} finally {
CommonDescriptor.getInstance()
.getConfig()
.setPipeDataStructureTabletRowSize(originalPipeDataStructureTabletRowSize);
}
}

@Test
public void manualTestScanParserSplitPerformance() throws Exception {
Assume.assumeTrue(
Expand Down Expand Up @@ -1343,6 +1414,23 @@ private void generateLargeAlignedTsFile(
}
}

private Tablet generateSimpleTableTablet(
final String tableName,
final List<String> columnNameList,
final List<TSDataType> dataTypeList,
final List<ColumnCategory> columnCategoryList,
final String tagValue,
final long... timestamps) {
final Tablet tablet =
new Tablet(tableName, columnNameList, dataTypeList, columnCategoryList, timestamps.length);
for (int rowIndex = 0; rowIndex < timestamps.length; ++rowIndex) {
tablet.addTimestamp(rowIndex, timestamps[rowIndex]);
tablet.addValue(rowIndex, 0, tagValue);
tablet.addValue(rowIndex, 1, (long) rowIndex);
}
return tablet;
}

private void generateLargeTableTsFile(
final File tsFile,
final int tableCount,
Expand Down
Loading