自定义状态空间模型¶
状态空间模型的真正强大之处在于它允许创建和估计自定义模型。本笔记本展示了各种子类化 sm.tsa.statespace.MLEModel
的状态空间模型。
记住,一般的状态空间模型可以写成以下一般形式:
您可以查看对象的详细信息和维度在此链接
大多数模型不会包含所有这些元素。例如,设计矩阵 \(Z_t\) 可能不依赖于时间(\(\forall t \;Z_t = Z\)),或者模型不会有观测截距 \(d_t\)。
我们将从一个相对简单的例子开始,然后逐步展示如何扩展它以包含更多元素。
模型1:时间变化的系数。一个观测方程和两个状态方程
模型 2:具有非恒等转移矩阵的时变参数
模型3:多观测和多状态方程
奖励:用于贝叶斯估计的 pymc3
[ ]:
%matplotlib inline
from collections import OrderedDict
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import statsmodels.api as sm
plt.rc("figure", figsize=(16, 8))
plt.rc("font", size=15)
模型1: 时变系数¶
观测到的数据是 \(y_t, x_t, w_t\)。其中 \(x_t, w_t\) 是外生变量。注意设计矩阵是随时间变化的,因此它将具有三个维度(k_endog x k_states x nobs
)
状态是 \(\beta_{x,t}\) 和 \(\beta_{w,t}\)。状态方程告诉我们这些状态随着随机游走而演变。因此,在这种情况下,转移矩阵是一个 2 乘 2 的单位矩阵。
我们将首先模拟数据,然后构建模型并最终估计它。
[ ]:
def gen_data_for_model1():
nobs = 1000
rs = np.random.RandomState(seed=93572)
d = 5
var_y = 5
var_coeff_x = 0.01
var_coeff_w = 0.5
x_t = rs.uniform(size=nobs)
w_t = rs.uniform(size=nobs)
eps = rs.normal(scale=var_y ** 0.5, size=nobs)
beta_x = np.cumsum(rs.normal(size=nobs, scale=var_coeff_x ** 0.5))
beta_w = np.cumsum(rs.normal(size=nobs, scale=var_coeff_w ** 0.5))
y_t = d + beta_x * x_t + beta_w * w_t + eps
return y_t, x_t, w_t, beta_x, beta_w
y_t, x_t, w_t, beta_x, beta_w = gen_data_for_model1()
_ = plt.plot(y_t)
[ ]:
class TVRegression(sm.tsa.statespace.MLEModel):
def __init__(self, y_t, x_t, w_t):
exog = np.c_[x_t, w_t] # shaped nobs x 2
super(TVRegression, self).__init__(
endog=y_t, exog=exog, k_states=2, initialization="diffuse"
)
# Since the design matrix is time-varying, it must be
# shaped k_endog x k_states x nobs
# Notice that exog.T is shaped k_states x nobs, so we
# just need to add a new first axis with shape 1
self.ssm["design"] = exog.T[np.newaxis, :, :] # shaped 1 x 2 x nobs
self.ssm["selection"] = np.eye(self.k_states)
self.ssm["transition"] = np.eye(self.k_states)
# Which parameters need to be positive?
self.positive_parameters = slice(1, 4)
@property
def param_names(self):
return ["intercept", "var.e", "var.x.coeff", "var.w.coeff"]
@property
def start_params(self):
"""
Defines the starting values for the parameters
The linear regression gives us reasonable starting values for the constant
d and the variance of the epsilon error
"""
exog = sm.add_constant(self.exog)
res = sm.OLS(self.endog, exog).fit()
params = np.r_[res.params[0], res.scale, 0.001, 0.001]
return params
def transform_params(self, unconstrained):
"""
We constraint the last three parameters
('var.e', 'var.x.coeff', 'var.w.coeff') to be positive,
because they are variances
"""
constrained = unconstrained.copy()
constrained[self.positive_parameters] = (
constrained[self.positive_parameters] ** 2
)
return constrained
def untransform_params(self, constrained):
"""
Need to unstransform all the parameters you transformed
in the `transform_params` function
"""
unconstrained = constrained.copy()
unconstrained[self.positive_parameters] = (
unconstrained[self.positive_parameters] ** 0.5
)
return unconstrained
def update(self, params, **kwargs):
params = super(TVRegression, self).update(params, **kwargs)
self["obs_intercept", 0, 0] = params[0]
self["obs_cov", 0, 0] = params[1]
self["state_cov"] = np.diag(params[2:4])
然后使用我们的自定义模型类进行估计¶
[ ]:
mod = TVRegression(y_t, x_t, w_t)
res = mod.fit()
print(res.summary())
生成数据的值是:
截距 = 5
var.e = 5
var.x.coeff = 0.01
var.w.coeff = 0.5
正如你所看到的,估计值很好地恢复了真实参数。
我们也可以恢复潜在系数的估计演变(或卡尔曼滤波中的状态)
[ ]:
fig, axes = plt.subplots(2, figsize=(16, 8))
ss = pd.DataFrame(res.smoothed_state.T, columns=["x", "w"])
axes[0].plot(beta_x, label="True")
axes[0].plot(ss["x"], label="Smoothed estimate")
axes[0].set(title="Time-varying coefficient on x_t")
axes[0].legend()
axes[1].plot(beta_w, label="True")
axes[1].plot(ss["w"], label="Smoothed estimate")
axes[1].set(title="Time-varying coefficient on w_t")
axes[1].legend()
fig.tight_layout();
模型2:具有非单位转移矩阵的时变参数¶
这是从模型1扩展而来的一个小扩展。我们不再使用单位转移矩阵,而是使用一个包含两个参数(\(\rho_1, \rho_2\))的矩阵,这两个参数需要我们进行估计。
我们在之前的类中应该修改什么以使事情正常工作? + 好消息:不需要做太多修改! + 坏消息:我们需要注意一些事情
1) 更改起始参数函数¶
我们需要为新参数 \(\rho_1, \rho_2\) 添加名称,并且需要开始相应的起始值。
函数 param_names
从以下内容开始:
def param_names(self):
return ['intercept', 'var.e', 'var.x.coeff', 'var.w.coeff']
到
def param_names(self):
return ['intercept', 'var.e', 'var.x.coeff', 'var.w.coeff',
'rho1', 'rho2']
并且我们将 start_params
函数从
def start_params(self):
exog = sm.add_constant(self.exog)
res = sm.OLS(self.endog, exog).fit()
params = np.r_[res.params[0], res.scale, 0.001, 0.001]
return params
到
def start_params(self):
exog = sm.add_constant(self.exog)
res = sm.OLS(self.endog, exog).fit()
params = np.r_[res.params[0], res.scale, 0.001, 0.001, 0.8, 0.8]
return params
更改
update
函数
它从
def update(self, params, **kwargs):
params = super(TVRegression, self).update(params, **kwargs)
self['obs_intercept', 0, 0] = params[0]
self['obs_cov', 0, 0] = params[1]
self['state_cov'] = np.diag(params[2:4])
到
def update(self, params, **kwargs):
params = super(TVRegression, self).update(params, **kwargs)
self['obs_intercept', 0, 0] = params[0]
self['obs_cov', 0, 0] = params[1]
self['state_cov'] = np.diag(params[2:4])
self['transition', 0, 0] = params[4]
self['transition', 1, 1] = params[5]
(可选) 更改
transform_params
和untransform_params
这不是必需的,但你可能希望将 \(\rho_1, \rho_2\) 限制在 -1 和 1 之间。在这种情况下,我们首先从 statsmodels
导入两个实用函数。
from statsmodels.tsa.statespace.tools import (
constrain_stationary_univariate, unconstrain_stationary_univariate)
constrain_stationary_univariate
约束值在 -1 和 1 之间。unconstrain_stationary_univariate
提供反函数。转换和反转换参数函数将如下所示(记住 \(\rho_1, \rho_2\) 位于第 4 和第 5 个索引):
def transform_params(self, unconstrained):
constrained = unconstrained.copy()
constrained[self.positive_parameters] = constrained[self.positive_parameters]**2
constrained[4] = constrain_stationary_univariate(constrained[4:5])
constrained[5] = constrain_stationary_univariate(constrained[5:6])
return constrained
def untransform_params(self, constrained):
unconstrained = constrained.copy()
unconstrained[self.positive_parameters] = unconstrained[self.positive_parameters]**0.5
unconstrained[4] = unconstrain_stationary_univariate(constrained[4:5])
unconstrained[5] = unconstrain_stationary_univariate(constrained[5:6])
return unconstrained
我将在下面写出完整的类(不包括我刚刚讨论的可选更改)
[ ]:
class TVRegressionExtended(sm.tsa.statespace.MLEModel):
def __init__(self, y_t, x_t, w_t):
exog = np.c_[x_t, w_t] # shaped nobs x 2
super(TVRegressionExtended, self).__init__(
endog=y_t, exog=exog, k_states=2, initialization="diffuse"
)
# Since the design matrix is time-varying, it must be
# shaped k_endog x k_states x nobs
# Notice that exog.T is shaped k_states x nobs, so we
# just need to add a new first axis with shape 1
self.ssm["design"] = exog.T[np.newaxis, :, :] # shaped 1 x 2 x nobs
self.ssm["selection"] = np.eye(self.k_states)
self.ssm["transition"] = np.eye(self.k_states)
# Which parameters need to be positive?
self.positive_parameters = slice(1, 4)
@property
def param_names(self):
return ["intercept", "var.e", "var.x.coeff", "var.w.coeff", "rho1", "rho2"]
@property
def start_params(self):
"""
Defines the starting values for the parameters
The linear regression gives us reasonable starting values for the constant
d and the variance of the epsilon error
"""
exog = sm.add_constant(self.exog)
res = sm.OLS(self.endog, exog).fit()
params = np.r_[res.params[0], res.scale, 0.001, 0.001, 0.7, 0.8]
return params
def transform_params(self, unconstrained):
"""
We constraint the last three parameters
('var.e', 'var.x.coeff', 'var.w.coeff') to be positive,
because they are variances
"""
constrained = unconstrained.copy()
constrained[self.positive_parameters] = (
constrained[self.positive_parameters] ** 2
)
return constrained
def untransform_params(self, constrained):
"""
Need to unstransform all the parameters you transformed
in the `transform_params` function
"""
unconstrained = constrained.copy()
unconstrained[self.positive_parameters] = (
unconstrained[self.positive_parameters] ** 0.5
)
return unconstrained
def update(self, params, **kwargs):
params = super(TVRegressionExtended, self).update(params, **kwargs)
self["obs_intercept", 0, 0] = params[0]
self["obs_cov", 0, 0] = params[1]
self["state_cov"] = np.diag(params[2:4])
self["transition", 0, 0] = params[4]
self["transition", 1, 1] = params[5]
为了估计,我们将使用与模型1相同的数据,并期望\(\rho_1, \rho_2\)接近1。
结果看起来相当不错!请注意,这个估计对\(\rho_1, \rho_2\)的初始值非常敏感。如果你尝试较低的值,你会发现它无法收敛。
[ ]:
mod = TVRegressionExtended(y_t, x_t, w_t)
res = mod.fit(maxiter=2000) # it doesn't converge with 50 iters
print(res.summary())
模型3:多观测和状态方程¶
我们将保留时间变化的参数,但这次我们还将有两个观测方程。
观测方程¶
\(\hat{i_t}, \hat{M_t}, \hat{s_t}\) 每个时期都会被观察到。
观测方程的模型有两个方程:
遵循状态空间模型的一般符号,观测方程的内生部分是 \(y_t = (\hat{i_t}, \hat{M_t})\),我们只有一个外生变量 \(\hat{s_t}\)
状态方程¶
状态空间模型的矩阵表示法¶
我将模拟一些数据,讨论我们需要修改的内容,最后估计模型以查看我们是否恢复了合理的结果。
[ ]:
true_values = {
"var_e1": 0.01,
"var_e2": 0.01,
"var_w1": 0.01,
"var_w2": 0.01,
"delta1": 0.8,
"delta2": 0.5,
"delta3": 0.7,
}
def gen_data_for_model3():
# Starting values
alpha1_0 = 2.1
alpha2_0 = 1.1
t_max = 500
def gen_i(alpha1, s):
return alpha1 * s + np.sqrt(true_values["var_e1"]) * np.random.randn()
def gen_m_hat(alpha2):
return 1 * alpha2 + np.sqrt(true_values["var_e2"]) * np.random.randn()
def gen_alpha1(alpha1, alpha2):
w1 = np.sqrt(true_values["var_w1"]) * np.random.randn()
return true_values["delta1"] * alpha1 + true_values["delta2"] * alpha2 + w1
def gen_alpha2(alpha2):
w2 = np.sqrt(true_values["var_w2"]) * np.random.randn()
return true_values["delta3"] * alpha2 + w2
s_t = 0.3 + np.sqrt(1.4) * np.random.randn(t_max)
i_hat = np.empty(t_max)
m_hat = np.empty(t_max)
current_alpha1 = alpha1_0
current_alpha2 = alpha2_0
for t in range(t_max):
# Obs eqns
i_hat[t] = gen_i(current_alpha1, s_t[t])
m_hat[t] = gen_m_hat(current_alpha2)
# state eqns
new_alpha1 = gen_alpha1(current_alpha1, current_alpha2)
new_alpha2 = gen_alpha2(current_alpha2)
# Update states for next period
current_alpha1 = new_alpha1
current_alpha2 = new_alpha2
return i_hat, m_hat, s_t
i_hat, m_hat, s_t = gen_data_for_model3()
我们需要修改什么?¶
再次强调,我们不需要做太多改动,但需要注意尺寸。
1) The __init__
函数的变化¶
def __init__(self, y_t, x_t, w_t):
exog = np.c_[x_t, w_t]
super(TVRegressionExtended, self).__init__(
endog=y_t, exog=exog, k_states=2,
initialization='diffuse')
self.ssm['design'] = exog.T[np.newaxis, :, :] # shaped 1 x 2 x nobs
self.ssm['selection'] = np.eye(self.k_states)
self.ssm['transition'] = np.eye(self.k_states)
到
def __init__(self, i_t: np.array, s_t: np.array, m_t: np.array):
exog = np.c_[s_t, np.repeat(1, len(s_t))] # exog.shape => (nobs, 2)
super(MultipleYsModel, self).__init__(
endog=np.c_[i_t, m_t], exog=exog, k_states=2,
initialization='diffuse')
self.ssm['design'] = np.zeros((self.k_endog, self.k_states, self.nobs))
self.ssm['design', 0, 0, :] = s_t
self.ssm['design', 1, 1, :] = 1
请注意,我们不需要在任何地方指定k_endog
。初始化会在检查endog
矩阵的维度后为我们完成这项工作。
2) The update()
函数¶
更改自
def update(self, params, **kwargs):
params = super(TVRegressionExtended, self).update(params, **kwargs)
self['obs_intercept', 0, 0] = params[0]
self['obs_cov', 0, 0] = params[1]
self['state_cov'] = np.diag(params[2:4])
self['transition', 0, 0] = params[4]
self['transition', 1, 1] = params[5]
到
def update(self, params, **kwargs):
params = super(MultipleYsModel, self).update(params, **kwargs)
#The following line is not needed (by default, this matrix is initialized by zeroes),
#But I leave it here so the dimensions are clearer
self['obs_intercept'] = np.repeat([np.array([0, 0])], self.nobs, axis=0).T
self['obs_cov', 0, 0] = params[0]
self['obs_cov', 1, 1] = params[1]
self['state_cov'] = np.diag(params[2:4])
#delta1, delta2, delta3
self['transition', 0, 0] = params[4]
self['transition', 0, 1] = params[5]
self['transition', 1, 1] = params[6]
其余的方法变化非常明显(需要添加参数名称,确保索引工作正常等)。函数的完整代码如下
[ ]:
starting_values = {
"var_e1": 0.2,
"var_e2": 0.1,
"var_w1": 0.15,
"var_w2": 0.18,
"delta1": 0.7,
"delta2": 0.1,
"delta3": 0.85,
}
class MultipleYsModel(sm.tsa.statespace.MLEModel):
def __init__(self, i_t: np.array, s_t: np.array, m_t: np.array):
exog = np.c_[s_t, np.repeat(1, len(s_t))] # exog.shape => (nobs, 2)
super(MultipleYsModel, self).__init__(
endog=np.c_[i_t, m_t], exog=exog, k_states=2, initialization="diffuse"
)
self.ssm["design"] = np.zeros((self.k_endog, self.k_states, self.nobs))
self.ssm["design", 0, 0, :] = s_t
self.ssm["design", 1, 1, :] = 1
# These have ok shape. Placeholders since I'm changing them
# in the update() function
self.ssm["selection"] = np.eye(self.k_states)
self.ssm["transition"] = np.eye(self.k_states)
# Dictionary of positions to names
self.position_dict = OrderedDict(
var_e1=1, var_e2=2, var_w1=3, var_w2=4, delta1=5, delta2=6, delta3=7
)
self.initial_values = starting_values
self.positive_parameters = slice(0, 4)
@property
def param_names(self):
return list(self.position_dict.keys())
@property
def start_params(self):
"""
Initial values
"""
# (optional) Use scale for var_e1 and var_e2 starting values
params = np.r_[
self.initial_values["var_e1"],
self.initial_values["var_e2"],
self.initial_values["var_w1"],
self.initial_values["var_w2"],
self.initial_values["delta1"],
self.initial_values["delta2"],
self.initial_values["delta3"],
]
return params
def transform_params(self, unconstrained):
"""
If you need to restrict parameters
For example, variances should be > 0
Parameters maybe have to be within -1 and 1
"""
constrained = unconstrained.copy()
constrained[self.positive_parameters] = (
constrained[self.positive_parameters] ** 2
)
return constrained
def untransform_params(self, constrained):
"""
Need to reverse what you did in transform_params()
"""
unconstrained = constrained.copy()
unconstrained[self.positive_parameters] = (
unconstrained[self.positive_parameters] ** 0.5
)
return unconstrained
def update(self, params, **kwargs):
params = super(MultipleYsModel, self).update(params, **kwargs)
# The following line is not needed (by default, this matrix is initialized by zeroes),
# But I leave it here so the dimensions are clearer
self["obs_intercept"] = np.repeat([np.array([0, 0])], self.nobs, axis=0).T
self["obs_cov", 0, 0] = params[0]
self["obs_cov", 1, 1] = params[1]
self["state_cov"] = np.diag(params[2:4])
# delta1, delta2, delta3
self["transition", 0, 0] = params[4]
self["transition", 0, 1] = params[5]
self["transition", 1, 1] = params[6]
[ ]:
mod = MultipleYsModel(i_hat, s_t, m_hat)
res = mod.fit()
print(res.summary())
Bonus: 使用pymc3进行快速贝叶斯估计¶
在本节中,我将展示如何将您的自定义状态空间模型轻松接入pymc3
,并使用贝叶斯方法进行估计。特别是,本示例将展示如何使用一种称为No-U-Turn采样器(NUTS)的哈密顿蒙特卡罗版本进行估计。
我基本上是在复制这个笔记本中包含的想法,所以一定要查看那里以获取更多详细信息。
[ ]:
# Extra requirements
import pymc3 as pm
import theano
import theano.tensor as tt
我们需要定义一些辅助函数来将theano连接到我们模型中隐含的似然函数
[ ]:
class Loglike(tt.Op):
itypes = [tt.dvector] # expects a vector of parameter values when called
otypes = [tt.dscalar] # outputs a single scalar value (the log likelihood)
def __init__(self, model):
self.model = model
self.score = Score(self.model)
def perform(self, node, inputs, outputs):
(theta,) = inputs # contains the vector of parameters
llf = self.model.loglike(theta)
outputs[0][0] = np.array(llf) # output the log-likelihood
def grad(self, inputs, g):
# the method that calculates the gradients - it actually returns the
# vector-Jacobian product - g[0] is a vector of parameter values
(theta,) = inputs # our parameters
out = [g[0] * self.score(theta)]
return out
class Score(tt.Op):
itypes = [tt.dvector]
otypes = [tt.dvector]
def __init__(self, model):
self.model = model
def perform(self, node, inputs, outputs):
(theta,) = inputs
outputs[0][0] = self.model.score(theta)
我们将再次模拟用于模型1的数据。我们还将拟合
它,并将结果保存下来,以便与我们得到的贝叶斯后验进行比较。
[ ]:
y_t, x_t, w_t, beta_x, beta_w = gen_data_for_model1()
plt.plot(y_t)
[ ]:
mod = TVRegression(y_t, x_t, w_t)
res_mle = mod.fit(disp=False)
print(res_mle.summary())
贝叶斯估计¶
我们需要为每个参数定义一个先验,以及抽取的数量和燃烧点
[ ]:
# Set sampling params
ndraws = 3000 # 3000 number of draws from the distribution
nburn = 600 # 600 number of "burn-in points" (which will be discarded)
[ ]:
# Construct an instance of the Theano wrapper defined above, which
# will allow PyMC3 to compute the likelihood and Jacobian in a way
# that it can make use of. Here we are using the same model instance
# created earlier for MLE analysis (we could also create a new model
# instance if we preferred)
loglike = Loglike(mod)
with pm.Model():
# Priors
intercept = pm.Uniform("intercept", 1, 10)
var_e = pm.InverseGamma("var.e", 2.3, 0.5)
var_x_coeff = pm.InverseGamma("var.x.coeff", 2.3, 0.1)
var_w_coeff = pm.InverseGamma("var.w.coeff", 2.3, 0.1)
# convert variables to tensor vectors
theta = tt.as_tensor_variable([intercept, var_e, var_x_coeff, var_w_coeff])
# use a DensityDist (use a lamdba function to "call" the Op)
pm.DensityDist("likelihood", loglike, observed=theta)
# Draw samples
trace = pm.sample(
ndraws,
tune=nburn,
return_inferencedata=True,
cores=1,
compute_convergence_checks=False,
)
后验分布与最大似然估计(MLE)相比如何?¶
在最大似然估计附近明显达到峰值。
[ ]:
results_dict = {
"intercept": res_mle.params[0],
"var.e": res_mle.params[1],
"var.x.coeff": res_mle.params[2],
"var.w.coeff": res_mle.params[3],
}
plt.tight_layout()
_ = pm.plot_trace(
trace,
lines=[(k, {}, [v]) for k, v in dict(results_dict).items()],
combined=True,
figsize=(12, 12),
)