"""
Actionlib
=========
Another way to interact with ROS is through the **actionlib** stack. Actions in
ROS allow to execute preemptable tasks, i.e. tasks that can be interrupted by the client.
Actions are used via the :class:`ActionClient` to which :class:`Goals <Goal>`
can be added. Each goal emits events that can be listened to in order to react to the
updates from the action server. There are four events emmitted: **status**, **result**,
**feedback**, and **timeout**.
.. autoclass:: Goal
:members:
.. autoclass:: ActionClient
:members:
.. autoclass:: SimpleActionServer
:members:
.. autoclass:: GoalStatus
:members:
"""
from __future__ import print_function
import logging
import math
import random
import threading
import time
from . import Message, Topic
from .core import RosTimeoutError
from .event_emitter import EventEmitterMixin
__all__ = ["Goal", "GoalStatus", "ActionClient", "SimpleActionServer"]
LOGGER = logging.getLogger("roslibpy")
DEFAULT_CONNECTION_TIMEOUT = 3 # in seconds
def _is_earlier(t1, t2):
"""Compares two timestamps."""
if t1["secs"] > t2["secs"]:
return False
elif t1["secs"] < t2["secs"]:
return True
elif t1["nsecs"] < t2["nsecs"]:
return True
return False
[docs]class GoalStatus:
"""Valid goal statuses."""
PENDING = 0
ACTIVE = 1
PREEMPTED = 2
SUCCEEDED = 3
ABORTED = 4
REJECTED = 5
PREEMPTING = 6
RECALLING = 7
RECALLED = 8
LOST = 9
[docs]class Goal(EventEmitterMixin):
"""Goal for an action server.
After an event has been added to an action client, it will emit different
events to indicate its progress:
* ``status``: fires to notify clients on the current state of the goal.
* ``feedback``: fires to send clients periodic auxiliary information of the goal.
* ``result``: fires to send clients the result upon completion of the goal.
* ``timeout``: fires when the goal did not complete in the specified timeout window.
Args:
action_client (:class:`.ActionClient`): Instance of the action client associated with the goal.
goal_message (:class:`.Message`): Goal for the action server.
"""
def __init__(self, action_client, goal_message):
super(Goal, self).__init__()
self.action_client = action_client
self.goal_message = goal_message
self.goal_id = "goal_%s_%d" % (random.random(), time.time() * 1000)
self.wait_result = threading.Event()
self.result = None
self.status = None
self.feedback = None
self.goal_message = Message(
{"goal_id": {"stamp": {"secs": 0, "nsecs": 0}, "id": self.goal_id}, "goal": dict(self.goal_message)}
)
self.action_client.add_goal(self)
self.on("status", self._set_status)
self.on("result", self._set_result)
self.on("feedback", self._set_feedback)
[docs] def send(self, result_callback=None, timeout=None):
"""Send goal to the action server.
Args:
timeout (:obj:`int`): Timeout for the goal's result expressed in seconds.
callback (:obj:`callable`): Function to be called when a result is received.
It is a shorthand for hooking on the ``result`` event.
"""
if result_callback:
self.on("result", result_callback)
self.status = {"status": GoalStatus.PENDING}
self.action_client.goal_topic.publish(self.goal_message)
if timeout:
self.action_client.ros.call_later(timeout, self._trigger_timeout)
[docs] def cancel(self):
"""Cancel the current goal."""
self.action_client.cancel_topic.publish(Message({"id": self.goal_id}))
[docs] def wait(self, timeout=None):
"""Block until the result is available.
If ``timeout`` is ``None``, it will wait indefinitely.
Args:
timeout (:obj:`int`): Timeout to wait for the result expressed in seconds.
Returns:
Result of the goal.
"""
if not self.wait_result.wait(timeout):
raise RosTimeoutError("Goal failed to receive result")
return self.result
def _trigger_timeout(self):
if not self.is_finished:
self.emit("timeout")
def _set_status(self, status):
self.status = status
if self.is_finished:
self.wait_result.set()
def _set_result(self, result):
self.result = result
if self.is_finished:
self.wait_result.set()
def _set_feedback(self, feedback):
self.feedback = feedback
@property
def is_active(self):
if self.status is None:
return False
return self.status["status"] == GoalStatus.ACTIVE or self.status["status"] == GoalStatus.PENDING
@property
def is_finished(self):
"""Indicate if the goal is finished or not.
Returns:
bool: True if finished, False otherwise.
"""
return self.result is not None and not self.is_active
[docs]class ActionClient(EventEmitterMixin):
"""Client to use ROS actions.
Args:
ros (:class:`.Ros`): Instance of the ROS connection.
server_name (:obj:`str`): Action server name, e.g. ``/fibonacci``.
action_name (:obj:`str`): Action message name, e.g. ``actionlib_tutorials/FibonacciAction``.
timeout (:obj:`int`): **Deprecated.** Connection timeout, expressed in seconds.
"""
def __init__(
self, ros, server_name, action_name, timeout=None, omit_feedback=False, omit_status=False, omit_result=False
):
super(ActionClient, self).__init__()
self.ros = ros
self.server_name = server_name
self.action_name = action_name
self.omit_feedback = omit_feedback
self.omit_status = omit_status
self.omit_result = omit_result
self.goals = {}
# Create the topics associated with actionlib
self.feedback_listener = Topic(ros, server_name + "/feedback", action_name + "Feedback")
self.status_listener = Topic(ros, server_name + "/status", "actionlib_msgs/GoalStatusArray")
self.result_listener = Topic(ros, server_name + "/result", action_name + "Result")
self.goal_topic = Topic(ros, server_name + "/goal", action_name + "Goal")
self.cancel_topic = Topic(ros, server_name + "/cancel", "actionlib_msgs/GoalID")
# Advertise the goal and cancel topics
self.goal_topic.advertise()
self.cancel_topic.advertise()
# Subscribe to the status topic
if not self.omit_status:
self.status_listener.subscribe(self._on_status_message)
# Subscribe to the feedback topic
if not self.omit_feedback:
self.feedback_listener.subscribe(self._on_feedback_message)
# Subscribe to the result topic
if not self.omit_result:
self.result_listener.subscribe(self._on_result_message)
if timeout:
LOGGER.warn(
"Deprecation warning: timeout parameter ignored and replaced by DEFAULT_CONNECTION_TIMEOUT constant."
)
self.wait_status = threading.Event()
if not self.wait_status.wait(DEFAULT_CONNECTION_TIMEOUT):
raise RosTimeoutError("Action client failed to connect, no status received.")
def _on_status_message(self, message):
self.wait_status.set()
for status in message["status_list"]:
goal_id = status["goal_id"]["id"]
goal = self.goals.get(goal_id, None)
if goal:
goal.emit("status", status)
def _on_feedback_message(self, message):
goal_id = message["status"]["goal_id"]["id"]
goal = self.goals.get(goal_id, None)
if goal:
goal.emit("feedback", message["feedback"])
def _on_result_message(self, message):
goal_id = message["status"]["goal_id"]["id"]
goal = self.goals.get(goal_id, None)
if goal:
goal.emit("result", message["result"])
[docs] def add_goal(self, goal):
"""Add a goal to this action client.
Args:
goal (:class:`.Goal`): Goal to add.
"""
self.goals[goal.goal_id] = goal
[docs] def cancel(self):
"""Cancel all goals associated with this action client."""
self.cancel_topic.publish(Message())
[docs] def dispose(self):
"""Unsubscribe and unadvertise all topics associated with this action client."""
self.goal_topic.unadvertise()
self.cancel_topic.unadvertise()
if not self.omit_status:
self.status_listener.unsubscribe()
if not self.omit_feedback:
self.feedback_listener.unsubscribe()
if not self.omit_result:
self.result_listener.unsubscribe()
[docs]class SimpleActionServer(EventEmitterMixin):
"""Implementation of the simple action server.
The server emits the following events:
* ``goal``: fires when a new goal has been received by the server.
* ``cancel``: fires when the client has requested the cancellation of the action.
Args:
ros (:class:`.Ros`): Instance of the ROS connection.
server_name (:obj:`str`): Action server name, e.g. ``/fibonacci``.
action_name (:obj:`str`): Action message name, e.g. ``actionlib_tutorials/FibonacciAction``.
"""
STATUS_PUBLISH_INTERVAL = 0.5 # In seconds
def __init__(self, ros, server_name, action_name):
super(SimpleActionServer, self).__init__()
self.ros = ros
self.server_name = server_name
self.action_name = action_name
self._lock = threading.Lock()
# Create all required publishers and listeners
self.feedback_publisher = Topic(ros, server_name + "/feedback", action_name + "Feedback")
self.status_publisher = Topic(ros, server_name + "/status", "actionlib_msgs/GoalStatusArray")
self.result_publisher = Topic(ros, server_name + "/result", action_name + "Result")
self.goal_listener = Topic(ros, server_name + "/goal", action_name + "Goal")
self.cancel_listener = Topic(ros, server_name + "/cancel", "actionlib_msgs/GoalID")
# Advertise all publishers
self.feedback_publisher.advertise()
self.status_publisher.advertise()
self.result_publisher.advertise()
# Track the goals and their status in order to publish status
self.status_message = Message(dict(header=dict(stamp=dict(secs=0, nsecs=100), frame_id=""), status_list=[]))
# Needed for handling preemption prompted by a new goal being received
self.current_goal = None # currently tracked goal
self.next_goal = None # the one that'll be preempting
self.preempt_request = False
self.goal_listener.subscribe(self._on_goal_message)
self.cancel_listener.subscribe(self._on_cancel_message)
# Intentionally not publishing immediately and instead
# waiting one interval for the first message
self.ros.call_later(self.STATUS_PUBLISH_INTERVAL, self._periodic_publish_status)
[docs] def start(self, action_callback):
"""Start the action server.
Args:
action_callback: Callable function to be invoked when a new goal is received.
It takes one paramter containing the goal message.
"""
LOGGER.info("Action server {} started".format(self.server_name))
def _internal_goal_callback(goal):
LOGGER.info("Action server {} received new goal".format(self.server_name))
self.ros.call_in_thread(lambda: action_callback(goal))
def _internal_preempt_callback():
LOGGER.info("Action server {} received preemption request".format(self.server_name))
self.preempt_request = True
self.on("goal", _internal_goal_callback)
self.on("cancel", _internal_preempt_callback)
def _publish_status(self):
current_time = time.time()
secs = int(math.floor(current_time))
nsecs = int(round(1e9 * (current_time - secs)))
self.status_message["header"]["stamp"]["secs"] = secs
self.status_message["header"]["stamp"]["nsecs"] = nsecs
self.status_publisher.publish(self.status_message)
def _periodic_publish_status(self):
# Status publishing is required for clients to know they've connected
with self._lock:
self._publish_status()
# Invoke again in the defined interval
self.ros.call_later(self.STATUS_PUBLISH_INTERVAL, self._periodic_publish_status)
def _on_goal_message(self, message):
will_cancel = False
will_emit_goal = None
with self._lock:
if self.current_goal:
self.next_goal = message
# needs to happen AFTER rest is set up
will_cancel = True
else:
self.status_message["status_list"] = [dict(goal_id=message["goal_id"], status=GoalStatus.ACTIVE)]
self.current_goal = message
will_emit_goal = message["goal"]
if will_cancel:
self.emit("cancel")
if will_emit_goal:
self.emit("goal", will_emit_goal)
def _on_cancel_message(self, message):
# As described in the comments of the original roslibjs code
# this may be more complicated than necessary, since it's
# not sure the callbacks can ever wind up with a scenario
# where we've been preempted by a next goal, it hasn't finished
# processing, and then we get a cancel message.
will_cancel = False
with self._lock:
message_id = message["id"]
message_stamp = message["stamp"]
secs = message["stamp"]["secs"]
if secs == 0 and secs == 0 and message_id == "":
self.next_goal = None
if self.current_goal:
will_cancel = True
else: # treat id and stamp independently
if self.current_goal and message_id == self.current_goal["goal_id"]["id"]:
will_cancel = True
elif self.next_goal and message_id == self.next_goal["goal_id"]["id"]:
self.next_goal = None
if self.next_goal and _is_earlier(self.next_goal["goal_id"]["stamp"], message_stamp):
self.next_goal = None
if self.current_goal and _is_earlier(self.current_goal["goal_id"]["stamp"], message_stamp):
will_cancel = True
if will_cancel:
self.emit("cancel")
[docs] def is_preempt_requested(self):
"""Indicate whether the client has requested preemption of the current goal."""
with self._lock:
return self.preempt_request
[docs] def set_succeeded(self, result):
"""Set the current action state to succeeded.
Args:
result (:obj:`dict`): Dictionary of key/values to set as the result of the action.
"""
LOGGER.info("Action server {} setting current goal to SUCCEEDED".format(self.server_name))
with self._lock:
status = dict(goal_id=self.current_goal["goal_id"], status=GoalStatus.SUCCEEDED)
self.status_message["status_list"] = [status]
self._publish_status()
self.status_message["status_list"] = []
result_message = Message({"status": status, "result": result})
self.result_publisher.publish(result_message)
if self.next_goal:
self.current_goal = self.next_goal
self.next_goal = None
else:
self.current_goal = None
self.preempt_request = False
# If there's a new current goal assigned, emit it
if self.current_goal:
self.emit("goal", self.current_goal["goal"])
[docs] def send_feedback(self, feedback):
"""Send feedback.
Args:
feedback (:obj:`dict`): Dictionary of key/values of the feedback message.
"""
feedback_message = Message(
{"status": {"goal_id": self.current_goal["goal_id"], "status": GoalStatus.ACTIVE}, "feedback": feedback}
)
self.feedback_publisher.publish(feedback_message)
[docs] def set_preempted(self):
"""Set the current action to preempted (cancelled)."""
LOGGER.info("Action server {} preempting current goal".format(self.server_name))
with self._lock:
status = dict(goal_id=self.current_goal["goal_id"], status=GoalStatus.PREEMPTED)
self.status_message["status_list"] = [status]
self._publish_status()
self.status_message["status_list"] = []
result_message = Message({"status": status})
self.result_publisher.publish(result_message)
if self.next_goal:
self.current_goal = self.next_goal
self.next_goal = None
else:
self.current_goal = None
# Preemption completed
self.preempt_request = False
# If there's a new current goal assigned, emit it
if self.current_goal:
self.emit("goal", self.current_goal["goal"])