ThreadLocalManager.java
package com.morphiqlabs.wavelet.util;
import java.lang.ref.WeakReference;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
/**
* Centralized management for ThreadLocal instances to prevent memory leaks.
*
* <p>This class provides lifecycle management for ThreadLocal variables used
* throughout the VectorWave library. It ensures proper cleanup in thread pool
* environments where threads are reused.</p>
*
* <p>Key features:</p>
* <ul>
* <li>Automatic registration of ThreadLocal instances</li>
* <li>Cleanup all registered ThreadLocals for current thread</li>
* <li>Try-with-resources support for scoped cleanup</li>
* <li>Memory leak detection and warnings</li>
* </ul>
*/
public final class ThreadLocalManager {
private ThreadLocalManager() { throw new AssertionError("No instances"); }
private static final System.Logger LOG = com.morphiqlabs.wavelet.util.Logging.getLogger(ThreadLocalManager.class);
// Track all managed ThreadLocal instances
private static final Set<WeakReference<ManagedThreadLocal<?>>> REGISTERED_LOCALS =
ConcurrentHashMap.newKeySet();
// Flag to enable memory leak detection
private static volatile boolean LEAK_DETECTION_ENABLED =
Boolean.parseBoolean(System.getProperty("vectorwave.threadlocal.leak.detection", "true"));
// Thread-local flag to track if cleanup has been performed
private static final ThreadLocal<Boolean> CLEANUP_PERFORMED =
ThreadLocal.withInitial(() -> false);
/**
* Creates a managed ThreadLocal that automatically registers for cleanup.
*
* @param <T> The type of the thread-local value
* @param supplier Supplier for initial values
* @return A managed ThreadLocal instance
*/
public static <T> ManagedThreadLocal<T> withInitial(Supplier<T> supplier) {
ManagedThreadLocal<T> local = new ManagedThreadLocal<>(supplier);
REGISTERED_LOCALS.add(new WeakReference<>(local));
return local;
}
/**
* Cleans up all registered ThreadLocal instances for the current thread.
* This should be called when a thread is done processing to prevent memory leaks.
*/
public static void cleanupCurrentThread() {
int cleanedCount = 0;
// Clean up all registered ThreadLocals
var iterator = REGISTERED_LOCALS.iterator();
while (iterator.hasNext()) {
WeakReference<ManagedThreadLocal<?>> ref = iterator.next();
ManagedThreadLocal<?> local = ref.get();
if (local == null) {
// WeakReference has been garbage collected
iterator.remove();
} else {
local.removeForCurrentThread();
cleanedCount++;
}
}
CLEANUP_PERFORMED.set(true);
if (LOG.isLoggable(System.Logger.Level.DEBUG)) {
LOG.log(System.Logger.Level.DEBUG, "Cleaned up " + cleanedCount +
" ThreadLocal instances for thread: " + Thread.currentThread().getName());
}
}
/**
* Checks if cleanup has been performed for the current thread.
*
* @return true if cleanup was performed
*/
public static boolean isCleanupPerformed() {
return CLEANUP_PERFORMED.get();
}
/**
* Resets the cleanup flag for the current thread.
* Useful for thread pool scenarios where threads are reused.
*/
public static void resetCleanupFlag() {
CLEANUP_PERFORMED.set(false);
}
/**
* Creates a cleanup scope that automatically cleans up ThreadLocals on close.
*
* @return A CleanupScope for use with try-with-resources
*/
public static CleanupScope createScope() {
return new CleanupScope();
}
/**
* Registers an existing ThreadLocal for management.
* Useful for integrating with legacy code.
*
* @param threadLocal The ThreadLocal to manage
*/
public static void register(ThreadLocal<?> threadLocal) {
if (threadLocal instanceof ManagedThreadLocal) {
REGISTERED_LOCALS.add(new WeakReference<>((ManagedThreadLocal<?>) threadLocal));
} else {
// Wrap in a managed proxy
ManagedThreadLocal<?> wrapper = new ManagedThreadLocal<Object>(() -> null) {
@Override
public void removeForCurrentThread() {
threadLocal.remove();
}
};
REGISTERED_LOCALS.add(new WeakReference<>(wrapper));
}
}
/**
* Gets statistics about ThreadLocal usage.
*
* @return Usage statistics
*/
public static ThreadLocalStats getStats() {
int registeredCount = 0;
int activeCount = 0;
for (WeakReference<ManagedThreadLocal<?>> ref : REGISTERED_LOCALS) {
ManagedThreadLocal<?> local = ref.get();
if (local != null) {
registeredCount++;
if (local.hasValueForCurrentThread()) {
activeCount++;
}
}
}
return new ThreadLocalStats(registeredCount, activeCount, CLEANUP_PERFORMED.get());
}
/**
* Enables or disables memory leak detection.
*
* @param enabled true to enable leak detection
*/
public static void setLeakDetectionEnabled(boolean enabled) {
LEAK_DETECTION_ENABLED = enabled;
LOG.log(System.Logger.Level.INFO, () -> "Leak detection " + (enabled ? "enabled" : "disabled"));
}
/**
* Checks if leak detection is currently enabled.
*
* @return true if leak detection is enabled
*/
public static boolean isLeakDetectionEnabled() {
return LEAK_DETECTION_ENABLED;
}
/**
* A ThreadLocal that can be explicitly managed and cleaned up.
*
* @param <T> The type of the thread-local value
*/
public static class ManagedThreadLocal<T> extends ThreadLocal<T> {
private final Supplier<T> supplier;
private final AtomicBoolean removed = new AtomicBoolean(false);
/**
* ThreadLocal to track whether a value has been explicitly set.
* This avoids the side effects of calling get() to check existence.
*/
private final ThreadLocal<Boolean> isSet = ThreadLocal.withInitial(() -> false);
private ManagedThreadLocal(Supplier<T> supplier) {
this.supplier = supplier;
}
@Override
protected T initialValue() {
return supplier.get();
}
@Override
public void set(T value) {
super.set(value);
isSet.set(true);
}
@Override
public T get() {
T value = super.get();
// If get() returns a value due to initialization, mark it as set
if (!isSet.get() && value != null) {
isSet.set(true);
}
return value;
}
/**
* Removes the value for the current thread.
*/
public void removeForCurrentThread() {
super.remove();
isSet.remove();
}
/**
* Checks if this ThreadLocal has a value for the current thread.
* This method has no side effects and doesn't trigger initialization.
*
* @return true if a value has been set or initialized for the current thread
*/
public boolean hasValueForCurrentThread() {
return isSet.get();
}
@Override
public void remove() {
super.remove();
isSet.remove();
removed.set(true);
}
/**
* Checks if this ThreadLocal has been removed.
*
* @return true if removed
*/
public boolean isRemoved() {
return removed.get();
}
}
/**
* AutoCloseable scope for automatic ThreadLocal cleanup.
* Use with try-with-resources for guaranteed cleanup.
*/
public static class CleanupScope implements AutoCloseable {
private final long startTime;
private boolean closed = false;
private CleanupScope() {
this.startTime = System.nanoTime();
resetCleanupFlag();
}
@Override
public void close() {
if (!closed) {
closed = true;
cleanupCurrentThread();
if (LOG.isLoggable(System.Logger.Level.DEBUG)) {
long duration = System.nanoTime() - startTime;
LOG.log(System.Logger.Level.DEBUG, () -> "ThreadLocal scope lasted " + (duration / 1_000_000) + " ms");
}
}
}
/**
* Checks if this scope has been closed.
*
* @return true if closed
*/
public boolean isClosed() {
return closed;
}
}
/**
* Thread-local usage statistics.
*
* @param registeredCount number of registered ThreadLocals
* @param activeCount number of ThreadLocals with values for current thread
* @param cleanupPerformed whether cleanup has been performed
*/
public record ThreadLocalStats(
int registeredCount,
int activeCount,
boolean cleanupPerformed
) {
/**
* Gets a summary of the statistics.
*
* @return Human-readable summary
*/
public String summary() {
return String.format(
"ThreadLocal Stats: %d registered, %d active, cleanup %s",
registeredCount, activeCount,
cleanupPerformed ? "performed" : "pending"
);
}
/**
* Checks if there might be a memory leak.
*
* @return true if potential leak detected
*/
public boolean hasPotentialLeak() {
return activeCount > 0 && !cleanupPerformed;
}
}
/**
* Utility method to wrap operations with automatic cleanup.
*
* @param <T> The return type
* @param operation The operation to perform
* @return The result of the operation
* @throws Exception if the operation throws
*/
public static <T> T withCleanup(ThrowingSupplier<T> operation) throws Exception {
CleanupScope scope = createScope();
try {
return operation.get();
} finally {
scope.close();
}
}
/**
* Utility method to wrap void operations with automatic cleanup.
*
* @param operation The operation to perform
* @throws Exception if the operation throws
*/
public static void withCleanup(ThrowingRunnable operation) throws Exception {
CleanupScope scope = createScope();
try {
operation.run();
} finally {
scope.close();
}
}
/**
* Functional interface for operations that may throw exceptions.
*
* @param <T> The return type
*/
@FunctionalInterface
public interface ThrowingSupplier<T> {
T get() throws Exception;
}
/**
* Functional interface for void operations that may throw exceptions.
*/
@FunctionalInterface
public interface ThrowingRunnable {
void run() throws Exception;
}
}