Source code for ruptures.detection.dynp

r"""
Exact segmentation: dynamic programming
====================================================================================================

Description
----------------------------------------------------------------------------------------------------

The method is implemented in :class:`ruptures.detection.Dynp`.

Roughly speaking, it computes the cost of all subsequences of a given signal.
The number of computed costs is of the order :math:`\mathcal{O}(Kn^2)`, where :math:`K` is the number
of change points and :math:`n` the number of samples.
This has to be multiplied by the computational cost of computing the approximation error on one
sub-sequence.
Consequently, piecewise constant models are significantly faster than linear or autoregressive
models.

Computational cost is drastically reduced when considering only a subsample of possible change
points.
When calling :meth:`ruptures.detection.Dynp.__init__`, the minimum distance between change points
can be set through the keyword ``'min_size'``; through the parameter ``'jump'``, only change
point indexes multiple of a particular value are considered.


Usage
----------------------------------------------------------------------------------------------------

.. code-block:: python

    import numpy as np
    import matplotlib.pylab as plt
    import ruptures as rpt

    # creation of data
    n, dim = 500, 3
    n_bkps, sigma = 3, 5
    signal, bkps = rpt.pw_constant(n, dim, n_bkps, noise_std=sigma)

    # change point detection
    model = "l1"  # "l2", "rbf"
    algo = rpt.Dynp(model=model, min_size=3, jump=5).fit(signal)
    my_bkps = algo.predict(n_bkps=3)

    # show results
    rpt.show.display(signal, bkps, my_bkps, figsize=(10, 6))
    plt.show()


Code explanation
----------------------------------------------------------------------------------------------------
.. autoclass:: ruptures.detection.Dynp
    :members:
    :special-members: __init__

"""
from functools import lru_cache

from ruptures.utils import sanity_check
from ruptures.costs import cost_factory
from ruptures.base import BaseCost, BaseEstimator


[docs]class Dynp(BaseEstimator): """ Find optimal change points using dynamic programming. Given a segment model, it computes the best partition for which the sum of errors is minimum. """
[docs] def __init__(self, model="l2", custom_cost=None, min_size=2, jump=5, params=None): """Creates a Dynp instance. Args: model (str, optional): segment model, ["l1", "l2", "rbf"]. Not used if ``'custom_cost'`` is not None. custom_cost (BaseCost, optional): custom cost function. Defaults to None. min_size (int, optional): minimum segment length. jump (int, optional): subsample (one every *jump* points). params (dict, optional): a dictionary of parameters for the cost instance. Returns: self """ self.seg = lru_cache(maxsize=None)(self._seg) # dynamic programming if custom_cost is not None and isinstance(custom_cost, BaseCost): self.cost = custom_cost else: if params is None: self.cost = cost_factory(model=model) else: self.cost = cost_factory(model=model, **params) self.min_size = max(min_size, self.cost.min_size) self.jump = jump self.n_samples = None
def _seg(self, start, end, n_bkps): """Recurrence to find the optimal partition of signal[start:end]. This method is to be memoized and then used. Args: start (int): start of the segment (inclusive) end (int): end of the segment (exclusive) n_bkps (int): number of breakpoints Returns: dict: {(start, end): cost value, ...} """ jump, min_size = self.jump, self.min_size if n_bkps == 0: cost = self.cost.error(start, end) return {(start, end): cost} elif n_bkps > 0: # Let's fill the list of admissible last breakpoints multiple_of_jump = (k for k in range(start, end) if k % jump == 0) admissible_bkps = list() for bkp in multiple_of_jump: n_samples = bkp - start # first check if left subproblem is possible if sanity_check(n_samples, n_bkps, jump, min_size): # second check if the right subproblem has enough points if end - bkp >= min_size: admissible_bkps.append(bkp) assert len( admissible_bkps) > 0, "No admissible last breakpoints found.\ start, end: ({},{}), n_bkps: {}.".format(start, end, n_bkps) # Compute the subproblems sub_problems = list() for bkp in admissible_bkps: left_partition = self.seg(start, bkp, n_bkps - 1) right_partition = self.seg(bkp, end, 0) tmp_partition = dict(left_partition) tmp_partition[(bkp, end)] = right_partition[(bkp, end)] sub_problems.append(tmp_partition) # Find the optimal partition return min(sub_problems, key=lambda d: sum(d.values()))
[docs] def fit(self, signal): """Create the cache associated with the signal. Dynamic programming is a recurrence; intermediate results are cached to speed up computations. This method sets up the cache. Args: signal (array): signal. Shape (n_samples, n_features) or (n_samples,). Returns: self """ # clear cache self.seg.cache_clear() # update some params self.cost.fit(signal) self.n_samples = signal.shape[0] return self
[docs] def predict(self, n_bkps): """Return the optimal breakpoints. Must be called after the fit method. The breakpoints are associated with the signal passed to fit(). Args: n_bkps (int): number of breakpoints. Returns: list: sorted list of breakpoints """ partition = self.seg(0, self.n_samples, n_bkps) bkps = sorted(e for s, e in partition.keys()) return bkps
[docs] def fit_predict(self, signal, n_bkps): """Fit to the signal and return the optimal breakpoints. Helper method to call fit and predict once Args: signal (array): signal. Shape (n_samples, n_features) or (n_samples,). n_bkps (int): number of breakpoints. Returns: list: sorted list of breakpoints """ self.fit(signal) return self.predict(n_bkps)