From 2e59f9d382f4dbe39e490518d5f906c147f99318 Mon Sep 17 00:00:00 2001 From: shizy Date: Mon, 29 Dec 2025 08:57:07 +0800 Subject: [PATCH 1/2] fix: reserve memory for sorting indices during query execution --- .../memory/FakedMemoryReservationManager.java | 5 + .../memory/MemoryReservationManager.java | 11 +++ ...NotThreadSafeMemoryReservationManager.java | 5 + .../ThreadSafeMemoryReservationManager.java | 5 + .../iotdb/db/utils/datastructure/TVList.java | 9 ++ .../FragmentInstanceExecutionTest.java | 99 +++++++++++++++++++ 6 files changed, 134 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java index 35ded4d62521..75c9d08fae2c 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java @@ -43,4 +43,9 @@ public Pair releaseMemoryVirtually(final long size) { @Override public void reserveMemoryVirtually( final long bytesToBeReserved, final long bytesAlreadyReserved) {} + + @Override + public long getReservedMemory() { + return 0; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java index 623936731201..a000bd97c574 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java @@ -19,6 +19,8 @@ package org.apache.iotdb.db.queryengine.plan.planner.memory; +import org.apache.iotdb.commons.utils.TestOnly; + import org.apache.tsfile.utils.Pair; public interface MemoryReservationManager { @@ -72,4 +74,13 @@ public interface MemoryReservationManager { * @param bytesAlreadyReserved the amount of memory that has already been reserved */ void reserveMemoryVirtually(final long bytesToBeReserved, final long bytesAlreadyReserved); + + /** + * Get the total amount of memory currently reserved. This includes memory that has been reserved, + * plus memory pending to be reserved, minus memory pending to be released. + * + * @return the total amount of memory in bytes that is currently reserved + */ + @TestOnly + long getReservedMemory(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java index 4fa97f368adf..b0bd4db469d5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java @@ -114,4 +114,9 @@ public void reserveMemoryVirtually( reservedBytesInTotal += bytesAlreadyReserved; reserveMemoryCumulatively(bytesToBeReserved); } + + @Override + public long getReservedMemory() { + return bytesToBeReserved - bytesToBeReleased + reservedBytesInTotal; + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java index d167eae354ff..34ed67d19f9b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java @@ -61,4 +61,9 @@ public synchronized void reserveMemoryVirtually( final long bytesToBeReserved, final long bytesAlreadyReserved) { super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved); } + + @Override + public synchronized long getReservedMemory() { + return super.getReservedMemory(); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index a1f2842d34a9..91a9b9206534 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -21,7 +21,9 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.utils.TestOnly; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; +import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; @@ -325,6 +327,13 @@ protected void set(int index, long timestamp, int valueIndex) { int offset = i * ARRAY_SIZE; Arrays.setAll(indices.get(i), j -> offset + j); } + // Reserve memory for indices if the TVList is owned by a query + if (ownerQuery != null) { + long indicesBytes = indices.size() * PrimitiveArrayManager.ARRAY_SIZE * 4L; + MemoryReservationManager memoryReservationManager = + ((FragmentInstanceContext) ownerQuery).getMemoryReservationContext(); + memoryReservationManager.reserveMemoryCumulatively(indicesBytes); + } } indices.get(arrayIndex)[elementIndex] = valueIndex; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java index e0655edc55a4..298d442b52a2 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java @@ -20,7 +20,12 @@ package org.apache.iotdb.db.queryengine.execution.fragment; import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory; +import org.apache.iotdb.commons.exception.IllegalPathException; +import org.apache.iotdb.commons.exception.MetadataException; +import org.apache.iotdb.commons.path.NonAlignedFullPath; +import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.db.conf.IoTDBDescriptor; +import org.apache.iotdb.db.exception.query.QueryProcessException; import org.apache.iotdb.db.queryengine.common.FragmentInstanceId; import org.apache.iotdb.db.queryengine.common.PlanFragmentId; import org.apache.iotdb.db.queryengine.exception.CpuNotEnoughException; @@ -30,15 +35,27 @@ import org.apache.iotdb.db.queryengine.execution.exchange.sink.ISink; import org.apache.iotdb.db.queryengine.execution.schedule.IDriverScheduler; import org.apache.iotdb.db.storageengine.dataregion.DataRegion; +import org.apache.iotdb.db.storageengine.dataregion.memtable.DeviceIDFactory; +import org.apache.iotdb.db.storageengine.dataregion.memtable.IMemTable; +import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunk; +import org.apache.iotdb.db.storageengine.dataregion.memtable.IWritableMemChunkGroup; +import org.apache.iotdb.db.storageengine.dataregion.memtable.PrimitiveMemTable; +import org.apache.iotdb.db.storageengine.dataregion.memtable.ReadOnlyMemChunk; import org.apache.iotdb.db.utils.datastructure.AlignedTVList; import org.apache.iotdb.db.utils.datastructure.TVList; import com.google.common.collect.ImmutableMap; import org.apache.tsfile.enums.TSDataType; +import org.apache.tsfile.file.metadata.IDeviceID; +import org.apache.tsfile.file.metadata.enums.CompressionType; +import org.apache.tsfile.file.metadata.enums.TSEncoding; +import org.apache.tsfile.read.reader.IPointReader; +import org.apache.tsfile.write.schema.MeasurementSchema; import org.junit.Test; import org.mockito.Mockito; import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.io.PrintStream; import java.util.ArrayList; import java.util.Collections; @@ -49,6 +66,7 @@ import static org.apache.iotdb.db.queryengine.common.QueryId.MOCK_QUERY_ID; import static org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -157,6 +175,71 @@ public void testTVListOwnerTransfer() throws InterruptedException { } } + @Test + public void testTVListCloneForQuery() { + IoTDBDescriptor.getInstance().getConfig().setDataNodeId(1); + + ExecutorService instanceNotificationExecutor = + IoTDBThreadPoolFactory.newFixedThreadPool(1, "test-instance-notification"); + + try { + String deviceId = "d1"; + String measurementId = "s1"; + IMemTable memTable = createMemTable(deviceId, measurementId); + assertEquals(1, memTable.getMemTableMap().size()); + IWritableMemChunkGroup memChunkGroup = memTable.getMemTableMap().values().iterator().next(); + assertEquals(1, memChunkGroup.getMemChunkMap().size()); + IWritableMemChunk memChunk = memChunkGroup.getMemChunkMap().values().iterator().next(); + TVList tvList = memChunk.getWorkingTVList(); + assertFalse(tvList.isSorted()); + + // FragmentInstance Context + FragmentInstanceId id1 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 1), "1"); + FragmentInstanceStateMachine stateMachine1 = + new FragmentInstanceStateMachine(id1, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext1 = + createFragmentInstanceContext(id1, stateMachine1); + + FragmentInstanceId id2 = new FragmentInstanceId(new PlanFragmentId(MOCK_QUERY_ID, 2), "2"); + FragmentInstanceStateMachine stateMachine2 = + new FragmentInstanceStateMachine(id2, instanceNotificationExecutor); + FragmentInstanceContext fragmentInstanceContext2 = + createFragmentInstanceContext(id2, stateMachine2); + + // query on memtable + NonAlignedFullPath fullPath = + new NonAlignedFullPath( + IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId), + new MeasurementSchema( + measurementId, + TSDataType.INT32, + TSEncoding.RLE, + CompressionType.UNCOMPRESSED, + Collections.emptyMap())); + ReadOnlyMemChunk readOnlyMemChunk1 = + memTable.query(fragmentInstanceContext1, fullPath, Long.MIN_VALUE, null, null); + ReadOnlyMemChunk readOnlyMemChunk2 = + memTable.query(fragmentInstanceContext2, fullPath, Long.MIN_VALUE, null, null); + + IPointReader pointReader = readOnlyMemChunk1.getPointReader(); + while (pointReader.hasNextTimeValuePair()) { + pointReader.nextTimeValuePair(); + } + assertTrue(tvList.isSorted()); + assertEquals( + tvList.calculateRamSize(), + fragmentInstanceContext1.getMemoryReservationContext().getReservedMemory()); + } catch (QueryProcessException + | IOException + | MetadataException + | MemoryNotEnoughException + | IllegalArgumentException e) { + fail(e.getMessage()); + } finally { + instanceNotificationExecutor.shutdown(); + } + } + private FragmentInstanceExecution createFragmentInstanceExecution(int id, Executor executor) throws CpuNotEnoughException { IDriverScheduler scheduler = Mockito.mock(IDriverScheduler.class); @@ -201,4 +284,20 @@ private TVList buildTVList() { } return tvList; } + + private IMemTable createMemTable(String deviceId, String measurementId) + throws IllegalPathException { + IMemTable memTable = new PrimitiveMemTable("root.test", "1"); + + int rows = 100; + for (int i = 0; i < 100; i++) { + memTable.write( + DeviceIDFactory.getInstance().getDeviceID(new PartialPath(deviceId)), + Collections.singletonList( + new MeasurementSchema(measurementId, TSDataType.INT32, TSEncoding.PLAIN)), + rows - i - 1, + new Object[] {i + 10}); + } + return memTable; + } } From bcae238269a6dc1103c6cd55cc6957da4a8b7c2a Mon Sep 17 00:00:00 2001 From: shizy Date: Mon, 29 Dec 2025 13:47:19 +0800 Subject: [PATCH 2/2] Sort vs setOwner --- .../fragment/FragmentInstanceContext.java | 11 +++++- .../memory/FakedMemoryReservationManager.java | 5 --- .../memory/MemoryReservationManager.java | 11 ------ ...NotThreadSafeMemoryReservationManager.java | 5 --- .../ThreadSafeMemoryReservationManager.java | 5 --- .../utils/ResourceByPathUtils.java | 4 +- .../memtable/AbstractWritableMemChunk.java | 4 +- .../memtable/AlignedReadOnlyMemChunk.java | 37 +++++++++++++++++-- .../dataregion/memtable/ReadOnlyMemChunk.java | 29 +++++++++++++++ .../db/utils/datastructure/AlignedTVList.java | 2 +- .../iotdb/db/utils/datastructure/TVList.java | 30 +++++++++------ .../FragmentInstanceExecutionTest.java | 4 +- 12 files changed, 99 insertions(+), 48 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java index f0b46d9df284..681e77518b78 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceContext.java @@ -897,11 +897,18 @@ public void releaseResourceWhenAllDriversAreClosed() { */ private void releaseTVListOwnedByQuery() { for (TVList tvList : tvListSet) { + long tvListRamSize = tvList.calculateRamSize(); tvList.lockQueryList(); Set queryContextSet = tvList.getQueryContextSet(); try { queryContextSet.remove(this); if (tvList.getOwnerQuery() == this) { + if (tvList.getReservedMemoryBytes() != tvListRamSize) { + LOGGER.warn( + "Release TVList owned by query: allocate size {}, release size {}", + tvList.getReservedMemoryBytes(), + tvListRamSize); + } if (queryContextSet.isEmpty()) { if (LOGGER.isDebugEnabled()) { LOGGER.debug( @@ -909,14 +916,14 @@ private void releaseTVListOwnedByQuery() { tvList, this.getId()); } - memoryReservationManager.releaseMemoryCumulatively(tvList.calculateRamSize()); + memoryReservationManager.releaseMemoryCumulatively(tvList.getReservedMemoryBytes()); tvList.clear(); } else { // Transfer memory to next query. It must be exception-safe as this method is called // during FragmentInstanceExecution cleanup. Any exception during this process could // prevent proper resource cleanup and cause memory leaks. Pair releasedBytes = - memoryReservationManager.releaseMemoryVirtually(tvList.calculateRamSize()); + memoryReservationManager.releaseMemoryVirtually(tvList.getReservedMemoryBytes()); FragmentInstanceContext queryContext = (FragmentInstanceContext) queryContextSet.iterator().next(); queryContext diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java index 75c9d08fae2c..35ded4d62521 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/FakedMemoryReservationManager.java @@ -43,9 +43,4 @@ public Pair releaseMemoryVirtually(final long size) { @Override public void reserveMemoryVirtually( final long bytesToBeReserved, final long bytesAlreadyReserved) {} - - @Override - public long getReservedMemory() { - return 0; - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java index a000bd97c574..623936731201 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/MemoryReservationManager.java @@ -19,8 +19,6 @@ package org.apache.iotdb.db.queryengine.plan.planner.memory; -import org.apache.iotdb.commons.utils.TestOnly; - import org.apache.tsfile.utils.Pair; public interface MemoryReservationManager { @@ -74,13 +72,4 @@ public interface MemoryReservationManager { * @param bytesAlreadyReserved the amount of memory that has already been reserved */ void reserveMemoryVirtually(final long bytesToBeReserved, final long bytesAlreadyReserved); - - /** - * Get the total amount of memory currently reserved. This includes memory that has been reserved, - * plus memory pending to be reserved, minus memory pending to be released. - * - * @return the total amount of memory in bytes that is currently reserved - */ - @TestOnly - long getReservedMemory(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java index b0bd4db469d5..4fa97f368adf 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/NotThreadSafeMemoryReservationManager.java @@ -114,9 +114,4 @@ public void reserveMemoryVirtually( reservedBytesInTotal += bytesAlreadyReserved; reserveMemoryCumulatively(bytesToBeReserved); } - - @Override - public long getReservedMemory() { - return bytesToBeReserved - bytesToBeReleased + reservedBytesInTotal; - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java index 34ed67d19f9b..d167eae354ff 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/memory/ThreadSafeMemoryReservationManager.java @@ -61,9 +61,4 @@ public synchronized void reserveMemoryVirtually( final long bytesToBeReserved, final long bytesAlreadyReserved) { super.reserveMemoryVirtually(bytesToBeReserved, bytesAlreadyReserved); } - - @Override - public synchronized long getReservedMemory() { - return super.getReservedMemory(); - } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java index 7c27307e61e7..d4d84c636601 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/schemaregion/utils/ResourceByPathUtils.java @@ -153,6 +153,7 @@ protected Map prepareTvListMapForQuery( // mutable tvlist TVList list = memChunk.getWorkingTVList(); TVList cloneList = null; + long tvListRamSize = list.calculateRamSize(); list.lockQueryList(); try { if (copyTimeFilter != null @@ -193,7 +194,8 @@ protected Map prepareTvListMapForQuery( if (firstQuery instanceof FragmentInstanceContext) { MemoryReservationManager memoryReservationManager = ((FragmentInstanceContext) firstQuery).getMemoryReservationContext(); - memoryReservationManager.reserveMemoryCumulatively(list.calculateRamSize()); + memoryReservationManager.reserveMemoryCumulatively(tvListRamSize); + list.setReservedMemoryBytes(tvListRamSize); } list.setOwnerQuery(firstQuery); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java index d1dd692303db..f33182fc680e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AbstractWritableMemChunk.java @@ -101,6 +101,7 @@ protected void maybeReleaseTvList(TVList tvList) { } private void tryReleaseTvList(TVList tvList) { + long tvListRamSize = tvList.calculateRamSize(); tvList.lockQueryList(); try { if (tvList.getQueryContextSet().isEmpty()) { @@ -112,7 +113,8 @@ private void tryReleaseTvList(TVList tvList) { if (firstQuery instanceof FragmentInstanceContext) { MemoryReservationManager memoryReservationManager = ((FragmentInstanceContext) firstQuery).getMemoryReservationContext(); - memoryReservationManager.reserveMemoryCumulatively(tvList.calculateRamSize()); + memoryReservationManager.reserveMemoryCumulatively(tvListRamSize); + tvList.setReservedMemoryBytes(tvListRamSize); } // update current TVList owner to first query in the list tvList.setOwnerQuery(firstQuery); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java index 25c4a876ab7d..2157623e1c67 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/AlignedReadOnlyMemChunk.java @@ -19,6 +19,7 @@ package org.apache.iotdb.db.storageengine.dataregion.memtable; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemAlignedChunkLoader; @@ -118,6 +119,21 @@ public void sortTvLists() { int queryRowCount = entry.getValue(); if (!alignedTvList.isSorted() && queryRowCount > alignedTvList.seqRowCount()) { alignedTvList.sort(); + long alignedTvListRamSize = alignedTvList.calculateRamSize(); + alignedTvList.lockQueryList(); + try { + FragmentInstanceContext ownerQuery = + (FragmentInstanceContext) alignedTvList.getOwnerQuery(); + if (ownerQuery != null) { + long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes(); + if (deltaBytes > 0) { + ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes); + alignedTvList.addReservedMemoryBytes(deltaBytes); + } + } + } finally { + alignedTvList.unlockQueryList(); + } } } } @@ -356,10 +372,25 @@ public boolean isEmpty() { @Override public IPointReader getPointReader() { for (Map.Entry entry : alignedTvListQueryMap.entrySet()) { - AlignedTVList tvList = (AlignedTVList) entry.getKey(); + AlignedTVList alignedTvList = (AlignedTVList) entry.getKey(); int queryLength = entry.getValue(); - if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) { - tvList.sort(); + if (!alignedTvList.isSorted() && queryLength > alignedTvList.seqRowCount()) { + alignedTvList.sort(); + long alignedTvListRamSize = alignedTvList.calculateRamSize(); + alignedTvList.lockQueryList(); + try { + FragmentInstanceContext ownerQuery = + (FragmentInstanceContext) alignedTvList.getOwnerQuery(); + if (ownerQuery != null) { + long deltaBytes = alignedTvListRamSize - alignedTvList.getReservedMemoryBytes(); + if (deltaBytes > 0) { + ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes); + alignedTvList.addReservedMemoryBytes(deltaBytes); + } + } + } finally { + alignedTvList.unlockQueryList(); + } } } TsBlock tsBlock = buildTsBlock(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java index f2fe63c02c62..ac2a78734978 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/memtable/ReadOnlyMemChunk.java @@ -21,6 +21,7 @@ import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.db.exception.query.QueryProcessException; +import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.storageengine.dataregion.read.reader.chunk.MemChunkLoader; @@ -135,6 +136,20 @@ public void sortTvLists() { int queryRowCount = entry.getValue(); if (!tvList.isSorted() && queryRowCount > tvList.seqRowCount()) { tvList.sort(); + long tvListRamSize = tvList.calculateRamSize(); + tvList.lockQueryList(); + try { + FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery(); + if (ownerQuery != null) { + long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes(); + if (deltaBytes > 0) { + ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes); + tvList.addReservedMemoryBytes(deltaBytes); + } + } + } finally { + tvList.unlockQueryList(); + } } } } @@ -274,6 +289,20 @@ public IPointReader getPointReader() { int queryLength = entry.getValue(); if (!tvList.isSorted() && queryLength > tvList.seqRowCount()) { tvList.sort(); + long tvListRamSize = tvList.calculateRamSize(); + tvList.lockQueryList(); + try { + FragmentInstanceContext ownerQuery = (FragmentInstanceContext) tvList.getOwnerQuery(); + if (ownerQuery != null) { + long deltaBytes = tvListRamSize - tvList.getReservedMemoryBytes(); + if (deltaBytes > 0) { + ownerQuery.getMemoryReservationContext().reserveMemoryCumulatively(deltaBytes); + tvList.addReservedMemoryBytes(deltaBytes); + } + } + } finally { + tvList.unlockQueryList(); + } } } TsBlock tsBlock = buildTsBlock(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java index 844adfb0e512..d96fb2f98154 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/AlignedTVList.java @@ -961,7 +961,7 @@ public TSDataType getDataType() { } @Override - public long calculateRamSize() { + public synchronized long calculateRamSize() { return timestamps.size() * alignedTvListArrayMemCost(); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java index 91a9b9206534..dd5f10f37d45 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/datastructure/TVList.java @@ -21,9 +21,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.utils.TestOnly; -import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext; import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext; -import org.apache.iotdb.db.queryengine.plan.planner.memory.MemoryReservationManager; import org.apache.iotdb.db.queryengine.plan.statement.component.Ordering; import org.apache.iotdb.db.service.metrics.WritingMetrics; import org.apache.iotdb.db.storageengine.dataregion.wal.buffer.WALEntryValue; @@ -79,8 +77,10 @@ public abstract class TVList implements WALEntryValue { // Index relation: arrayIndex -> elementIndex protected List bitMap; - // lock to provide synchronization for query list + // Guards queryContextSet, ownerQuery, and reservedMemoryBytes. + // Always acquire this lock before accessing/modifying these fields. private final ReentrantLock queryListLock = new ReentrantLock(); + // set of query that this TVList is used protected final Set queryContextSet; @@ -88,6 +88,9 @@ public abstract class TVList implements WALEntryValue { // When it is null, the TVList is owned by insert thread and released after flush. protected QueryContext ownerQuery; + // Reserved memory by the query. Ensure to acquire queryListLock before update. + protected long reservedMemoryBytes = 0L; + protected boolean sorted = true; protected long maxTime; protected long minTime; @@ -159,7 +162,7 @@ public static long tvListArrayMemCost(TSDataType type) { return size; } - public long calculateRamSize() { + public synchronized long calculateRamSize() { return timestamps.size() * tvListArrayMemCost(); } @@ -167,6 +170,18 @@ public synchronized boolean isSorted() { return sorted; } + public void setReservedMemoryBytes(long bytes) { + this.reservedMemoryBytes = bytes; + } + + public void addReservedMemoryBytes(long bytes) { + this.reservedMemoryBytes += bytes; + } + + public long getReservedMemoryBytes() { + return reservedMemoryBytes; + } + public abstract void sort(); public void increaseReferenceCount() { @@ -327,13 +342,6 @@ protected void set(int index, long timestamp, int valueIndex) { int offset = i * ARRAY_SIZE; Arrays.setAll(indices.get(i), j -> offset + j); } - // Reserve memory for indices if the TVList is owned by a query - if (ownerQuery != null) { - long indicesBytes = indices.size() * PrimitiveArrayManager.ARRAY_SIZE * 4L; - MemoryReservationManager memoryReservationManager = - ((FragmentInstanceContext) ownerQuery).getMemoryReservationContext(); - memoryReservationManager.reserveMemoryCumulatively(indicesBytes); - } } indices.get(arrayIndex)[elementIndex] = valueIndex; } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java index 298d442b52a2..61b24cd4ed96 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/fragment/FragmentInstanceExecutionTest.java @@ -226,9 +226,7 @@ public void testTVListCloneForQuery() { pointReader.nextTimeValuePair(); } assertTrue(tvList.isSorted()); - assertEquals( - tvList.calculateRamSize(), - fragmentInstanceContext1.getMemoryReservationContext().getReservedMemory()); + assertEquals(tvList.calculateRamSize(), tvList.getReservedMemoryBytes()); } catch (QueryProcessException | IOException | MetadataException