r"""
.. _sec-autoregressive:
Autoregressive model change
====================================================================================================
Description
----------------------------------------------------------------------------------------------------
Let :math:`0<t_1<t_2<\dots<n` be unknown change points indexes.
Consider the following piecewise autoregressive model
.. math::
y_t = z_t' \delta_j + \varepsilon_t, \quad \forall t=t_j,\dots,t_{j+1}-1
where :math:`j>1` is the segment number, :math:`z_t=[y_{t-1}, y_{t-2},\dots,y_{t-p}]` is the lag vector,and :math:`p>0` is the order of the process.
The least-squares estimates of the break dates is obtained by minimiming the sum of squared
residuals :cite:`ar-Bai2000`.
Formally, the associated cost function on an interval :math:`I` is
.. math:: c(y_{I}) = \min_{\delta\in\mathbb{R}^p} \sum_{t\in I} \|y_t - \delta' z_t \|_2^2
Usage
----------------------------------------------------------------------------------------------------
Start with the usual imports and create a signal with piecewise linear trends.
.. code-block:: python
from itertools import cycle
import numpy as np
import matplotlib.pylab as plt
import ruptures as rpt
# creation of data
n = 2000
n_bkps, sigma = 4, 0.5 # number of change points, noise standart deviation
bkps = [400, 1000, 1300, 1800, n]
f1 = np.array([0.075, 0.1])
f2 = np.array([0.1, 0.125])
freqs = np.zeros((n, 2))
for sub, val in zip(np.split(freqs, bkps[:-1]), cycle([f1, f2])):
sub += val
tt = np.arange(n)
signal = np.sum((np.sin(2*np.pi*tt*f) for f in freqs.T))
signal += np.random.normal(scale=sigma, size=signal.shape)
# display signal
rpt.show.display(signal, bkps, figsize=(10, 6))
plt.show()
Then create a :class:`CostAR` instance and print the cost of the sub-signal
:code:`signal[50:150]`.
The autoregressive order can be specified through the keyword ``'order'``.
.. code-block:: python
c = rpt.costs.CostAR(order=10).fit(signal)
print(c.error(50, 150))
You can also compute the sum of costs for a given list of change points.
.. code-block:: python
print(c.sum_of_costs(bkps))
print(c.sum_of_costs([10, 100, 200, 250, n]))
In order to use this cost class in a change point detection algorithm (inheriting from
:class:`BaseEstimator`), either pass a :class:`CostAR` instance (through the argument
``'custom_cost'``) or set :code:`model="ar"`.
Additional parameters can be passed to the cost instance through the keyword ``'params'``.
.. code-block:: python
c = rpt.costs.CostAR(order=10); algo = rpt.Dynp(custom_cost=c)
# is equivalent to
algo = rpt.Dynp(model="ar", params={"order": 10})
Code explanation
----------------------------------------------------------------------------------------------------
.. autoclass:: ruptures.costs.CostAR
:members:
:special-members: __init__
.. rubric:: References
.. bibliography:: ../biblio.bib
:style: alpha
:cited:
:labelprefix: AR
:keyprefix: ar-
"""
import numpy as np
from numpy.lib.stride_tricks import as_strided
from numpy.linalg import lstsq
from ruptures.base import BaseCost
from ruptures.costs import NotEnoughPoints
[docs]class CostAR(BaseCost):
r"""
Least-squares estimate for changes in autoregressive coefficients.
"""
model = "ar"
[docs] def __init__(self, order=4):
self.signal = None
self.covar = None
self.min_size = max(5, order + 1)
self.order = order
[docs] def fit(self, signal):
"""Set parameters of the instance.
The signal must be 1D.
Args:
signal (array): 1d signal. Shape (n_samples, 1) or (n_samples,).
Returns:
self
"""
if signal.ndim == 1:
self.signal = signal.reshape(-1, 1)
else:
self.signal = signal
# lagged covariates
n_samples, _ = self.signal.shape
strides = (self.signal.itemsize, self.signal.itemsize)
shape = (n_samples - self.order, self.order)
lagged = as_strided(self.signal, shape=shape, strides=strides)
# pad the first columns
lagged_after_padding = np.pad(lagged,
((self.order, 0), (0, 0)),
mode="edge")
# add intercept
self.covar = np.c_[lagged_after_padding, np.ones(n_samples)]
# pad signal on the edges
self.signal[:self.order] = self.signal[self.order]
return self
[docs] def error(self, start, end):
"""Return the approximation cost on the segment [start:end].
Args:
start (int): start of the segment
end (int): end of the segment
Returns:
float: segment cost
Raises:
NotEnoughPoints: when the segment is too short (less than ``'min_size'`` samples).
"""
if end - start < self.min_size:
raise NotEnoughPoints
y, X = self.signal[start:end], self.covar[start:end]
_, residual, _, _ = lstsq(X, y, rcond=None)
return residual.sum()