|
| 1 | +""" |
| 2 | +Elastic Bands |
| 3 | +
|
| 4 | +author: Wang Zheng (@Aglargil) |
| 5 | +
|
| 6 | +Ref: |
| 7 | +
|
| 8 | +- [Elastic Bands: Connecting Path Planning and Control] |
| 9 | +(http://www8.cs.umu.se/research/ifor/dl/Control/elastic%20bands.pdf) |
| 10 | +""" |
| 11 | + |
| 12 | +import numpy as np |
| 13 | +import sys |
| 14 | +import pathlib |
| 15 | +import matplotlib.pyplot as plt |
| 16 | +from matplotlib.patches import Circle |
| 17 | + |
| 18 | +sys.path.append(str(pathlib.Path(__file__).parent.parent.parent)) |
| 19 | + |
| 20 | +from Mapping.DistanceMap.distance_map import compute_sdf_scipy |
| 21 | + |
| 22 | +# Elastic Bands Params |
| 23 | +MAX_BUBBLE_RADIUS = 100 |
| 24 | +MIN_BUBBLE_RADIUS = 10 |
| 25 | +RHO0 = 20.0 # Maximum distance for applying repulsive force |
| 26 | +KC = 0.05 # Contraction force gain |
| 27 | +KR = -0.1 # Repulsive force gain |
| 28 | +LAMBDA = 0.7 # Overlap constraint factor |
| 29 | +STEP_SIZE = 3.0 # Step size for calculating gradient |
| 30 | + |
| 31 | +# Visualization Params |
| 32 | +ENABLE_PLOT = True |
| 33 | +# ENABLE_INTERACTIVE is True allows user to add obstacles by left clicking |
| 34 | +# and add path points by right clicking and start planning by middle clicking |
| 35 | +ENABLE_INTERACTIVE = False |
| 36 | +# ENABLE_SAVE_DATA is True allows saving the path and obstacles which added |
| 37 | +# by user in interactive mode to file |
| 38 | +ENABLE_SAVE_DATA = False |
| 39 | +MAX_ITER = 50 |
| 40 | + |
| 41 | + |
| 42 | +class Bubble: |
| 43 | + def __init__(self, position, radius): |
| 44 | + self.pos = np.array(position) # Bubble center coordinates [x, y] |
| 45 | + self.radius = radius # Safety distance radius ρ(b) |
| 46 | + if self.radius > MAX_BUBBLE_RADIUS: |
| 47 | + self.radius = MAX_BUBBLE_RADIUS |
| 48 | + if self.radius < MIN_BUBBLE_RADIUS: |
| 49 | + self.radius = MIN_BUBBLE_RADIUS |
| 50 | + |
| 51 | + |
| 52 | +class ElasticBands: |
| 53 | + def __init__( |
| 54 | + self, |
| 55 | + initial_path, |
| 56 | + obstacles, |
| 57 | + rho0=RHO0, |
| 58 | + kc=KC, |
| 59 | + kr=KR, |
| 60 | + lambda_=LAMBDA, |
| 61 | + step_size=STEP_SIZE, |
| 62 | + ): |
| 63 | + self.distance_map = compute_sdf_scipy(obstacles) |
| 64 | + self.bubbles = [ |
| 65 | + Bubble(p, self.compute_rho(p)) for p in initial_path |
| 66 | + ] # Initialize bubble chain |
| 67 | + self.kc = kc # Contraction force gain |
| 68 | + self.kr = kr # Repulsive force gain |
| 69 | + self.rho0 = rho0 # Maximum distance for applying repulsive force |
| 70 | + self.lambda_ = lambda_ # Overlap constraint factor |
| 71 | + self.step_size = step_size # Step size for calculating gradient |
| 72 | + self._maintain_overlap() |
| 73 | + |
| 74 | + def compute_rho(self, position): |
| 75 | + """Compute the distance field value at the position""" |
| 76 | + return self.distance_map[int(position[0]), int(position[1])] |
| 77 | + |
| 78 | + def contraction_force(self, i): |
| 79 | + """Calculate internal contraction force for the i-th bubble""" |
| 80 | + if i == 0 or i == len(self.bubbles) - 1: |
| 81 | + return np.zeros(2) |
| 82 | + |
| 83 | + prev = self.bubbles[i - 1].pos |
| 84 | + next_ = self.bubbles[i + 1].pos |
| 85 | + current = self.bubbles[i].pos |
| 86 | + |
| 87 | + # f_c = kc * ( (prev-current)/|prev-current| + (next-current)/|next-current| ) |
| 88 | + dir_prev = (prev - current) / (np.linalg.norm(prev - current) + 1e-6) |
| 89 | + dir_next = (next_ - current) / (np.linalg.norm(next_ - current) + 1e-6) |
| 90 | + return self.kc * (dir_prev + dir_next) |
| 91 | + |
| 92 | + def repulsive_force(self, i): |
| 93 | + """Calculate external repulsive force for the i-th bubble""" |
| 94 | + h = self.step_size # Step size |
| 95 | + b = self.bubbles[i].pos |
| 96 | + rho = self.bubbles[i].radius |
| 97 | + |
| 98 | + if rho >= self.rho0: |
| 99 | + return np.zeros(2) |
| 100 | + |
| 101 | + # Finite difference approximation of the gradient ∂ρ/∂b |
| 102 | + dx = np.array([h, 0]) |
| 103 | + dy = np.array([0, h]) |
| 104 | + grad_x = (self.compute_rho(b - dx) - self.compute_rho(b + dx)) / (2 * h) |
| 105 | + grad_y = (self.compute_rho(b - dy) - self.compute_rho(b + dy)) / (2 * h) |
| 106 | + grad = np.array([grad_x, grad_y]) |
| 107 | + |
| 108 | + return self.kr * (self.rho0 - rho) * grad |
| 109 | + |
| 110 | + def update_bubbles(self): |
| 111 | + """Update bubble positions""" |
| 112 | + new_bubbles = [] |
| 113 | + for i in range(len(self.bubbles)): |
| 114 | + if i == 0 or i == len(self.bubbles) - 1: |
| 115 | + new_bubbles.append(self.bubbles[i]) # Fixed start and end points |
| 116 | + continue |
| 117 | + |
| 118 | + f_total = self.contraction_force(i) + self.repulsive_force(i) |
| 119 | + v = self.bubbles[i - 1].pos - self.bubbles[i + 1].pos |
| 120 | + |
| 121 | + # Remove tangential component |
| 122 | + f_star = f_total - f_total * v * v / (np.linalg.norm(v) ** 2 + 1e-6) |
| 123 | + |
| 124 | + alpha = self.bubbles[i].radius # Adaptive step size |
| 125 | + new_pos = self.bubbles[i].pos + alpha * f_star |
| 126 | + new_pos = np.clip(new_pos, 0, 499) |
| 127 | + new_radius = self.compute_rho(new_pos) |
| 128 | + |
| 129 | + # Update bubble and maintain overlap constraint |
| 130 | + new_bubble = Bubble(new_pos, new_radius) |
| 131 | + new_bubbles.append(new_bubble) |
| 132 | + |
| 133 | + self.bubbles = new_bubbles |
| 134 | + self._maintain_overlap() |
| 135 | + |
| 136 | + def _maintain_overlap(self): |
| 137 | + """Maintain bubble chain continuity (simplified insertion/deletion mechanism)""" |
| 138 | + # Insert bubbles |
| 139 | + i = 0 |
| 140 | + while i < len(self.bubbles) - 1: |
| 141 | + bi, bj = self.bubbles[i], self.bubbles[i + 1] |
| 142 | + dist = np.linalg.norm(bi.pos - bj.pos) |
| 143 | + if dist > self.lambda_ * (bi.radius + bj.radius): |
| 144 | + new_pos = (bi.pos + bj.pos) / 2 |
| 145 | + rho = self.compute_rho( |
| 146 | + new_pos |
| 147 | + ) # Calculate new radius using environment model |
| 148 | + self.bubbles.insert(i + 1, Bubble(new_pos, rho)) |
| 149 | + i += 2 # Skip the processed region |
| 150 | + else: |
| 151 | + i += 1 |
| 152 | + |
| 153 | + # Delete redundant bubbles |
| 154 | + i = 1 |
| 155 | + while i < len(self.bubbles) - 1: |
| 156 | + prev = self.bubbles[i - 1] |
| 157 | + next_ = self.bubbles[i + 1] |
| 158 | + dist = np.linalg.norm(prev.pos - next_.pos) |
| 159 | + if dist <= self.lambda_ * (prev.radius + next_.radius): |
| 160 | + del self.bubbles[i] # Delete if redundant |
| 161 | + else: |
| 162 | + i += 1 |
| 163 | + |
| 164 | + |
| 165 | +class ElasticBandsVisualizer: |
| 166 | + def __init__(self): |
| 167 | + self.obstacles = np.zeros((500, 500)) |
| 168 | + self.obstacles_points = [] |
| 169 | + self.path_points = [] |
| 170 | + self.elastic_band = None |
| 171 | + self.running = True |
| 172 | + |
| 173 | + if ENABLE_PLOT: |
| 174 | + self.fig, self.ax = plt.subplots(figsize=(8, 8)) |
| 175 | + self.fig.canvas.mpl_connect("close_event", self.on_close) |
| 176 | + self.ax.set_xlim(0, 500) |
| 177 | + self.ax.set_ylim(0, 500) |
| 178 | + |
| 179 | + if ENABLE_INTERACTIVE: |
| 180 | + self.path_points = [] # Add a list to store path points |
| 181 | + # Connect mouse events |
| 182 | + self.fig.canvas.mpl_connect("button_press_event", self.on_click) |
| 183 | + else: |
| 184 | + self.path_points = np.load(pathlib.Path(__file__).parent / "path.npy") |
| 185 | + self.obstacles_points = np.load( |
| 186 | + pathlib.Path(__file__).parent / "obstacles.npy" |
| 187 | + ) |
| 188 | + for x, y in self.obstacles_points: |
| 189 | + self.add_obstacle(x, y) |
| 190 | + self.plan_path() |
| 191 | + |
| 192 | + self.plot_background() |
| 193 | + |
| 194 | + def on_close(self, event): |
| 195 | + """Handle window close event""" |
| 196 | + self.running = False |
| 197 | + plt.close("all") # Close all figure windows |
| 198 | + |
| 199 | + def plot_background(self): |
| 200 | + """Plot the background grid""" |
| 201 | + if not ENABLE_PLOT or not self.running: |
| 202 | + return |
| 203 | + |
| 204 | + self.ax.cla() |
| 205 | + self.ax.set_xlim(0, 500) |
| 206 | + self.ax.set_ylim(0, 500) |
| 207 | + self.ax.grid(True) |
| 208 | + |
| 209 | + if ENABLE_INTERACTIVE: |
| 210 | + self.ax.set_title( |
| 211 | + "Elastic Bands Path Planning\n" |
| 212 | + "Left click: Add obstacles\n" |
| 213 | + "Right click: Add path points\n" |
| 214 | + "Middle click: Start planning", |
| 215 | + pad=20, |
| 216 | + ) |
| 217 | + else: |
| 218 | + self.ax.set_title("Elastic Bands Path Planning", pad=20) |
| 219 | + |
| 220 | + if self.path_points: |
| 221 | + self.ax.plot( |
| 222 | + [p[0] for p in self.path_points], |
| 223 | + [p[1] for p in self.path_points], |
| 224 | + "yo", |
| 225 | + markersize=8, |
| 226 | + ) |
| 227 | + |
| 228 | + self.ax.imshow(self.obstacles.T, origin="lower", cmap="binary", alpha=0.8) |
| 229 | + self.ax.plot([], [], color="black", label="obstacles") |
| 230 | + if self.elastic_band is not None: |
| 231 | + path = [b.pos.tolist() for b in self.elastic_band.bubbles] |
| 232 | + path = np.array(path) |
| 233 | + self.ax.plot(path[:, 0], path[:, 1], "b-", linewidth=2, label="path") |
| 234 | + |
| 235 | + for bubble in self.elastic_band.bubbles: |
| 236 | + circle = Circle( |
| 237 | + bubble.pos, bubble.radius, fill=False, color="g", alpha=0.3 |
| 238 | + ) |
| 239 | + self.ax.add_patch(circle) |
| 240 | + self.ax.plot(bubble.pos[0], bubble.pos[1], "bo", markersize=10) |
| 241 | + self.ax.plot([], [], color="green", label="bubbles") |
| 242 | + |
| 243 | + self.ax.legend(loc="upper right") |
| 244 | + plt.draw() |
| 245 | + plt.pause(0.01) |
| 246 | + |
| 247 | + def add_obstacle(self, x, y): |
| 248 | + """Add an obstacle at the given coordinates""" |
| 249 | + size = 30 # Side length of the square |
| 250 | + half_size = size // 2 |
| 251 | + x_start = max(0, x - half_size) |
| 252 | + x_end = min(self.obstacles.shape[0], x + half_size) |
| 253 | + y_start = max(0, y - half_size) |
| 254 | + y_end = min(self.obstacles.shape[1], y + half_size) |
| 255 | + self.obstacles[x_start:x_end, y_start:y_end] = 1 |
| 256 | + |
| 257 | + def on_click(self, event): |
| 258 | + """Handle mouse click events""" |
| 259 | + if event.inaxes != self.ax: |
| 260 | + return |
| 261 | + |
| 262 | + x, y = int(event.xdata), int(event.ydata) |
| 263 | + |
| 264 | + if event.button == 1: # Left click to add obstacles |
| 265 | + self.add_obstacle(x, y) |
| 266 | + self.obstacles_points.append([x, y]) |
| 267 | + |
| 268 | + elif event.button == 3: # Right click to add path points |
| 269 | + self.path_points.append([x, y]) |
| 270 | + |
| 271 | + elif event.button == 2: # Middle click to end path input and start planning |
| 272 | + if len(self.path_points) >= 2: |
| 273 | + if ENABLE_SAVE_DATA: |
| 274 | + np.save( |
| 275 | + pathlib.Path(__file__).parent / "path.npy", self.path_points |
| 276 | + ) |
| 277 | + np.save( |
| 278 | + pathlib.Path(__file__).parent / "obstacles.npy", |
| 279 | + self.obstacles_points, |
| 280 | + ) |
| 281 | + self.plan_path() |
| 282 | + |
| 283 | + self.plot_background() |
| 284 | + |
| 285 | + def plan_path(self): |
| 286 | + """Plan the path""" |
| 287 | + |
| 288 | + initial_path = self.path_points |
| 289 | + # Create an elastic band object and optimize |
| 290 | + self.elastic_band = ElasticBands(initial_path, self.obstacles) |
| 291 | + for _ in range(MAX_ITER): |
| 292 | + self.elastic_band.update_bubbles() |
| 293 | + self.path_points = [b.pos for b in self.elastic_band.bubbles] |
| 294 | + self.plot_background() |
| 295 | + |
| 296 | + |
| 297 | +if __name__ == "__main__": |
| 298 | + _ = ElasticBandsVisualizer() |
| 299 | + if ENABLE_PLOT: |
| 300 | + plt.show(block=True) |
0 commit comments