15
15
"""Function to iterate graph nodes. Implements functions that can be
16
16
used to implement the experiments of the Sandnes and Sinnen paper."""
17
17
18
- from decyclify . functions import create_intraiteration_matrix , create_interiteration_matrix , decyclify
19
- from networkx import DiGraph
18
+ from typing import Union
19
+
20
20
import numpy as np
21
+ from networkx import DiGraph
22
+
23
+ from decyclify .functions import create_intraiteration_matrix , create_interiteration_matrix
21
24
22
25
23
26
class CycleIterator :
24
27
"""An iterator that will iterate over all the nodes in a cycle,
25
28
before moving to the next cycles."""
26
29
27
- def __init__ (self , graph : DiGraph , cycles = 2 ):
30
+ def __init__ (self , graph : DiGraph , cycles = 2 , include_cycle = True ):
28
31
if not isinstance (graph , DiGraph ):
29
32
raise TypeError ('graph must be a non-empty DiGraph' )
30
33
if not isinstance (cycles , int ):
@@ -39,10 +42,17 @@ def __init__(self, graph: DiGraph, cycles=2):
39
42
self .current_column = - 1
40
43
self .nodes = [node for node in self .graph .nodes ]
41
44
self .count_nodes = len (self .nodes )
45
+ self .include_cycle = include_cycle
42
46
43
47
def __iter__ (self ):
44
48
return self
45
49
50
+ def _get_node_text (self , node ):
51
+ if not self .include_cycle :
52
+ return node
53
+ return f'{ node } .{ self .current_cycle } '
54
+
55
+
46
56
def __next__ (self ):
47
57
# if we have completed a cycle, we must reset the indexes
48
58
if self .current_column == self .count_nodes - 1 :
@@ -55,14 +65,14 @@ def __next__(self):
55
65
# dependency
56
66
if self .current_column == - 1 :
57
67
node = self .nodes [self .current_column + 1 ]
58
- nodes .append (f' { node } . { self .current_cycle } ' )
68
+ nodes .append (self ._get_node_text ( node ) )
59
69
else :
60
70
while True :
61
71
empty_column = True
62
72
for row_index in range (0 , self .count_nodes ):
63
73
if self .matrix [row_index , self .current_column ] == 1 :
64
74
node = self .nodes [row_index ]
65
- nodes .append (f' { node } . { self .current_cycle } ' )
75
+ nodes .append (self ._get_node_text ( node ) )
66
76
empty_column = False
67
77
if not empty_column :
68
78
break
@@ -71,32 +81,154 @@ def __next__(self):
71
81
return nodes
72
82
73
83
84
+ class Cycle :
85
+
86
+ def __init__ (
87
+ self ,
88
+ cycle_number :int = 1 ,
89
+ graph :DiGraph = None ,
90
+ nodes :[]= None ,
91
+ interiteration_matrix :np .ndarray = None
92
+ ):
93
+ self .cycle_number = cycle_number
94
+ self .nodes : Union [None , list ] = nodes
95
+ self .current_nodes = [node for node in nodes ]
96
+ # one-cycle iteration (i.e. intra-iteration)
97
+ self .intraiteration_nodes = [node for node in CycleIterator (graph , cycles = 1 , include_cycle = False )]
98
+ self .interiteration_matrix = interiteration_matrix
99
+
100
+ self .previous : Union [Cycle , None ] = None
101
+ self .next = None
102
+
103
+ def iterate (self , new_nodes_in_this_cycle ) -> list :
104
+ new_nodes = []
105
+ if self ._is_first ():
106
+ intraiteration_nodes = self ._pop_next ()
107
+ if intraiteration_nodes :
108
+ new_nodes .extend (intraiteration_nodes )
109
+ else :
110
+ # we have iterated through all possible nodes in the current cycle
111
+ raise RemoveCycle ()
112
+ else :
113
+ # for the next cycle, we can only add; the next cycle later becomes current cycle,
114
+ # and is then later removed
115
+ new_nodes .extend (self ._pop_next_interiteration (new_nodes_in_this_cycle ))
116
+ return new_nodes
117
+
118
+ def _get_node_text (self , node ):
119
+ return f'{ node } .{ self .cycle_number } '
120
+
121
+ def _next_nodes (self ):
122
+ if not self .intraiteration_nodes :
123
+ return []
124
+ return [self ._get_node_text (node ) for node in self .intraiteration_nodes [0 ].copy ()]
125
+
126
+ def _remove_nodes (self , nodes ):
127
+ for (key , intraiteration_node ) in enumerate (self .intraiteration_nodes ):
128
+ for node in intraiteration_node :
129
+ if node in nodes :
130
+ intraiteration_node .remove (node )
131
+ self .current_nodes .remove (node )
132
+ if not intraiteration_node :
133
+ self .intraiteration_nodes .pop (key )
134
+
135
+ def _pop_next (self ) -> []:
136
+ nodes = self ._next_nodes ()
137
+ self ._remove_nodes ([node .split ('.' , 1 )[0 ] for node in nodes ])
138
+ return nodes
139
+
140
+ def _is_first (self ) -> bool :
141
+ return self .previous is None
142
+
143
+ def _is_last (self ) -> bool :
144
+ return self .next is None
145
+
146
+ def _pop_next_interiteration (self , new_nodes_in_this_cycle ):
147
+ nodes = []
148
+ new_nodes_in_this_cycle = [node .split ('.' , 1 )[0 ] for node in new_nodes_in_this_cycle ]
149
+ if not self ._is_first ():
150
+ next_intraiteration_nodes = self ._next_nodes ()
151
+ for next_intraiteration_node in next_intraiteration_nodes :
152
+ # given an intraiteration node, let's find its upstream nodes,
153
+ # from the previous cycle
154
+ node = next_intraiteration_node .split ('.' , 1 )[0 ]
155
+ if node in self .previous .current_nodes or node in new_nodes_in_this_cycle :
156
+ continue
157
+ row_index = self .nodes .index (node )
158
+ row = self .interiteration_matrix [row_index ]
159
+
160
+ # these are the nodes triggering the `next_intraiteration_node` from the
161
+ # previous cycle
162
+ interiteration_trigger_nodes = [value for (key , value ) in enumerate (row ) if value == 1 ]
163
+ if not interiteration_trigger_nodes :
164
+ # node has no intercycle dependency, fine to return it
165
+ nodes .append (next_intraiteration_node )
166
+ self ._remove_nodes ([node ])
167
+ if interiteration_trigger_nodes :
168
+ for interiteration_trigger_node in interiteration_trigger_nodes :
169
+ # if this is True, it means the node in the previous cycle has been
170
+ # returned, so we are good to return this downstream dependency
171
+ if interiteration_trigger_node not in self .previous .current_nodes :
172
+ nodes .append (next_intraiteration_node )
173
+ self ._remove_nodes ([next_intraiteration_node ])
174
+ return nodes
175
+
176
+
177
+ class RemoveCycle (BaseException ):
178
+ ...
179
+
180
+
74
181
class TasksIterator :
75
182
"""An iterator that will iterate over all the nodes in a cycle,
76
183
starting a new cycle as soon as a task has a interiteration
77
184
dependency (IOW, if a task c.1 has a back-edge to a.2, once
78
185
c.1 is found, it will start the cycle 2 and iterate over a.2,
79
186
even if the cycle 1 is still being processed."""
80
187
81
- def __init__ (self , graph : DiGraph , cycles = 2 ):
188
+ def __init__ (self , graph : DiGraph , cycles_removed : Union [ None , list ], cycles = 2 ):
82
189
if not isinstance (graph , DiGraph ):
83
190
raise TypeError ('graph must be a non-empty DiGraph' )
84
191
if not isinstance (cycles , int ):
85
192
raise TypeError ('cycles must be an integer' )
86
193
if cycles <= 0 :
87
194
raise ValueError ('cycles value must be greater than zero' )
88
- graph , cycles_removed = decyclify (graph )
89
195
self .graph = graph
90
- self .cycles = cycles
91
- self .cycles_removed = cycles_removed
92
-
93
- self .intraiteration_matrix = create_intraiteration_matrix (graph )
196
+ self .nodes = [node for node in self .graph .nodes ]
197
+ self .intraiteration_matrix = np .array (create_intraiteration_matrix (graph ), copy = True )
198
+ self .interiteration_matrix = np .array (create_interiteration_matrix (graph .nodes , cycles_removed ), copy = True )
94
199
95
- self .interiteration_matrix = create_interiteration_matrix (graph .nodes , cycles_removed )
200
+ # here we create a linked-list of cycles; where each cycle knows its previous
201
+ # cycle and the next cycle (if available).
202
+ self .cycles = []
203
+ for cycle_number in range (0 , cycles ):
204
+ cycle = Cycle (
205
+ cycle_number = cycle_number ,
206
+ graph = self .graph ,
207
+ nodes = self .nodes ,
208
+ interiteration_matrix = self .interiteration_matrix
209
+ )
210
+ if self .cycles :
211
+ cycle .previous = self .cycles [- 1 ]
212
+ self .cycles [- 1 ].next = cycle
213
+ self .cycles .append (cycle )
96
214
97
215
def __iter__ (self ):
98
216
return self
99
217
100
218
def __next__ (self ):
101
- # WIP
102
- raise StopIteration
219
+ if not self .cycles :
220
+ raise StopIteration
221
+
222
+ # collect tasks/nodes ready to execute in each cycle
223
+ new_nodes_in_this_cycle = []
224
+ for cycle in self .cycles :
225
+ try :
226
+ new_nodes_in_this_cycle .extend (cycle .iterate (new_nodes_in_this_cycle ))
227
+ except RemoveCycle :
228
+ self .cycles .remove (cycle )
229
+
230
+ # no more nodes, stop iterating
231
+ if not new_nodes_in_this_cycle :
232
+ raise StopIteration
233
+
234
+ return new_nodes_in_this_cycle
0 commit comments