Source code for ruptures.costs.costlinear

r"""

.. _sec-linear:

Linear model change
====================================================================================================

Description
----------------------------------------------------------------------------------------------------

Let :math:`0<t_1<t_2<\dots<n` be unknown change points indexes.
Consider the following multiple linear regression model

.. math::

    y_t = z_t' \delta_j + \varepsilon_t, \quad \forall t=t_j,\dots,t_{j+1}-1

for :math:`j>1`.
Here, the observed dependant variable is :math:`y_t\in\mathbb{R}`, the covariate vector is
:math:`x_t \in\mathbb{R}^p`, the disturbance is :math:`\varepsilon_t\in\mathbb{R}`.
The vectors :math:`\delta_j\in\mathbb{R}^p` are the paramater vectors (or regression coefficients).

The least-squares estimates of the break dates is obtained by minimiming the sum of squared
residuals :cite:`cl-Bai2003`.
Formally, the associated cost function on an interval :math:`I` is

    .. math:: c(y_{I}) = \min_{\delta\in\mathbb{R}^p} \sum_{t\in I} \|y_t - \delta' z_t \|_2^2


Usage
----------------------------------------------------------------------------------------------------

Start with the usual imports and create a signal with piecewise linear trends.

.. code-block:: python

    import numpy as np
    import matplotlib.pylab as plt
    import ruptures as rpt
    # creation of data
    n, n_reg = 2000, 3  # number of samples, number of regressors (including intercept)
    n_bkps, sigma = 3, 5  # number of change points, noise standart deviation
    # regressors
    tt = np.linspace(0, 10*np.pi, n)
    X = np.vstack((np.sin(tt), np.sin(5*tt), np.ones(n))).T
    # parameter vectors
    deltas, bkps = rpt.pw_constant(n, n_reg, n_bkps, noise_std=None, delta=(1, 3))
    # observed signal
    y = np.sum(X*deltas, axis=1)
    y += np.random.normal(size=y.shape)
    # display signal
    rpt.show.display(y, bkps, figsize=(10, 6))
    plt.show()

Then create a :class:`CostLinear` instance and print the cost of the sub-signal
:code:`signal[50:150]`.

.. code-block:: python

    # stack observed signal and regressors.
    # first dimension is the observed signal.
    signal = np.column_stack((y.reshape(-1, 1), X))
    c = rpt.costs.CostLinear().fit(signal)
    print(c.error(50, 150))


You can also compute the sum of costs for a given list of change points.

.. code-block:: python

    print(c.sum_of_costs(bkps))
    print(c.sum_of_costs([10, 100, 200, 250, n]))


In order to use this cost class in a change point detection algorithm (inheriting from
:class:`BaseEstimator`), either pass a :class:`CostLinear` instance (through the argument
``'custom_cost'``) or set :code:`model="linear"`.

.. code-block:: python

    c = rpt.costs.CostLinear(); algo = rpt.Dynp(custom_cost=c)
    # is equivalent to
    algo = rpt.Dynp(model="linear")


Code explanation
----------------------------------------------------------------------------------------------------

.. autoclass:: ruptures.costs.CostLinear
    :members:
    :special-members: __init__


.. rubric:: References

.. bibliography:: ../biblio.bib
    :style: alpha
    :cited:
    :labelprefix: CL
    :keyprefix: cl-

"""
from numpy.linalg import lstsq

from ruptures.base import BaseCost
from ruptures.costs import NotEnoughPoints


[docs]class CostLinear(BaseCost): r""" Least-squares estimate for linear changes. """ model = "linear"
[docs] def __init__(self): self.signal = None self.covar = None self.min_size = 2
[docs] def fit(self, signal): """Set parameters of the instance. The first column contains the observed variable. The other columns contains the covariates. Args: signal (array): signal. Shape (n_samples, n_regressors+1) Returns: self """ assert signal.ndim > 1, "Not enough dimensions" self.signal = signal[:, 0].reshape(-1, 1) self.covar = signal[:, 1:] return self
[docs] def error(self, start, end): """Return the approximation cost on the segment [start:end]. Args: start (int): start of the segment end (int): end of the segment Returns: float: segment cost Raises: NotEnoughPoints: when the segment is too short (less than ``'min_size'`` samples). """ if end - start < self.min_size: raise NotEnoughPoints y, X = self.signal[start:end], self.covar[start:end] _, residual, _, _ = lstsq(X, y, rcond=None) return residual.sum()
# class Cost(BaseCost): # """Compute error (in different norms) when approximating a signal with a constant value.""" # def __init__(self, model="l2"): # assert model in [ # "l1", "l2", "rbf"], "Choose different model." # self.model = model # if self.model in ["l1", "l2", "rbf"]: # self.min_size = 2 # self.signal = None # self.gram = None # def fit(self, signal): # """Update the parameters of the instance to fit the signal. # Detailled description # Args: # signal (array): signal of shape (n_samples, n_features) of (n_samples,) # Returns: # self: # """ # if signal.ndim == 1: # self.signal = signal.reshape(-1, 1) # else: # self.signal = signal # if self.model == "rbf": # pairwise_dists = pdist(self.signal, 'sqeuclidean') # pairwise_dists /= np.median(pairwise_dists) # scaling # self.gram = squareform(np.exp(-pairwise_dists)) # np.fill_diagonal(self.gram, 1) # elif self.model == "l2": # self.gram = self.signal.dot(self.signal.T) # return self # def error(self, start, end): # """Return squared error on the interval start:end # Detailled description # Args: # start (int): start index (inclusive) # end (int): end index (exclusive) # Returns: # float: error # Raises: # NotEnoughPoints: when not enough points # """ # if end - start < self.min_size: # raise NotEnoughPoints # if self.model in ["l2", "rbf"]: # sub_gram = self.gram[start:end, start:end] # cost = np.diagonal(sub_gram).sum() # cost -= sub_gram.sum() / (end - start) # elif self.model == "l1": # med = np.median(self.signal[start:end], axis=0) # cost = abs(self.signal[start:end] - med).sum() # return cost