IncrementalFinancialAnalyzer.java

package com.morphiqlabs.wavelet.cwt.finance;

import com.morphiqlabs.wavelet.cwt.*;
import com.morphiqlabs.wavelet.cwt.finance.FinancialWaveletAnalyzer.*;

import java.util.*;
import java.util.concurrent.atomic.AtomicReference;

/**
 * Incremental financial analyzer that updates analysis results efficiently
 * as new data arrives without reprocessing entire history.
 * 
 * <p>This analyzer maintains running statistics and incremental calculations
 * to minimize computational overhead for real-time applications.</p>
 * 
 * <p>Key optimizations:</p>
 * <ul>
 *   <li>Incremental volatility calculation using Welford's algorithm</li>
 *   <li>Sliding window crash detection with cached coefficients</li>
 *   <li>Exponential moving averages for trend detection</li>
 *   <li>Circular buffer reuse for memory efficiency</li>
 * </ul>
 */
public class IncrementalFinancialAnalyzer {
    
    /**
     * Incremental analysis state that can be updated efficiently.
     */
    public static class IncrementalState {
        // Price statistics
        private double lastPrice;
        private double ema12;  // 12-period EMA
        private double ema26;  // 26-period EMA
        private double ema50;  // 50-period EMA
        
        // Volatility statistics (Welford's algorithm)
        private int volCount;
        private double volMean;
        private double volM2;  // Sum of squares of differences from mean
        
        // Crash detection state
        private final CircularBuffer<CWTCoefficients> cwtHistory;
        private double maxAsymmetryScore;
        private int lastCrashIndex;
        
        // Risk metrics
        private double currentRiskLevel;
        private MarketRegime currentRegime;
        
        // Performance tracking
        private long samplesProcessed;
        private double maxDrawdown;
        private double peakPrice;
        
        IncrementalState(int historySize) {
            this.cwtHistory = new CircularBuffer<>(historySize);
            this.currentRegime = MarketRegime.RANGING;
            this.currentRiskLevel = 0.5;
            this.lastCrashIndex = -1;
        }
        
        /**
         * Update volatility using Welford's online algorithm.
         */
        void updateVolatility(double return_) {
            volCount++;
            double delta = return_ - volMean;
            volMean += delta / volCount;
            double delta2 = return_ - volMean;
            volM2 += delta * delta2;
        }
        
        /**
         * Get current volatility (standard deviation).
         */
        double getCurrentVolatility() {
            return volCount > 1 ? Math.sqrt(volM2 / (volCount - 1)) : 0.0;
        }
        
        /**
         * Update exponential moving averages.
         */
        void updateEMAs(double price) {
            if (ema12 == 0) {
                ema12 = ema26 = ema50 = price;
            } else {
                ema12 = updateEMA(price, ema12, 12);
                ema26 = updateEMA(price, ema26, 26);
                ema50 = updateEMA(price, ema50, 50);
            }
        }
        
        private double updateEMA(double price, double prevEMA, int period) {
            double alpha = 2.0 / (period + 1);
            return price * alpha + prevEMA * (1 - alpha);
        }
        
        /**
         * Update drawdown tracking.
         */
        void updateDrawdown(double price) {
            if (price > peakPrice) {
                peakPrice = price;
            } else {
                double drawdown = (peakPrice - price) / peakPrice;
                maxDrawdown = Math.max(maxDrawdown, drawdown);
            }
        }
    }
    
    /**
     * Cached CWT coefficients for sliding window.
     */
    private static class CWTCoefficients {
        final int timeIndex;
        final double[] coefficients;
        final double asymmetryScore;
        
        CWTCoefficients(int timeIndex, double[] coefficients, double asymmetryScore) {
            this.timeIndex = timeIndex;
            this.coefficients = coefficients;
            this.asymmetryScore = asymmetryScore;
        }
    }
    
    /**
     * Simple circular buffer implementation.
     */
    private static class CircularBuffer<T> {
        private final Object[] buffer;
        private int head = 0;
        private int size = 0;
        
        CircularBuffer(int capacity) {
            this.buffer = new Object[capacity];
        }
        
        void add(T item) {
            buffer[head] = item;
            head = (head + 1) % buffer.length;
            size = Math.min(size + 1, buffer.length);
        }
        
        @SuppressWarnings("unchecked")
        T get(int index) {
            if (index >= size) return null;
            int actualIndex = (head - size + index + buffer.length) % buffer.length;
            return (T) buffer[actualIndex];
        }
        
        int size() {
            return size;
        }
        
        void clear() {
            Arrays.fill(buffer, null);
            head = size = 0;
        }
    }
    
    // Core components
    private final FinancialAnalysisParameters parameters;
    private final FinancialAnalysisObjectPool pool;
    private final IncrementalState state;
    
    // Wavelets for analysis
    private final PaulWavelet crashWavelet;
    private final CWTTransform crashTransform;
    
    // Configuration
    private final int slidingWindowSize;
    private final int updateInterval;
    
    /**
     * Creates an incremental analyzer with default settings.
     */
    public IncrementalFinancialAnalyzer() {
        this(FinancialAnalysisParameters.defaultParameters(), 256, 10);
    }
    
    /**
     * Creates an incremental analyzer with custom settings.
     * 
     * @param parameters analysis parameters
     * @param windowSize sliding window size for CWT
     * @param updateInterval how often to update CWT (every N samples)
     */
    public IncrementalFinancialAnalyzer(
            FinancialAnalysisParameters parameters,
            int windowSize,
            int updateInterval) {
        this.parameters = parameters;
        this.pool = new FinancialAnalysisObjectPool();
        this.slidingWindowSize = windowSize;
        this.updateInterval = updateInterval;
        this.state = new IncrementalState(windowSize / updateInterval);
        
        // Initialize wavelets
        this.crashWavelet = new PaulWavelet(parameters.getOptimization().getCrashPaulOrder());
        this.crashTransform = new CWTTransform(crashWavelet);
    }
    
    /**
     * Process a new price sample and update all metrics incrementally.
     * 
     * @param price current price
     * @param volume current volume (optional, use 0 if not available)
     * @return updated analysis result
     */
    public IncrementalAnalysisResult processSample(double price, double volume) {
        state.samplesProcessed++;
        
        // Calculate return if we have previous price
        double return_ = 0;
        if (state.lastPrice > 0) {
            return_ = (price - state.lastPrice) / state.lastPrice;
            state.updateVolatility(Math.abs(return_));
        }
        
        // Update EMAs
        state.updateEMAs(price);
        
        // Update drawdown
        state.updateDrawdown(price);
        
        // Update crash detection periodically
        boolean crashDetected = false;
        if (state.samplesProcessed % updateInterval == 0) {
            crashDetected = updateCrashDetection(price);
        }
        
        // Update regime
        updateRegime(price, return_);
        
        // Update risk level
        updateRiskLevel(crashDetected);
        
        // Store current price for next iteration
        state.lastPrice = price;
        
        return createResult(price, volume, return_, crashDetected);
    }
    
    /**
     * Process a batch of samples efficiently.
     * 
     * @param prices array of prices
     * @param volumes array of volumes (can be null)
     * @return list of incremental results
     */
    public List<IncrementalAnalysisResult> processBatch(double[] prices, double[] volumes) {
        List<IncrementalAnalysisResult> results = new ArrayList<>(prices.length);
        
        for (int i = 0; i < prices.length; i++) {
            double volume = volumes != null ? volumes[i] : 0;
            results.add(processSample(prices[i], volume));
        }
        
        return results;
    }
    
    /**
     * Update crash detection using sliding window CWT.
     */
    private boolean updateCrashDetection(double currentPrice) {
        // Build price window from recent history
        int windowStart = (int) Math.max(0, state.samplesProcessed - slidingWindowSize);
        int windowSize = (int) (state.samplesProcessed - windowStart);
        
        if (windowSize < 10) return false;  // Need minimum data
        
        // This is simplified - in production, maintain a proper price history
        // For now, generate synthetic window based on EMAs
        FinancialAnalysisObjectPool.ArrayHolder windowHolder = pool.borrowArray(windowSize);
        
        try {
            double[] priceWindow = windowHolder.array;
            
            // Simplified: interpolate between EMAs
            for (int i = 0; i < windowSize; i++) {
                double t = (double) i / windowSize;
                priceWindow[i] = state.ema50 * (1 - t) + currentPrice * t;
            }
            
            // Analyze with CWT
            ScaleSpace scales = ScaleSpace.logarithmic(1.0, 10.0, 5);
            CWTResult result = crashTransform.analyze(
                Arrays.copyOfRange(priceWindow, 0, windowSize), scales);
            
            // Calculate asymmetry score
            double[][] coeffs = result.getCoefficients();
            double asymmetryScore = 0;
            
            for (int s = 0; s < scales.getNumScales(); s++) {
                double coeff = coeffs[s][windowSize - 1];
                if (coeff < 0) {
                    asymmetryScore += Math.abs(coeff) * scales.getScale(s);
                }
            }
            
            // Cache coefficients
            state.cwtHistory.add(new CWTCoefficients(
                (int) state.samplesProcessed,
                coeffs[coeffs.length - 1],  // Last scale
                asymmetryScore
            ));
            
            // Update max score
            state.maxAsymmetryScore = Math.max(state.maxAsymmetryScore, asymmetryScore);
            
            // Check for crash
            if (asymmetryScore > parameters.getCrashAsymmetryThreshold()) {
                state.lastCrashIndex = (int) state.samplesProcessed;
                return true;
            }
            
            return false;
            
        } finally {
            pool.returnArray(windowHolder);
        }
    }
    
    /**
     * Update market regime based on EMAs and volatility.
     */
    private void updateRegime(double price, double return_) {
        double volatility = state.getCurrentVolatility();
        double avgVolatility = parameters.getDefaultAverageVolatility();
        
        // MACD-like indicator
        double macd = state.ema12 - state.ema26;
        double signal = state.ema12 - state.ema50;
        
        if (volatility > avgVolatility * 2) {
            state.currentRegime = MarketRegime.VOLATILE;
        } else if (macd > 0 && signal > 0 && price > state.ema50) {
            state.currentRegime = MarketRegime.TRENDING_UP;
        } else if (macd < 0 && signal < 0 && price < state.ema50) {
            state.currentRegime = MarketRegime.TRENDING_DOWN;
        } else {
            state.currentRegime = MarketRegime.RANGING;
        }
    }
    
    /**
     * Update risk level based on multiple factors.
     */
    private void updateRiskLevel(boolean crashDetected) {
        double baseRisk = parameters.getBaseRiskLevel();
        double volatility = state.getCurrentVolatility();
        double avgVolatility = parameters.getDefaultAverageVolatility();
        
        // Start with base risk
        double risk = baseRisk;
        
        // Add volatility component
        risk += 0.3 * Math.min(volatility / avgVolatility, 2.0);
        
        // Add crash component
        if (crashDetected) {
            risk += 0.4;
        } else if (state.lastCrashIndex > 0 && 
                   state.samplesProcessed - state.lastCrashIndex < 50) {
            // Recent crash increases risk
            risk += 0.2;
        }
        
        // Add drawdown component
        risk += 0.1 * state.maxDrawdown;
        
        // Smooth risk level changes
        state.currentRiskLevel = 0.7 * state.currentRiskLevel + 0.3 * Math.min(risk, 1.0);
    }
    
    /**
     * Create analysis result from current state.
     */
    private IncrementalAnalysisResult createResult(
            double price, double volume, double return_, boolean crashDetected) {
        
        // Generate trading signal based on current state
        SignalType signal = SignalType.HOLD;
        double signalStrength = 0;
        
        if (state.currentRegime == MarketRegime.TRENDING_UP && 
            state.currentRiskLevel < 0.7) {
            signal = SignalType.BUY;
            signalStrength = Math.min(0.8, (state.ema12 - state.ema50) / state.ema50 * 100);
        } else if (state.currentRegime == MarketRegime.TRENDING_DOWN || 
                   state.currentRiskLevel > 0.8) {
            signal = SignalType.SELL;
            signalStrength = state.currentRiskLevel;
        }
        
        return new IncrementalAnalysisResult(
            state.samplesProcessed,
            price,
            volume,
            return_,
            state.getCurrentVolatility(),
            state.currentRegime,
            state.currentRiskLevel,
            signal,
            signalStrength,
            crashDetected,
            state.maxDrawdown,
            state.ema12,
            state.ema26,
            state.ema50
        );
    }
    
    /**
     * Get current state snapshot.
     */
    public IncrementalState getState() {
        return state;
    }
    
    /**
     * Reset analyzer state.
     */
    public void reset() {
        state.volCount = 0;
        state.volMean = 0;
        state.volM2 = 0;
        state.lastPrice = 0;
        state.ema12 = state.ema26 = state.ema50 = 0;
        state.cwtHistory.clear();
        state.maxAsymmetryScore = 0;
        state.lastCrashIndex = -1;
        state.currentRiskLevel = 0.5;
        state.currentRegime = MarketRegime.RANGING;
        state.samplesProcessed = 0;
        state.maxDrawdown = 0;
        state.peakPrice = 0;
    }
    
    /**
     * Result of incremental analysis.
     *
     * @param sampleIndex sequential sample index
     * @param price current price
     * @param volume current volume
     * @param return_ instantaneous return
     * @param volatility instantaneous volatility
     * @param regime detected market regime
     * @param riskLevel current risk level
     * @param signal detected signal type (if any)
     * @param signalStrength signal confidence/strength
     * @param crashDetected whether a crash event was detected
     * @param maxDrawdown running maximum drawdown
     * @param ema12 12-period EMA
     * @param ema26 26-period EMA
     * @param ema50 50-period EMA
     */
    public record IncrementalAnalysisResult(
        long sampleIndex,
        double price,
        double volume,
        double return_,
        double volatility,
        MarketRegime regime,
        double riskLevel,
        SignalType signal,
        double signalStrength,
        boolean crashDetected,
        double maxDrawdown,
        double ema12,
        double ema26,
        double ema50
    ) {
        /**
         * Check if this result contains a trading signal.
         */
        public boolean hasSignal() {
            return signal != SignalType.HOLD && signalStrength > 0;
        }
        
        /**
         * Check if this result indicates high risk.
         */
        public boolean isHighRisk() {
            return riskLevel > 0.8 || crashDetected;
        }
    }
}