SimpleStreamingAnalyzer.java

package com.morphiqlabs.wavelet.cwt.finance;

import com.morphiqlabs.wavelet.cwt.*;
import com.morphiqlabs.wavelet.cwt.finance.FinancialWaveletAnalyzer.*;

import java.util.*;
import java.util.function.Consumer;

/**
 * Simple streaming financial analyzer optimized for minimal memory usage.
 * 
 * <p>This analyzer processes financial data in a streaming fashion using
 * fixed-size sliding windows and incremental calculations.</p>
 */
public class SimpleStreamingAnalyzer {
    
    /**
     * Streaming result emitted for each update.
     *
     * @param index sample index of this result
     * @param price current price
     * @param instantVolatility instantaneous volatility
     * @param avgVolatility average volatility over window
     * @param regime detected market regime
     * @param riskLevel current risk level
     * @param signal optional trading signal
     */
    public record StreamingResult(
        int index,
        double price,
        double instantVolatility,
        double avgVolatility,
        MarketRegime regime,
        double riskLevel,
        Optional<TradingSignal> signal
    ) {}
    
    // Configuration
    private final int windowSize;
    private final int updateInterval;
    private final FinancialAnalysisParameters parameters;
    private final FinancialAnalysisObjectPool pool;
    
    // Circular buffers (simple arrays with index tracking)
    private final double[] priceWindow;
    private final double[] volatilityWindow;
    private int windowIndex = 0;
    private int samplesProcessed = 0;
    
    // Running statistics
    private double volatilitySum = 0;
    private double volatilityMean = 0;
    private MarketRegime currentRegime = MarketRegime.RANGING;
    
    // Wavelets
    private final PaulWavelet crashWavelet;
    private final CWTTransform crashTransform;
    
    // Result consumer
    private Consumer<StreamingResult> resultConsumer;
    
    /**
     * Creates a simple streaming analyzer.
     * 
     * @param windowSize size of sliding window
     * @param updateInterval how often to emit results
     */
    public SimpleStreamingAnalyzer(int windowSize, int updateInterval) {
        this.windowSize = windowSize;
        this.updateInterval = updateInterval;
        this.parameters = FinancialAnalysisParameters.defaultParameters();
        this.pool = new FinancialAnalysisObjectPool();
        
        this.priceWindow = new double[windowSize];
        this.volatilityWindow = new double[windowSize];
        
        this.crashWavelet = new PaulWavelet(4);
        this.crashTransform = new CWTTransform(crashWavelet);
    }
    
    /**
     * Set the result consumer.
     */
    public void onResult(Consumer<StreamingResult> consumer) {
        this.resultConsumer = consumer;
    }
    
    /**
     * Process a new price sample.
     */
    public void processSample(double price) {
        samplesProcessed++;
        
        // Update circular buffer
        int oldIndex = windowIndex;
        windowIndex = (windowIndex + 1) % windowSize;
        
        // Calculate instant volatility
        double instantVol = 0;
        if (samplesProcessed > 1) {
            double prevPrice = priceWindow[oldIndex];
            if (prevPrice > 0) {
                instantVol = Math.abs((price - prevPrice) / prevPrice);
            }
        }
        
        // Update windows
        double oldVolatility = volatilityWindow[windowIndex];
        priceWindow[windowIndex] = price;
        volatilityWindow[windowIndex] = instantVol;
        
        // Update running sum
        volatilitySum = volatilitySum - oldVolatility + instantVol;
        int effectiveSize = Math.min(samplesProcessed, windowSize);
        volatilityMean = effectiveSize > 0 ? volatilitySum / effectiveSize : 0;
        
        // Analyze and emit results at intervals
        if (samplesProcessed % updateInterval == 0 && resultConsumer != null) {
            analyzeAndEmit(price, instantVol);
        }
    }
    
    /**
     * Process a batch of prices.
     */
    public void processBatch(double[] prices) {
        for (double price : prices) {
            processSample(price);
        }
    }
    
    /**
     * Analyze current state and emit result.
     */
    private void analyzeAndEmit(double currentPrice, double instantVol) {
        // Detect regime
        currentRegime = detectRegime(instantVol);
        
        // Calculate risk level
        double riskLevel = calculateRiskLevel(instantVol);
        
        // Generate signal
        Optional<TradingSignal> signal = generateSignal(currentPrice, instantVol);
        
        // Emit result
        StreamingResult result = new StreamingResult(
            samplesProcessed,
            currentPrice,
            instantVol,
            volatilityMean,
            currentRegime,
            riskLevel,
            signal
        );
        
        resultConsumer.accept(result);
    }
    
    /**
     * Detect market regime based on volatility.
     */
    private MarketRegime detectRegime(double instantVol) {
        if (instantVol > volatilityMean * 2) {
            return MarketRegime.VOLATILE;
        }
        
        // Simple trend detection using price window
        int effectiveSize = Math.min(samplesProcessed, windowSize);
        if (effectiveSize < 10) {
            return MarketRegime.RANGING;
        }
        
        // Calculate simple trend
        double oldPrice = priceWindow[(windowIndex - effectiveSize + 1 + windowSize) % windowSize];
        double currentPrice = priceWindow[windowIndex];
        double trend = (currentPrice - oldPrice) / oldPrice;
        
        if (trend > parameters.getRegimeTrendThreshold()) {
            return MarketRegime.TRENDING_UP;
        } else if (trend < -parameters.getRegimeTrendThreshold()) {
            return MarketRegime.TRENDING_DOWN;
        } else {
            return MarketRegime.RANGING;
        }
    }
    
    /**
     * Calculate risk level based on volatility.
     */
    private double calculateRiskLevel(double instantVol) {
        double baseRisk = parameters.getBaseRiskLevel();
        double volComponent = 0;
        
        if (volatilityMean > 0) {
            volComponent = Math.min(0.5, (instantVol / volatilityMean - 1) * 0.2);
        }
        
        return Math.min(1.0, baseRisk + volComponent);
    }
    
    /**
     * Generate trading signal.
     */
    private Optional<TradingSignal> generateSignal(double currentPrice, double instantVol) {
        int effectiveSize = Math.min(samplesProcessed, windowSize);
        if (effectiveSize < 20) {
            return Optional.empty();
        }
        
        // Simple momentum signal
        double oldPrice = priceWindow[(windowIndex - 20 + windowSize) % windowSize];
        double momentum = (currentPrice - oldPrice) / oldPrice;
        
        if (momentum > 0.02 && instantVol < volatilityMean * 1.5) {
            return Optional.of(new TradingSignal(
                samplesProcessed,
                SignalType.BUY,
                Math.min(0.8, momentum * 10),
                "Positive momentum with controlled volatility"
            ));
        } else if (momentum < -0.02 || instantVol > volatilityMean * 2) {
            return Optional.of(new TradingSignal(
                samplesProcessed,
                SignalType.SELL,
                Math.min(0.9, instantVol / volatilityMean),
                "Negative momentum or high volatility"
            ));
        }
        
        return Optional.empty();
    }
    
    /**
     * Get current statistics.
     */
    public StreamingStatistics getStatistics() {
        return new StreamingStatistics(
            samplesProcessed,
            volatilityMean,
            currentRegime
        );
    }
    
    /**
     * Simple statistics.
     *
     * @param samplesProcessed number of samples processed
     * @param averageVolatility running average volatility
     * @param currentRegime current detected regime
     */
    public record StreamingStatistics(
        int samplesProcessed,
        double averageVolatility,
        MarketRegime currentRegime
    ) {}
}