diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java index 6253e618edac..b1a5a3d1562c 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBFlushQueryIT.java @@ -19,12 +19,21 @@ package org.apache.iotdb.db.it; +import org.apache.iotdb.isession.ISession; +import org.apache.iotdb.isession.SessionDataSet; import org.apache.iotdb.it.env.EnvFactory; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.LocalStandaloneIT; +import org.apache.iotdb.rpc.IoTDBConnectionException; +import org.apache.iotdb.rpc.StatementExecutionException; +import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.write.record.Tablet; +import org.apache.tsfile.write.schema.IMeasurementSchema; +import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -34,6 +43,8 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; +import java.util.Collections; +import java.util.List; import java.util.Locale; import static org.junit.Assert.assertEquals; @@ -181,4 +192,75 @@ public void testFlushNotExistGroupNoData() { fail(e.getMessage()); } } + + @Test + public void testStreamingQueryMemTableWithOverlappedData() + throws IoTDBConnectionException, StatementExecutionException { + String device = "root.stream1.d1"; + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { + session.open(); + generateTimeRangeWithTimestamp(session, device, 1, 10); + + generateTimeRangeWithTimestamp(session, device, 500000, 510000); + session.executeNonQueryStatement("flush"); + generateTimeRangeWithTimestamp(session, device, 100000, 350000); + + SessionDataSet sessionDataSet = + session.executeQueryStatement("select count(*) from root.stream1.d1"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + long count = 0; + while (iterator.next()) { + count = iterator.getLong(1); + } + Assert.assertEquals(10 + 10001 + 250001, count); + } + } + + @Test + public void testStreamingQueryMemTableWithOverlappedData2() + throws IoTDBConnectionException, StatementExecutionException { + String device = "root.stream2.d1"; + try (ISession session = EnvFactory.getEnv().getSessionConnection()) { + session.open(); + generateTimeRangeWithTimestamp(session, device, 1, 10); + + generateTimeRangeWithTimestamp(session, device, 500000, 510000); + session.executeNonQueryStatement("flush"); + generateTimeRangeWithTimestamp(session, device, 1, 20); + generateTimeRangeWithTimestamp(session, device, 100000, 210000); + session.executeNonQueryStatement("flush"); + + generateTimeRangeWithTimestamp(session, device, 150000, 450000); + + SessionDataSet sessionDataSet = + session.executeQueryStatement("select count(*) from root.stream2.d1"); + SessionDataSet.DataIterator iterator = sessionDataSet.iterator(); + long count = 0; + while (iterator.next()) { + count = iterator.getLong(1); + } + Assert.assertEquals(20 + 10001 + 350001, count); + } + } + + private static void generateTimeRangeWithTimestamp( + ISession session, String device, long start, long end) + throws IoTDBConnectionException, StatementExecutionException { + List measurementSchemas = + Collections.singletonList(new MeasurementSchema("s1", TSDataType.INT64)); + Tablet tablet = new Tablet(device, measurementSchemas); + for (long currentTime = start; currentTime <= end; currentTime++) { + int rowIndex = tablet.getRowSize(); + if (rowIndex == tablet.getMaxRowNumber()) { + session.insertTablet(tablet); + tablet.reset(); + rowIndex = 0; + } + tablet.addTimestamp(rowIndex, currentTime); + tablet.addValue(rowIndex, 0, currentTime); + } + if (tablet.getRowSize() > 0) { + session.insertTablet(tablet); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java index e2fbb4be4eca..741961961045 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/SeriesScanUtil.java @@ -890,7 +890,17 @@ private boolean hasNextOverlappedPage() throws IOException { return true; } - tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); + // init the merge reader for current call + // The original process is changed to lazy loading because different mem page readers + // belonging to the same mem chunk need to be read in a streaming manner. Therefore, it is + // necessary to ensure that these mem page readers cannot coexist in the mergeReader at the + // same time. + // The initial endPointTime is calculated as follows: + // 1. If mergeReader is empty, use the endpoint of firstPageReader to find all overlapped + // unseq pages and take the end point. + // 2. If mergeReader is not empty, use the readStopTime of mergeReader to find all overlapping + // unseq pages and take the end point. + long initialEndPointTime = tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader(); while (true) { @@ -898,7 +908,10 @@ private boolean hasNextOverlappedPage() throws IOException { if (mergeReader.hasNextTimeValuePair()) { TsBlockBuilder builder = new TsBlockBuilder(getTsDataTypeList()); - long currentPageEndPointTime = mergeReader.getCurrentReadStopTime(); + long currentPageEndPointTime = + orderUtils.getAscending() + ? Math.max(mergeReader.getCurrentReadStopTime(), initialEndPointTime) + : Math.min(mergeReader.getCurrentReadStopTime(), initialEndPointTime); while (mergeReader.hasNextTimeValuePair()) { /* @@ -928,7 +941,7 @@ private boolean hasNextOverlappedPage() throws IOException { unpackAllOverlappedTimeSeriesMetadataToCachedChunkMetadata( timeValuePair.getTimestamp(), false); unpackAllOverlappedChunkMetadataToPageReaders(timeValuePair.getTimestamp(), false); - unpackAllOverlappedUnseqPageReadersToMergeReader(timeValuePair.getTimestamp()); + unpackAllOverlappedUnseqPageReadersToMergeReader(); // update if there are unpacked unSeqPageReaders timeValuePair = mergeReader.currentTimeValuePair(); @@ -1017,33 +1030,71 @@ private long updateEndPointTime(long currentPageEndPointTime, IVersionPageReader } } - private void tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException { + private long tryToPutAllDirectlyOverlappedUnseqPageReadersIntoMergeReader() throws IOException { + do { + /* + * no cached page readers + */ + if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) { + return mergeReader.getCurrentReadStopTime(); + } - /* - * no cached page readers - */ - if (firstPageReader == null && unSeqPageReaders.isEmpty() && seqPageReaders.isEmpty()) { - return; - } + /* + * init firstPageReader + */ + if (firstPageReader == null) { + initFirstPageReader(); + } + if (!mergeReader.hasNextTimeValuePair()) { + putPageReaderToMergeReader(firstPageReader); + firstPageReader = null; + } + } while (!mergeReader.hasNextTimeValuePair()); /* - * init firstPageReader + * put all currently directly overlapped unseq page reader to merge reader */ - if (firstPageReader == null) { - initFirstPageReader(); - } + long mergeReaderStopTime = mergeReader.getCurrentReadStopTime(); + unpackAllOverlappedUnseqPageReadersToMergeReader(); - long currentPageEndpointTime; - if (mergeReader.hasNextTimeValuePair()) { - currentPageEndpointTime = mergeReader.getCurrentReadStopTime(); - } else { - currentPageEndpointTime = orderUtils.getOverlapCheckTime(firstPageReader.getStatistics()); - } + return calculateInitialEndPointTime(mergeReaderStopTime); + } - /* - * put all currently directly overlapped unseq page reader to merge reader - */ - unpackAllOverlappedUnseqPageReadersToMergeReader(currentPageEndpointTime); + private long calculateInitialEndPointTime(final long currentReadStopTime) { + long initialReadStopTime = currentReadStopTime; + if (firstPageReader != null + && !firstPageReader.isSeq() + && orderUtils.isOverlapped(currentReadStopTime, firstPageReader.getStatistics())) { + if (orderUtils.getAscending()) { + initialReadStopTime = + Math.max( + initialReadStopTime, + orderUtils.getOverlapCheckTime(firstPageReader.getStatistics())); + } else { + initialReadStopTime = + Math.min( + initialReadStopTime, + orderUtils.getOverlapCheckTime(firstPageReader.getStatistics())); + } + } + for (IVersionPageReader unSeqPageReader : unSeqPageReaders) { + if (orderUtils.isOverlapped(currentReadStopTime, unSeqPageReader.getStatistics())) { + if (orderUtils.getAscending()) { + initialReadStopTime = + Math.max( + initialReadStopTime, + orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics())); + } else { + initialReadStopTime = + Math.min( + initialReadStopTime, + orderUtils.getOverlapCheckTime(unSeqPageReader.getStatistics())); + } + } else { + break; + } + } + return initialReadStopTime; } private void addTimeValuePairToResult(TimeValuePair timeValuePair, TsBlockBuilder builder) { @@ -1135,17 +1186,26 @@ private IVersionPageReader getFirstPageReaderFromCachedReaders() { return firstPageReader; } - private void unpackAllOverlappedUnseqPageReadersToMergeReader(long endpointTime) - throws IOException { - while (!unSeqPageReaders.isEmpty() - && orderUtils.isOverlapped(endpointTime, unSeqPageReaders.peek().getStatistics())) { - putPageReaderToMergeReader(unSeqPageReaders.poll()); - } + // This process loads overlapped unseq pages based on the current time value pair of the + // mergeReader. The current time value pair of the mergeReader is recalculated each time an unseq + // page is added. + // The current time obtained from mergeReader each time is not necessarily the minimum among all + // the actual unseq data, so it is necessary to repeatedly calculate and include potentially + // overlapping unseq pages. + private void unpackAllOverlappedUnseqPageReadersToMergeReader() throws IOException { + long actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp(); if (firstPageReader != null && !firstPageReader.isSeq() - && orderUtils.isOverlapped(endpointTime, firstPageReader.getStatistics())) { + && orderUtils.isOverlapped(actualFirstTimeOfMergeReader, firstPageReader.getStatistics())) { putPageReaderToMergeReader(firstPageReader); firstPageReader = null; + actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp(); + } + while (!unSeqPageReaders.isEmpty() + && orderUtils.isOverlapped( + actualFirstTimeOfMergeReader, unSeqPageReaders.peek().getStatistics())) { + putPageReaderToMergeReader(unSeqPageReaders.poll()); + actualFirstTimeOfMergeReader = mergeReader.currentTimeValuePair().getTimestamp(); } }