Skip to content

Pipeline

spectrakit.pipeline.Pipeline

Chain spectral processing steps into a reusable pipeline.

Each step is a callable that takes a numpy array (W,) or (N, W) and returns the same shape. Steps are executed in order.

Examples:

>>> from spectrakit import Pipeline, baseline_als, normalize_snv
>>> pipe = Pipeline()
>>> pipe.add("baseline", baseline_als, lam=1e6)
>>> pipe.add("normalize", normalize_snv)
>>> corrected = pipe.transform(raw_intensities)
Source code in src/spectrakit/pipeline.py
class Pipeline:
    """Chain spectral processing steps into a reusable pipeline.

    Each step is a callable that takes a numpy array (W,) or (N, W)
    and returns the same shape. Steps are executed in order.

    Examples:
        >>> from spectrakit import Pipeline, baseline_als, normalize_snv
        >>> pipe = Pipeline()
        >>> pipe.add("baseline", baseline_als, lam=1e6)
        >>> pipe.add("normalize", normalize_snv)
        >>> corrected = pipe.transform(raw_intensities)
    """

    def __init__(
        self,
        steps: list[tuple[str, Callable[..., np.ndarray], dict[str, Any]]] | None = None,
    ) -> None:
        """Initialize pipeline with optional named steps.

        Args:
            steps: List of (name, callable, kwargs) tuples.
        """
        self.steps: list[tuple[str, Callable[..., np.ndarray], dict[str, Any]]] = steps or []

    def add(
        self,
        name: str,
        fn: Callable[..., np.ndarray],
        **kwargs: Any,
    ) -> Pipeline:
        """Add a processing step to the pipeline.

        Args:
            name: Human-readable step name for logging.
            fn: Processing function (e.g., baseline_als, normalize_snv).
            **kwargs: Keyword arguments passed to fn.

        Returns:
            Self, for method chaining.
        """
        self.steps.append((name, fn, kwargs))
        return self

    def transform(self, intensities: np.ndarray) -> np.ndarray:
        """Apply all pipeline steps to the input.

        Args:
            intensities: Input spectral data, shape (W,) or (N, W).

        Returns:
            Processed spectral data, same shape.
        """
        result = intensities.copy()
        for name, fn, kwargs in self.steps:
            logger.debug("Pipeline step: %s", name)
            result = fn(result, **kwargs)
        return result

    def transform_spectrum(self, spectrum: Spectrum) -> Spectrum:
        """Apply pipeline to a Spectrum, returning a new Spectrum.

        Args:
            spectrum: Input Spectrum.

        Returns:
            New Spectrum with processed intensities.
        """
        new_intensities = self.transform(spectrum.intensities)
        return Spectrum(
            intensities=new_intensities,
            wavenumbers=spectrum.wavenumbers.copy() if spectrum.wavenumbers is not None else None,
            metadata={**spectrum.metadata, "pipeline_steps": [s[0] for s in self.steps]},
            source_format=spectrum.source_format,
            label=spectrum.label,
        )

    def __repr__(self) -> str:
        step_names = [name for name, _, _ in self.steps]
        return f"Pipeline(steps={step_names})"

__init__

__init__(
    steps: list[
        tuple[str, Callable[..., ndarray], dict[str, Any]]
    ]
    | None = None,
) -> None

Initialize pipeline with optional named steps.

Parameters:

Name Type Description Default
steps list[tuple[str, Callable[..., ndarray], dict[str, Any]]] | None

List of (name, callable, kwargs) tuples.

None
Source code in src/spectrakit/pipeline.py
def __init__(
    self,
    steps: list[tuple[str, Callable[..., np.ndarray], dict[str, Any]]] | None = None,
) -> None:
    """Initialize pipeline with optional named steps.

    Args:
        steps: List of (name, callable, kwargs) tuples.
    """
    self.steps: list[tuple[str, Callable[..., np.ndarray], dict[str, Any]]] = steps or []

add

add(
    name: str, fn: Callable[..., ndarray], **kwargs: Any
) -> Pipeline

Add a processing step to the pipeline.

Parameters:

Name Type Description Default
name str

Human-readable step name for logging.

required
fn Callable[..., ndarray]

Processing function (e.g., baseline_als, normalize_snv).

required
**kwargs Any

Keyword arguments passed to fn.

{}

Returns:

Type Description
Pipeline

Self, for method chaining.

Source code in src/spectrakit/pipeline.py
def add(
    self,
    name: str,
    fn: Callable[..., np.ndarray],
    **kwargs: Any,
) -> Pipeline:
    """Add a processing step to the pipeline.

    Args:
        name: Human-readable step name for logging.
        fn: Processing function (e.g., baseline_als, normalize_snv).
        **kwargs: Keyword arguments passed to fn.

    Returns:
        Self, for method chaining.
    """
    self.steps.append((name, fn, kwargs))
    return self

transform

transform(intensities: ndarray) -> np.ndarray

Apply all pipeline steps to the input.

Parameters:

Name Type Description Default
intensities ndarray

Input spectral data, shape (W,) or (N, W).

required

Returns:

Type Description
ndarray

Processed spectral data, same shape.

Source code in src/spectrakit/pipeline.py
def transform(self, intensities: np.ndarray) -> np.ndarray:
    """Apply all pipeline steps to the input.

    Args:
        intensities: Input spectral data, shape (W,) or (N, W).

    Returns:
        Processed spectral data, same shape.
    """
    result = intensities.copy()
    for name, fn, kwargs in self.steps:
        logger.debug("Pipeline step: %s", name)
        result = fn(result, **kwargs)
    return result

transform_spectrum

transform_spectrum(spectrum: Spectrum) -> Spectrum

Apply pipeline to a Spectrum, returning a new Spectrum.

Parameters:

Name Type Description Default
spectrum Spectrum

Input Spectrum.

required

Returns:

Type Description
Spectrum

New Spectrum with processed intensities.

Source code in src/spectrakit/pipeline.py
def transform_spectrum(self, spectrum: Spectrum) -> Spectrum:
    """Apply pipeline to a Spectrum, returning a new Spectrum.

    Args:
        spectrum: Input Spectrum.

    Returns:
        New Spectrum with processed intensities.
    """
    new_intensities = self.transform(spectrum.intensities)
    return Spectrum(
        intensities=new_intensities,
        wavenumbers=spectrum.wavenumbers.copy() if spectrum.wavenumbers is not None else None,
        metadata={**spectrum.metadata, "pipeline_steps": [s[0] for s in self.steps]},
        source_format=spectrum.source_format,
        label=spectrum.label,
    )