From fb03dda1a10302ecc781314d05a0df1eb683c0b0 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 29 Dec 2025 17:53:59 +0800 Subject: [PATCH 1/7] try fix --- .../operator/source/SeriesScanUtil.java | 108 +++++++++++++----- 1 file changed, 77 insertions(+), 31 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index e2fbb4be4eca..5da3a9c315fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -890,7 +890,17 @@ private boolean hasNextOverlappedPage() throws IOException { return true; } - tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); + // init the merge reader for current call + // The original process is changed to lazy loading because different mem page readers + // belonging to the same mem chunk need to be read in a streaming manner. Therefore, it is + // necessary to ensure that these mem page readers cannot coexist in the mergeReader at the + // same time. + // The initial endPointTime is calculated as follows: + // 1. If mergeReader is empty, use the endpoint of firstPageReader to find all overlapped + // unseq pages and take the end point. + // 2. If mergeReader is not empty, use the readStopTime of mergeReader to find all overlapping + // unseq pages and take the end point. + long initialEndPointTime = tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); while (true) { @@ -898,7 +908,8 @@ private boolean hasNextOverlappedPage() throws IOException { if (mergeReader.hasNextTimeValuePair()) { TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList()); - long currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); + long currentPageEndPointTime = + Math.max(mergeReader.getCurrentReadStopTime(), initialEndPointTime); while (mergeReader.hasNextTimeValuePair()) { /* @@ -928,7 +939,7 @@ private boolean hasNextOverlappedPage() throws IOException { unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( timeValuePair.getTimestamp(), false); unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false); - unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp()); + unpackAllOverlappedUnseqPageReadersToMergeReader(); // update if there are unpacked unSeqPageReaders timeValuePair = mergeReader.currentTimeValuePair(); @@ -1017,33 +1028,59 @@ private long updateEndPointTime(long currentPageEndPointTime, IVersionPageReader } } - private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException { + private long tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException { + do { + /* + * no cached page readers + */ + if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) { + return mergeReader.getCurrentReadStopTime(); + } - /* - * no cached page readers - */ - if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) { - return; - } + /* + * init firstPageReader + */ + if (firstPageReader == null) { + initFirstPageReader(); + } + putPageReaderToMergeReader(firstPageReader); + firstPageReader = null; + } while (!mergeReader.hasNextTimeValuePair()); /* - * init firstPageReader + * put all currently directly overlapped unseq page reader to merge reader */ - if (firstPageReader == null) { - initFirstPageReader(); - } + long mergeReaderStopTime = mergeReader.getCurrentReadStopTime(); + unpackAllOverlappedUnseqPageReadersToMergeReader(); - long currentPageEndpointTime; - if (mergeReader.hasNextTimeValuePair()) { - currentPageEndpointTime = mergeReader.getCurrentReadStopTime(); - } else { - currentPageEndpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()); - } + return calculateInitialEndPointTime(mergeReaderStopTime); + } - /* - * put all currently directly overlapped unseq page reader to merge reader - */ - unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime); + private long calculateInitialEndPointTime(long currentReadStopTime) { + if (firstPageReader != null + && !firstPageReader.isSeq() + && orderUtils.isOverlapped(currentReadStopTime, firstPageReader.getStatistics())) { + if (orderUtils.getAscending()) { + currentReadStopTime = + Math.max(currentReadStopTime, firstPageReader.getStatistics().getEndTime()); + } else { + currentReadStopTime = + Math.min(currentReadStopTime, firstPageReader.getStatistics().getEndTime()); + } + } + for (IVersionPageReader unSeqPageReader : unSeqPageReaders) { + if (orderUtils.isOverlapped(currentReadStopTime, unSeqPageReader.getStatistics())) { + if (orderUtils.getAscending()) { + currentReadStopTime = + Math.max(currentReadStopTime, firstPageReader.getStatistics().getEndTime()); + } else { + currentReadStopTime = + Math.min(currentReadStopTime, firstPageReader.getStatistics().getEndTime()); + } + } + break; + } + return currentReadStopTime; } private void addTimeValuePairToResult(TimeValuePair timeValuePair, TsBlockBuilder builder) { @@ -1135,17 +1172,26 @@ private IVersionPageReader getFirstPageReaderFromCachedReaders() { return firstPageReader; } - private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime) - throws IOException { - while (!unSeqPageReaders.isEmpty() - && orderUtils.isOverlapped(endpointTime, unSeqPageReaders.peek().getStatistics())) { - putPageReaderToMergeReader(unSeqPageReaders.poll()); - } + // This process loads overlapped unseq pages based on the current time value pair of the + // mergeReader. The current time value pair of the mergeReader is recalculated each time an unseq + // page is added. + // The current time obtained from mergeReader each time is not necessarily the minimum among all + // the actual unseq data, so it is necessary to repeatedly calculate and include potentially + // overlapping unseq pages. + private void unpackAllOverlappedUnseqPageReadersToMergeReader() throws IOException { + long actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp(); if (firstPageReader != null && !firstPageReader.isSeq() - && orderUtils.isOverlapped(endpointTime, firstPageReader.getStatistics())) { + && orderUtils.isOverlapped(actualFirstTimeOfMergeReader, firstPageReader.getStatistics())) { putPageReaderToMergeReader(firstPageReader); firstPageReader = null; + actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp(); + } + while (!unSeqPageReaders.isEmpty() + && orderUtils.isOverlapped( + actualFirstTimeOfMergeReader, unSeqPageReaders.peek().getStatistics())) { + putPageReaderToMergeReader(unSeqPageReaders.poll()); + actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp(); } } From 994459b40f65d43e14d55b2774faf1488d4ed987 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Mon, 29 Dec 2025 18:45:35 +0800 Subject: [PATCH 2/7] fix --- .../execution/operator/source/SeriesScanUtil.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index 5da3a9c315fb..35059d37f704 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -1043,8 +1043,10 @@ private long tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() thro if (firstPageReader == null) { initFirstPageReader(); } - putPageReaderToMergeReader(firstPageReader); - firstPageReader = null; + if (!mergeReader.hasNextTimeValuePair()) { + putPageReaderToMergeReader(firstPageReader); + firstPageReader = null; + } } while (!mergeReader.hasNextTimeValuePair()); /* @@ -1072,10 +1074,10 @@ private long calculateInitialEndPointTime(long currentReadStopTime) { if (orderUtils.isOverlapped(currentReadStopTime, unSeqPageReader.getStatistics())) { if (orderUtils.getAscending()) { currentReadStopTime = - Math.max(currentReadStopTime, firstPageReader.getStatistics().getEndTime()); + Math.max(currentReadStopTime, unSeqPageReader.getStatistics().getEndTime()); } else { currentReadStopTime = - Math.min(currentReadStopTime, firstPageReader.getStatistics().getEndTime()); + Math.min(currentReadStopTime, unSeqPageReader.getStatistics().getEndTime()); } } break; From ca6e754cb29569fcb0b2c0000bd3351ca4ab20c2 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 30 Dec 2025 09:33:03 +0800 Subject: [PATCH 3/7] add it --- .../apache/iotdb/db/it/IoTDBFlushQueryIT.java | 55 +++++++++++++++++++ 1 file changed, 55 insertions(+) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java index 6253e618edac..ed8db3de6f97 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java @@ -19,12 +19,21 @@ package org.apache.iotdb.db.it; +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -34,6 +43,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Collections; +import java.util.List; import java.util.Locale; import static org.junit.Assert.assertEquals; @@ -181,4 +192,48 @@ public void testFlushNotExistGroupNoData() { fail(e.getMessage()); } } + + @Test + public void testStreamingQueryMemTableWithOverlappedData() + throws IoTDBConnectionException, StatementExecutionException { + String device = "root.stream.d1"; + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { + session.open(); + generateTimeRangeWithTimestamp(session, device, 1, 10); + + generateTimeRangeWithTimestamp(session, device, 500000, 520000); + session.executeNonQueryStatement("flush"); + generateTimeRangeWithTimestamp(session, device, 100000, 350000); + + SessionDataSet sessionDataSet = + session.executeQueryStatement("select count(*) from root.stream.d1"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + long count = 0; + while (iterator.next()) { + count = iterator.getLong(1); + } + Assert.assertEquals(10 + 20001 + 250001, count); + } + } + + private static void generateTimeRangeWithTimestamp( + ISession session, String device, long start, long end) + throws IoTDBConnectionException, StatementExecutionException { + List measurementSchemas = + Collections.singletonList(new MeasurementSchema("s1", TSDataType.INT64)); + Tablet tablet = new Tablet(device, measurementSchemas); + for (long currentTime = start; currentTime <= end; currentTime++) { + int rowIndex = tablet.getRowSize(); + if (rowIndex == tablet.getMaxRowNumber()) { + session.insertTablet(tablet); + tablet.reset(); + rowIndex = 0; + } + tablet.addTimestamp(rowIndex, currentTime); + tablet.addValue(rowIndex, 0, currentTime); + } + if (tablet.getRowSize() > 0) { + session.insertTablet(tablet); + } + } } From a898a1c1ac366eb01879b811c8d72e1b8f25c83f Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 30 Dec 2025 09:55:45 +0800 Subject: [PATCH 4/7] fix --- .../apache/iotdb/db/it/IoTDBFlushQueryIT.java | 35 ++++++++++++++++--- .../operator/source/SeriesScanUtil.java | 3 +- 2 files changed, 33 insertions(+), 5 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java index ed8db3de6f97..b1a5a3d1562c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java @@ -196,23 +196,50 @@ public void testFlushNotExistGroupNoData() { @Test public void testStreamingQueryMemTableWithOverlappedData() throws IoTDBConnectionException, StatementExecutionException { - String device = "root.stream.d1"; + String device = "root.stream1.d1"; try (ISession session = EnvFactory.getEnv().getSessionConnection()) { session.open(); generateTimeRangeWithTimestamp(session, device, 1, 10); - generateTimeRangeWithTimestamp(session, device, 500000, 520000); + generateTimeRangeWithTimestamp(session, device, 500000, 510000); session.executeNonQueryStatement("flush"); generateTimeRangeWithTimestamp(session, device, 100000, 350000); SessionDataSet sessionDataSet = - session.executeQueryStatement("select count(*) from root.stream.d1"); + session.executeQueryStatement("select count(*) from root.stream1.d1"); SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); long count = 0; while (iterator.next()) { count = iterator.getLong(1); } - Assert.assertEquals(10 + 20001 + 250001, count); + Assert.assertEquals(10 + 10001 + 250001, count); + } + } + + @Test + public void testStreamingQueryMemTableWithOverlappedData2() + throws IoTDBConnectionException, StatementExecutionException { + String device = "root.stream2.d1"; + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { + session.open(); + generateTimeRangeWithTimestamp(session, device, 1, 10); + + generateTimeRangeWithTimestamp(session, device, 500000, 510000); + session.executeNonQueryStatement("flush"); + generateTimeRangeWithTimestamp(session, device, 1, 20); + generateTimeRangeWithTimestamp(session, device, 100000, 210000); + session.executeNonQueryStatement("flush"); + + generateTimeRangeWithTimestamp(session, device, 150000, 450000); + + SessionDataSet sessionDataSet = + session.executeQueryStatement("select count(*) from root.stream2.d1"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + long count = 0; + while (iterator.next()) { + count = iterator.getLong(1); + } + Assert.assertEquals(20 + 10001 + 350001, count); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index 35059d37f704..511d368365df 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -1079,8 +1079,9 @@ private long calculateInitialEndPointTime(long currentReadStopTime) { currentReadStopTime = Math.min(currentReadStopTime, unSeqPageReader.getStatistics().getEndTime()); } + } else { + break; } - break; } return currentReadStopTime; } From 646695b275a501801fc3854934ac638afef0b407 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 30 Dec 2025 10:18:06 +0800 Subject: [PATCH 5/7] fix bug --- .../queryengine/execution/operator/source/SeriesScanUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index 511d368365df..9261e9c05a3c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -1067,7 +1067,7 @@ private long calculateInitialEndPointTime(long currentReadStopTime) { Math.max(currentReadStopTime, firstPageReader.getStatistics().getEndTime()); } else { currentReadStopTime = - Math.min(currentReadStopTime, firstPageReader.getStatistics().getEndTime()); + Math.min(currentReadStopTime, firstPageReader.getStatistics().getStartTime()); } } for (IVersionPageReader unSeqPageReader : unSeqPageReaders) { @@ -1077,7 +1077,7 @@ private long calculateInitialEndPointTime(long currentReadStopTime) { Math.max(currentReadStopTime, unSeqPageReader.getStatistics().getEndTime()); } else { currentReadStopTime = - Math.min(currentReadStopTime, unSeqPageReader.getStatistics().getEndTime()); + Math.min(currentReadStopTime, unSeqPageReader.getStatistics().getStartTime()); } } else { break; From c765e88303c9ab964cda544038902b1f6f602505 Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 30 Dec 2025 10:48:39 +0800 Subject: [PATCH 6/7] fix bug --- .../operator/source/SeriesScanUtil.java | 20 ++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index 9261e9c05a3c..0c349b082af5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -909,7 +909,9 @@ private boolean hasNextOverlappedPage() throws IOException { TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList()); long currentPageEndPointTime = - Math.max(mergeReader.getCurrentReadStopTime(), initialEndPointTime); + orderUtils.getAscending() + ? Math.max(mergeReader.getCurrentReadStopTime(), initialEndPointTime) + : Math.min(mergeReader.getCurrentReadStopTime(), initialEndPointTime); while (mergeReader.hasNextTimeValuePair()) { /* @@ -1064,20 +1066,28 @@ private long calculateInitialEndPointTime(long currentReadStopTime) { && orderUtils.isOverlapped(currentReadStopTime, firstPageReader.getStatistics())) { if (orderUtils.getAscending()) { currentReadStopTime = - Math.max(currentReadStopTime, firstPageReader.getStatistics().getEndTime()); + Math.max( + currentReadStopTime, + orderUtils.getOverlapCheckTime(firstPageReader.getStatistics())); } else { currentReadStopTime = - Math.min(currentReadStopTime, firstPageReader.getStatistics().getStartTime()); + Math.min( + currentReadStopTime, + orderUtils.getOverlapCheckTime(firstPageReader.getStatistics())); } } for (IVersionPageReader unSeqPageReader : unSeqPageReaders) { if (orderUtils.isOverlapped(currentReadStopTime, unSeqPageReader.getStatistics())) { if (orderUtils.getAscending()) { currentReadStopTime = - Math.max(currentReadStopTime, unSeqPageReader.getStatistics().getEndTime()); + Math.max( + currentReadStopTime, + orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics())); } else { currentReadStopTime = - Math.min(currentReadStopTime, unSeqPageReader.getStatistics().getStartTime()); + Math.min( + currentReadStopTime, + orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics())); } } else { break; From 59cee446dfc4e6be9c79994af30f47eb99ffbb2e Mon Sep 17 00:00:00 2001 From: shuwenwei Date: Tue, 30 Dec 2025 10:57:41 +0800 Subject: [PATCH 7/7] fix --- .../operator/source/SeriesScanUtil.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index 0c349b082af5..741961961045 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -1060,40 +1060,41 @@ private long tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() thro return calculateInitialEndPointTime(mergeReaderStopTime); } - private long calculateInitialEndPointTime(long currentReadStopTime) { + private long calculateInitialEndPointTime(final long currentReadStopTime) { + long initialReadStopTime = currentReadStopTime; if (firstPageReader != null && !firstPageReader.isSeq() && orderUtils.isOverlapped(currentReadStopTime, firstPageReader.getStatistics())) { if (orderUtils.getAscending()) { - currentReadStopTime = + initialReadStopTime = Math.max( - currentReadStopTime, + initialReadStopTime, orderUtils.getOverlapCheckTime(firstPageReader.getStatistics())); } else { - currentReadStopTime = + initialReadStopTime = Math.min( - currentReadStopTime, + initialReadStopTime, orderUtils.getOverlapCheckTime(firstPageReader.getStatistics())); } } for (IVersionPageReader unSeqPageReader : unSeqPageReaders) { if (orderUtils.isOverlapped(currentReadStopTime, unSeqPageReader.getStatistics())) { if (orderUtils.getAscending()) { - currentReadStopTime = + initialReadStopTime = Math.max( - currentReadStopTime, + initialReadStopTime, orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics())); } else { - currentReadStopTime = + initialReadStopTime = Math.min( - currentReadStopTime, + initialReadStopTime, orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics())); } } else { break; } } - return currentReadStopTime; + return initialReadStopTime; } private void addTimeValuePairToResult(TimeValuePair timeValuePair, TsBlockBuilder builder) {