FastStreamingDenoiser.java
package com.morphiqlabs.wavelet.streaming;
import com.morphiqlabs.wavelet.modwt.streaming.MODWTStreamingDenoiser;
import com.morphiqlabs.wavelet.modwt.streaming.MODWTStreamingTransform;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
/**
* Fast implementation of streaming denoiser optimized for real-time processing.
*
* <p>This implementation prioritizes low latency over denoising quality,
* making it suitable for:</p>
* <ul>
* <li>Live audio processing</li>
* <li>Real-time sensor data filtering</li>
* <li>Low-latency financial data processing</li>
* </ul>
*/
final class FastStreamingDenoiser implements StreamingDenoiserStrategy {
private final MODWTStreamingDenoiser denoiser;
private final StreamingDenoiserConfig config;
private final SubmissionPublisher<double[]> publisher;
private final PerformanceProfile profile;
/**
* Calculates the optimal noise window size for fast streaming processing.
*
* <p>For fast processing, we limit the noise window to at most half the block size
* to minimize latency while maintaining reasonable noise estimation quality.
* The window size is clamped between 1 and the configured maximum.</p>
*
* @param config the streaming denoiser configuration
* @return the optimal noise window size in samples
*/
private static int calculateOptimalNoiseWindowSize(StreamingDenoiserConfig config) {
int maxAllowedSize = (int) Math.round(config.getBlockSize() / 2.0);
return Math.max(1, Math.min(config.getNoiseWindowSize(), maxAllowedSize));
}
FastStreamingDenoiser(StreamingDenoiserConfig config) {
this.config = config;
this.publisher = new SubmissionPublisher<>();
// Create MODWT denoiser with fast settings
this.denoiser = new MODWTStreamingDenoiser.Builder()
.wavelet(config.getWavelet())
.bufferSize(config.getBlockSize())
.thresholdMethod(config.getThresholdMethod())
.thresholdType(config.getThresholdType())
.thresholdMultiplier(config.getThresholdMultiplier())
.noiseEstimation(MODWTStreamingDenoiser.NoiseEstimation.MAD)
.noiseWindowSize(calculateOptimalNoiseWindowSize(config))
.build();
// Subscribe to denoiser output and forward to our subscribers
this.denoiser.subscribe(new Flow.Subscriber<double[]>() {
private Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(Long.MAX_VALUE);
}
@Override
public void onNext(double[] item) {
publisher.submit(item);
}
@Override
public void onError(Throwable throwable) {
publisher.closeExceptionally(throwable);
}
@Override
public void onComplete() {
publisher.close();
}
});
// Create performance profile
this.profile = PerformanceProfile.fastProfile(config.getBlockSize());
}
@Override
public void process(double[] samples) {
denoiser.denoise(samples);
}
@Override
public void flush() {
// Close the denoiser which will trigger onComplete on our subscriber
try {
denoiser.close();
} catch (Exception e) {
// Log or handle error if needed
}
}
@Override
public PerformanceProfile getPerformanceProfile() {
return profile;
}
@Override
public MODWTStreamingTransform.StreamingStatistics getStatistics() {
// Return basic statistics based on denoiser state
final long samples = denoiser.getSamplesProcessed();
final long blocks = samples / config.getBlockSize();
return new MODWTStreamingTransform.StreamingStatistics() {
@Override
public long getSamplesProcessed() {
return samples;
}
@Override
public long getBlocksProcessed() {
return blocks;
}
@Override
public long getAverageProcessingTimeNanos() {
// Estimate based on performance profile
return (long)(profile.expectedLatencyMicros() * 1000);
}
@Override
public long getMaxProcessingTimeNanos() {
// Estimate max as 2x average for fast implementation
return (long)(profile.expectedLatencyMicros() * 2000);
}
@Override
public long getMinProcessingTimeNanos() {
// Estimate min as 0.5x average for fast implementation
return (long)(profile.expectedLatencyMicros() * 500);
}
@Override
public double getThroughputSamplesPerSecond() {
// Samples per second estimate
return 1_000_000.0 / profile.expectedLatencyMicros();
}
@Override
public void reset() {
// Not implemented - statistics are read-only estimates
}
};
}
@Override
public void subscribe(Flow.Subscriber<? super double[]> subscriber) {
publisher.subscribe(subscriber);
}
@Override
public void close() {
denoiser.close();
publisher.close();
}
}