Source code for roslibpy.tf

"""
TF
==

ROS provides a very powerful transform library called `TF2 <http://wiki.ros.org/tf2>`_,
which lets the user keep track of multiple coordinate frames over time.

The **roslibpy** library offers access to it through the
`tf2_web_republisher <http://wiki.ros.org/tf2_web_republisher>`_
via the :class:`TFClient` class.

.. autoclass:: TFClient
   :members:

"""
from __future__ import print_function

import logging
import math

from . import Service, ServiceRequest, Topic

__all__ = ["TFClient"]

LOGGER = logging.getLogger("roslibpy.tf")


[docs]class TFClient(object): """A TF Client that listens to TFs from tf2_web_republisher. Args: ros (:class:`.Ros`): Instance of the ROS connection. fixed_frame (:obj:`str`): Fixed frame, e.g. ``/base_link``. angular_threshold (:obj:`float`): Angular threshold for the TF republisher. translation_threshold (:obj:`float`): Translation threshold for the TF republisher. rate (:obj:`float`): Rate for the TF republisher. update_delay (:obj:`int`): Time expressed in milliseconds to wait after a new subscription before update TFs. topic_timeout (:obj:`int`): Timeout parameter for the TF republisher expressed in milliseconds. repub_service_name (:obj:`str`): Name of the republish tfs service, e.g. ``/republish_tfs``. """ def __init__( self, ros, fixed_frame="/base_link", angular_threshold=2.0, translation_threshold=0.01, rate=10.0, update_delay=50, topic_timeout=2000.0, server_name="/tf2_web_republisher", repub_service_name="/republish_tfs", ): self.ros = ros self.fixed_frame = fixed_frame self.angular_threshold = angular_threshold self.translation_threshold = translation_threshold self.rate = rate self.update_delay = update_delay seconds = topic_timeout / 1000.0 secs = math.floor(seconds) nsecs = math.floor((seconds - secs) * int(1e9)) self.topic_timeout = dict(secs=secs, nsecs=nsecs) self.repub_service_name = repub_service_name self.current_topic = False self.frame_info = {} self.republisher_update_requested = False self.service_client = Service(ros, self.repub_service_name, "tf2_web_republisher/RepublishTFs") def _process_tf_array(self, tf): """Process an incoming TF message and send it out using the callback functions. Args: tf (:obj:`list`): TF message from the server. """ # TODO: Test this function for transform in tf["transforms"]: frame_id = self._normalize_frame_id(transform["child_frame_id"]) frame = self.frame_info.get(frame_id, None) if frame: frame["transform"] = dict( translation=transform["transform"]["translation"], rotation=transform["transform"]["rotation"] ) for callback in frame["cbs"]: callback(frame["transform"])
[docs] def update_goal(self): """Send a new service request to the tf2_web_republisher based on the current list of TFs.""" message = dict( source_frames=list(self.frame_info.keys()), target_frame=self.fixed_frame, angular_thres=self.angular_threshold, trans_thres=self.translation_threshold, rate=self.rate, ) # In contrast to roslibjs, we do not support groovy compatibility mode # and only use the service interface to the TF republisher message["timeout"] = self.topic_timeout request = ServiceRequest(message) self.service_client.call(request, self._process_response, self._process_error) self.republisher_update_requested = False
def _process_error(self, response): LOGGER.error("The TF republisher service interface returned an error. %s", str(response)) def _process_response(self, response): """Process the service response and subscribe to the tf republisher topic.""" LOGGER.info("Received response from TF Republisher service interface") if self.current_topic: self.current_topic.unsubscribe() self.current_topic = Topic(self.ros, response["topic_name"], "tf2_web_republisher/TFArray") self.current_topic.subscribe(self._process_tf_array) def _normalize_frame_id(self, frame_id): # Remove leading slash, if it's there if frame_id[0] == "/": return frame_id[1:] return frame_id
[docs] def subscribe(self, frame_id, callback): """Subscribe to the given TF frame. Args: frame_id (:obj:`str`): TF frame identifier to subscribe to. callback (:obj:`callable`): A callable functions receiving one parameter with `transform` data. """ frame_id = self._normalize_frame_id(frame_id) frame = self.frame_info.get(frame_id, None) # If there is no callback registered for the given frame, create emtpy callback list if not frame: frame = dict(cbs=[]) self.frame_info[frame_id] = frame if not self.republisher_update_requested: self.ros.call_later(self.update_delay / 1000.0, self.update_goal) self.republisher_update_requested = True else: # If we already have a transform, call back immediately if "transform" in frame: callback(frame["transform"]) frame["cbs"].append(callback)
[docs] def unsubscribe(self, frame_id, callback): """Unsubscribe from the given TF frame. Args: frame_id (:obj:`str`): TF frame identifier to unsubscribe from. callback (:obj:`callable`): The callback function to remove. """ frame_id = self._normalize_frame_id(frame_id) frame = self.frame_info.get(frame_id, None) if "cbs" in frame: frame["cbs"].pop(callback) if not callback or ("cbs" in frame and len(frame["cbs"]) == 0): self.frame_info.pop(frame_id)
[docs] def dispose(self): """Unsubscribe and unadvertise all topics associated with this instance.""" if self.current_topic: self.current_topic.unsubscribe()
if __name__ == "__main__": from roslibpy import Ros FORMAT = "%(asctime)-15s [%(levelname)s] %(message)s" logging.basicConfig(level=logging.DEBUG, format=FORMAT) ros_client = Ros("127.0.0.1", 9090) def run_tf_example(): tfclient = TFClient(ros_client, fixed_frame="world", angular_threshold=0.01, translation_threshold=0.01) tfclient.subscribe("turtle2", print) def dispose_server(): tfclient.dispose() ros_client.call_later(10, dispose_server) ros_client.call_later(11, ros_client.close) ros_client.call_later(12, ros_client.terminate) run_tf_example() ros_client.run_forever()