diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index b63b6a0d9f3..5163074dc0d 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -1611,16 +1611,15 @@ public static boolean isValidTablePropertyKey(String key) { // MANAGER options MANAGER_THREADCHECK, MANAGER_FATE_METRICS_MIN_UPDATE_INTERVAL, MANAGER_METADATA_SUSPENDABLE, MANAGER_STARTUP_TSERVER_AVAIL_MIN_COUNT, MANAGER_STARTUP_TSERVER_AVAIL_MAX_WAIT, - MANAGER_CLIENTPORT, MANAGER_MINTHREADS, MANAGER_MINTHREADS_TIMEOUT, - MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME, MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, - MANAGER_TABLET_REFRESH_MINTHREADS, MANAGER_TABLET_REFRESH_MAXTHREADS, - MANAGER_TABLET_MERGEABILITY_INTERVAL, MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX, + MANAGER_CLIENTPORT, MANAGER_MINTHREADS_TIMEOUT, MANAGER_RECOVERY_WAL_EXISTENCE_CACHE_TIME, + MANAGER_COMPACTION_SERVICE_PRIORITY_QUEUE_SIZE, MANAGER_TABLET_MERGEABILITY_INTERVAL, + MANAGER_FATE_CONDITIONAL_WRITER_THREADS_MAX, // SSERV options SSERV_CACHED_TABLET_METADATA_REFRESH_PERCENT, SSERV_THREADCHECK, SSERV_CLIENTPORT, SSERV_PORTSEARCH, SSERV_DATACACHE_SIZE, SSERV_INDEXCACHE_SIZE, SSERV_SUMMARYCACHE_SIZE, SSERV_DEFAULT_BLOCKSIZE, SSERV_SCAN_REFERENCE_EXPIRATION_TIME, - SSERV_CACHED_TABLET_METADATA_EXPIRATION, SSERV_MINTHREADS, SSERV_MINTHREADS_TIMEOUT, + SSERV_CACHED_TABLET_METADATA_EXPIRATION, SSERV_MINTHREADS_TIMEOUT, SSERV_WAL_SORT_MAX_CONCURRENT, SSERV_GROUP_NAME, // TSERV options @@ -1631,8 +1630,8 @@ public static boolean isValidTablePropertyKey(String key) { TSERV_LOG_BUSY_TABLETS_COUNT, TSERV_LOG_BUSY_TABLETS_INTERVAL, TSERV_WAL_SORT_MAX_CONCURRENT, TSERV_SLOW_FILEPERMIT_MILLIS, TSERV_WAL_BLOCKSIZE, TSERV_CLIENTPORT, TSERV_PORTSEARCH, TSERV_DATACACHE_SIZE, TSERV_INDEXCACHE_SIZE, TSERV_SUMMARYCACHE_SIZE, TSERV_DEFAULT_BLOCKSIZE, - TSERV_MINTHREADS, TSERV_MINTHREADS_TIMEOUT, TSERV_NATIVEMAP_ENABLED, TSERV_MAXMEM, - TSERV_SCAN_MAX_OPENFILES, TSERV_ONDEMAND_UNLOADER_INTERVAL, TSERV_GROUP_NAME, + TSERV_MINTHREADS_TIMEOUT, TSERV_NATIVEMAP_ENABLED, TSERV_MAXMEM, TSERV_SCAN_MAX_OPENFILES, + TSERV_ONDEMAND_UNLOADER_INTERVAL, TSERV_GROUP_NAME, // GC options GC_CANDIDATE_BATCH_SIZE, GC_CYCLE_START, GC_PORT, @@ -1643,8 +1642,7 @@ public static boolean isValidTablePropertyKey(String key) { // COMPACTOR options COMPACTOR_CANCEL_CHECK_INTERVAL, COMPACTOR_CLIENTPORT, COMPACTOR_THREADCHECK, - COMPACTOR_PORTSEARCH, COMPACTOR_MINTHREADS, COMPACTOR_MINTHREADS_TIMEOUT, - COMPACTOR_GROUP_NAME, + COMPACTOR_PORTSEARCH, COMPACTOR_MINTHREADS_TIMEOUT, COMPACTOR_GROUP_NAME, // COMPACTION_COORDINATOR options COMPACTION_COORDINATOR_DEAD_COMPACTOR_CHECK_INTERVAL, diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java index 6eb22ff58bc..7c21119848a 100644 --- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java +++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java @@ -219,6 +219,43 @@ public static void ensureRunning(ScheduledFuture future, String message) { } } + /** + * Resize ThreadPoolExecutor based on current value of minThreads + * + * @param pool the ThreadPoolExecutor to modify + * @param minThreads supplier of minThreads value + * @param poolName name of the thread pool + */ + public static void resizeCorePool(final ThreadPoolExecutor pool, final IntSupplier minThreads, + String poolName) { + int count = pool.getPoolSize(); + int newCount = minThreads.getAsInt(); + if (count == newCount) { + return; + } + LOG.info("Changing min threads for {} from {} to {}", poolName, count, newCount); + if (newCount < count) { + pool.setCorePoolSize(newCount); + } else { + if (newCount > pool.getMaximumPoolSize()) { + pool.setMaximumPoolSize(newCount); + } + pool.setCorePoolSize(newCount); + } + } + + /** + * Resize ThreadPoolExecutor based on current value of Property p + * + * @param pool the ThreadPoolExecutor to modify + * @param conf the AccumuloConfiguration + * @param p the property to base the size from + */ + public static void resizeCorePool(final ThreadPoolExecutor pool, final AccumuloConfiguration conf, + final Property p) { + resizeCorePool(pool, () -> conf.getCount(p), p.getKey()); + } + /** * Resize ThreadPoolExecutor based on current value of maxThreads * diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java index a972b4e13b5..2759de71951 100644 --- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java +++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java @@ -50,6 +50,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -262,6 +263,7 @@ void setTserverStatus(LiveTServersSnapshot snapshot, private final long timeToCacheRecoveryWalExistence; private ExecutorService tableInformationStatusPool = null; private ThreadPoolExecutor tabletRefreshThreadPool; + private ScheduledExecutorService refreshCheckerScheduler; private final TabletStateStore rootTabletStore; private final TabletStateStore metadataTabletStore; @@ -905,6 +907,24 @@ private void checkForHeldServer(SortedMap ts return info; } + private class RefreshThreads implements Runnable { + @Override + public void run() { + ThreadPools.resizeCorePool(tabletRefreshThreadPool, getConfiguration(), + Property.MANAGER_TABLET_REFRESH_MINTHREADS); + ThreadPools.resizePool(tabletRefreshThreadPool, getConfiguration(), + Property.MANAGER_TABLET_REFRESH_MAXTHREADS); + + } + } + + public void startRefreshChecker() { + refreshCheckerScheduler = ThreadPools.getServerThreadPools().createScheduledExecutorService(1, + "ManagerTabletRefreshThreadsExecutor"); + ThreadPools.watchNonCriticalScheduledTask( + refreshCheckerScheduler.scheduleAtFixedRate(new RefreshThreads(), 5, 5, MINUTES)); + } + @Override public void run() { final ServerContext context = getContext(); @@ -968,6 +988,8 @@ public void run() { .numMaxThreads(getConfiguration().getCount(Property.MANAGER_TABLET_REFRESH_MAXTHREADS)) .build(); + startRefreshChecker(); + Thread statusThread = Threads.createCriticalThread("Status Thread", new StatusThread()); statusThread.start(); @@ -1235,6 +1257,7 @@ boolean canSuspendTablets() { tableInformationStatusPool.shutdownNow(); tabletRefreshThreadPool.shutdownNow(); + refreshCheckerScheduler.shutdown(); compactionCoordinator.shutdown();