MODWTStreamingTransform.java

package com.morphiqlabs.wavelet.modwt.streaming;

import com.morphiqlabs.wavelet.api.BoundaryMode;
import com.morphiqlabs.wavelet.api.Wavelet;
import com.morphiqlabs.wavelet.exception.InvalidArgumentException;
import com.morphiqlabs.wavelet.exception.InvalidSignalException;
import com.morphiqlabs.wavelet.modwt.MODWTResult;

import java.util.concurrent.Flow;

/**
 * Streaming MODWT transform for processing continuous data streams.
 *
 * <p>This interface enables real-time MODWT analysis of streaming data without
 * requiring the entire signal to be available in memory. Key advantages over
 * streaming DWT:</p>
 * <ul>
 *   <li>No block size restrictions (works with any length)</li>
 *   <li>Shift-invariant processing</li>
 *   <li>Better continuity across block boundaries</li>
 *   <li>Maintains time alignment with input</li>
 * </ul>
 *
 * <p>Particularly useful for:</p>
 * <ul>
 *   <li>Live audio processing with precise timing</li>
 *   <li>Real-time financial data analysis</li>
 *   <li>Continuous sensor monitoring</li>
 *   <li>Edge detection in streaming video</li>
 * </ul>
 *
 * <p>Example usage:</p>
 * <pre>{@code
 * MODWTStreamingTransform transform = MODWTStreamingTransform.create(
 *     Daubechies.DB4,
 *     BoundaryMode.PERIODIC,
 *     256  // buffer size (can be any size, not just power of 2)
 * );
 *
 * // Subscribe to transform results
 * transform.subscribe(new Flow.Subscriber<MODWTResult>() {
 *     // Handle results...
 * });
 *
 * // Feed data into the transform
 * transform.process(dataChunk);
 * }</pre>
 * 
 * @since 1.0.0
 */
public interface MODWTStreamingTransform extends Flow.Publisher<MODWTResult>, AutoCloseable {
    
    /**
     * Closes this streaming transform and releases any resources.
     * This method does not throw InterruptedException.
     */
    @Override
    void close();

    /**
     * Create a streaming MODWT transform with default buffer size.
     *
     * @param wavelet      the wavelet to use
     * @param boundaryMode the boundary handling mode
     * @return a new streaming MODWT transform
     */
    static MODWTStreamingTransform create(Wavelet wavelet, BoundaryMode boundaryMode) {
        return create(wavelet, boundaryMode, 256); // Default buffer size
    }

    /**
     * Create a streaming MODWT transform.
     *
     * @param wavelet      the wavelet to use
     * @param boundaryMode the boundary handling mode
     * @param bufferSize   the processing buffer size (any positive value)
     * @return a new streaming MODWT transform
     * @throws InvalidArgumentException if bufferSize is not positive
     */
    static MODWTStreamingTransform create(Wavelet wavelet, BoundaryMode boundaryMode, int bufferSize) {
        return new MODWTStreamingTransformImpl(wavelet, boundaryMode, bufferSize);
    }

    /**
     * Create a multi-level streaming MODWT transform.
     *
     * @param wavelet      the wavelet to use
     * @param boundaryMode the boundary handling mode
     * @param bufferSize   the processing buffer size
     * @param levels       number of decomposition levels
     * @return a new multi-level streaming MODWT transform
     */
    static MODWTStreamingTransform createMultiLevel(
            Wavelet wavelet, BoundaryMode boundaryMode, int bufferSize, int levels) {
        return new MultiLevelMODWTStreamingTransform(wavelet, boundaryMode, bufferSize, levels);
    }

    /**
     * Process a chunk of streaming data.
     *
     * @param data the data chunk to process
     * @throws InvalidSignalException if data is null or empty
     * @throws IllegalStateException  if the transform is closed
     */
    void process(double[] data);

    /**
     * Process a single sample.
     *
     * @param sample the sample value
     * @throws IllegalStateException if the transform is closed
     */
    void processSample(double sample);

    /**
     * Flush any buffered data and emit final results.
     * Call this when the stream ends to process remaining samples.
     *
     * @throws IllegalStateException if the transform is closed
     */
    void flush();

    /**
     * Get current streaming statistics.
     *
     * @return statistics about the streaming performance
     */
    StreamingStatistics getStatistics();

    /**
     * Reset the transform state, clearing buffers and statistics.
     *
     * @throws IllegalStateException if the transform is closed
     */
    void reset();

    /**
     * Get the current buffer fill level.
     *
     * @return number of samples currently buffered
     */
    int getBufferLevel();

    /**
     * Check if the transform is closed.
     *
     * @return true if closed, false otherwise
     */
    boolean isClosed();

    /**
     * Streaming statistics for monitoring performance.
     */
    interface StreamingStatistics {
        /**
         * Total number of samples processed.
         */
        long getSamplesProcessed();

        /**
         * Total number of blocks processed.
         */
        long getBlocksProcessed();

        /**
         * Average processing time per block in nanoseconds.
         */
        long getAverageProcessingTimeNanos();

        /**
         * Maximum processing time for a single block.
         */
        long getMaxProcessingTimeNanos();

        /**
         * Minimum processing time for a single block.
         */
        long getMinProcessingTimeNanos();

        /**
         * Get throughput in samples per second.
         */
        double getThroughputSamplesPerSecond();

        /**
         * Reset all statistics.
         */
        void reset();
    }
}