前言

因为在项目中单次的治疗效果找不到什么指标,用的是模型倒数第二层输出的特征向量和不患病的人之间做特征相似度的分析,但是长期的治疗分析找不到很好的指标,于是想到了使用时间序列的趋势做分析,对于时间序列趋势的检验有 mann-kendall 算法。

mann-kendall 检验

mann-kendall 检验用于判断给定的时间序列是否存在线性单调趋势。

  • H0H_0假设:不存在单调趋势
  • HaH_a假设:三种
    • 存在单调上升趋势
    • 存在单调下降趋势
    • 存在单调上升或者单调下降趋势

mann-kendall 的假设

以下假设是 mk 测试检验的基础

  • 在没有趋势的时候,数据成独立同分布
  • 随时间推移的观察结果代表真实情况的采样
  • 用于采样的方法、仪器测量是无偏差的

优点

  • 不假设数据是符合特定的分布

  • 不受缺省值的影响,受采样值数量的影响

  • 不受采样时间间距不规则影响

  • 不受时间序列长度影响

局限性

  • 不适用与具有周期性的数据,为了使数据有效,建建议预处理前删除已知的周期性影响
  • 检验对较短的数据给出较多坏结果,序列越长趋势检测越有效

计算方法

  1. 按照时间顺序列出所有数据 x1,x2,...,xnx_1, x_2, ..., x_n
  2. 计算所有 n(n1)/2n(n-1)/2 种的 xjxkx_j-x_k 的差值其中 j>kj \gt k
  3. 计算上一步中差值的 sgnsgn 函数结果

sgn(xixj)={1,xixj>00,xixj=01,xixj<0sgn(x_i - x_j) = \begin{cases} 1, & x_i - x_j > 0\\ 0, & x_i - x_j = 0\\ -1, & x_i - x_j < 0 \end{cases}

  1. 计算所有 sgnsgn 的和

S=k1n1jk+1nsgn(xjxk)S = \displaystyle\sum_{k-1}^{n-1}\displaystyle\sum_{j-k+1}^{n}\text{sgn}(x_j-x_k)

  1. 计算方差 VAR(S)VAR(S),公式如下。其中 g 是有相同数据的组的数量,tpt_p是相同数据的数量。例如序列 [23, 24, 29, 6, 29, 24, 24, 29, 23] 中,我们有 g=3 个相同数据组,t1=2t_1=2,因为序列中有 2 个 23,t2=3t_2=3,因为数据中有 3 个 24,t3=3t_3=3 因为数据中有 3 个 29

VAR(S)=118[n(n1)(2n+5)p1gtp(tp1)(2tp+5)]\text{VAR}(S) = \frac{1}{18}\Big[n(n-1)(2n+5) - \displaystyle\sum_{p-1}^{g}t_p(t_p-1)(2t_p+5)\Big]

  1. 计算 mann kendall 检验的统计量

ZMK={S1VAR(S),S>00,S=0S+1VAR(S),S<0Z_{MK} = \begin{cases} \frac{S - 1} {\sqrt{VAR(S)}}, & S > 0\\ 0, & S = 0\\ \frac{S + 1} {\sqrt{VAR(S)}}, & S < 0\\ \end{cases}

  1. 检验假设:显著性水平 aa0<a<0.50<a<0.5)表示了 MK 检验中错误的拒绝原假设的可容忍概率
    • H0H_0假设:无趋势
    • 单调上升趋势
      • HaH_a假设:单调上升趋势
      • 如果 ZMKZ1aZ_{MK} \geq Z_{1-a} 就接受假设 HaH_aZ1aZ_{1-a}表示正态分布中第 100 (1-a) 个百分位数
    • 单调下降趋势
      • HaH_a假设:单调下降趋势
      • 如果 $Z_{MK} \leq -Z_{1-a} $ 就接受假设 HaH_a
    • 存在单调上升或者单调下降趋势
      • HaH_a假设:存在单调上升或者单调下降趋势
      • 如果ZMKZ1a/2|Z_{MK}| \geq Z_{1-a/2} 就接受假设 HaH_a

修正公式

因为 mk 检验中一个重要的步骤就是计算两次测量之间的差异大于、等于还是小于 0. 如果我们的测量的 x 精度 ε=0.01\varepsilon = 0.01,由于存在浮点计算误差可能有 x27x11=0.000251>0x_{27} - x_{11} = 0.000251 > 0,这个大于 0 的值其实是没有意义的,因为在实际的测量中是无法确定这么小的差异,因此在 mk 检验的实现中我们加入了一个 ε\varepsilon 来解决
修改上面两个公式

sgn(xixj)={1,xixj>ε0,xixjε1,xixj<εsgn(x_i - x_j) = \begin{cases} 1, & x_i - x_j > \varepsilon\\ 0, & |x_i - x_j| \leq \varepsilon\\ -1, & x_i - x_j < -\varepsilon \end{cases}

ZMK={S1VAR(S),S>ε0,SεS+1VAR(S),S<εZ_{MK} = \begin{cases} \frac{S - 1} {\sqrt{VAR(S)}}, & S > \varepsilon\\ 0, & |S| \leq \varepsilon\\ \frac{S + 1} {\sqrt{VAR(S)}}, & S < -\varepsilon\\ \end{cases}

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import numpy as np
from scipy.stats import norm
from collections import namedtuple

def _preprocess(x):
"""
数据预处理 返回拉平的x和通道数
"""
x = np.asarray(x).astype(float)
dim = x.ndim

if dim == 1:
c = 1
elif dim == 2:
(_, c) = x.shape

if c == 1:
x = x.flatten()
else:
assert 1 == dim, "please check you input shape"

return x, c


def _missing_values_analysis(x, method: str = "skip"):
"""
缺省值预处理,默认抛弃
"""
method = method.lower()
assert method in ["skip"], f"method: {method} is not allow"
if method == "skip":
if x.ndim == 1:
x = x[~np.isnan(x)]
else:
x = x[~np.isnan(x).any(axis=1)]

n = len(x)

return x, n


def _mk_score(x, n, eps=1e-6):
"""
计算 mk算法s分数
"""

sgn = np.zeros((n, n), dtype="int")
for i in range(n):
tmp = x - x[i]
tmp[np.where(np.fabs(tmp) <= eps)] = 0.0 # 对0特殊处理
sgn[i] = np.sign(tmp)

s = sgn[np.triu_indices(n, k=1)].sum() # 取上三角,不含对角线
return s, sgn


def _variance_s(x, sgn, n,eps=1e-6):
"""
计算方差
"""
np.fill_diagonal(sgn,eps*1e6)
i,_ = np.where(sgn==0.)
ties = np.unique(x[i]) #重复的点
p = len(ties)
q = np.zeros(len(ties), dtype="int")
for k in range(p):
idx = np.where(np.fabs(x-ties[k])<eps)
q[k] = len(idx)

term1 = n * (n - 1) * (2 * n + 5)
term2 = (q * (q - 1) * (2 * q + 5)).sum()

var_s = float(term1 - term2) / 18.
return var_s

def _z_score(s,var_s,eps=1e-6):
if s > eps:
z = (s-1)/np.sqrt(var_s)
elif np.fabs(s) <= eps:
z = 0.
else:
z = (s+1) / np.sqrt(var_s)
return z

def _p_value(z,alpha):
# 双端测试
p = 2*(1-norm.cdf(abs(z)))
h = abs(z) > norm.ppf(1-alpha/2)

if (z < 0) and h:
trend = 'decreasing'
elif (z > 0) and h:
trend = 'increasing'
else:
trend = 'no trend'

return p, h, trend

def mann_kendall_test(x,alpha = 0.05, eps=1e-6):
'''
x: 待检验数据
alpha: 显著性水平 允许错误否定假设的误差 0~0.5
eps: 最小计数误差
'''
res = namedtuple("mann_kendall_result",['trend','h','p'])
x, _ = _preprocess(x)
x, n = _missing_values_analysis(x)
s, sgn = _mk_score(x, n, eps=eps)
var_s = _variance_s(x,sgn,n,eps=eps)
z = _z_score(s,var_s,eps=eps)
p,h,trend = _p_value(z,alpha)
# 对于mk算法 p不是必须的
# 可以算斜率,但用不上
return res(trend,h,p)

测试效果

1
2
3
4
5
6
x = np.arange(9)
print(mann_kendall_test(x))
x = np.random.sample(60)
print(mann_kendall_test(x))
x = np.random.sample(60) - np.arange(60)
print(mann_kendall_test(x))

输出如下

1
2
3
mann_kendall_result(trend='increasing', h=True, p=0.00026326080270355767)
mann_kendall_result(trend='no trend', h=False, p=0.754646525401216)
mann_kendall_result(trend='decreasing', h=True, p=0.0)

参考文献