From 7c9c80d5a181b90f9429ea2018328c323e6ac8f2 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Mon, 29 Dec 2025 19:02:37 +0800 Subject: [PATCH 1/2] Fix memory leak when allocation failure in IoTConsensus queue. --- .../IoTConsensusMemoryManager.java | 11 ++ .../IoTConsensusMemoryManagerTest.java | 183 ++++++++++++++++++ 2 files changed, 194 insertions(+) create mode 100644 iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java index bb46f20486f51..22e5484f5a203 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManager.java @@ -98,6 +98,7 @@ private boolean reserve(long size, boolean fromQueue) { result = queueMemorySizeInByte.addAndGet(size) < maxMemorySizeForQueueInByte; if (!result) { queueMemorySizeInByte.addAndGet(-size); + memorySizeInByte.addAndGet(-size); } } else { syncMemorySizeInByte.addAndGet(size); @@ -172,6 +173,16 @@ long getSyncMemorySizeInByte() { return syncMemorySizeInByte.get(); } + @TestOnly + public Long getMaxMemorySizeInByte() { + return maxMemorySizeInByte; + } + + @TestOnly + public Long getMaxMemorySizeForQueueInByte() { + return maxMemorySizeForQueueInByte; + } + private static final IoTConsensusMemoryManager INSTANCE = new IoTConsensusMemoryManager(); public static IoTConsensusMemoryManager getInstance() { diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java new file mode 100644 index 0000000000000..d68cf983fcd86 --- /dev/null +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.consensus.iot.logdispatcher; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; +import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; +import org.apache.iotdb.consensus.config.IoTConsensusConfig; +import org.apache.iotdb.consensus.iot.thrift.TLogEntry; +import org.junit.Test; + +public class IoTConsensusMemoryManagerTest { + + @Test + public void testAllocateQueue() { + IoTConsensusMemoryManager memoryManager = IoTConsensusMemoryManager.getInstance(); + long maxMemory = memoryManager.getMaxMemorySizeForQueueInByte(); + + long occupiedMemory = 0; + IndexedConsensusRequest request; + List requestList = new ArrayList<>(); + while (occupiedMemory <= maxMemory) { + request = new IndexedConsensusRequest(0, Collections.singletonList(new ByteBufferConsensusRequest( + ByteBuffer.wrap(new byte[4 * 1024 * 1024])))); + request.buildSerializedRequests(); + long requestSize = request.getMemorySize(); + if (occupiedMemory + requestSize < maxMemory) { + boolean reserved = memoryManager.reserve(request); + assertTrue(reserved); + occupiedMemory += requestSize; + assertEquals(occupiedMemory, memoryManager.getQueueMemorySizeInByte()); + assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte()); + requestList.add(request); + } else { + assertFalse(memoryManager.reserve(request)); + break; + } + } + assertTrue(memoryManager.getMemorySizeInByte() <= maxMemory); + + for (IndexedConsensusRequest indexedConsensusRequest : requestList) { + memoryManager.free(indexedConsensusRequest); + occupiedMemory -= indexedConsensusRequest.getMemorySize(); + assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte()); + assertEquals(occupiedMemory, memoryManager.getQueueMemorySizeInByte()); + } + } + + @Test + public void testAllocateBatch() { + IoTConsensusMemoryManager memoryManager = IoTConsensusMemoryManager.getInstance(); + long maxMemory = memoryManager.getQueueMemorySizeInByte(); + + long occupiedMemory = 0; + + Batch batch; + int batchSize = 5; + List batchList = new ArrayList<>(); + while (occupiedMemory < maxMemory) { + batch = new Batch(IoTConsensusConfig.newBuilder().build()); + for (int i = 0; i < batchSize; i++) { + IndexedConsensusRequest request; + request = new IndexedConsensusRequest(0, Collections.singletonList(new ByteBufferConsensusRequest( + ByteBuffer.wrap(new byte[1024 * 1024])))); + batch.addTLogEntry(new TLogEntry(request.getSerializedRequests(), request.getSearchIndex(), false, request.getMemorySize())); + } + + long requestSize = batch.getMemorySize(); + if (occupiedMemory + requestSize < maxMemory) { + assertTrue(memoryManager.reserve(batch)); + occupiedMemory += requestSize; + assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte()); + batchList.add(batch); + } else { + assertFalse(memoryManager.reserve(batch)); + } + } + assertTrue(memoryManager.getMemorySizeInByte() <= maxMemory); + + for (Batch b : batchList) { + memoryManager.free(b); + occupiedMemory -= b.getMemorySize(); + assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte()); + } + } + + @Test + public void testAllocateMixed() { + IoTConsensusMemoryManager memoryManager = IoTConsensusMemoryManager.getInstance(); + long maxMemory = memoryManager.getMaxMemorySizeForQueueInByte(); + + long occupiedMemory = 0; + IndexedConsensusRequest request; + List requestList = new ArrayList<>(); + Batch batch; + int batchSize = 5; + List batchList = new ArrayList<>(); + + int i = 0; + while (occupiedMemory <= maxMemory) { + if (i % 2 == 0) { + request = new IndexedConsensusRequest(0, Collections.singletonList(new ByteBufferConsensusRequest( + ByteBuffer.wrap(new byte[4 * 1024 * 1024])))); + request.buildSerializedRequests(); + long requestSize = request.getMemorySize(); + if (occupiedMemory + requestSize < maxMemory) { + boolean reserved = memoryManager.reserve(request); + assertTrue(reserved); + occupiedMemory += requestSize; + assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte()); + requestList.add(request); + } else { + assertFalse(memoryManager.reserve(request)); + break; + } + } else { + batch = new Batch(IoTConsensusConfig.newBuilder().build()); + for (int j = 0; j < batchSize; j++) { + IndexedConsensusRequest batchRequest; + batchRequest = new IndexedConsensusRequest(0, Collections.singletonList(new ByteBufferConsensusRequest( + ByteBuffer.wrap(new byte[1024 * 1024])))); + batch.addTLogEntry(new TLogEntry(batchRequest.getSerializedRequests(), batchRequest.getSearchIndex(), false, batchRequest.getMemorySize())); + } + + long requestSize = batch.getMemorySize(); + if (occupiedMemory + requestSize < maxMemory) { + assertTrue(memoryManager.reserve(batch)); + occupiedMemory += requestSize; + assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte()); + batchList.add(batch); + } else { + assertFalse(memoryManager.reserve(batch)); + } + } + i ++; + } + assertTrue(memoryManager.getMemorySizeInByte() <= maxMemory); + + while (!requestList.isEmpty() || !batchList.isEmpty()) { + if (!requestList.isEmpty()) { + request = requestList.remove(0); + memoryManager.free(request); + occupiedMemory -= request.getMemorySize(); + assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte()); + i --; + } + if (!batchList.isEmpty()) { + batch = batchList.remove(0); + memoryManager.free(batch); + occupiedMemory -= batch.getMemorySize(); + assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte()); + i --; + } + } + assertEquals(0, i); + assertEquals(0, memoryManager.getMemorySizeInByte()); + assertEquals(0, memoryManager.getQueueMemorySizeInByte()); + } +} \ No newline at end of file From f55d6b8c3df8bfa0b49385faf8b75c455eccc417 Mon Sep 17 00:00:00 2001 From: Tian Jiang Date: Mon, 29 Dec 2025 19:11:46 +0800 Subject: [PATCH 2/2] spotless --- .../IoTConsensusMemoryManagerTest.java | 68 +++++++++++++------ 1 file changed, 46 insertions(+), 22 deletions(-) diff --git a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java index d68cf983fcd86..f87d8cd7f9887 100644 --- a/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java +++ b/iotdb-core/consensus/src/test/java/org/apache/iotdb/consensus/iot/logdispatcher/IoTConsensusMemoryManagerTest.java @@ -19,20 +19,22 @@ package org.apache.iotdb.consensus.iot.logdispatcher; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Collections; -import java.util.List; import org.apache.iotdb.consensus.common.request.ByteBufferConsensusRequest; import org.apache.iotdb.consensus.common.request.IndexedConsensusRequest; import org.apache.iotdb.consensus.config.IoTConsensusConfig; import org.apache.iotdb.consensus.iot.thrift.TLogEntry; + import org.junit.Test; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + public class IoTConsensusMemoryManagerTest { @Test @@ -44,8 +46,11 @@ public void testAllocateQueue() { IndexedConsensusRequest request; List requestList = new ArrayList<>(); while (occupiedMemory <= maxMemory) { - request = new IndexedConsensusRequest(0, Collections.singletonList(new ByteBufferConsensusRequest( - ByteBuffer.wrap(new byte[4 * 1024 * 1024])))); + request = + new IndexedConsensusRequest( + 0, + Collections.singletonList( + new ByteBufferConsensusRequest(ByteBuffer.wrap(new byte[4 * 1024 * 1024])))); request.buildSerializedRequests(); long requestSize = request.getMemorySize(); if (occupiedMemory + requestSize < maxMemory) { @@ -84,9 +89,17 @@ public void testAllocateBatch() { batch = new Batch(IoTConsensusConfig.newBuilder().build()); for (int i = 0; i < batchSize; i++) { IndexedConsensusRequest request; - request = new IndexedConsensusRequest(0, Collections.singletonList(new ByteBufferConsensusRequest( - ByteBuffer.wrap(new byte[1024 * 1024])))); - batch.addTLogEntry(new TLogEntry(request.getSerializedRequests(), request.getSearchIndex(), false, request.getMemorySize())); + request = + new IndexedConsensusRequest( + 0, + Collections.singletonList( + new ByteBufferConsensusRequest(ByteBuffer.wrap(new byte[1024 * 1024])))); + batch.addTLogEntry( + new TLogEntry( + request.getSerializedRequests(), + request.getSearchIndex(), + false, + request.getMemorySize())); } long requestSize = batch.getMemorySize(); @@ -123,8 +136,11 @@ public void testAllocateMixed() { int i = 0; while (occupiedMemory <= maxMemory) { if (i % 2 == 0) { - request = new IndexedConsensusRequest(0, Collections.singletonList(new ByteBufferConsensusRequest( - ByteBuffer.wrap(new byte[4 * 1024 * 1024])))); + request = + new IndexedConsensusRequest( + 0, + Collections.singletonList( + new ByteBufferConsensusRequest(ByteBuffer.wrap(new byte[4 * 1024 * 1024])))); request.buildSerializedRequests(); long requestSize = request.getMemorySize(); if (occupiedMemory + requestSize < maxMemory) { @@ -141,9 +157,17 @@ public void testAllocateMixed() { batch = new Batch(IoTConsensusConfig.newBuilder().build()); for (int j = 0; j < batchSize; j++) { IndexedConsensusRequest batchRequest; - batchRequest = new IndexedConsensusRequest(0, Collections.singletonList(new ByteBufferConsensusRequest( - ByteBuffer.wrap(new byte[1024 * 1024])))); - batch.addTLogEntry(new TLogEntry(batchRequest.getSerializedRequests(), batchRequest.getSearchIndex(), false, batchRequest.getMemorySize())); + batchRequest = + new IndexedConsensusRequest( + 0, + Collections.singletonList( + new ByteBufferConsensusRequest(ByteBuffer.wrap(new byte[1024 * 1024])))); + batch.addTLogEntry( + new TLogEntry( + batchRequest.getSerializedRequests(), + batchRequest.getSearchIndex(), + false, + batchRequest.getMemorySize())); } long requestSize = batch.getMemorySize(); @@ -156,7 +180,7 @@ public void testAllocateMixed() { assertFalse(memoryManager.reserve(batch)); } } - i ++; + i++; } assertTrue(memoryManager.getMemorySizeInByte() <= maxMemory); @@ -166,18 +190,18 @@ public void testAllocateMixed() { memoryManager.free(request); occupiedMemory -= request.getMemorySize(); assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte()); - i --; + i--; } if (!batchList.isEmpty()) { batch = batchList.remove(0); memoryManager.free(batch); occupiedMemory -= batch.getMemorySize(); assertEquals(occupiedMemory, memoryManager.getMemorySizeInByte()); - i --; + i--; } } assertEquals(0, i); assertEquals(0, memoryManager.getMemorySizeInByte()); assertEquals(0, memoryManager.getQueueMemorySizeInByte()); } -} \ No newline at end of file +}