Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 7 additions & 9 deletions core/src/main/java/org/apache/accumulo/core/conf/Property.java
Original file line number Diff line number Diff line change
Expand Up @@ -1611,16 +1611,15 @@ public static boolean isValidTablePropertyKey(String key) {
// MANAGER options
MANAGER_THREADCHECK, MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL, MANAGER_METADATA_SUSPENDABLE,
MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT, MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT,
MANAGER_CLIENTPORT, MANAGER_MINTHREADS, MANAGER_MINTHREADS_TIMEOUT,
MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME, MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE,
MANAGER_TABLET_REFRESH_MINTHREADS, MANAGER_TABLET_REFRESH_MAXTHREADS,
MANAGER_TABLET_MERGEABILITY_INTERVAL, MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX,
MANAGER_CLIENTPORT, MANAGER_MINTHREADS_TIMEOUT, MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME,
MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, MANAGER_TABLET_MERGEABILITY_INTERVAL,
MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX,

// SSERV options
SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT, SSERV_THREADCHECK, SSERV_CLIENTPORT,
SSERV_PORTSEARCH, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, SSERV_SUMMARYCACHE_SIZE,
SSERV_DEFAULT_BLOCKSIZE, SSERV_SCAN_REFERENCE_EXPIRATION_TIME,
SSERV_CACHED_TABLET_METADATA_EXPIRATION, SSERV_MINTHREADS, SSERV_MINTHREADS_TIMEOUT,
SSERV_CACHED_TABLET_METADATA_EXPIRATION, SSERV_MINTHREADS_TIMEOUT,
SSERV_WAL_SORT_MAX_CONCURRENT, SSERV_GROUP_NAME,

// TSERV options
Expand All @@ -1631,8 +1630,8 @@ public static boolean isValidTablePropertyKey(String key) {
TSERV_LOG_BUSY_TABLETS_COUNT, TSERV_LOG_BUSY_TABLETS_INTERVAL, TSERV_WAL_SORT_MAX_CONCURRENT,
TSERV_SLOW_FILEPERMIT_MILLIS, TSERV_WAL_BLOCKSIZE, TSERV_CLIENTPORT, TSERV_PORTSEARCH,
TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE, TSERV_SUMMARYCACHE_SIZE, TSERV_DEFAULT_BLOCKSIZE,
TSERV_MINTHREADS, TSERV_MINTHREADS_TIMEOUT, TSERV_NATIVEMAP_ENABLED, TSERV_MAXMEM,
TSERV_SCAN_MAX_OPENFILES, TSERV_ONDEMAND_UNLOADER_INTERVAL, TSERV_GROUP_NAME,
TSERV_MINTHREADS_TIMEOUT, TSERV_NATIVEMAP_ENABLED, TSERV_MAXMEM, TSERV_SCAN_MAX_OPENFILES,
TSERV_ONDEMAND_UNLOADER_INTERVAL, TSERV_GROUP_NAME,

// GC options
GC_CANDIDATE_BATCH_SIZE, GC_CYCLE_START, GC_PORT,
Expand All @@ -1643,8 +1642,7 @@ public static boolean isValidTablePropertyKey(String key) {

// COMPACTOR options
COMPACTOR_CANCEL_CHECK_INTERVAL, COMPACTOR_CLIENTPORT, COMPACTOR_THREADCHECK,
COMPACTOR_PORTSEARCH, COMPACTOR_MINTHREADS, COMPACTOR_MINTHREADS_TIMEOUT,
COMPACTOR_GROUP_NAME,
COMPACTOR_PORTSEARCH, COMPACTOR_MINTHREADS_TIMEOUT, COMPACTOR_GROUP_NAME,

// COMPACTION_COORDINATOR options
COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,43 @@ public static void ensureRunning(ScheduledFuture<?> future, String message) {
}
}

/**
* Resize ThreadPoolExecutor based on current value of minThreads
*
* @param pool the ThreadPoolExecutor to modify
* @param minThreads supplier of minThreads value
* @param poolName name of the thread pool
*/
public static void resizeCorePool(final ThreadPoolExecutor pool, final IntSupplier minThreads,
String poolName) {
int count = pool.getPoolSize();
int newCount = minThreads.getAsInt();
if (count == newCount) {
return;
}
LOG.info("Changing min threads for {} from {} to {}", poolName, count, newCount);
if (newCount < count) {
pool.setCorePoolSize(newCount);
} else {
if (newCount > pool.getMaximumPoolSize()) {
pool.setMaximumPoolSize(newCount);
}
pool.setCorePoolSize(newCount);
}
}

/**
* Resize ThreadPoolExecutor based on current value of Property p
*
* @param pool the ThreadPoolExecutor to modify
* @param conf the AccumuloConfiguration
* @param p the property to base the size from
*/
public static void resizeCorePool(final ThreadPoolExecutor pool, final AccumuloConfiguration conf,
final Property p) {
resizeCorePool(pool, () -> conf.getCount(p), p.getKey());
}

/**
* Resize ThreadPoolExecutor based on current value of maxThreads
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -262,6 +263,7 @@ void setTserverStatus(LiveTServersSnapshot snapshot,
private final long timeToCacheRecoveryWalExistence;
private ExecutorService tableInformationStatusPool = null;
private ThreadPoolExecutor tabletRefreshThreadPool;
private ScheduledExecutorService refreshCheckerScheduler;

private final TabletStateStore rootTabletStore;
private final TabletStateStore metadataTabletStore;
Expand Down Expand Up @@ -905,6 +907,24 @@ private void checkForHeldServer(SortedMap<TServerInstance,TabletServerStatus> ts
return info;
}

private class RefreshThreads implements Runnable {
@Override
public void run() {
ThreadPools.resizeCorePool(tabletRefreshThreadPool, getConfiguration(),
Property.MANAGER_TABLET_REFRESH_MINTHREADS);
ThreadPools.resizePool(tabletRefreshThreadPool, getConfiguration(),
Property.MANAGER_TABLET_REFRESH_MAXTHREADS);

}
}

public void startRefreshChecker() {
refreshCheckerScheduler = ThreadPools.getServerThreadPools().createScheduledExecutorService(1,
"ManagerTabletRefreshThreadsExecutor");
ThreadPools.watchNonCriticalScheduledTask(
refreshCheckerScheduler.scheduleAtFixedRate(new RefreshThreads(), 5, 5, MINUTES));
}

@Override
public void run() {
final ServerContext context = getContext();
Expand Down Expand Up @@ -968,6 +988,8 @@ public void run() {
.numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS))
.build();

startRefreshChecker();

Thread statusThread = Threads.createCriticalThread("Status Thread", new StatusThread());
statusThread.start();

Expand Down Expand Up @@ -1235,6 +1257,7 @@ boolean canSuspendTablets() {

tableInformationStatusPool.shutdownNow();
tabletRefreshThreadPool.shutdownNow();
refreshCheckerScheduler.shutdown();

compactionCoordinator.shutdown();

Expand Down