-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathrisk_parity.py
132 lines (106 loc) · 3.69 KB
/
risk_parity.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import pandas as pd
import numpy as np
from cvxpy import *
def get_smart_weight(cov_mat, method='min variance', wts_adjusted=False):
'''
功能:输入协方差矩阵,得到不同优化方法下的权重配置
输入:
cov_mat pd.DataFrame,协方差矩阵,index和column均为资产名称
method 优化方法,可选的有min variance、risk parity、max diversification、equal weight
输出:
pd.Series index为资产名,values为weight
PS:
依赖scipy package
'''
if not isinstance(cov_mat, pd.DataFrame):
raise ValueError('cov_mat should be pandas DataFrame!')
omega = np.matrix(cov_mat.values) # 协方差矩阵
# 定义目标函数
def fun1(x):
return np.matrix(x) * omega * np.matrix(x).T
def fun2(x):
tmp = (omega * np.matrix(x).T).A1
risk = x * tmp
delta_risk = [sum((i - risk)**2) for i in risk]
return sum(delta_risk)
def fun3(x):
den = x * omega.diagonal().T
num = np.sqrt(np.matrix(x) * omega * np.matrix(x).T)
return num/den
# 初始值 + 约束条件
x0 = np.ones(omega.shape[0]) / omega.shape[0]
bnds = tuple((0,None) for x in x0)
cons = ({'type':'eq', 'fun': lambda x: sum(x) - 1})
options={'disp':False, 'maxiter':1000, 'ftol':1e-20}
if method == 'min variance':
res = minimize(fun1, x0, bounds=bnds, constraints=cons, method='SLSQP', options=options)
elif method == 'risk parity':
res = minimize(fun2, x0, bounds=bnds, constraints=cons, method='SLSQP', options=options)
elif method == 'max diversification':
res = minimize(fun3, x0, bounds=bnds, constraints=cons, method='SLSQP', options=options)
elif method == 'equal weight':
return pd.Series(index=cov_mat.index, data=1.0 / cov_mat.shape[0])
else:
raise ValueError('method should be min variance/risk parity/max diversification/equal weight!!!')
# 权重调整
if res['success'] == False:
# print res['message']
pass
wts = pd.Series(index=cov_mat.index, data=res['x'])
if wts_adjusted == True:
wts = wts[wts >= 0.0001]
return wts / wts.sum() * 1.0
elif wts_adjusted == False:
return wts
else:
raise ValueError('wts_adjusted should be True/False!')
def risk_parity(df, epsilon):
n = df.shape[1]
Sigma = df.cov().values
y0 = np.random.random(n + 1)
y0[:-1] = 1.0 / (Sigma.diagonal() ** 0.5)
y0[-1] = 0.5
flag = True
y_old = y0
while flag:
y_new = np.array(y_old - J_inverse(y_old, Sigma).dot(F(y_old, Sigma)))[0]
print y_new
if np.sum((y_new - y_old)**2) < epsilon:
flag = False
y_old = y_new
return y_old
def F(y, Sigma):
x = y[: -1]
Lambda = y[-1]
n = Sigma.shape[0]
a = np.zeros(n + 1)
a[: -1] = Sigma.dot(x) - Lambda * 1.0 / x
a[-1] = x.sum() - 1
return a
def J_inverse(y, Sigma):
x = y[:-1]
Lambda = y[-1]
n = Sigma.shape[0]
a = np.matrix(np.zeros([n +1, n + 1]))
a[:-1,:-1] = Sigma + Lambda * np.diag(1.0 / x ** 2) - 1.0 / x
a[-1, :-1] = np.zeros(n) + 1
a[:,-1][:-1] = (- 1.0 / x).reshape(-1, 1)
a[-1][-1] = 0
return a.I
def func(x, Sigma):
n = Sigma.shape[0]
f = 0.0
for i in range(n):
for j in range(n):
TRCi = x[i] *((Sigma.dot(x))[i])
TRCj = x[j] *((Sigma.dot(x))[j])
f += (TRCi - TRCj) ** 2
return f
def func(x, Sigma):
n = Sigma.shape[0]
f = 0.0
for i in range(n):
for j in range(n):
TRCi = x[i] *((Sigma.dot(x))[i])
TRCj = x[j] *((Sigma.dot(x))[j])
f += (TRCi - TRCj) ** 2