Source code for ruptures.detection.pelt

Exact segmentation: Pelt


The method is implemented in :class:`ruptures.detection.Pelt`.

Because the enumeration of all possible partitions impossible, the algorithm relies on a pruning
rule. Many indexes are discarded, greatly reducing the computational cost while retaining the
ability to find the optimal segmentation.
The implementation follows :cite:`b-Killick2012a`. In addition, under certain conditions on the change
point repartition, the computational complexity is linear on average.

When calling :meth:`ruptures.detection.Pelt.__init__`, the minimum distance between change points
can be set through the keyword ``'min_size'``; through the parameter ``'jump'``, only change
point indexes multiple of a particular value are considered.


.. code-block:: python

    import numpy as np
    import matplotlib.pylab as plt
    import ruptures as rpt

    # creation of data
    n, dim = 500, 3
    n_bkps, sigma = 3, 1
    signal, b = rpt.pw_constant(n, dim, n_bkps, noise_std=sigma)

    # change point detection
    model = "l1"  # "l2", "rbf"
    algo = rpt.Pelt(model=model, min_size=3, jump=5).fit(signal)
    my_bkps = algo.predict(pen=3)

    # show results
    fig, (ax,) = rpt.display(signal, bkps, my_bkps, figsize=(10, 6))

Code explanation

.. autoclass:: ruptures.detection.Pelt
    :special-members: __init__

.. rubric:: References

.. bibliography:: ../biblio.bib
    :style: alpha
    :labelprefix: B
    :keyprefix: b-
from math import floor

from ruptures.costs import cost_factory
from ruptures.base import BaseCost, BaseEstimator

[docs]class Pelt(BaseEstimator): """Penalized change point detection. For a given model and penalty level, computes the segmentation which minimizes the constrained sum of approximation errors. """
[docs] def __init__(self, model="l2", custom_cost=None, min_size=2, jump=5, params=None): """Initialize a Pelt instance. Args: model (str, optional): segment model, ["l1", "l2", "rbf"]. Not used if ``'custom_cost'`` is not None. custom_cost (BaseCost, optional): custom cost function. Defaults to None. min_size (int, optional): minimum segment length. jump (int, optional): subsample (one every *jump* points). params (dict, optional): a dictionary of parameters for the cost instance. Returns: self """ if custom_cost is not None and isinstance(custom_cost, BaseCost): self.cost = custom_cost else: if params is None: self.cost = cost_factory(model=model) else: self.cost = cost_factory(model=model, **params) self.min_size = max(min_size, self.cost.min_size) self.jump = jump self.n_samples = None
def _seg(self, pen): """Computes the segmentation for a given penalty using PELT (or a list of penalties). Args: penalty (float): penalty value Returns: dict: partition dict {(start, end): cost value,...} """ # initialization # partitions[t] contains the optimal partition of signal[0:t] partitions = dict() # this dict will be recursively filled partitions[0] = {(0, 0): 0} admissible = [] # Recursion ind = [ k for k in range(0, self.n_samples, self.jump) if k >= self.min_size] ind += [self.n_samples] for bkp in ind: # adding a point to the admissible set from the previous loop. new_adm_pt = floor((bkp - self.min_size) / self.jump) new_adm_pt *= self.jump admissible.append(new_adm_pt) subproblems = list() for t in admissible: # left partition try: tmp_partition = partitions[t].copy() except KeyError: # no partition of 0:t exists continue # we update with the right partition tmp_partition.update({(t, bkp): self.cost.error(t, bkp) + pen}) subproblems.append(tmp_partition) # finding the optimal partition partitions[bkp] = min( subproblems, key=lambda d: sum(d.values())) # trimming the admissible set admissible = [t for t, partition in zip(admissible, subproblems) if sum(partition.values()) <= sum(partitions[bkp].values()) + pen] best_partition = partitions[self.n_samples] del best_partition[(0, 0)] return best_partition
[docs] def fit(self, signal): """Set params. Args: signal (array): signal to segment. Shape (n_samples, n_features) or (n_samples,). Returns: self """ # update params if signal.ndim == 1: n_samples, = signal.shape else: n_samples, _ = signal.shape self.n_samples = n_samples return self
[docs] def predict(self, pen): """Return the optimal breakpoints. Must be called after the fit method. The breakpoints are associated with the signal passed to fit(). Args: pen (float): penalty value (>0) Returns: list: sorted list of breakpoints """ partition = self._seg(pen) bkps = sorted(e for s, e in partition.keys()) return bkps
[docs] def fit_predict(self, signal, pen): """Fit to the signal and return the optimal breakpoints. Helper method to call fit and predict once Args: signal (array): signal. Shape (n_samples, n_features) or (n_samples,). pen (float): penalty value (>0) Returns: list: sorted list of breakpoints """ return self.predict(pen)