-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathnet.py
126 lines (94 loc) · 3.82 KB
/
net.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
import joey as ml
import numpy as np
from devito import Eq, Inc, Operator, ConditionalDimension, Ne, Function, \
Constant
from joey import default_name_allocator as alloc
from joey import default_dim_allocator as dim_alloc
from sympy import And
class Net:
def __init__(self, layers: list):
self._layers = layers
self._batch_constant = Constant(name='batch', dtype=np.int32)
self._forward_arg_dict = {}
self._backward_arg_dict = {}
eqs, args = self._gen_eqs()
backprop_eqs, backprop_args = self._gen_backprop_eqs()
for (key, value) in args:
self._forward_arg_dict[key] = value
for (key, value) in backprop_args:
self._backward_arg_dict[key] = value
parameter_lists = list(map(ml.Layer.pytorch_parameters, self._layers))
parameters = []
for (kernel_parameter, bias_parameter) in parameter_lists:
if kernel_parameter is not None:
parameters.append(kernel_parameter)
if bias_parameter is not None:
parameters.append(bias_parameter)
self._parameters = parameters
self._init_parameters()
self._forward_operator = Operator(eqs)
self._backward_operator = Operator(backprop_eqs)
def _init_parameters(self):
for layer in self._layers:
if layer.kernel is not None:
layer.kernel.data[:] = \
np.random.rand(*layer.kernel.shape) - 0.5
if layer.bias is not None:
layer.bias.data[:] = np.random.rand(*layer.bias.shape) - 0.5
def _gen_eqs(self):
eqs = []
args = []
input_function = None
for layer in self._layers:
if input_function is not None:
dims = input_function.dimensions
eqs.append(Eq(layer.input[dims], input_function[dims]))
layer_eqs, layer_args = layer.equations()
args += layer_args
eqs += layer_eqs
input_function = layer.result
return (eqs, args)
def _gen_backprop_eqs(self):
eqs = []
args = []
for i in range(len(self._layers) - 1, -1, -1):
if i < len(self._layers) - 1:
prev_layer = self._layers[i + 1]
else:
prev_layer = None
if i > 0:
next_layer = self._layers[i - 1]
else:
next_layer = None
layer_eqs, layer_args = \
self._layers[i].backprop_equations(prev_layer, next_layer,
self._batch_constant)
args += layer_args
eqs += layer_eqs
return (eqs, args)
@property
def pytorch_parameters(self):
return self._parameters
def forward(self, input_data):
for layer in self._layers:
layer.result.data[:] = 0
self._layers[0].input.data[:] = input_data
self._forward_operator.apply(**self._forward_arg_dict)
return self._layers[-1].result.data
def backward(self, loss_gradient_func, pytorch_optimizer=None):
for layer in self._layers:
if layer.kernel_gradients is not None:
layer.kernel_gradients.data[:] = 0
if layer.bias_gradients is not None:
layer.bias_gradients.data[:] = 0
if len(self._layers[-1].result.shape) < 2:
batch_size = 1
else:
batch_size = self._layers[-1].result.shape[1]
for i in range(batch_size):
self._batch_constant.data = i
self._layers[-1].result_gradients.data[:] = \
loss_gradient_func(self._layers[-1], i)
self._backward_operator.apply(**self._backward_arg_dict)
if pytorch_optimizer is not None:
pytorch_optimizer.step()