-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathprototype_system.py
162 lines (124 loc) · 5.54 KB
/
prototype_system.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Created on Tue Jul 19 2022
@author: Jack Burgess
"""
import numpy as np
from sklearn.linear_model import LinearRegression
#from sklearn.linear_model import LogisticRegression
import matplotlib.pyplot as plt
# Environment class
class Env:
# initialization method
def __init__(self, sample_size):
self.sample_size = sample_size
self.rng = np.random.default_rng() # add random seed here if wanted
self.counter = 0
# generate environmental sample
def sample(self):
if self.counter < 15:
k = -5
else:
k = 5
X = self.rng.random(self.sample_size) # X ~ Unif[0,1)
p = 1 / (1 + np.exp(-k * (X - 0.5))) # p(x is 1) follows logistic
Y = self.rng.random(self.sample_size) < p # generate labels from probs
Y = Y.astype(int) # cast bool labels to integer type
self.counter = self.counter + 1
return (X, Y)
# Learner class
class Learner:
# initialization method
def __init__(self, env):
self.env = env
# instantiate models
self.m_e = LinearRegression()
self.m_i = LinearRegression()
self.m_c = LinearRegression()
# self.m_e = LogisticRegression()
# self.m_i = LogisticRegression()
# self.m_c = LogisticRegression()
# initialize models with random data
self.m_e.fit(self.env.rng.random(self.env.sample_size).reshape(-1, 1),
self.env.rng.choice(2, self.env.sample_size))
self.m_i.fit(self.env.rng.random(self.env.sample_size).reshape(-1, 1),
self.env.rng.choice(2, self.env.sample_size))
self.m_c.fit(self.env.rng.random(self.env.sample_size).reshape(-1, 1),
self.env.rng.choice(2, self.env.sample_size))
# run learner on environment
def run(self, params):
m_e_scores = []
m_i_scores = []
m_c_scores = []
for run in range(params['n_runs']):
(X_w, Y_w) = self.env.sample()
#print(self.evaluate(X_w, Y_w)) # print current score
m_e_score = sum((self.m_e.predict(X_w.reshape(-1, 1)) > 0.5) ==
Y_w.astype(bool)) / self.env.sample_size
m_i_score = sum((self.m_i.predict(X_w.reshape(-1, 1)) > 0.5) ==
Y_w.astype(bool)) / self.env.sample_size
m_c_score = sum((self.m_c.predict(X_w.reshape(-1, 1)) > 0.5) ==
Y_w.astype(bool)) / self.env.sample_size
m_e_scores.append(m_e_score)
m_i_scores.append(m_i_score)
m_c_scores.append(m_c_score)
# m_e_scores.append(self.m_e.score(X_w.reshape(-1, 1), Y_w))
# m_i_scores.append(self.m_i.score(X_w.reshape(-1, 1), Y_w))
# m_c_scores.append(self.m_c.score(X_w.reshape(-1, 1), Y_w))
self.update(X_w, Y_w, params)
return (m_e_scores, m_i_scores, m_c_scores)
# evaluate current state
def evaluate(self, X_w, Y_w):
return self.m_c.score(X_w.reshape(-1, 1), Y_w)
# update current state
def update(self, X_w, Y_w, params):
# calculate environment and imagination model losses
Y_hat_e = self.m_e.predict(X_w.reshape(-1, 1))
loss_e = np.sum( (Y_w - Y_hat_e) ** 2)
Y_hat_i = self.m_i.predict(X_w.reshape(-1, 1))
loss_i = np.sum( (Y_w - Y_hat_i) ** 2)
if (loss_e + loss_i) == 0: # prevent divide by zero error
phi = 0.5
else:
phi = loss_e / (loss_e + loss_i) # relative loss proportion
#print("phi = " + str(phi))
#print(self.m_e.score(X_w.reshape(-1, 1), Y_w))
#print(self.m_i.score(X_w.reshape(-1, 1), Y_w))
#print(self.m_c.score(X_w.reshape(-1, 1), Y_w))
#print()
# train environment model on environment data
self.m_e.fit(X_w.reshape(-1, 1), Y_w)
# train imagination model on imagined data
X_i = self.env.rng.random(self.env.sample_size) # X ~ Unif[0,1)
Y_i = self.m_c.predict(X_w.reshape(-1, 1)) # imagine from combo model
self.m_i.fit(X_i.reshape(-1, 1), Y_i)
theta_e = (self.m_e.coef_, self.m_e.intercept_)
theta_i = (self.m_i.coef_, self.m_i.intercept_)
theta_c = (self.m_c.coef_, self.m_c.intercept_)
if 'phi' in params:
phi = params['phi']
theta_x = (theta_e[0] * (1 - phi) + theta_i[0] * phi,
theta_e[1] * (1 - phi) + theta_i[1] * phi)
gam = params['gam']
theta_c = (theta_x[0] * (1 - gam) + theta_c[0] * gam,
theta_x[1] * (1 - gam) + theta_c[1] * gam)
(self.m_c.coef_, self.m_c.intercept_) = theta_c
#print(theta_e)
#print(sample_w)
plt.figure()
plt.title('ImaginAgent performance over batches across gammas')
plt.xlabel('batch')
plt.ylabel('Accuracy')
plt.ylim(0,1)
for gam in [0.00, 0.10, 0.50, 0.75, 0.85, 0.90]: #np.arange(0, 1, 0.25):
params = {'n_runs': 30, 'phi': 0, 'gam': gam} #
n_batches = 1000
results = np.full([n_batches, params['n_runs']], np.nan)
for batch in range(n_batches):
e = Env(sample_size=3)
l = Learner(e)
results[batch] = l.run(params)[2]
#plt.plot(results[0], label='M_e')
plt.plot(np.mean(results, axis=0), label=('gam = ' + str(gam)))
plt.legend(title=f'Avg over {n_batches} runs')