# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""For clients to use the world object service"""
from bosdyn.api import geometry_pb2 as geom
from bosdyn.api import world_object_pb2, world_object_service_pb2
from bosdyn.api import world_object_service_pb2_grpc as world_object_service
from bosdyn.client.common import BaseClient, common_header_errors
from bosdyn.client.frame_helpers import *
from bosdyn.client.robot_command import NoTimeSyncError
from bosdyn.client.time_sync import update_time_filter, update_timestamp_filter
from bosdyn.util import now_timestamp
[docs]class WorldObjectClient(BaseClient):
"""Client for World Object service."""
default_service_name = 'world-objects'
service_type = 'bosdyn.api.WorldObjectService'
def __init__(self):
super(WorldObjectClient, self).__init__(world_object_service.WorldObjectServiceStub)
self._timesync_endpoint = None
[docs] def update_from(self, other):
super(WorldObjectClient, self).update_from(other)
# Grab a timesync endpoint if it is available.
try:
self._timesync_endpoint = other.time_sync.endpoint
except AttributeError:
pass # other doesn't have a time_sync accessor
@property
def timesync_endpoint(self):
"""Accessor for timesync-endpoint that is grabbed via 'update_from()'.
Raises:
bosdyn.client.robot_command.NoTimeSyncError: Could not find the timesync endpoint for
the robot.
"""
if not self._timesync_endpoint:
raise NoTimeSyncError("[world object service] No timesync endpoint set for the robot")
return self._timesync_endpoint
[docs] def list_world_objects(self, object_type=None, time_start_point=None, **kwargs):
"""Get a list of World Objects.
Args:
object_type (list of bosdyn.api.WorldObjectType): Specific types to include in the
response, all other types will be
filtered out.
time_start_point (float): A client timestamp to filter objects in the response. All objects
will have a timestamp after this time.
Returns:
The response message, which includes the filtered list of all world objects.
Raises:
RpcError: Problem communicating with the robot.
bosdyn.client.robot_command.NoTimeSyncError: Couldn't convert the timestamp into robot
time.
"""
if time_start_point is not None:
time_start_point = update_time_filter(self, time_start_point, self.timesync_endpoint)
req = world_object_pb2.ListWorldObjectRequest(object_type=object_type,
timestamp_filter=time_start_point)
return self.call(self._stub.ListWorldObjects, req,
value_from_response=_get_world_object_value,
error_from_response=common_header_errors, copy_request=False, **kwargs)
[docs] def list_world_objects_async(self, object_type=None, time_start_point=None, **kwargs):
"""Async version of list_world_objects()."""
if time_start_point is not None:
time_start_point = update_time_filter(self, time_start_point, self.timesync_endpoint)
req = world_object_pb2.ListWorldObjectRequest(object_type=object_type,
timestamp_filter=time_start_point)
return self.call_async(self._stub.ListWorldObjects, req,
value_from_response=_get_world_object_value,
error_from_response=common_header_errors, copy_request=False,
**kwargs)
[docs] def mutate_world_objects(self, mutation_req, **kwargs):
"""Mutate (add, change, delete) world objects.
Args:
mutation_req (world_object_pb2.MutateWorldObjectRequest): The request including
the object to be mutated and the
type of mutation.
Returns:
The response, which includes the id of the mutated object.
Raises:
RpcError: Problem communicating with the robot.
bosdyn.client.robot_command.NoTimeSyncError: Couldn't convert the timestamp into robot
time.
"""
if mutation_req.mutation.object.HasField("acquisition_time"):
# Ensure the mutation request's object's time of detection is in robot time.
client_timestamp = mutation_req.mutation.object.acquisition_time
mutation_req.mutation.object.acquisition_time.CopyFrom(
update_timestamp_filter(self, client_timestamp, self.timesync_endpoint))
return self.call(self._stub.MutateWorldObjects, mutation_req,
value_from_response=_get_status, error_from_response=common_header_errors,
**kwargs)
[docs] def mutate_world_objects_async(self, mutation_req, **kwargs):
"""Async version of mutate_world_objects()."""
if mutation_req.mutation.object.HasField("acquisition_time"):
# Ensure the mutation request's object's time of detection is in robot time.
client_timestamp = mutation_req.mutation.object.acquisition_time
mutation_req.mutation.object.acquisition_time.CopyFrom(
update_timestamp_filter(self, client_timestamp, self.timesync_endpoint))
return self.call_async(self._stub.MutateWorldObjects, mutation_req,
value_from_response=_get_status,
error_from_response=common_header_errors, **kwargs)
[docs] def draw_sphere(self, name, x_rt_frame_name, y_rt_frame_name, z_rt_frame_name, frame_name,
radius=0.05, rgba=(255, 0, 0, 1), list_objects_now=True):
"""Create a drawable sphere world object that will be sent to the world object service
with a mutation request.
Args:
name (string): The human-readable name of the world object.
x_rt_frame_name,y_rt_frame_name,z_rt_frame_name (int): The coordinate position (x,y,z) of
the drawable sphere.
frame_name (string): the frame in which the sphere's position is described.
radius (float): The radius for the drawn sphere.
rgba (4 valued tuple): The RGBA color, where RGB are int values in [0,255] and A is a float in [0,1].
list_objects_now (boolean): Should the ListWorldObjects request be made after creating
the sphere world object.
Returns:
The MutateWorldObjectResponse for the addition of the sphere world object.
"""
vision_tform_drawable = geom.SE3Pose(
position=geom.Vec3(x=x_rt_frame_name, y=y_rt_frame_name, z=z_rt_frame_name),
rotation=geom.Quaternion(w=1, x=0, y=0, z=0))
# Create a map between the child frame name and the parent frame name/SE3Pose parent_tform_child
edges = {}
# Create an edge in the frame tree snapshot that includes vision_tform_drawable
drawable_frame_name = name
edges = add_edge_to_tree(edges, vision_tform_drawable, frame_name, drawable_frame_name)
snapshot = geom.FrameTreeSnapshot(child_to_parent_edge_map=edges)
# Set the acquisition time for the sphere using a function to get google.protobuf.Timestamp of the current system time.
time_now = now_timestamp()
# Create the sphere drawable object
sphere = world_object_pb2.DrawableSphere(radius=radius)
draw_color = world_object_pb2.DrawableProperties.Color(r=rgba[0], g=rgba[1], b=rgba[2],
a=rgba[3])
sphere_drawable_prop = world_object_pb2.DrawableProperties(
color=draw_color, label=name, wireframe=False, sphere=sphere,
frame_name_drawable=drawable_frame_name)
# Create the complete world object with transform information, a unique name, and the drawable sphere properties.
sphere_to_add = world_object_pb2.WorldObject(name=name, transforms_snapshot=snapshot,
acquisition_time=time_now,
drawable_properties=[sphere_drawable_prop])
# Add the sphere to the robot's world object service
add_sphere = make_add_world_object_req(sphere_to_add)
resp = self.mutate_world_objects(mutation_req=add_sphere)
if list_objects_now:
# Request a listing of the world objects so that the sphere shows up in the log.
self.list_world_objects()
return resp
[docs] def draw_oriented_bounding_box(self, name, drawable_box_frame_name, frame_name,
frame_name_tform_drawable_box, size_ewrt_box_vec3,
rgba=(255, 0, 0, 1), wireframe=True, list_objects_now=False):
"""Create a drawable 3D box world object that will be sent to the world object service
with a mutation request.
Args:
name (string): The human-readable name of the world object.
drawable_box_frame_name (string): The frame name for the drawable box frame.
frame_name (string): The frame name which the drawable box is described relative to.
frame_name_tform_drawable_box (geometry_pb2.SE3Pose): the SE3 pose of the drawable box relative to frame name.
size_ewrt_box_vec3 (float): The size of the box (x,y,z) expressed with respect to the
drawable box frame.
rgba (4 valued tuple): The RGBA color, where RGB are int values in [0,255] and A is a float in [0,1].
wireframe (boolean): Should this be drawn as a wireframe [wireframe=true] or a solid object [wireframe=false].
list_objects_now (boolean): Should the ListWorldObjects request be made after creating
the sphere world object.
Returns:
The MutateWorldObjectResponse for the addition of the sphere world object.
"""
# Create a map between the child frame name and the parent frame name/SE3Pose parent_tform_child
edges = {}
# Create an edge in the frame tree snapshot that includes frame_tform_box
drawable_frame_name = name
edges = add_edge_to_tree(edges, frame_name_tform_drawable_box, frame_name,
drawable_frame_name)
snapshot = geom.FrameTreeSnapshot(child_to_parent_edge_map=edges)
# Set the acquisition time for the box using a function to get google.protobuf.Timestamp of the current system time.
time_now = now_timestamp()
# Create the box drawable object
box = world_object_pb2.DrawableBox(size=size_ewrt_box_vec3)
draw_color = world_object_pb2.DrawableProperties.Color(r=rgba[0], g=rgba[1], b=rgba[2],
a=rgba[3])
box_drawable_prop = world_object_pb2.DrawableProperties(
color=draw_color, label=name, wireframe=wireframe, box=box,
frame_name_drawable=drawable_box_frame_name)
# Create the complete world object with transform information, a unique name, and the drawable box properties.
box_to_add = world_object_pb2.WorldObject(name=name, transforms_snapshot=snapshot,
acquisition_time=time_now,
drawable_properties=[box_drawable_prop])
# Add the box to the robot's world object service
add_box = make_add_world_object_req(box_to_add)
resp = self.mutate_world_objects(mutation_req=add_box)
if list_objects_now:
# Request a listing of the world objects so that the box shows up in the log.
self.list_world_objects()
return resp
def _get_world_object_value(response):
return response
def _get_status(response):
if (response.status != world_object_pb2.MutateWorldObjectResponse.STATUS_OK):
if (response.status == world_object_pb2.MutateWorldObjectResponse.STATUS_INVALID_MUTATION_ID
):
print("Object id not found, and could not be mutated.")
if (response.status == world_object_pb2.MutateWorldObjectResponse.STATUS_NO_PERMISSION):
print(
"Cannot change/delete objects detected by Spot's perception system, only client objects."
)
return response
'''
Static helper methods for constructing and sending mutation requests for a given world object.
'''
[docs]def make_add_world_object_req(world_obj):
"""Add a world object to the scene.
Args:
world_obj (WorldObject): The world object to be added into the robot's perception scene.
Returns:
A MutateWorldObjectRequest where the action is to "add" the object to the scene.
"""
add_obj = world_object_pb2.MutateWorldObjectRequest.Mutation(
action=world_object_pb2.MutateWorldObjectRequest.ACTION_ADD, object=world_obj)
req = world_object_pb2.MutateWorldObjectRequest(mutation=add_obj)
return req
[docs]def make_delete_world_object_req(world_obj):
"""Delete a world object from the scene.
Args:
world_obj (WorldObject): The world object to be deleted in the robot's perception scene. The
object must be a client-added object and have the correct world object
id returned by the service after adding the object.
Returns:
A MutateWorldObjectRequest where the action is to "delete" the object to the scene.
"""
del_obj = world_object_pb2.MutateWorldObjectRequest.Mutation(
action=world_object_pb2.MutateWorldObjectRequest.ACTION_DELETE, object=world_obj)
req = world_object_pb2.MutateWorldObjectRequest(mutation=del_obj)
return req
[docs]def make_change_world_object_req(world_obj):
"""Change/update an existing world object in the scene.
Args:
world_obj (WorldObject): The world object to be changed/updated in the robot's perception scene.
The object must be a client-added object and have the correct world object
id returned by the service after adding the object.
Returns:
A MutateWorldObjectRequest where the action is to "change" the object to the scene.
"""
change_obj = world_object_pb2.MutateWorldObjectRequest.Mutation(
action=world_object_pb2.MutateWorldObjectRequest.ACTION_CHANGE, object=world_obj)
req = world_object_pb2.MutateWorldObjectRequest(mutation=change_obj)
return req
[docs]def send_add_mutation_requests(world_object_client, world_object_array):
"""
Create and send an "add" mutation request for each world object in an array. Return a matching
array of the object id's that are assigned when the object is created, so that each object we add
can be identified and removed individually (if desired) later.
Args:
world_object_client (WorldObjectClient): Client for World Object service.
world_object_array (List): List of objects to add.
Returns:
A List containing the object ids associated with the objects created.
"""
obj_id = [-1] * len(world_object_array)
for i, obj in enumerate(world_object_array):
add_req = make_add_world_object_req(obj)
add_resp = world_object_client.mutate_world_objects(mutation_req=add_req)
obj_id[i] = add_resp.mutated_object_id
return obj_id
[docs]def send_delete_mutation_requests(world_object_client, delete_object_id_array):
"""
Create and send a "delete" mutation request for each world object successfully identified from a
given list of object id's.
Args:
world_object_client (WorldObjectClient): Client for World Object service.
delete_object_id_array (List): List of object id's to send delete requests for.
"""
world_objects = world_object_client.list_world_objects().world_objects
for obj in world_objects:
this_object_id = obj.id
if isinstance(delete_object_id_array, int):
delete_object_id_array = [delete_object_id_array]
for i in range(len(delete_object_id_array)):
delete_id = delete_object_id_array[i]
if this_object_id == delete_id:
del_req = make_delete_world_object_req(obj)
del_resp = world_object_client.mutate_world_objects(mutation_req=del_req)
continue