Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,28 +1,21 @@

import math as maths
import random

import numpy as np
import cv2
import matplotlib.pyplot as plt
from matplotlib.path import Path
import numpy as np
from scipy.spatial import ConvexHull, Delaunay
from sklearn.cluster import DBSCAN
from auto_shepherd_simulation_ros2.sheep_simulation.simulation import Simulation
from auto_shepherd_simulation_ros2.utils.geo_converter import load_coords_from_yaml, MapConverter

import numpy as np
from scipy.spatial import ConvexHull, Delaunay
from visualization_msgs.msg import Marker, MarkerArray
from std_msgs.msg import Header, ColorRGBA
from builtin_interfaces.msg import Duration
from geometry_msgs.msg import Point
from std_msgs.msg import Header, ColorRGBA
import random

try:
mc = MapConverter(load_coords_from_yaml("/home/ros/map/map1.yaml"))
except:
mc = MapConverter(load_coords_from_yaml("../configs/map/map1.yaml"))
from visualization_msgs.msg import Marker, MarkerArray

map_polygon = Path(np.array(mc.map_coords_xy_meters))
#check if points are valid using `map_polygon.contains_point(point)`
from auto_shepherd_simulation_ros2.sheep_simulation.simulation import Simulation
from auto_shepherd_simulation_ros2.utils.geo_converter import load_coords_from_yaml, MapConverter

def circle_around_points(points):
diff = points[:, np.newaxis, :] - points[np.newaxis, :, :]
Expand Down Expand Up @@ -75,7 +68,7 @@ def color_for_label(label):
random.seed(label)
return ColorRGBA(r=random.random(), g=random.random(), b=random.random(), a=0.3)

def render_dbscan_convex_hulls(pub, db, points, frame_id="map", z=0.0):
def render_dbscan_convex_hulls(pub, db, points, frame_id="field_frame", z=0.0):
marker_array = MarkerArray()
labels = db.labels_
unique_labels = set(labels)
Expand Down Expand Up @@ -120,11 +113,61 @@ def render_dbscan_convex_hulls(pub, db, points, frame_id="map", z=0.0):

pub.publish(marker_array)


def render_targets_points(targets_pub, scores, best):
marker_array = MarkerArray()
header = Header(frame_id="field_frame")

max_score = max(
(abs(s['cost']) for s in scores.values() if s['cost'] != -1),
default=1.0
)

for i, data in scores.items():
marker = Marker()
marker.header = header
marker.ns = "target_points"
marker.id = i
marker.type = Marker.CYLINDER
marker.action = Marker.ADD

marker.pose.position.x = data['x']
marker.pose.position.y = data['y']
marker.pose.position.z = 0.2

# Twice as big and proportional to normalized score (except for -1)
if data['cost'] == -1:
scale = 0.3 # default size for invalid scores
elif data['cost'] == -2:
scale = 0.3 # default size for invalid scores
else:
scale = 2.0 * (0.2 + 0.8 * (abs(data['cost']) / max_score))

marker.scale.x = marker.scale.y = scale
marker.scale.z = 0.1 # Cylinder height

# Color conditions
if data == best:
marker.color = ColorRGBA(r=0.0, g=1.0, b=0.0, a=0.5) # Green
elif data['cost'] == -1:
marker.color = ColorRGBA(r=0.0, g=0.0, b=0.0, a=0.5) # Black
elif data['cost'] == -2:
marker.color = ColorRGBA(r=0.0, g=0.0, b=1.0, a=0.5) # Blue
else:
marker.color = ColorRGBA(r=1.0, g=1.0, b=0.0, a=0.5) # Yellow

marker.lifetime.sec = 1
marker_array.markers.append(marker)

targets_pub.publish(marker_array)


def find_best_dog_position(x, y, xd, yd, xc, yc, field_boundary, # ← flock, dog, goal
radius_d=1.4, n_candidates=15, early_exit_threshold=5,
default_goto=np.asarray((0,0)), boundary_pub=None):
"""Return optimal dog (x_d*, y_d*) given current flock and goal."""
default_goto=np.asarray((0,0)),
boundary_pub=None, targets_pub=None):

"""Return optimal dog (x_d*, y_d*) given current flock and goal."""
points = np.stack([x, y], axis=1)
goal_point = np.array([xc,yc])

Expand All @@ -150,25 +193,41 @@ def find_best_dog_position(x, y, xd, yd, xc, yc, field_boundary, # ← flock, d
d = np.linalg.norm(np.asarray([xmean, ymean]) - np.asarray([xd, yd]))
if np.linalg.norm(np.asarray([xmean, ymean]) - np.asarray([xc, yc])) < 5:
d = np.linalg.norm(np.asarray([-10, 10]) - np.asarray([xd, yd]))
if d < 3: optimal_xd, optimal_yd = xd, yd
if d < 3:
optimal_xd, optimal_yd = xd, yd
else:
closest_point = (xd + (-10 - xd) * radius_d / d, yd + (10 - yd) * radius_d / d)
points = np.array([closest_point])
optimal_xd, optimal_yd = closest_point
scores = {0: {'x': optimal_xd, 'y': optimal_yd, 'cost': 1}}
best = min(scores.values(), key=lambda s: s['cost'])
optimal_xd, optimal_yd = best['x'], best['y']

#
elif d > radius_d + radius_sheep:
print("Moving towards sheep")
closest_point = (xd + (xmean - xd) * radius_d / d, yd + (ymean - yd) * radius_d / d)
points = np.array([closest_point])
optimal_xd, optimal_yd = closest_point
scores = {0: {'x': optimal_xd, 'y': optimal_yd, 'cost': 1}}
best = min(scores.values(), key=lambda s: s['cost'])
optimal_xd, optimal_yd = best['x'], best['y']

#
elif d < abs(radius_d - radius_sheep):
print("Moving away from sheep")
d = np.linalg.norm(default_goto - np.asarray([xd, yd]))
closest_point = (xd + (default_goto[0] - xd) * radius_d / d, yd + (default_goto[1] - yd) * radius_d / d)
points = np.array([closest_point])
optimal_xd, optimal_yd = closest_point
scores = {0: {'x': optimal_xd, 'y': optimal_yd, 'cost': 1}}
best = min(scores.values(), key=lambda s: s['cost'])
optimal_xd, optimal_yd = best['x'], best['y']

# Otherwise is actively pressuring the closest group
else:
print("Herding sheep")
# get points around sheep
# get points in an arc around the sheep
angle_a = np.arccos((radius_sheep**2 + d**2 - radius_d**2) / (2 * radius_sheep * d))
angle_start = np.arctan2(yd - ymean, xd - xmean)
angle_range = (angle_start - angle_a, angle_start + angle_a)
Expand All @@ -183,15 +242,37 @@ def find_best_dog_position(x, y, xd, yd, xc, yc, field_boundary, # ← flock, d
# optimise new dog position
last_update = 0
optimal_xd, optimal_yd, optimal_cost = xd, yd, cost(x, y, xd, yd, xc, yc, simulation)
scores = dict()
for i, (new_xd, new_yd) in enumerate(points):
if not map_polygon.contains_point((new_xd, new_yd)): continue
scores[i] = {'x': new_xd, 'y': new_yd, 'cost': -1}

# Check if point is within map
map_polygon = Path(np.array(field_boundary))
if not map_polygon.contains_point((new_xd, new_yd)):
scores[i]['cost'] = -2
continue

# Calculate cost of point using simulation
new_cost = cost(x, y, new_xd, new_yd, xc, yc, simulation)
scores[i]['cost'] = new_cost

# If cost is better, reject old cost
if new_cost < optimal_cost:
optimal_cost = new_cost
optimal_xd, optimal_yd = new_xd, new_yd
last_update += 1
print(f"Best Cost: {optimal_cost}")
if i-last_update > early_exit_threshold: break

# Exit once min threshold is completed
#if i-last_update > early_exit_threshold:
# break

best = min(scores.values(), key=lambda s: s['cost'])
optimal_xd, optimal_yd = best['x'], best['y']

# publish the options for points
if targets_pub:
render_targets_points(targets_pub, scores, best)
return optimal_xd, optimal_yd

def pure_pursuit(dog_xy, target_xy, lookahead=2.0, step=0.5):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from nav_msgs.msg import Path
from visualization_msgs.msg import MarkerArray

from auto_shepherd_simulation_ros2.tmpDogSim.dog_control_lib import find_best_dog_position, pure_pursuit
from auto_shepherd_simulation_ros2.control.dog_control_lib import find_best_dog_position, pure_pursuit
from auto_shepherd_simulation_ros2.utils.geo_converter import load_coords_from_yaml, MapConverter

class DogController(Node):
Expand All @@ -22,38 +22,19 @@ def __init__(self):
self.create_subscription(Path, '/sheep/poses_sim', self._sheep_cb, 10)
self.create_subscription(PoseStamped, '/sheep/goal', self._goal_cb, self.get_qos())
self.marker_pub = self.create_publisher(MarkerArray, "/dbscan_hulls", 10)
self.targets_pub = self.create_publisher(MarkerArray, "/dog/options", 10)
self.create_subscription(Path, "/field/fence/path", self._fence_cb, self.get_qos())

# state caches ----------------------------------------------------
self.dog_xy = None # (x, y)
self._planned_dog_xy = None # (x, y) of the last planned dog position
self.sheep_xy = None # Nx2 array
self.goal_xy = None # (x, y)
self.field_boundary = None

# control timer ----------------------------------------------------
self.timer = self.create_timer(0.1, self._control_step)

yaml_map_file_path = "/home/ros/map/map1.yaml"
print(f"Attempting to load field coordinates from: {yaml_map_file_path}")
try:
field_coords_latlon = load_coords_from_yaml(yaml_map_file_path)
print(f"Successfully loaded {len(field_coords_latlon)} coordinates from YAML.")
except (FileNotFoundError, ValueError) as e:
print(f"Failed to load coordinates from YAML: {e}")
print("Please ensure the file path is correct and the YAML format matches 'field_boundary: - latitude: X - longitude: Y'.")
print("Exiting example.")
exit(1) # Exit if cannot load map data


# Create Map Bounding Box & Convert All Coords
try:
self.map_converter = MapConverter(field_coords_latlon)
map_data = self.map_converter.get_map_data()
self.field_boundary = map_data['map_coords_xy_meters']

except ValueError as e:
print(f"Error during map conversion: {e}")
self.map_converter = None # Ensure map_converter is not set if initialization failed


def get_qos(self):
qos_profile = QoSProfile(
Expand All @@ -65,6 +46,9 @@ def get_qos(self):


# ------------ message callbacks -------------------------------------
def _fence_cb(self, msg: Path):
self.field_boundary = [(p.pose.position.x, p.pose.position.y) for p in msg.poses]

def _dog_cb(self, msg: PoseStamped):
self.dog_xy = (msg.pose.position.x, msg.pose.position.y)

Expand Down Expand Up @@ -124,13 +108,14 @@ def _boundary_follow_step(self):

# ------------ closed-loop control -----------------------------------
def _control_step(self):
print("_control_step")

# make sure we have the three inputs we need
if self.sheep_xy is None or self.goal_xy is None:
# make sure we have the inputs we need
opt = [self.sheep_xy, self.goal_xy, self.field_boundary]
if any(o is None for o in opt):
return
opt = [self.dog_xy, self._planned_dog_xy]
if all(o is None for o in opt):
return
if self.dog_xy is None and self._planned_dog_xy is None:
return # still waiting for the very first dog pose

# ---------------------------------------------
# choose the starting point for optimisation
Expand All @@ -144,7 +129,7 @@ def _control_step(self):
xs, ys = self.sheep_xy[:, 0], self.sheep_xy[:, 1]
xc, yc = self.goal_xy

xd_opt, yd_opt = find_best_dog_position(xs, ys, xd_start, yd_start, xc, yc, self.field_boundary, boundary_pub=self.marker_pub)
xd_opt, yd_opt = find_best_dog_position(xs, ys, xd_start, yd_start, xc, yc, self.field_boundary, boundary_pub=self.marker_pub, targets_pub=self.targets_pub)
print(f"Optimised dog position: ({xd_opt:.2f}, {yd_opt:.2f})")

ps = PoseStamped()
Expand Down
Loading