Another way to interact with ROS is through the **actionlib** stack. Actions in
ROS allow to execute preemptable tasks, i.e. tasks that can be interrupted by the client.

Actions are used via the :class:`ActionClient` to which :class:`Goals <Goal>`
can be added. Each goal emits events that can be listened to in order to react to the
updates from the action server. There are four events emmitted: **status**, **result**,
**feedback**, and **timeout**.

from __future__ import print_function

import logging
import math
import random
import threading
import time

from . import Message
from . import Topic
from .event_emitter import EventEmitterMixin

__all__ = ['Goal', 'GoalStatus', 'ActionClient', 'SimpleActionServer']

LOGGER = logging.getLogger('roslibpy')

def _is_earlier(t1, t2):
    """Compares two timestamps."""
    if t1['secs'] > t2['secs']:
        return False
    elif t1['secs'] < t2['secs']:
        return True
    elif t1['nsecs'] < t2['nsecs']:
        return True

    return False

[docs]class GoalStatus: """Valid goal statuses.""" PENDING = 0 ACTIVE = 1 PREEMPTED = 2 SUCCEEDED = 3 ABORTED = 4 REJECTED = 5 PREEMPTING = 6 RECALLING = 7 RECALLED = 8 LOST = 9
[docs]class Goal(EventEmitterMixin): """Goal for an action server. After an event has been added to an action client, it will emit different events to indicate its progress: * ``status``: fires to notify clients on the current state of the goal. * ``feedback``: fires to send clients periodic auxiliary information of the goal. * ``result``: fires to send clients the result upon completion of the goal. * ``timeout``: fires when the goal did not complete in the specified timeout window. Args: action_client (:class:`.ActionClient`): Instance of the action client associated with the goal. goal_message (:class:`.Message`): Goal for the action server. """ def __init__(self, action_client, goal_message): super(Goal, self).__init__() self.action_client = action_client self.goal_message = goal_message self.goal_id = 'goal_%s_%d' % (random.random(), time.time() * 1000) self.wait_result = threading.Event() self.result = None self.status = None = None self.goal_message = Message({ 'goal_id': { 'stamp': { 'secs': 0, 'nsecs': 0 }, 'id': self.goal_id }, 'goal': dict(self.goal_message) }) self.action_client.add_goal(self) self.on('status', self._set_status) self.on('result', self._set_result) self.on('feedback', self._set_feedback)
[docs] def send(self, result_callback=None, timeout=None): """Send goal to the action server. Args: timeout (:obj:`int`): Timeout for the goal's result expressed in seconds. callback (:obj:`callable`): Function to be called when a result is received. It is a shorthand for hooking on the ``result`` event. """ if result_callback: self.on('result', result_callback) self.action_client.goal_topic.publish(self.goal_message) if timeout: self.action_client.ros.call_later(timeout, self._trigger_timeout)
[docs] def cancel(self): """Cancel the current goal.""" self.action_client.cancel_topic.publish(Message({'id': self.goal_id}))
[docs] def wait(self, timeout=None): """Block until the result is available. If ``timeout`` is ``None``, it will wait indefinitely. Args: timeout (:obj:`int`): Timeout to wait for the result expressed in seconds. Returns: Result of the goal. """ if not self.wait_result.wait(timeout): raise Exception('Goal failed to receive result') return self.result
def _trigger_timeout(self): if not self.is_finished: self.emit('timeout') def _set_status(self, status): self.status = status def _set_result(self, result): self.result = result self.wait_result.set() def _set_feedback(self, feedback): = feedback @property def is_finished(self): """Indicate if the goal is finished or not. Returns: bool: True if finished, False otherwise. """ return self.result is not None
[docs]class ActionClient(EventEmitterMixin): """Client to use ROS actions. Args: ros (:class:`.Ros`): Instance of the ROS connection. server_name (:obj:`str`): Action server name, e.g. ``/fibonacci``. action_name (:obj:`str`): Action message name, e.g. ``actionlib_tutorials/FibonacciAction``. timeout (:obj:`int`): **Deprecated.** Connection timeout, expressed in seconds. """ def __init__(self, ros, server_name, action_name, timeout=None, omit_feedback=False, omit_status=False, omit_result=False): super(ActionClient, self).__init__() self.ros = ros self.server_name = server_name self.action_name = action_name self.omit_feedback = omit_feedback self.omit_status = omit_status self.omit_result = omit_result self.goals = {} # Create the topics associated with actionlib self.feedback_listener = Topic(ros, server_name + '/feedback', action_name + 'Feedback') self.status_listener = Topic(ros, server_name + '/status', 'actionlib_msgs/GoalStatusArray') self.result_listener = Topic(ros, server_name + '/result', action_name + 'Result') self.goal_topic = Topic(ros, server_name + '/goal', action_name + 'Goal') self.cancel_topic = Topic(ros, server_name + '/cancel', 'actionlib_msgs/GoalID') # Advertise the goal and cancel topics self.goal_topic.advertise() self.cancel_topic.advertise() # Subscribe to the status topic if not self.omit_status: self.status_listener.subscribe(self._on_status_message) # Subscribe to the feedback topic if not self.omit_feedback: self.feedback_listener.subscribe(self._on_feedback_message) # Subscribe to the result topic if not self.omit_result: self.result_listener.subscribe(self._on_result_message) if timeout: LOGGER.warn( 'Deprecation warning: timeout parameter is ignored, and replaced by the DEFAULT_CONNECTION_TIMEOUT constant.') self.wait_status = threading.Event() if not self.wait_status.wait(DEFAULT_CONNECTION_TIMEOUT): raise Exception('Action client failed to connect, no status received.') def _on_status_message(self, message): self.wait_status.set() for status in message['status_list']: goal_id = status['goal_id']['id'] goal = self.goals.get(goal_id, None) if goal: goal.emit('status', status) def _on_feedback_message(self, message): goal_id = message['status']['goal_id']['id'] goal = self.goals.get(goal_id, None) if goal: goal.emit('status', message['status']) goal.emit('feedback', message['feedback']) def _on_result_message(self, message): goal_id = message['status']['goal_id']['id'] goal = self.goals.get(goal_id, None) if goal: goal.emit('status', message['status']) goal.emit('result', message['result'])
[docs] def add_goal(self, goal): """Add a goal to this action client. Args: goal (:class:`.Goal`): Goal to add. """ self.goals[goal.goal_id] = goal
[docs] def cancel(self): """Cancel all goals associated with this action client.""" self.cancel_topic.publish(Message())
[docs] def dispose(self): """Unsubscribe and unadvertise all topics associated with this action client.""" self.goal_topic.unadvertise() self.cancel_topic.unadvertise() if not self.omit_status: self.status_listener.unsubscribe() if not self.omit_feedback: self.feedback_listener.unsubscribe() if not self.omit_result: self.result_listener.unsubscribe()
[docs]class SimpleActionServer(EventEmitterMixin): """Implementation of the simple action server. The server emits the following events: * ``goal``: fires when a new goal has been received by the server. * ``cancel``: fires when the client has requested the cancellation of the action. Args: ros (:class:`.Ros`): Instance of the ROS connection. server_name (:obj:`str`): Action server name, e.g. ``/fibonacci``. action_name (:obj:`str`): Action message name, e.g. ``actionlib_tutorials/FibonacciAction``. """ STATUS_PUBLISH_INTERVAL = 0.5 # In seconds def __init__(self, ros, server_name, action_name): super(SimpleActionServer, self).__init__() self.ros = ros self.server_name = server_name self.action_name = action_name self._lock = threading.Lock() # Create all required publishers and listeners self.feedback_publisher = Topic(ros, server_name + '/feedback', action_name + 'Feedback') self.status_publisher = Topic(ros, server_name + '/status', 'actionlib_msgs/GoalStatusArray') self.result_publisher = Topic(ros, server_name + '/result', action_name + 'Result') self.goal_listener = Topic(ros, server_name + '/goal', action_name + 'Goal') self.cancel_listener = Topic(ros, server_name + '/cancel', 'actionlib_msgs/GoalID') # Advertise all publishers self.feedback_publisher.advertise() self.status_publisher.advertise() self.result_publisher.advertise() # Track the goals and their status in order to publish status self.status_message = Message(dict( header=dict( stamp=dict(secs=0, nsecs=100), frame_id='' ), status_list=[] )) # Needed for handling preemption prompted by a new goal being received self.current_goal = None # currently tracked goal self.next_goal = None # the one that'll be preempting self.preempt_request = False self.goal_listener.subscribe(self._on_goal_message) self.cancel_listener.subscribe(self._on_cancel_message) # Intentionally not publishing immediately and instead # waiting one interval for the first message self.ros.call_later(self.STATUS_PUBLISH_INTERVAL, self._publish_status)
[docs] def start(self, action_callback): """Start the action server. Args: action_callback: Callable function to be invoked when a new goal is received. It takes one paramter containing the goal message. """'Action server {} started'.format(self.server_name)) def _internal_goal_callback(goal):'Action server {} received new goal'.format(self.server_name)) self.ros.call_in_thread(lambda: action_callback(goal)) def _internal_preempt_callback():'Action server {} received preemption request'.format(self.server_name)) self.preempt_request = True self.on('goal', _internal_goal_callback) self.on('cancel', _internal_preempt_callback)
def _publish_status(self): # Status publishing is required for clients to know they've connected with self._lock: current_time = time.time() secs = int(math.floor(current_time)) nsecs = int(round(1e9 * (current_time - secs))) self.status_message['header']['stamp']['secs'] = secs self.status_message['header']['stamp']['nsecs'] = nsecs self.status_publisher.publish(self.status_message) # Invoke again in the defined interval self.ros.call_later(self.STATUS_PUBLISH_INTERVAL, self._publish_status) def _on_goal_message(self, message): will_cancel = False will_emit_goal = None with self._lock: if self.current_goal: self.next_goal = message # needs to happen AFTER rest is set up will_cancel = True else: self.status_message['status_list'] = [ dict(goal_id=message['goal_id'], status=GoalStatus.ACTIVE)] self.current_goal = message will_emit_goal = message['goal'] if will_cancel: self.emit('cancel') if will_emit_goal: self.emit('goal', will_emit_goal) def _on_cancel_message(self, message): # As described in the comments of the original roslibjs code # this may be more complicated than necessary, since it's # not sure the callbacks can ever wind up with a scenario # where we've been preempted by a next goal, it hasn't finished # processing, and then we get a cancel message. will_cancel = False with self._lock: message_id = message['id'] message_stamp = message['stamp'] secs = message['stamp']['secs'] if secs == 0 and secs == 0 and message_id == '': self.next_goal = None if self.current_goal: will_cancel = True else: # treat id and stamp independently if self.current_goal and message_id == self.current_goal['goal_id']['id']: will_cancel = True elif self.next_goal and message_id == self.next_goal['goal_id']['id']: self.next_goal = None if self.next_goal and _is_earlier(self.next_goal['goal_id']['stamp'], message_stamp): self.next_goal = None if self.current_goal and _is_earlier(self.current_goal['goal_id']['stamp'], message_stamp): will_cancel = True if will_cancel: self.emit('cancel')
[docs] def is_preempt_requested(self): """Indicate whether the client has requested preemption of the current goal.""" with self._lock: return self.preempt_request
[docs] def set_succeeded(self, result): """Set the current action state to succeeded. Args: result (:obj:`dict`): Dictionary of key/values to set as the result of the action. """ 'Action server {} setting current goal to SUCCEEDED'.format(self.server_name)) with self._lock: result_message = Message({ 'status': { 'goal_id': self.current_goal['goal_id'], 'status': GoalStatus.SUCCEEDED }, 'result': result }) self.result_publisher.publish(result_message) self.status_message['status_list'] = [] if self.next_goal: self.current_goal = self.next_goal self.next_goal = None else: self.current_goal = None self.preempt_request = False # If there's a new current goal assigned, emit it if self.current_goal: self.emit('goal', self.current_goal['goal'])
[docs] def send_feedback(self, feedback): """Send feedback. Args: feedback (:obj:`dict`): Dictionary of key/values of the feedback message. """ feedback_message = Message({ 'status': { 'goal_id': self.current_goal['goal_id'], 'status': GoalStatus.ACTIVE }, 'feedback': feedback }) self.feedback_publisher.publish(feedback_message)
[docs] def set_preempted(self): """Set the current action to preempted (cancelled)."""'Action server {} preempting current goal'.format(self.server_name)) with self._lock: self.status_message['status_list'] = [] result_message = Message({ 'status': { 'goal_id': self.current_goal['goal_id'], 'status': GoalStatus.PREEMPTED } }) self.result_publisher.publish(result_message) if self.next_goal: self.current_goal = self.next_goal self.next_goal = None else: self.current_goal = None # Preemption completed self.preempt_request = False # If there's a new current goal assigned, emit it if self.current_goal: self.emit('goal', self.current_goal['goal'])